Split preparation and submission of transaction in Actual backend
This commit is contained in:
@@ -7,6 +7,11 @@ import { utils } from "@actual-app/api";
|
||||
|
||||
export type ActualImportResult = Awaited<ReturnType<typeof api.importTransactions>>;
|
||||
|
||||
export type SubmitOptions =
|
||||
| { mode: 'add', learnCategories?: boolean, runTransfers?: boolean }
|
||||
| { mode: 'import' }
|
||||
| { mode: 'dry-run' };
|
||||
|
||||
type ActualTransaction = {
|
||||
id?: string;
|
||||
account?: string;
|
||||
@@ -40,11 +45,9 @@ type ActualPayee = {
|
||||
|
||||
export class Actual {
|
||||
#config: ServerConfig;
|
||||
#dryRun: boolean;
|
||||
|
||||
constructor(config: ServerConfig, dryRun: boolean = false) {
|
||||
constructor(config: ServerConfig) {
|
||||
this.#config = config;
|
||||
this.#dryRun = dryRun;
|
||||
}
|
||||
|
||||
#map(transaction: Transaction, accounts: ActualAccount[], payees: ActualPayee[]): ActualTransaction {
|
||||
@@ -94,21 +97,7 @@ export class Actual {
|
||||
return actualTransaction;
|
||||
}
|
||||
|
||||
async import(transactions: Transaction[]): Promise<ActualImportResult> {
|
||||
return this.#submit(transactions, t => api.importTransactions(t.account, [t]));
|
||||
}
|
||||
|
||||
async add(transactions: Transaction[], learnCategories: boolean, runTransfers: boolean): Promise<ActualImportResult> {
|
||||
return this.#submit(transactions, async t => {
|
||||
|
||||
const result = await api.addTransactions(t.account, [t], { learnCategories, runTransfers });
|
||||
return result === "ok"
|
||||
? { added: [t], updated: [], errors: [] }
|
||||
: { added: [], updated: [], errors: [{ message: `Cannot add transaction: ${t}` }] };
|
||||
});
|
||||
}
|
||||
|
||||
async #submit(transactions: Transaction[], add: (t: ActualTransaction) => Promise<ActualImportResult>): Promise<ActualImportResult> {
|
||||
async #api<T>(fn: () => Promise<T>): Promise<T> {
|
||||
try {
|
||||
await mkdir(this.#config.data, { recursive: true });
|
||||
} catch(e) {}
|
||||
@@ -120,33 +109,34 @@ export class Actual {
|
||||
});
|
||||
|
||||
await api.downloadBudget(this.#config.budget);
|
||||
|
||||
const accounts: ActualAccount[] = await api.getAccounts();
|
||||
const payees: ActualPayee[] = await api.getPayees();
|
||||
|
||||
const toImport = transactions.map(t => this.#map(t, accounts, payees))
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await api.shutdown();
|
||||
};
|
||||
}
|
||||
|
||||
async #printTransactions(transactions: ActualTransaction[]) {
|
||||
const data: { value: (t: ActualTransaction) => string|undefined, header: string, pad: number }[] = [
|
||||
{ value: t => t.payee !== undefined ? "Yes" : "No", header: "Known payee", pad: 12 },
|
||||
{ value: t => t.imported_payee, header: "Imported payee", pad: 70 },
|
||||
{ value: t => t.amount?.toString(), header: "Amount", pad: 10 },
|
||||
{ value: t => t.notes, header: "Notes", pad: 100 }
|
||||
];
|
||||
|
||||
const header = data.map(d => d.header.padEnd(d.pad)).join(" ");
|
||||
const rows = transactions.map(transaction => data.map(d => (d.value(transaction) ?? "").padEnd(d.pad)).join(" "));
|
||||
|
||||
console.log(header);
|
||||
console.log("-".repeat(Math.max(...rows.map(x => x.length)) ?? header.length));
|
||||
rows.forEach(x => console.log(x));
|
||||
}
|
||||
|
||||
async #submitTransactions(transactions: ActualTransaction[], add: (t: ActualTransaction) => Promise<ActualImportResult>): Promise<ActualImportResult> {
|
||||
const output: ActualImportResult = { added: [], updated: [], errors: [] };
|
||||
|
||||
|
||||
if (this.#dryRun) {
|
||||
const data: { value: (t: ActualTransaction) => string|undefined, header: string, pad: number }[] = [
|
||||
{ value: t => t.payee !== undefined ? "Yes" : "No", header: "Known payee", pad: 12 },
|
||||
{ value: t => t.imported_payee, header: "Imported payee", pad: 70 },
|
||||
{ value: t => t.amount?.toString(), header: "Amount", pad: 10 },
|
||||
{ value: t => t.notes, header: "Notes", pad: 100 }
|
||||
];
|
||||
|
||||
const header = data.map(d => d.header.padEnd(d.pad)).join(" ");
|
||||
const rows = toImport.map(transaction => data.map(d => (d.value(transaction) ?? "").padEnd(d.pad)).join(" "));
|
||||
|
||||
console.log(header);
|
||||
console.log("-".repeat(Math.max(...rows.map(x => x.length)) ?? header.length));
|
||||
rows.forEach(x => console.log(x));
|
||||
|
||||
}
|
||||
|
||||
else for (const transaction of toImport) {
|
||||
for (const transaction of transactions) {
|
||||
const result = await add(transaction);
|
||||
|
||||
output.added.push(...result.added);
|
||||
@@ -155,11 +145,47 @@ export class Actual {
|
||||
|
||||
await sleep(100);
|
||||
}
|
||||
|
||||
await api.shutdown();
|
||||
|
||||
|
||||
return output;
|
||||
}
|
||||
|
||||
async #doSubmit(transactions: ActualTransaction[], opts: SubmitOptions): Promise<ActualImportResult> {
|
||||
switch(opts.mode) {
|
||||
case 'add':
|
||||
return this.#submitTransactions(transactions, async t => {
|
||||
const result = await api.addTransactions(t.account, [t], opts);
|
||||
|
||||
return result === "ok"
|
||||
? { added: [t], updated: [], errors: [] }
|
||||
: { added: [], updated: [], errors: [{ message: `Cannot add transaction: ${t}` }] };
|
||||
});
|
||||
|
||||
case 'import':
|
||||
return this.#submitTransactions(transactions, t => api.importTransactions(t.account, [t]));
|
||||
}
|
||||
|
||||
this.#printTransactions(transactions);
|
||||
|
||||
return { added: [], updated: [], errors: [] };
|
||||
}
|
||||
|
||||
async prepare(transactions: Transaction[]): Promise<ActualTransaction[]> {
|
||||
const accounts: ActualAccount[] = await api.getAccounts();
|
||||
const payees: ActualPayee[] = await api.getPayees();
|
||||
|
||||
return transactions.map(t => this.#map(t, accounts, payees));
|
||||
}
|
||||
|
||||
async submit(transactions: ActualTransaction[], opts: SubmitOptions): Promise<ActualImportResult> {
|
||||
return this.#api(() => this.#doSubmit(transactions, opts));
|
||||
}
|
||||
|
||||
async load(transactions: Transaction[], opts: SubmitOptions): Promise<ActualImportResult> {
|
||||
return this.#api(async () => {
|
||||
const prepared = await this.prepare(transactions);
|
||||
return this.#doSubmit(prepared, opts);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function sleep(ms: number): Promise<void> {
|
||||
|
||||
132
src/cli/index.ts
132
src/cli/index.ts
@@ -2,81 +2,29 @@ import fs from "fs";
|
||||
import { program } from "commander";
|
||||
import { AddOptions, ImportOptions } from "@/types/cli";
|
||||
import { loadConfig } from "./config";
|
||||
import { addTransactions, importTransactions } from "@/runner";
|
||||
import { submitTransactions } from "@/runner";
|
||||
import { serve } from "@/server";
|
||||
import { SubmitOptions } from "@/backend";
|
||||
|
||||
export function run(...args: string[]) {
|
||||
program
|
||||
.name("actual-importer")
|
||||
.version("0.0.1")
|
||||
function doSubmit<O extends ImportOptions | AddOptions>(parseOpts: (o: O) => SubmitOptions) {
|
||||
return async (file: string, options: O) => {
|
||||
const config = parseConfig(options.config, options.set);
|
||||
|
||||
if (Object.keys(config?.profiles ?? {}).length === 0) {
|
||||
throw new Error(`No profiles defined in the ${options.config}`);
|
||||
}
|
||||
|
||||
if (Object.keys(config?.servers ?? {}).length === 0) {
|
||||
throw new Error(`No servers defined in the ${options.config}`);
|
||||
}
|
||||
|
||||
const profile = options.profile ?? config.defaultProfile ?? Object.keys(config.profiles)[0];
|
||||
const server = options.server ?? config.defaultServer ?? Object.keys(config.servers)[0];
|
||||
|
||||
program
|
||||
.command("add <csv-file>")
|
||||
.requiredOption("-c, --config <file>", "sets the path to the YAML file with configuration")
|
||||
.option("-d, --dry-run", "simulates the import by printing out what will be imported")
|
||||
.option("-p, --profile <name>", "sets the desired profile to invoke")
|
||||
.option("-s, --server <name>", "sets the desired server to upload transactions to")
|
||||
.option("-t, --transfers", "whether transfer transactions should be resolved")
|
||||
.option("-l, --learn", "whether new rules for category assignment should be created")
|
||||
.option("-x, --set <arg>", "overrides the config option for this specific run (arg: <key>=<name>, i.e. profiles.myprofile.parser=pl.ing", (v: string, prev: string[]) => prev.concat([v]), [])
|
||||
.action(doAdd)
|
||||
|
||||
program
|
||||
.command("import <csv-file>")
|
||||
.requiredOption("-c, --config <file>", "sets the path to the YAML file with configuration")
|
||||
.option("-d, --dry-run", "simulates the import by printing out what will be imported")
|
||||
.option("-p, --profile <name>", "sets the desired profile to invoke")
|
||||
.option("-s, --server <name>", "sets the desired server to upload transactions to")
|
||||
.option("-x, --set <arg>", "overrides the config option for this specific run (arg: <key>=<name>, i.e. profiles.myprofile.parser=pl.ing", (v: string, prev: string[]) => prev.concat([v]), [])
|
||||
.action(doImport)
|
||||
|
||||
program
|
||||
.command("serve <port>")
|
||||
.requiredOption("-c, --config <file>", "sets the path to the YAML file with configuration")
|
||||
.option("-x, --set <arg>", "overrides the config option for this specific run (arg: <key>=<name>, i.e. profiles.myprofile.parser=pl.ing", (v: string, prev: string[]) => prev.concat([v]), [])
|
||||
.action(doServe)
|
||||
|
||||
program.parse(args);
|
||||
}
|
||||
|
||||
async function doAdd(file: string, options: AddOptions) {
|
||||
const config = parseConfig(options.config, options.set);
|
||||
|
||||
if (Object.keys(config?.profiles ?? {}).length === 0) {
|
||||
throw new Error(`No profiles defined in the ${options.config}`);
|
||||
const opts: SubmitOptions = options.dryRun ? { mode: 'dry-run' } : parseOpts(options);
|
||||
|
||||
await submitTransactions(fs.createReadStream(file), profile, server, config, opts);
|
||||
}
|
||||
|
||||
if (Object.keys(config?.servers ?? {}).length === 0) {
|
||||
throw new Error(`No servers defined in the ${options.config}`);
|
||||
}
|
||||
|
||||
const profile = options.profile ?? config.defaultProfile ?? Object.keys(config.profiles)[0];
|
||||
const server = options.server ?? config.defaultServer ?? Object.keys(config.servers)[0];
|
||||
|
||||
const result = await addTransactions(fs.createReadStream(file), profile, server, config, options.dryRun, options.learn, options.transfers);
|
||||
}
|
||||
|
||||
async function doImport(file: string, options: ImportOptions) {
|
||||
const config = parseConfig(options.config, options.set);
|
||||
|
||||
if (Object.keys(config?.profiles ?? {}).length === 0) {
|
||||
throw new Error(`No profiles defined in the ${options.config}`);
|
||||
}
|
||||
|
||||
if (Object.keys(config?.servers ?? {}).length === 0) {
|
||||
throw new Error(`No servers defined in the ${options.config}`);
|
||||
}
|
||||
|
||||
const profile = options.profile ?? config.defaultProfile ?? Object.keys(config.profiles)[0];
|
||||
const server = options.server ?? config.defaultServer ?? Object.keys(config.servers)[0];
|
||||
|
||||
const result = await importTransactions(fs.createReadStream(file), profile, server, config, options.dryRun);
|
||||
}
|
||||
|
||||
function doServe(port: string, options: any) {
|
||||
const config = parseConfig(options.config, options.set);
|
||||
|
||||
serve(config, Number.parseInt(port));
|
||||
}
|
||||
|
||||
function parseConfig(path: string, set: string[]) {
|
||||
@@ -103,3 +51,45 @@ function parseConfig(path: string, set: string[]) {
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
export function run(...args: string[]) {
|
||||
program
|
||||
.name("actual-importer")
|
||||
.version("0.0.1")
|
||||
|
||||
program
|
||||
.command("add <csv-file>")
|
||||
.requiredOption("-c, --config <file>", "sets the path to the YAML file with configuration")
|
||||
.option("-d, --dry-run", "simulates the import by printing out what will be imported")
|
||||
.option("-p, --profile <name>", "sets the desired profile to invoke")
|
||||
.option("-s, --server <name>", "sets the desired server to upload transactions to")
|
||||
.option("-t, --transfers", "whether transfer transactions should be resolved")
|
||||
.option("-l, --learn", "whether new rules for category assignment should be created")
|
||||
.option("-x, --set <arg>", "overrides the config option for this specific run (arg: <key>=<name>, i.e. profiles.myprofile.parser=pl.ing", (v: string, prev: string[]) => prev.concat([v]), [])
|
||||
.action(doSubmit<AddOptions>(o => ({
|
||||
mode: 'add',
|
||||
learnCategories: o.learn,
|
||||
runTransfers: o.transfers
|
||||
})));
|
||||
|
||||
program
|
||||
.command("import <csv-file>")
|
||||
.requiredOption("-c, --config <file>", "sets the path to the YAML file with configuration")
|
||||
.option("-d, --dry-run", "simulates the import by printing out what will be imported")
|
||||
.option("-p, --profile <name>", "sets the desired profile to invoke")
|
||||
.option("-s, --server <name>", "sets the desired server to upload transactions to")
|
||||
.option("-x, --set <arg>", "overrides the config option for this specific run (arg: <key>=<name>, i.e. profiles.myprofile.parser=pl.ing", (v: string, prev: string[]) => prev.concat([v]), [])
|
||||
.action(doSubmit<ImportOptions>(o => ({ mode: 'import' })))
|
||||
|
||||
program
|
||||
.command("serve <port>")
|
||||
.requiredOption("-c, --config <file>", "sets the path to the YAML file with configuration")
|
||||
.option("-x, --set <arg>", "overrides the config option for this specific run (arg: <key>=<name>, i.e. profiles.myprofile.parser=pl.ing", (v: string, prev: string[]) => prev.concat([v]), [])
|
||||
.action((port: string, options: any) => {
|
||||
const config = parseConfig(options.config, options.set);
|
||||
|
||||
serve(config, Number.parseInt(port));
|
||||
});
|
||||
|
||||
program.parse(args);
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
import Papa from "papaparse";
|
||||
import iconv from "iconv-lite";
|
||||
import { createParser } from "@/parser";
|
||||
import { Actual, ActualImportResult } from "@/backend";
|
||||
import { Actual, ActualImportResult, SubmitOptions } from "@/backend";
|
||||
import { Config } from "@/types/config";
|
||||
import { Readable } from "stream";
|
||||
import { Transaction } from "@/types/transaction";
|
||||
@@ -12,7 +12,7 @@ export type ImportResult = {
|
||||
skipped: string[][];
|
||||
};
|
||||
|
||||
const submitTransactions = (load: (server: Actual, t: Transaction[]) => Promise<ActualImportResult>) => async (stream: Readable, profile: string, server: string, config: Config, dryRun?: boolean): Promise<ImportResult> => new Promise((resolve, reject) => {
|
||||
export const submitTransactions = async (stream: Readable, profile: string, server: string, config: Config, opts: SubmitOptions): Promise<ImportResult> => new Promise((resolve, reject) => {
|
||||
const profileConfig = config.profiles[profile];
|
||||
|
||||
if (!profileConfig) {
|
||||
@@ -26,7 +26,7 @@ const submitTransactions = (load: (server: Actual, t: Transaction[]) => Promise<
|
||||
|
||||
const parser = createParser(profileConfig, serverConfig);
|
||||
|
||||
const actualServer = new Actual(serverConfig, dryRun);
|
||||
const actualServer = new Actual(serverConfig);
|
||||
const skipped: string[][] = [];
|
||||
|
||||
const handleRow = async (data: string[]) => {
|
||||
@@ -40,7 +40,7 @@ const submitTransactions = (load: (server: Actual, t: Transaction[]) => Promise<
|
||||
const handleClose = async () => {
|
||||
try {
|
||||
const transactions = await parser.reconcile();
|
||||
const result = await load(actualServer, transactions);
|
||||
const result = await actualServer.load(transactions, opts);
|
||||
|
||||
resolve({
|
||||
transactions,
|
||||
@@ -59,9 +59,4 @@ const submitTransactions = (load: (server: Actual, t: Transaction[]) => Promise<
|
||||
.pipe(Papa.parse(Papa.NODE_STREAM_INPUT, profileConfig.csv ?? parser.csvConfig))
|
||||
.on('data', handleRow)
|
||||
.on('close', handleClose);
|
||||
});
|
||||
|
||||
export const importTransactions = submitTransactions((s, t) => s.import(t));
|
||||
|
||||
export const addTransactions = async (stream: Readable, profile: string, server: string, config: Config, dryRun?: boolean, learnCategories?: boolean, runTransfers?: boolean): Promise<ImportResult> =>
|
||||
submitTransactions((s, t) => s.add(t, !!learnCategories, !!runTransfers))(stream, profile, server, config, dryRun);
|
||||
});
|
||||
@@ -1,4 +1,5 @@
|
||||
import { addTransactions, ImportResult, importTransactions } from "@/runner";
|
||||
import { SubmitOptions } from "@/backend";
|
||||
import { ImportResult, submitTransactions } from "@/runner";
|
||||
import { Config } from "@/types/config";
|
||||
import express from "express";
|
||||
import multer from "multer";
|
||||
@@ -65,9 +66,14 @@ export function serve(config: Config, port: number) {
|
||||
stream.push(req.file.buffer);
|
||||
stream.push(null);
|
||||
|
||||
const result = mode === "add"
|
||||
? await addTransactions(stream, profile, server, config, false, readCheckbox(learn), readCheckbox(transfers))
|
||||
: await importTransactions(stream, profile, server, config);
|
||||
const opts: SubmitOptions = {
|
||||
mode: mode as 'import'|'add',
|
||||
learnCategories: readCheckbox(learn),
|
||||
runTransfers: readCheckbox(transfers)
|
||||
};
|
||||
|
||||
const result = await submitTransactions(stream, profile, server, config, opts);
|
||||
|
||||
res.send(formatResult(result));
|
||||
});
|
||||
|
||||
|
||||
@@ -66,5 +66,5 @@ export function parseAmount(input?: string): number|undefined {
|
||||
}
|
||||
|
||||
export function deduplicateSpaces(input: string): string {
|
||||
return input.replaceAll(/\s+/, " ");
|
||||
return input.replaceAll(/\s+/g, " ");
|
||||
}
|
||||
Reference in New Issue
Block a user