Skip to content

Commit 2a0c618

Browse files
committed
PM-3200 - switch away from using pgboss's workers
1 parent 723a847 commit 2a0c618

File tree

2 files changed

+78
-49
lines changed

2 files changed

+78
-49
lines changed

src/shared/modules/global/queue-scheduler.service.ts

Lines changed: 78 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import {
77
import * as PgBoss from 'pg-boss';
88
import { policies, Queue } from 'pg-boss';
99

10+
const PGBOSS_JOB_POLLING_INTERVAL = parseInt(
11+
process.env.PGBOSS_JOB_POLLING_INTERVAL || '1000',
12+
10,
13+
);
14+
1015
/**
1116
* QueueSchedulerService
1217
*/
@@ -16,11 +21,6 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
1621
private boss: PgBoss;
1722
private $start;
1823

19-
private jobsHandlersMap = new Map<
20-
string,
21-
(resolution?: 'fail' | 'complete', result?: any) => void
22-
>();
23-
2424
get isEnabled() {
2525
return String(process.env.DISPATCH_AI_REVIEW_WORKFLOWS) === 'true';
2626
}
@@ -115,26 +115,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
115115
return;
116116
}
117117

118-
if (resolution === 'fail') {
119-
// IMPORTANT!
120-
// thes 4 operations will update the cache for the active singletons in the database
121-
// and will allow the jobs queue to go next or retry
122-
await this.boss.cancel(queueName, jobId);
123-
await this.boss.getQueueStats(queueName);
124-
await this.boss.supervise(queueName);
125-
await this.boss.resume(queueName, jobId);
126-
}
127-
128-
if (this.jobsHandlersMap.has(jobId)) {
129-
this.logger.log(
130-
`Found job handler for ${jobId}. Calling with '${resolution}' resolution.`,
131-
);
132-
this.jobsHandlersMap.get(jobId)?.call(null, resolution);
133-
this.jobsHandlersMap.delete(jobId);
134-
this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]);
135-
} else {
136-
await this.boss[resolution](queueName, jobId);
137-
}
118+
await this.boss[resolution](queueName, jobId);
138119

139120
this.logger.log(`Job ${jobId} ${resolution} called.`);
140121

@@ -168,16 +149,79 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
168149
await this.createQueue(queueName);
169150
}
170151

171-
return this.boss.work(queueName, handlerFn);
152+
/**
153+
* Continuously polls a job queue and processes jobs one at a time.
154+
*
155+
* @typeParam T - The type of the job payload/data.
156+
*
157+
* @remarks
158+
* - Fetches a single job from the queue (batchSize = 1) via `this.boss.fetch<T>(queueName, { batchSize: 1 })`.
159+
* - If a job is returned, logs the job and invokes `handlerFn([job])`, awaiting its completion.
160+
* - After each iteration (whether a job was found or not), schedules the next poll using
161+
* `setTimeout(..., PGBOSS_JOB_POLLING_INTERVAL)` to avoid deep recursion and to yield to the event loop.
162+
* - The scheduled, recursive invocation calls `poll().catch(() => {})` so that errors from those future
163+
* invocations are swallowed and do not produce unhandled promise rejections. Note that errors thrown
164+
* by the current invocation (for example from `handlerFn`) will propagate to the caller of this invocation
165+
* unless the caller handles them.
166+
*
167+
* @returns A Promise that resolves when this single poll iteration completes.
168+
*/
169+
const poll = async () => {
170+
try {
171+
// guard: ensure boss still exists and service still enabled
172+
if (!this.boss || !this.isEnabled) {
173+
this.logger.warn(
174+
`Polling for queue "${queueName}" stopped: boss not available or service disabled.`,
175+
);
176+
return;
177+
}
178+
179+
const [job] = await this.boss.fetch<T>(queueName, { batchSize: 1 });
180+
if (job) {
181+
// avoid throwing from here to keep the loop alive
182+
try {
183+
this.logger.log(
184+
`Starting job processing for job ${job.id} from queue "${queueName}"`,
185+
);
186+
this.logger.debug(
187+
`Job ${job.id} payload: ${JSON.stringify(job.data)}`,
188+
);
189+
} catch {
190+
// ignore stringify errors
191+
}
192+
193+
try {
194+
await handlerFn([job]);
195+
} catch (err) {
196+
this.logger.error(
197+
`Handler error while processing job ${job.id} from "${queueName}": ${
198+
(err && (err as Error).message) || err
199+
}`,
200+
err,
201+
);
202+
// don't rethrow so the scheduled next poll still runs
203+
}
204+
}
205+
} catch (err) {
206+
this.logger.error(
207+
`Error fetching job from queue "${queueName}": ${
208+
(err && (err as Error).message) || err
209+
}`,
210+
err,
211+
);
212+
// swallow to avoid unhandled promise rejection; next poll still scheduled
213+
} finally {
214+
// schedule next poll (non-blocking). Any errors from the scheduled invocation are logged.
215+
setTimeout(() => {
216+
poll().catch((err) =>
217+
this.logger.error('Unhandled poll error', err),
218+
);
219+
}, PGBOSS_JOB_POLLING_INTERVAL);
220+
}
221+
};
222+
223+
await poll();
172224
}),
173225
);
174226
}
175-
176-
registerJobHandler(
177-
jobId: string,
178-
handler: (resolution?: string, result?: any) => void,
179-
) {
180-
this.logger.log(`Registering job handler for job ${jobId}.`);
181-
this.jobsHandlersMap.set(jobId, handler);
182-
}
183227
}

src/shared/modules/global/workflow-queue.handler.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,6 @@ export class WorkflowQueueHandler implements OnModuleInit {
104104
},
105105
});
106106

107-
// return not-resolved promise,
108-
// this will put a pause on the job
109-
// until it is marked as completed via webhook call
110-
await new Promise<void>((resolve, reject) => {
111-
this.scheduler.registerJobHandler(
112-
job.id,
113-
(resolution: string = 'complete', result: any) => {
114-
this.logger.log(
115-
`Job handler called with ${resolution} and ${result}`,
116-
);
117-
(resolution === 'fail' ? reject : resolve)(result);
118-
},
119-
);
120-
});
121-
122107
this.logger.log(`Job ${job.id} promise finished.`);
123108
}
124109

0 commit comments

Comments
 (0)