Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/ai-workflow-flow.full-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/ai-workflow-flow.pgboss-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
111 changes: 77 additions & 34 deletions src/shared/modules/global/queue-scheduler.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import {
import * as PgBoss from 'pg-boss';
import { policies, Queue } from 'pg-boss';

const PGBOSS_JOB_POLLING_INTERVAL_SEC = parseFloat(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[💡 performance]
Consider using Number instead of parseFloat for converting environment variables to numbers. Number is generally more performant and sufficient for this use case, as it handles both integer and floating-point conversions.

process.env.PGBOSS_JOB_POLLING_INTERVAL_SEC || '10',
);

/**
* QueueSchedulerService
*/
Expand All @@ -16,11 +20,6 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
private boss: PgBoss;
private $start;

private jobsHandlersMap = new Map<
string,
(resolution?: 'fail' | 'complete', result?: any) => void
>();

get isEnabled() {
return String(process.env.DISPATCH_AI_REVIEW_WORKFLOWS) === 'true';
}
Expand Down Expand Up @@ -115,26 +114,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
return;
}

if (resolution === 'fail') {
// IMPORTANT!
// thes 4 operations will update the cache for the active singletons in the database
// and will allow the jobs queue to go next or retry
await this.boss.cancel(queueName, jobId);
await this.boss.getQueueStats(queueName);
await this.boss.supervise(queueName);
await this.boss.resume(queueName, jobId);
}

if (this.jobsHandlersMap.has(jobId)) {
this.logger.log(
`Found job handler for ${jobId}. Calling with '${resolution}' resolution.`,
);
this.jobsHandlersMap.get(jobId)?.call(null, resolution);
this.jobsHandlersMap.delete(jobId);
this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]);
} else {
await this.boss[resolution](queueName, jobId);
}
await this.boss[resolution](queueName, jobId);

this.logger.log(`Job ${jobId} ${resolution} called.`);

Expand Down Expand Up @@ -168,16 +148,79 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
await this.createQueue(queueName);
}

return this.boss.work(queueName, handlerFn);
/**
* Continuously polls a job queue and processes jobs one at a time.
*
* @typeParam T - The type of the job payload/data.
*
* @remarks
* - Fetches a single job from the queue (batchSize = 1) via `this.boss.fetch<T>(queueName, { batchSize: 1 })`.
* - If a job is returned, logs the job and invokes `handlerFn([job])`, awaiting its completion.
* - After each iteration (whether a job was found or not), schedules the next poll using
* `setTimeout(..., PGBOSS_JOB_POLLING_INTERVAL)` to avoid deep recursion and to yield to the event loop.
* - The scheduled, recursive invocation calls `poll().catch(() => {})` so that errors from those future
* invocations are swallowed and do not produce unhandled promise rejections. Note that errors thrown
* by the current invocation (for example from `handlerFn`) will propagate to the caller of this invocation
* unless the caller handles them.
*
* @returns A Promise that resolves when this single poll iteration completes.
*/
const poll = async () => {
try {
// guard: ensure boss still exists and service still enabled
if (!this.boss || !this.isEnabled) {
this.logger.warn(
`Polling for queue "${queueName}" stopped: boss not available or service disabled.`,
);
return;
}

const [job] = await this.boss.fetch<T>(queueName, { batchSize: 1 });
if (job) {
// avoid throwing from here to keep the loop alive
try {
this.logger.log(
`Starting job processing for job ${job.id} from queue "${queueName}"`,
);
this.logger.debug(
`Job ${job.id} payload: ${JSON.stringify(job.data)}`,
);
} catch {
// ignore stringify errors
}

try {
await handlerFn([job]);
} catch (err) {
this.logger.error(
`Handler error while processing job ${job.id} from "${queueName}": ${
(err && (err as Error).message) || err
}`,
err,
);
// don't rethrow so the scheduled next poll still runs
}
}
} catch (err) {
this.logger.error(
`Error fetching job from queue "${queueName}": ${
(err && (err as Error).message) || err
}`,
err,
);
// swallow to avoid unhandled promise rejection; next poll still scheduled
} finally {
// schedule next poll (non-blocking). Any errors from the scheduled invocation are logged.
setTimeout(() => {
poll().catch((err) =>
this.logger.error('Unhandled poll error', err),
);
}, PGBOSS_JOB_POLLING_INTERVAL_SEC * 1000);
}
};

await poll();
}),
);
}

registerJobHandler(
jobId: string,
handler: (resolution?: string, result?: any) => void,
) {
this.logger.log(`Registering job handler for job ${jobId}.`);
this.jobsHandlersMap.set(jobId, handler);
}
}
15 changes: 0 additions & 15 deletions src/shared/modules/global/workflow-queue.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,6 @@ export class WorkflowQueueHandler implements OnModuleInit {
},
});

// return not-resolved promise,
// this will put a pause on the job
// until it is marked as completed via webhook call
await new Promise<void>((resolve, reject) => {
this.scheduler.registerJobHandler(
job.id,
(resolution: string = 'complete', result: any) => {
this.logger.log(
`Job handler called with ${resolution} and ${result}`,
);
(resolution === 'fail' ? reject : resolve)(result);
},
);
});

this.logger.log(`Job ${job.id} promise finished.`);
}

Expand Down
Loading