Add support for adding raw transactions

This commit is contained in:
2025-04-12 23:29:53 +02:00
parent 900145a4d0
commit 53e8474706
5 changed files with 122 additions and 52 deletions

View File

@@ -12,53 +12,56 @@ export type ImportResult = {
skipped: string[][];
};
export async function importTransactions(stream: Readable, profile: string, server: string, config: Config, dryRun?: boolean): Promise<ImportResult> {
return new Promise((resolve, reject) => {
const profileConfig = config.profiles[profile];
const submitTransactions = (load: (server: Actual, t: Transaction[]) => Promise<ActualImportResult>) => async (stream: Readable, profile: string, server: string, config: Config, dryRun?: boolean): Promise<ImportResult> => new Promise((resolve, reject) => {
const profileConfig = config.profiles[profile];
if (!profileConfig) {
throw new Error(`Unknown profile: ${profile}`);
}
if (!profileConfig) {
throw new Error(`Unknown profile: ${profile}`);
}
const serverConfig = config.servers[server];
if(!serverConfig) {
throw new Error(`Unknown server: ${server}`);
}
const serverConfig = config.servers[server];
if(!serverConfig) {
throw new Error(`Unknown server: ${server}`);
}
const parser = createParser(profileConfig, serverConfig);
const parser = createParser(profileConfig, serverConfig);
const actualServer = new Actual(serverConfig, dryRun);
const skipped: string[][] = [];
const actualServer = new Actual(serverConfig, dryRun);
const skipped: string[][] = [];
const handleRow = async (data: string[]) => {
const pushed = await parser.pushTransaction(data);
if (!pushed) {
skipped.push(data);
}
};
const handleClose = async () => {
try {
const transactions = await parser.reconcile();
const result = await actualServer.submit(transactions);
resolve({
transactions,
result,
skipped
});
} catch (e) {
console.error(e);
reject(e);
}
};
const handleRow = async (data: string[]) => {
const pushed = await parser.pushTransaction(data);
stream
.pipe(iconv.decodeStream(profileConfig.encoding ?? "utf8"))
.pipe(Papa.parse(Papa.NODE_STREAM_INPUT))
.on('data', handleRow)
.on('close', handleClose);
});
};
if (!pushed) {
skipped.push(data);
}
};
const handleClose = async () => {
try {
const transactions = await parser.reconcile();
const result = await load(actualServer, transactions);
resolve({
transactions,
result,
skipped
});
} catch (e) {
console.error(e);
reject(e);
}
};
stream
.pipe(iconv.decodeStream(profileConfig.encoding ?? "utf8"))
.pipe(Papa.parse(Papa.NODE_STREAM_INPUT))
.on('data', handleRow)
.on('close', handleClose);
});
export const importTransactions = submitTransactions((s, t) => s.import(t));
export const addTransactions = async (stream: Readable, profile: string, server: string, config: Config, dryRun?: boolean, learnCategories?: boolean, runTransfers?: boolean): Promise<ImportResult> =>
submitTransactions((s, t) => s.add(t, !!learnCategories, !!runTransfers))(stream, profile, server, config, dryRun);