diff --git a/docs/ai-workflow-flow.full-diagram.png b/docs/ai-workflow-flow.full-diagram.png new file mode 100755 index 0000000..f7ae183 Binary files /dev/null and b/docs/ai-workflow-flow.full-diagram.png differ diff --git a/docs/ai-workflow-flow.pgboss-diagram.png b/docs/ai-workflow-flow.pgboss-diagram.png new file mode 100755 index 0000000..599e92e Binary files /dev/null and b/docs/ai-workflow-flow.pgboss-diagram.png differ diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index ed6c889..d3cc02d 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -7,6 +7,10 @@ import { import * as PgBoss from 'pg-boss'; import { policies, Queue } from 'pg-boss'; +const PGBOSS_JOB_POLLING_INTERVAL_SEC = parseFloat( + process.env.PGBOSS_JOB_POLLING_INTERVAL_SEC || '10', +); + /** * QueueSchedulerService */ @@ -16,11 +20,6 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { private boss: PgBoss; private $start; - private jobsHandlersMap = new Map< - string, - (resolution?: 'fail' | 'complete', result?: any) => void - >(); - get isEnabled() { return String(process.env.DISPATCH_AI_REVIEW_WORKFLOWS) === 'true'; } @@ -115,26 +114,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { return; } - if (resolution === 'fail') { - // IMPORTANT! - // thes 4 operations will update the cache for the active singletons in the database - // and will allow the jobs queue to go next or retry - await this.boss.cancel(queueName, jobId); - await this.boss.getQueueStats(queueName); - await this.boss.supervise(queueName); - await this.boss.resume(queueName, jobId); - } - - if (this.jobsHandlersMap.has(jobId)) { - this.logger.log( - `Found job handler for ${jobId}. Calling with '${resolution}' resolution.`, - ); - this.jobsHandlersMap.get(jobId)?.call(null, resolution); - this.jobsHandlersMap.delete(jobId); - this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]); - } else { - await this.boss[resolution](queueName, jobId); - } + await this.boss[resolution](queueName, jobId); this.logger.log(`Job ${jobId} ${resolution} called.`); @@ -168,16 +148,79 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { await this.createQueue(queueName); } - return this.boss.work(queueName, handlerFn); + /** + * Continuously polls a job queue and processes jobs one at a time. + * + * @typeParam T - The type of the job payload/data. + * + * @remarks + * - Fetches a single job from the queue (batchSize = 1) via `this.boss.fetch(queueName, { batchSize: 1 })`. + * - If a job is returned, logs the job and invokes `handlerFn([job])`, awaiting its completion. + * - After each iteration (whether a job was found or not), schedules the next poll using + * `setTimeout(..., PGBOSS_JOB_POLLING_INTERVAL)` to avoid deep recursion and to yield to the event loop. + * - The scheduled, recursive invocation calls `poll().catch(() => {})` so that errors from those future + * invocations are swallowed and do not produce unhandled promise rejections. Note that errors thrown + * by the current invocation (for example from `handlerFn`) will propagate to the caller of this invocation + * unless the caller handles them. + * + * @returns A Promise that resolves when this single poll iteration completes. + */ + const poll = async () => { + try { + // guard: ensure boss still exists and service still enabled + if (!this.boss || !this.isEnabled) { + this.logger.warn( + `Polling for queue "${queueName}" stopped: boss not available or service disabled.`, + ); + return; + } + + const [job] = await this.boss.fetch(queueName, { batchSize: 1 }); + if (job) { + // avoid throwing from here to keep the loop alive + try { + this.logger.log( + `Starting job processing for job ${job.id} from queue "${queueName}"`, + ); + this.logger.debug( + `Job ${job.id} payload: ${JSON.stringify(job.data)}`, + ); + } catch { + // ignore stringify errors + } + + try { + await handlerFn([job]); + } catch (err) { + this.logger.error( + `Handler error while processing job ${job.id} from "${queueName}": ${ + (err && (err as Error).message) || err + }`, + err, + ); + // don't rethrow so the scheduled next poll still runs + } + } + } catch (err) { + this.logger.error( + `Error fetching job from queue "${queueName}": ${ + (err && (err as Error).message) || err + }`, + err, + ); + // swallow to avoid unhandled promise rejection; next poll still scheduled + } finally { + // schedule next poll (non-blocking). Any errors from the scheduled invocation are logged. + setTimeout(() => { + poll().catch((err) => + this.logger.error('Unhandled poll error', err), + ); + }, PGBOSS_JOB_POLLING_INTERVAL_SEC * 1000); + } + }; + + await poll(); }), ); } - - registerJobHandler( - jobId: string, - handler: (resolution?: string, result?: any) => void, - ) { - this.logger.log(`Registering job handler for job ${jobId}.`); - this.jobsHandlersMap.set(jobId, handler); - } } diff --git a/src/shared/modules/global/workflow-queue.handler.ts b/src/shared/modules/global/workflow-queue.handler.ts index 629a3e1..1c230bd 100644 --- a/src/shared/modules/global/workflow-queue.handler.ts +++ b/src/shared/modules/global/workflow-queue.handler.ts @@ -104,21 +104,6 @@ export class WorkflowQueueHandler implements OnModuleInit { }, }); - // return not-resolved promise, - // this will put a pause on the job - // until it is marked as completed via webhook call - await new Promise((resolve, reject) => { - this.scheduler.registerJobHandler( - job.id, - (resolution: string = 'complete', result: any) => { - this.logger.log( - `Job handler called with ${resolution} and ${result}`, - ); - (resolution === 'fail' ? reject : resolve)(result); - }, - ); - }); - this.logger.log(`Job ${job.id} promise finished.`); }