Improve consumers implementation

This commit is contained in:
2024-11-13 18:28:15 +01:00
parent 67830129a0
commit 252b920119
5 changed files with 113 additions and 31 deletions

30
src/consumers/abstract.ts Normal file
View File

@@ -0,0 +1,30 @@
import { Dayjs } from "dayjs";
import { Config } from "../config";
import { Measurement } from "../fetcher/types";
import { ConsumerConfig } from "./types";
export abstract class Consumer<C extends ConsumerConfig> {
public abstract readonly name: string;
protected abstract requiredFields: readonly (keyof C)[];
protected abstract publish(config: C, date: Dayjs, measurement: Measurement): void;
#validate(config: Partial<C>): C {
for (const field of this.requiredFields) {
if (config[field] === undefined) {
throw new Error(`The '${String(field)}' configuration field of'${this.name}' consumer is required`)
}
}
return config as C;
}
consume(config: Config, date: Dayjs, measurement: Measurement): void {
const cfg = config.consumers[this.name] as Partial<C>;
if (cfg.enable !== true) {
return
}
this.publish(this.#validate(cfg), date, measurement);
}
};

View File

@@ -3,9 +3,18 @@ import { Config } from "../config";
import { MQTTConsumer } from "./mqtt";
import { Measurement } from "../fetcher/types";
/**
* Registered consumers
*/
const consumers = [
new MQTTConsumer()
];
/**
* Runs all registered consumers for the measurement.
* @param config global configuration
* @param date date of measurement
* @param measurement measurement data
*/
export const consume = (config: Config, date: Dayjs, measurement: Measurement) =>
consumers.forEach(consumer => consumer.consume(config, date, measurement));

View File

@@ -0,0 +1,31 @@
import fs from 'fs';
import mqtt from "mqtt";
import { Measurement } from "../../fetcher/types";
import { Dayjs } from 'dayjs';
import { MQTTConfig } from './types';
import { Consumer } from '../abstract';
export * from './types';
export class MQTTConsumer extends Consumer<MQTTConfig> {
public name = "mqtt";
protected requiredFields = ['brokerURL', 'usernamePath', 'passwordPath'] as const;
protected publish({ brokerURL, usernamePath, passwordPath, clientId, prefix, publishOptions }: MQTTConfig, date: Dayjs, measurement: Measurement) {
const client = mqtt.connect(brokerURL, {
username: fs.readFileSync(usernamePath, 'utf8'),
password: fs.readFileSync(passwordPath, 'utf8'),
clientId: clientId || "tauron-scrapper"
});
const topicPrefix = prefix || 'tauron';
client.on('connect', () => {
client.publish(`${topicPrefix}/energy/daily`, measurement.energyForDay.sum.toString(), publishOptions);
client.publish(`${topicPrefix}/energy/monthly`, measurement.energyForMonth.sum.toString(), publishOptions);
client.publish(`${topicPrefix}/energy/yearly`, measurement.energyForYear.sum.toString(), publishOptions);
client.publish(`${topicPrefix}/reading`, measurement.reading.C.toString(), publishOptions);
client.end();
});
}
}

View File

@@ -0,0 +1,38 @@
import mqtt from "mqtt";
import { ConsumerConfig } from "../types";
export type MQTTConfig = ConsumerConfig & {
/**
* The URL to broker.
* Example: mqtt://localhost:1882
*/
brokerURL: string;
/**
* Path to file containing a username of MQTT user.
*/
usernamePath: string;
/**
* Path to file containing a password of MQTT user.
*/
passwordPath: string;
/**
* Optional client ID used to connect to MQTT (visible in MQTT broker logs).
* Default: tauron-scrapper
*/
clientId?: string;
/**
* Optional prefix under which the messages will be published.
* Default: tauron
*/
prefix?: string;
/**
* Optional configuration of published messages (i.e. QoS, whether the message should be retained etc.).
*/
publishOptions?: mqtt.IClientPublishOptions;
}

View File

@@ -1,33 +1,7 @@
import { Dayjs } from "dayjs";
import { Measurement } from "../fetcher/types";
import { Config } from "../config";
export type ConsumerConfig = {
/**
* Whether given consumer should be invoked.
* Default: false
*/
enable?: boolean;
}
export abstract class Consumer<C extends ConsumerConfig> {
public abstract readonly name: string;
protected abstract requiredFields: readonly (keyof C)[];
protected abstract publish(config: C, date: Dayjs, measurement: Measurement): void;
#validate(config: Partial<C>): C {
for (const field of this.requiredFields) {
if (config[field] === undefined) {
throw new Error(`The '${String(field)}' configuration field of'${this.name}' consumer is required`)
}
}
return config as C;
}
consume(config: Config, date: Dayjs, measurement: Measurement): void {
const cfg = config.consumers[this.name] as Partial<C>;
if (cfg.enable !== true) {
return
}
this.publish(this.#validate(cfg), date, measurement);
}
};
}