From f31593148cfa1009e5e2a25bc6a60f7ce37abe24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Pluta?= Date: Wed, 13 Nov 2024 22:45:09 +0100 Subject: [PATCH] Implement scaffolding for InfluxDB consumer --- src/consumers/index.ts | 13 ++++++-- src/consumers/influxdb/index.ts | 58 +++++++++++++++++++++++++++++++++ src/consumers/influxdb/types.ts | 8 +++++ src/fetcher/index.ts | 19 ++++++----- 4 files changed, 87 insertions(+), 11 deletions(-) create mode 100644 src/consumers/influxdb/index.ts create mode 100644 src/consumers/influxdb/types.ts diff --git a/src/consumers/index.ts b/src/consumers/index.ts index b5531ac..eeb77ad 100644 --- a/src/consumers/index.ts +++ b/src/consumers/index.ts @@ -2,12 +2,14 @@ import { Dayjs } from "dayjs"; import { Config } from "../config"; import { MQTTConsumer } from "./mqtt"; import { Measurement } from "../fetcher/types"; +import { InfluxDBConsumer } from "./influxdb"; /** * Registered consumers */ const consumers = [ - new MQTTConsumer() + new MQTTConsumer(), + new InfluxDBConsumer() ]; /** @@ -16,5 +18,10 @@ const consumers = [ * @param date date of measurement * @param measurement measurement data */ -export const consume = (config: Config, date: Dayjs, measurement: Measurement) => - consumers.forEach(consumer => consumer.consume(config, date, measurement)); \ No newline at end of file +export const consume = (config: Config, date: Dayjs, measurement: Measurement) => consumers.forEach(consumer => { + try { + consumer.consume(config, date, measurement); + } catch (exception) { + console.error(`Exception during '${consumer.name}' consumer evaluation: ${exception}`); + } +}); \ No newline at end of file diff --git a/src/consumers/influxdb/index.ts b/src/consumers/influxdb/index.ts new file mode 100644 index 0000000..556006a --- /dev/null +++ b/src/consumers/influxdb/index.ts @@ -0,0 +1,58 @@ +import { InfluxDB, Point } from "@influxdata/influxdb-client"; +import { Consumer } from "../abstract"; +import { InfluxDBConfig } from "./types"; +import { Dayjs } from "dayjs"; +import { Measurement } from "../../fetcher/types"; + +export class InfluxDBConsumer extends Consumer { + public name = 'influxdb'; + + protected requiredFields = [ + 'databaseURL', + 'apiToken', + 'organization', + 'bucket', + ] as const; + + protected async publish({ databaseURL, apiToken, organization, bucket }: InfluxDBConfig, date: Dayjs, measurement: Measurement) { + const db = new InfluxDB({ + url: databaseURL, + token: apiToken, + }); + + const api = db.getWriteApi(organization, bucket); + + const hourlyEnergy = measurement.energyForDay.values.map((value, index) => new Point('energy') + .timestamp(date.clone().set('hour', index+1).toDate()) + .tag('profile', 'hourly') + .floatField('value', value) + ); + + const dailyEnergy = new Point('energy') + .timestamp(date.toDate()) + .tag('profile', 'daily') + .floatField('value', measurement.energyForDay.sum); + + const monthlyEnergy = new Point('energy') + .timestamp(date.toDate()) + .tag('profile', 'monthly') + .floatField('value', measurement.energyForMonth.sum); + + const annualyEnergy = new Point('energy') + .timestamp(date.toDate()) + .tag('profile', 'annualy') + .floatField('value', measurement.energyForYear.sum); + + const reading = new Point('reading') + .timestamp(date.clone().set('hour', 23).set('minute', 59).set('second', 59).toDate()) + .floatField('value', measurement.reading.C); + + hourlyEnergy.forEach(api.writePoint); + api.writePoint(dailyEnergy); + api.writePoint(monthlyEnergy); + api.writePoint(annualyEnergy); + api.writePoint(reading); + + api.close(); + } +} \ No newline at end of file diff --git a/src/consumers/influxdb/types.ts b/src/consumers/influxdb/types.ts new file mode 100644 index 0000000..c1aa55d --- /dev/null +++ b/src/consumers/influxdb/types.ts @@ -0,0 +1,8 @@ +import { ConsumerConfig } from "../types"; + +export type InfluxDBConfig = ConsumerConfig & { + databaseURL: string; + apiToken: string; + organization: string; + bucket: string; +}; \ No newline at end of file diff --git a/src/fetcher/index.ts b/src/fetcher/index.ts index 181ade7..1fef882 100644 --- a/src/fetcher/index.ts +++ b/src/fetcher/index.ts @@ -14,18 +14,21 @@ export class Fetcher { } /** - * Collects the measurements for a specific date. + * Collects the measurements for a specific date and runs the consumers on the masurement data. * @param date - the measurement date (note, that 'todays' date may not be available at the time being when request is made) * @returns the measurement data */ async fetch(date: Dayjs): Promise { - const monthEnd = date.endOf('month'); - const endDay = monthEnd.isBefore(dayjs(), 'day') ? monthEnd : date; + const normalizedDate = date.startOf('day'); + console.log(`Fetching measurements for: ${normalizedDate}`); - const energyForDay = await this.#service.getEnergyForDay(date); - const energyForMonth = await this.#service.getEnergyForRange(date.startOf('month'), endDay); - const energyForYear = await this.#service.getEnergyForYear(date.format('YYYY')); - const reading = await this.#service.getReadingForRange(date, date); + const monthEnd = normalizedDate.endOf('month'); + const endDay = monthEnd.isBefore(dayjs(), 'day') ? monthEnd : normalizedDate; + + const energyForDay = await this.#service.getEnergyForDay(normalizedDate); + const energyForMonth = await this.#service.getEnergyForRange(normalizedDate.startOf('month'), endDay); + const energyForYear = await this.#service.getEnergyForYear(normalizedDate.format('YYYY')); + const reading = await this.#service.getReadingForRange(normalizedDate, normalizedDate); const measurement: Measurement = { energyForDay, @@ -34,7 +37,7 @@ export class Fetcher { reading: reading?.[0], }; - consume(this.#config, date, measurement); + consume(this.#config, normalizedDate, measurement); return measurement; }