Files
nanobot-ts/src/cron/service.ts

208 lines
6.3 KiB
TypeScript

import { existsSync, readFileSync, statSync, writeFileSync } from 'node:fs';
import { join } from 'node:path';
import { CronExpressionParser } from 'cron-parser';
import { type CronJob, CronJobSchema, CronStoreSchema } from './types.ts';
export type OnJobCallback = (job: CronJob) => Promise<void>;
export class CronService {
private _filePath: string;
private _jobs: Map<string, CronJob> = new Map();
private _timers: Map<string, ReturnType<typeof setTimeout>> = new Map();
private _onJob: OnJobCallback;
private _lastMtime = 0;
constructor(workspacePath: string, onJob: OnJobCallback) {
this._filePath = join(workspacePath, 'cron', 'jobs.json');
this._onJob = onJob;
this._load();
}
// ---------------------------------------------------------------------------
// Persistence
// ---------------------------------------------------------------------------
private _load(): void {
if (!existsSync(this._filePath)) return;
try {
const raw = readFileSync(this._filePath, 'utf8');
const store = CronStoreSchema.parse(JSON.parse(raw));
this._jobs = new Map(store.jobs.map((j) => [j.id, j]));
this._lastMtime = statSync(this._filePath).mtimeMs;
} catch {
// start fresh on corrupt file
}
}
private _save(): void {
const store = CronStoreSchema.parse({ jobs: [...this._jobs.values()] });
const dir = this._filePath.replace(/\/[^/]+$/, '');
require('node:fs').mkdirSync(dir, { recursive: true });
writeFileSync(this._filePath, JSON.stringify(store, null, 2), 'utf8');
this._lastMtime = statSync(this._filePath).mtimeMs;
}
private _reloadIfChanged(): void {
if (!existsSync(this._filePath)) return;
const mtime = statSync(this._filePath).mtimeMs;
if (mtime !== this._lastMtime) this._load();
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
listJobs(): CronJob[] {
this._reloadIfChanged();
return [...this._jobs.values()];
}
addJob(job: Omit<CronJob, 'state' | 'createdAtMs' | 'updatedAtMs'>): CronJob {
const now = Date.now();
const full = CronJobSchema.parse({ ...job, state: {}, createdAtMs: now, updatedAtMs: now });
this._jobs.set(full.id, full);
this._save();
this._arm(full);
return full;
}
removeJob(id: string): boolean {
if (!this._jobs.has(id)) return false;
this._clearTimer(id);
this._jobs.delete(id);
this._save();
return true;
}
enableJob(id: string, enabled: boolean): boolean {
const job = this._jobs.get(id);
if (!job) return false;
this._jobs.set(id, { ...job, enabled, updatedAtMs: Date.now() });
this._save();
if (enabled) this._arm(this._jobs.get(id)!);
else this._clearTimer(id);
return true;
}
async runJob(id: string): Promise<string> {
const job = this._jobs.get(id);
if (!job) return `Error: job ${id} not found`;
await this._execute(job);
return `Job ${id} executed.`;
}
status(): string {
const jobs = this.listJobs();
if (jobs.length === 0) return 'No cron jobs configured.';
return jobs
.map((j) => {
const next = j.state.nextRunAtMs ? new Date(j.state.nextRunAtMs).toISOString() : 'N/A';
return `[${j.enabled ? 'ON' : 'OFF'}] ${j.id} "${j.name}" next=${next}`;
})
.join('\n');
}
/** Arm all loaded jobs. Call once after construction. */
start(): void {
for (const job of this._jobs.values()) {
if (job.enabled) this._arm(job);
}
}
stop(): void {
for (const id of this._timers.keys()) this._clearTimer(id);
}
// ---------------------------------------------------------------------------
// Scheduling internals
// ---------------------------------------------------------------------------
private _arm(job: CronJob): void {
this._clearTimer(job.id);
if (!job.enabled) return;
const delayMs = this._nextDelayMs(job);
if (delayMs === null) return;
const nextRunAtMs = Date.now() + delayMs;
const updated: CronJob = { ...job, state: { ...job.state, nextRunAtMs }, updatedAtMs: Date.now() };
this._jobs.set(job.id, updated);
this._save();
const timer = setTimeout(() => void this._tick(job.id), delayMs);
this._timers.set(job.id, timer);
}
private async _tick(id: string): Promise<void> {
this._timers.delete(id);
const job = this._jobs.get(id);
if (!job || !job.enabled) return;
await this._execute(job);
// Re-arm unless it was deleted or is a one-shot
const current = this._jobs.get(id);
if (current && !current.deleteAfterRun) this._arm(current);
}
private async _execute(job: CronJob): Promise<void> {
try {
await this._onJob(job);
const updated: CronJob = {
...job,
state: { ...job.state, lastRunAtMs: Date.now(), lastStatus: 'ok', lastError: null },
updatedAtMs: Date.now(),
};
this._jobs.set(job.id, updated);
if (job.deleteAfterRun) {
this._jobs.delete(job.id);
}
this._save();
} catch (err) {
const updated: CronJob = {
...job,
state: { ...job.state, lastRunAtMs: Date.now(), lastStatus: 'error', lastError: String(err) },
updatedAtMs: Date.now(),
};
this._jobs.set(job.id, updated);
this._save();
}
}
private _nextDelayMs(job: CronJob): number | null {
const { schedule } = job;
const now = Date.now();
if (schedule.kind === 'at') {
const delay = schedule.atMs - now;
return delay > 0 ? delay : null;
}
if (schedule.kind === 'every') {
const lastRun = job.state.lastRunAtMs ?? 0;
const elapsed = now - lastRun;
const delay = Math.max(0, schedule.everyMs - elapsed);
return delay;
}
if (schedule.kind === 'cron') {
try {
const interval = CronExpressionParser.parse(schedule.expr, { tz: schedule.tz });
const next = interval.next();
return next.getTime() - now;
} catch {
console.error(`[cron] Failed to parse expression for job ${job.id}: ${schedule.expr}`);
return null;
}
}
return null;
}
private _clearTimer(id: string): void {
const timer = this._timers.get(id);
if (timer !== undefined) {
clearTimeout(timer);
this._timers.delete(id);
}
}
}