Files
actual-importer/src/runner/index.ts

108 lines
3.1 KiB
TypeScript

import Papa from "papaparse";
import iconv from "iconv-lite";
import { createParser } from "@/parser";
import { Actual, ActualImportResult, SubmitOptions } from "@/backend";
import { Config } from "@/types/config";
import { Readable } from "stream";
import { Transaction } from "@/types/transaction";
export type PrepareResult = {
transactions: Transaction[];
skipped: string[][];
};
export type ImportResult = PrepareResult & {
result: ActualImportResult;
};
export const getTransactions = async (server: string, config: Config, limit: number = 5): Promise<Transaction[]> => {
const serverConfig = config.servers[server];
if(!serverConfig) {
throw new Error(`Unknown server: ${server}`);
}
const actualServer = new Actual(serverConfig);
return await actualServer.getTransactions(limit);
};
export const getTransactionsFromRange = async (server: string, config: Config, start: string, end: string): Promise<Transaction[]> => {
const serverConfig = config.servers[server];
if(!serverConfig) {
throw new Error(`Unknown server: ${server}`);
}
const actualServer = new Actual(serverConfig);
return await actualServer.getTransactionsFromRange(start, end);
};
export const prepareTransactions = async (stream: Readable, profile: string, server: string, config: Config): Promise<PrepareResult> => new Promise((resolve, reject) => {
const profileConfig = config.profiles[profile];
if (!profileConfig) {
throw new Error(`Unknown profile: ${profile}`);
}
const serverConfig = config.servers[server];
if(!serverConfig) {
throw new Error(`Unknown server: ${server}`);
}
const parser = createParser(profileConfig, serverConfig);
const skipped: string[][] = [];
const handleRow = async (data: string[]) => {
const pushed = await parser.pushTransaction(data);
if (!pushed) {
skipped.push(data);
}
};
const handleClose = async () => {
try {
const transactions = await parser.reconcile();
resolve({
transactions,
skipped
});
} catch (e: unknown) {
console.error(e);
reject(e);
}
};
stream
.pipe(iconv.decodeStream(profileConfig.encoding ?? parser.csvEncoding ?? "utf8"))
.pipe(Papa.parse(Papa.NODE_STREAM_INPUT, profileConfig.csv ?? parser.csvConfig))
.on('data', handleRow)
.on('close', handleClose);
});
export const submitTransactions = async (transactions: Transaction[], server: string, config: Config, opts: SubmitOptions): Promise<ActualImportResult> => {
const serverConfig = config.servers[server];
if(!serverConfig) {
throw new Error(`Unknown server: ${server}`);
}
const actualServer = new Actual(serverConfig);
return await actualServer.load(transactions, opts);
};
export const loadTransactions = async (stream: Readable, profile: string, server: string, config: Config, opts: SubmitOptions): Promise<ImportResult> => {
const prepared = await prepareTransactions(stream, profile, server, config)
const result = await submitTransactions(prepared.transactions, server, config, opts);
return {
...prepared,
result
}
};