From 252b920119c092daca5cb34abc31c2a3d8444c9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Pluta?= Date: Wed, 13 Nov 2024 18:28:15 +0100 Subject: [PATCH] Improve consumers implementation --- src/consumers/abstract.ts | 30 +++++++++++++++++++++++++++++ src/consumers/index.ts | 9 +++++++++ src/consumers/mqtt/index.ts | 31 ++++++++++++++++++++++++++++++ src/consumers/mqtt/types.ts | 38 +++++++++++++++++++++++++++++++++++++ src/consumers/types.ts | 36 +++++------------------------------ 5 files changed, 113 insertions(+), 31 deletions(-) create mode 100644 src/consumers/abstract.ts create mode 100644 src/consumers/mqtt/index.ts create mode 100644 src/consumers/mqtt/types.ts diff --git a/src/consumers/abstract.ts b/src/consumers/abstract.ts new file mode 100644 index 0000000..42d556c --- /dev/null +++ b/src/consumers/abstract.ts @@ -0,0 +1,30 @@ +import { Dayjs } from "dayjs"; +import { Config } from "../config"; +import { Measurement } from "../fetcher/types"; +import { ConsumerConfig } from "./types"; + +export abstract class Consumer { + public abstract readonly name: string; + protected abstract requiredFields: readonly (keyof C)[]; + protected abstract publish(config: C, date: Dayjs, measurement: Measurement): void; + + #validate(config: Partial): C { + for (const field of this.requiredFields) { + if (config[field] === undefined) { + throw new Error(`The '${String(field)}' configuration field of'${this.name}' consumer is required`) + } + } + + return config as C; + } + + consume(config: Config, date: Dayjs, measurement: Measurement): void { + const cfg = config.consumers[this.name] as Partial; + + if (cfg.enable !== true) { + return + } + + this.publish(this.#validate(cfg), date, measurement); + } +}; \ No newline at end of file diff --git a/src/consumers/index.ts b/src/consumers/index.ts index 0e898f9..b5531ac 100644 --- a/src/consumers/index.ts +++ b/src/consumers/index.ts @@ -3,9 +3,18 @@ import { Config } from "../config"; import { MQTTConsumer } from "./mqtt"; import { Measurement } from "../fetcher/types"; +/** + * Registered consumers + */ const consumers = [ new MQTTConsumer() ]; +/** + * Runs all registered consumers for the measurement. + * @param config global configuration + * @param date date of measurement + * @param measurement measurement data + */ export const consume = (config: Config, date: Dayjs, measurement: Measurement) => consumers.forEach(consumer => consumer.consume(config, date, measurement)); \ No newline at end of file diff --git a/src/consumers/mqtt/index.ts b/src/consumers/mqtt/index.ts new file mode 100644 index 0000000..355113a --- /dev/null +++ b/src/consumers/mqtt/index.ts @@ -0,0 +1,31 @@ +import fs from 'fs'; +import mqtt from "mqtt"; +import { Measurement } from "../../fetcher/types"; +import { Dayjs } from 'dayjs'; +import { MQTTConfig } from './types'; +import { Consumer } from '../abstract'; + +export * from './types'; + +export class MQTTConsumer extends Consumer { + public name = "mqtt"; + protected requiredFields = ['brokerURL', 'usernamePath', 'passwordPath'] as const; + + protected publish({ brokerURL, usernamePath, passwordPath, clientId, prefix, publishOptions }: MQTTConfig, date: Dayjs, measurement: Measurement) { + const client = mqtt.connect(brokerURL, { + username: fs.readFileSync(usernamePath, 'utf8'), + password: fs.readFileSync(passwordPath, 'utf8'), + clientId: clientId || "tauron-scrapper" + }); + + const topicPrefix = prefix || 'tauron'; + + client.on('connect', () => { + client.publish(`${topicPrefix}/energy/daily`, measurement.energyForDay.sum.toString(), publishOptions); + client.publish(`${topicPrefix}/energy/monthly`, measurement.energyForMonth.sum.toString(), publishOptions); + client.publish(`${topicPrefix}/energy/yearly`, measurement.energyForYear.sum.toString(), publishOptions); + client.publish(`${topicPrefix}/reading`, measurement.reading.C.toString(), publishOptions); + client.end(); + }); + } +} \ No newline at end of file diff --git a/src/consumers/mqtt/types.ts b/src/consumers/mqtt/types.ts new file mode 100644 index 0000000..ac4f5e5 --- /dev/null +++ b/src/consumers/mqtt/types.ts @@ -0,0 +1,38 @@ +import mqtt from "mqtt"; +import { ConsumerConfig } from "../types"; + +export type MQTTConfig = ConsumerConfig & { + + /** + * The URL to broker. + * Example: mqtt://localhost:1882 + */ + brokerURL: string; + + /** + * Path to file containing a username of MQTT user. + */ + usernamePath: string; + + /** + * Path to file containing a password of MQTT user. + */ + passwordPath: string; + + /** + * Optional client ID used to connect to MQTT (visible in MQTT broker logs). + * Default: tauron-scrapper + */ + clientId?: string; + + /** + * Optional prefix under which the messages will be published. + * Default: tauron + */ + prefix?: string; + + /** + * Optional configuration of published messages (i.e. QoS, whether the message should be retained etc.). + */ + publishOptions?: mqtt.IClientPublishOptions; +} \ No newline at end of file diff --git a/src/consumers/types.ts b/src/consumers/types.ts index 1ab5528..52437d8 100644 --- a/src/consumers/types.ts +++ b/src/consumers/types.ts @@ -1,33 +1,7 @@ -import { Dayjs } from "dayjs"; -import { Measurement } from "../fetcher/types"; -import { Config } from "../config"; - export type ConsumerConfig = { + /** + * Whether given consumer should be invoked. + * Default: false + */ enable?: boolean; -} - -export abstract class Consumer { - public abstract readonly name: string; - protected abstract requiredFields: readonly (keyof C)[]; - protected abstract publish(config: C, date: Dayjs, measurement: Measurement): void; - - #validate(config: Partial): C { - for (const field of this.requiredFields) { - if (config[field] === undefined) { - throw new Error(`The '${String(field)}' configuration field of'${this.name}' consumer is required`) - } - } - - return config as C; - } - - consume(config: Config, date: Dayjs, measurement: Measurement): void { - const cfg = config.consumers[this.name] as Partial; - - if (cfg.enable !== true) { - return - } - - this.publish(this.#validate(cfg), date, measurement); - } -}; \ No newline at end of file +} \ No newline at end of file