import Papa from "papaparse"; import iconv from "iconv-lite"; import { createParser } from "@/parser"; import { Actual, ActualImportResult } from "@/backend"; import { Config } from "@/types/config"; import { Readable } from "stream"; import { Transaction } from "@/types/transaction"; export type ImportResult = { transactions: Transaction[]; result: ActualImportResult; skipped: string[][]; }; export async function importTransactions(stream: Readable, profile: string, server: string, config: Config, dryRun?: boolean): Promise { return new Promise((resolve, reject) => { const profileConfig = config.profiles[profile]; if (!profileConfig) { throw new Error(`Unknown profile: ${profile}`); } const serverConfig = config.servers[server]; if(!serverConfig) { throw new Error(`Unknown server: ${server}`); } const parser = createParser(profileConfig, serverConfig); const actualServer = new Actual(serverConfig, dryRun); const skipped: string[][] = []; const handleRow = async (data: string[]) => { const pushed = await parser.pushTransaction(data); if (!pushed) { skipped.push(data); } }; const handleClose = async () => { try { const transactions = await parser.reconcile(); const result = await actualServer.submit(transactions); resolve({ transactions, result, skipped }); } catch (e) { console.error(e); reject(e); } }; stream .pipe(iconv.decodeStream(profileConfig.encoding ?? "utf8")) .pipe(Papa.parse(Papa.NODE_STREAM_INPUT)) .on('data', handleRow) .on('close', handleClose); }); };