Implement scaffolding for InfluxDB consumer

This commit is contained in:
2024-11-13 22:45:09 +01:00
parent eeb457ad48
commit f31593148c
4 changed files with 87 additions and 11 deletions

View File

@@ -2,12 +2,14 @@ import { Dayjs } from "dayjs";
import { Config } from "../config"; import { Config } from "../config";
import { MQTTConsumer } from "./mqtt"; import { MQTTConsumer } from "./mqtt";
import { Measurement } from "../fetcher/types"; import { Measurement } from "../fetcher/types";
import { InfluxDBConsumer } from "./influxdb";
/** /**
* Registered consumers * Registered consumers
*/ */
const consumers = [ const consumers = [
new MQTTConsumer() new MQTTConsumer(),
new InfluxDBConsumer()
]; ];
/** /**
@@ -16,5 +18,10 @@ const consumers = [
* @param date date of measurement * @param date date of measurement
* @param measurement measurement data * @param measurement measurement data
*/ */
export const consume = (config: Config, date: Dayjs, measurement: Measurement) => export const consume = (config: Config, date: Dayjs, measurement: Measurement) => consumers.forEach(consumer => {
consumers.forEach(consumer => consumer.consume(config, date, measurement)); try {
consumer.consume(config, date, measurement);
} catch (exception) {
console.error(`Exception during '${consumer.name}' consumer evaluation: ${exception}`);
}
});

View File

@@ -0,0 +1,58 @@
import { InfluxDB, Point } from "@influxdata/influxdb-client";
import { Consumer } from "../abstract";
import { InfluxDBConfig } from "./types";
import { Dayjs } from "dayjs";
import { Measurement } from "../../fetcher/types";
export class InfluxDBConsumer extends Consumer<InfluxDBConfig> {
public name = 'influxdb';
protected requiredFields = [
'databaseURL',
'apiToken',
'organization',
'bucket',
] as const;
protected async publish({ databaseURL, apiToken, organization, bucket }: InfluxDBConfig, date: Dayjs, measurement: Measurement) {
const db = new InfluxDB({
url: databaseURL,
token: apiToken,
});
const api = db.getWriteApi(organization, bucket);
const hourlyEnergy = measurement.energyForDay.values.map((value, index) => new Point('energy')
.timestamp(date.clone().set('hour', index+1).toDate())
.tag('profile', 'hourly')
.floatField('value', value)
);
const dailyEnergy = new Point('energy')
.timestamp(date.toDate())
.tag('profile', 'daily')
.floatField('value', measurement.energyForDay.sum);
const monthlyEnergy = new Point('energy')
.timestamp(date.toDate())
.tag('profile', 'monthly')
.floatField('value', measurement.energyForMonth.sum);
const annualyEnergy = new Point('energy')
.timestamp(date.toDate())
.tag('profile', 'annualy')
.floatField('value', measurement.energyForYear.sum);
const reading = new Point('reading')
.timestamp(date.clone().set('hour', 23).set('minute', 59).set('second', 59).toDate())
.floatField('value', measurement.reading.C);
hourlyEnergy.forEach(api.writePoint);
api.writePoint(dailyEnergy);
api.writePoint(monthlyEnergy);
api.writePoint(annualyEnergy);
api.writePoint(reading);
api.close();
}
}

View File

@@ -0,0 +1,8 @@
import { ConsumerConfig } from "../types";
export type InfluxDBConfig = ConsumerConfig & {
databaseURL: string;
apiToken: string;
organization: string;
bucket: string;
};

View File

@@ -14,18 +14,21 @@ export class Fetcher {
} }
/** /**
* Collects the measurements for a specific date. * Collects the measurements for a specific date and runs the consumers on the masurement data.
* @param date - the measurement date (note, that 'todays' date may not be available at the time being when request is made) * @param date - the measurement date (note, that 'todays' date may not be available at the time being when request is made)
* @returns the measurement data * @returns the measurement data
*/ */
async fetch(date: Dayjs): Promise<Measurement> { async fetch(date: Dayjs): Promise<Measurement> {
const monthEnd = date.endOf('month'); const normalizedDate = date.startOf('day');
const endDay = monthEnd.isBefore(dayjs(), 'day') ? monthEnd : date; console.log(`Fetching measurements for: ${normalizedDate}`);
const energyForDay = await this.#service.getEnergyForDay(date); const monthEnd = normalizedDate.endOf('month');
const energyForMonth = await this.#service.getEnergyForRange(date.startOf('month'), endDay); const endDay = monthEnd.isBefore(dayjs(), 'day') ? monthEnd : normalizedDate;
const energyForYear = await this.#service.getEnergyForYear(date.format('YYYY'));
const reading = await this.#service.getReadingForRange(date, date); const energyForDay = await this.#service.getEnergyForDay(normalizedDate);
const energyForMonth = await this.#service.getEnergyForRange(normalizedDate.startOf('month'), endDay);
const energyForYear = await this.#service.getEnergyForYear(normalizedDate.format('YYYY'));
const reading = await this.#service.getReadingForRange(normalizedDate, normalizedDate);
const measurement: Measurement = { const measurement: Measurement = {
energyForDay, energyForDay,
@@ -34,7 +37,7 @@ export class Fetcher {
reading: reading?.[0], reading: reading?.[0],
}; };
consume(this.#config, date, measurement); consume(this.#config, normalizedDate, measurement);
return measurement; return measurement;
} }