From 1c93ddf7782e07593aaaec17cf147ca56622bd89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antoine=20de=20Chevign=C3=A9?= Date: Fri, 9 Jan 2026 20:26:49 +0100 Subject: [PATCH 1/2] Add real-time RPC failure reporting for explorer auto-deactivation When blockSync/receiptSync jobs encounter RPC errors, they now immediately report failures to the explorer model. After 3 consecutive failures, the explorer's sync is automatically disabled. Changes: - Add POST /api/explorers/:slug/syncFailure endpoint for PM2 server - Modify blockSync job to report RPC failures directly via model - Modify receiptSync job to report RPC failures directly via model - Add tests for new endpoint (6 test cases) Rate-limited requests are excluded from failure counting since they are expected behavior, not actual RPC failures. Co-Authored-By: Claude Opus 4.5 --- run/api/explorers.js | 42 +++++++++++++++++ run/jobs/blockSync.js | 23 +++++++++ run/jobs/receiptSync.js | 23 +++++++++ run/tests/api/explorers.test.js | 84 +++++++++++++++++++++++++++++++++ 4 files changed, 172 insertions(+) diff --git a/run/api/explorers.js b/run/api/explorers.js index 8bd9f050..97c6940a 100644 --- a/run/api/explorers.js +++ b/run/api/explorers.js @@ -481,6 +481,48 @@ router.post('/syncExplorers', secretMiddleware, async (req, res, next) => { } }); +/** + * Report a sync failure for an explorer + * Used by PM2 server and sync jobs to report RPC failures + * @route POST /api/explorers/:slug/syncFailure + * @param {string} slug - Explorer slug + * @param {string} reason - Failure reason (e.g., 'rpc_error', 'rpc_unreachable') + * @param {string} source - Source of the failure (e.g., 'pm2', 'blockSync', 'receiptSync') + * @returns {object} - Result with disabled status, attempts count, and message + */ +router.post('/:slug/syncFailure', secretMiddleware, async (req, res, next) => { + try { + const { slug } = req.params; + const { reason, source } = req.body; + + const explorer = await Explorer.findOne({ where: { slug } }); + if (!explorer) { + return res.status(404).json({ error: 'Explorer not found' }); + } + + const result = await explorer.incrementSyncFailures(reason || 'rpc_error'); + + logger.info({ + message: 'Sync failure reported', + explorerSlug: slug, + reason: reason || 'rpc_error', + source: source || 'unknown', + attempts: result.attempts, + disabled: result.disabled + }); + + res.status(200).json({ + disabled: result.disabled, + attempts: result.attempts, + message: result.disabled + ? `Sync auto-disabled after ${result.attempts} failures` + : `Failure recorded (attempt ${result.attempts}/3)` + }); + } catch(error) { + unmanagedError(error, req, next); + } +}); + router.put('/:id/stopSync', [authMiddleware], async (req, res, next) => { try { if (!req.params.id) diff --git a/run/jobs/blockSync.js b/run/jobs/blockSync.js index abc29596..15306cf2 100644 --- a/run/jobs/blockSync.js +++ b/run/jobs/blockSync.js @@ -91,6 +91,29 @@ module.exports = async job => { block = await providerConnector.fetchRawBlockWithTransactions(data.blockNumber); } catch(error) { const priority = job.opts.priority || (data.source == 'cli-light' ? 1 : 10); + + // Report RPC failure to explorer (except for rate limiting which is expected) + if (error.message != 'Rate limited' && workspace.explorer && workspace.explorer.shouldSync) { + try { + const result = await workspace.explorer.incrementSyncFailures('rpc_error'); + if (result.disabled) { + logger.info({ + message: 'Explorer auto-disabled due to RPC failures in blockSync', + explorerId: workspace.explorer.id, + workspaceId: workspace.id, + attempts: result.attempts + }); + return 'Sync disabled due to repeated RPC failures'; + } + } catch (reportError) { + logger.warn({ + message: 'Failed to report sync failure', + error: reportError.message, + workspaceId: workspace.id + }); + } + } + if (error.message == 'Rate limited') { return enqueue('blockSync', `blockSync-${workspace.id}-${data.blockNumber}-${Date.now()}`, { userId: workspace.user.firebaseUserId, diff --git a/run/jobs/receiptSync.js b/run/jobs/receiptSync.js index ecaa9ea3..7c88d45c 100644 --- a/run/jobs/receiptSync.js +++ b/run/jobs/receiptSync.js @@ -119,6 +119,29 @@ module.exports = async job => { receipt = await providerConnector.fetchTransactionReceipt(transaction.hash); } catch(error) { const priority = job.opts.priority || (data.source == 'cli-light' ? 1 : 10); + + // Report RPC failure to explorer (except for rate limiting which is expected) + if (error.message != 'Rate limited' && workspace.explorer && workspace.explorer.shouldSync) { + try { + const result = await workspace.explorer.incrementSyncFailures('rpc_error'); + if (result.disabled) { + logger.info({ + message: 'Explorer auto-disabled due to RPC failures in receiptSync', + explorerId: workspace.explorer.id, + workspaceId: workspace.id, + attempts: result.attempts + }); + return 'Sync disabled due to repeated RPC failures'; + } + } catch (reportError) { + logger.warn({ + message: 'Failed to report sync failure', + error: reportError.message, + workspaceId: workspace.id + }); + } + } + if (error.message == 'Rate limited') { return enqueue('receiptSync', `receiptSync-${workspace.id}-${transaction.hash}-${Date.now()}`, { transactionId: transaction.id, diff --git a/run/tests/api/explorers.test.js b/run/tests/api/explorers.test.js index 967f2b63..6b01ef63 100644 --- a/run/tests/api/explorers.test.js +++ b/run/tests/api/explorers.test.js @@ -412,6 +412,90 @@ describe(`POST ${BASE_URL}/syncExplorers`, () => { }); }); +describe(`POST ${BASE_URL}/:slug/syncFailure`, () => { + it('Should return 401 if no secret provided', (done) => { + request.post(`${BASE_URL}/test-explorer/syncFailure`) + .send({ reason: 'rpc_error', source: 'blockSync' }) + .expect(401) + .then(() => done()); + }); + + it('Should return 401 if invalid secret provided', (done) => { + request.post(`${BASE_URL}/test-explorer/syncFailure?secret=invalid`) + .send({ reason: 'rpc_error', source: 'blockSync' }) + .expect(401) + .then(() => done()); + }); + + it('Should return 404 if explorer not found', (done) => { + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce(null); + request.post(`${BASE_URL}/nonexistent/syncFailure?secret=secret`) + .send({ reason: 'rpc_error', source: 'blockSync' }) + .expect(404) + .then(({ body }) => { + expect(body.error).toEqual('Explorer not found'); + done(); + }); + }); + + it('Should increment failure counter and return result', (done) => { + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', + incrementSyncFailures: mockIncrementSyncFailures + }); + + request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`) + .send({ reason: 'rpc_error', source: 'blockSync' }) + .expect(200) + .then(({ body }) => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_error'); + expect(body.disabled).toBe(false); + expect(body.attempts).toBe(1); + expect(body.message).toContain('Failure recorded'); + done(); + }); + }); + + it('Should auto-disable after threshold and return disabled status', (done) => { + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: true, attempts: 3 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', + incrementSyncFailures: mockIncrementSyncFailures + }); + + request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`) + .send({ reason: 'rpc_unreachable', source: 'pm2' }) + .expect(200) + .then(({ body }) => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_unreachable'); + expect(body.disabled).toBe(true); + expect(body.attempts).toBe(3); + expect(body.message).toContain('auto-disabled'); + done(); + }); + }); + + it('Should use default reason if not provided', (done) => { + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', + incrementSyncFailures: mockIncrementSyncFailures + }); + + request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`) + .send({}) + .expect(200) + .then(() => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_error'); + done(); + }); + }); +}); + describe(`PUT ${BASE_URL}/:id/stopSync`, () => { it('Should return an error if cannot find explorer', (done) => { jest.spyOn(db, 'getExplorerById').mockResolvedValueOnce(null); From a3cd6098ed9ee16920ffa038490d9026468850f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antoine=20de=20Chevign=C3=A9?= Date: Fri, 9 Jan 2026 20:30:56 +0100 Subject: [PATCH 2/2] Refactor RPC failure reporting with shared helper - Extract duplicate failure reporting logic to run/lib/syncHelpers.js - Exclude timeout errors from failure counting (only actual RPC failures) - Use SYNC_FAILURE_THRESHOLD constant instead of hardcoded value - Add comprehensive unit tests for syncHelpers (7 test cases) Co-Authored-By: Claude Opus 4.5 --- run/api/explorers.js | 3 +- run/jobs/blockSync.js | 25 ++----- run/jobs/receiptSync.js | 25 ++----- run/lib/syncHelpers.js | 61 ++++++++++++++++ run/tests/lib/syncHelpers.test.js | 112 ++++++++++++++++++++++++++++++ 5 files changed, 185 insertions(+), 41 deletions(-) create mode 100644 run/lib/syncHelpers.js create mode 100644 run/tests/lib/syncHelpers.test.js diff --git a/run/api/explorers.js b/run/api/explorers.js index 97c6940a..ad64e08c 100644 --- a/run/api/explorers.js +++ b/run/api/explorers.js @@ -49,6 +49,7 @@ const { getSupportedOpNetworks, isOpNetworkSupported } = require('../lib/opNetwo const authMiddleware = require('../middlewares/auth'); const stripeMiddleware = require('../middlewares/stripe'); const secretMiddleware = require('../middlewares/secret'); +const { SYNC_FAILURE_THRESHOLD } = require('../lib/syncHelpers'); /** * Get orbit config for a given explorer @@ -516,7 +517,7 @@ router.post('/:slug/syncFailure', secretMiddleware, async (req, res, next) => { attempts: result.attempts, message: result.disabled ? `Sync auto-disabled after ${result.attempts} failures` - : `Failure recorded (attempt ${result.attempts}/3)` + : `Failure recorded (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD})` }); } catch(error) { unmanagedError(error, req, next); diff --git a/run/jobs/blockSync.js b/run/jobs/blockSync.js index 15306cf2..76966438 100644 --- a/run/jobs/blockSync.js +++ b/run/jobs/blockSync.js @@ -14,6 +14,7 @@ const { enqueue, bulkEnqueue } = require('../lib/queue'); const RateLimiter = require('../lib/rateLimiter'); const constants = require('../constants/orbit'); const { isBatchTransaction, getBatchInfo } = require('../lib/opBatches'); +const { reportRpcFailure } = require('../lib/syncHelpers'); module.exports = async job => { const data = job.data; @@ -92,26 +93,10 @@ module.exports = async job => { } catch(error) { const priority = job.opts.priority || (data.source == 'cli-light' ? 1 : 10); - // Report RPC failure to explorer (except for rate limiting which is expected) - if (error.message != 'Rate limited' && workspace.explorer && workspace.explorer.shouldSync) { - try { - const result = await workspace.explorer.incrementSyncFailures('rpc_error'); - if (result.disabled) { - logger.info({ - message: 'Explorer auto-disabled due to RPC failures in blockSync', - explorerId: workspace.explorer.id, - workspaceId: workspace.id, - attempts: result.attempts - }); - return 'Sync disabled due to repeated RPC failures'; - } - } catch (reportError) { - logger.warn({ - message: 'Failed to report sync failure', - error: reportError.message, - workspaceId: workspace.id - }); - } + // Report RPC failure to explorer (excludes rate limiting and timeouts) + const failureResult = await reportRpcFailure(error, workspace.explorer, 'blockSync', workspace.id); + if (failureResult.shouldStop) { + return failureResult.message; } if (error.message == 'Rate limited') { diff --git a/run/jobs/receiptSync.js b/run/jobs/receiptSync.js index 7c88d45c..c3efc198 100644 --- a/run/jobs/receiptSync.js +++ b/run/jobs/receiptSync.js @@ -10,6 +10,7 @@ const { processRawRpcObject } = require('../lib/utils'); const { enqueue } = require('../lib/queue'); const RateLimiter = require('../lib/rateLimiter'); const logger = require('../lib/logger'); +const { reportRpcFailure } = require('../lib/syncHelpers'); const { isTransactionDepositedEvent, isDisputeGameCreatedEvent, @@ -120,26 +121,10 @@ module.exports = async job => { } catch(error) { const priority = job.opts.priority || (data.source == 'cli-light' ? 1 : 10); - // Report RPC failure to explorer (except for rate limiting which is expected) - if (error.message != 'Rate limited' && workspace.explorer && workspace.explorer.shouldSync) { - try { - const result = await workspace.explorer.incrementSyncFailures('rpc_error'); - if (result.disabled) { - logger.info({ - message: 'Explorer auto-disabled due to RPC failures in receiptSync', - explorerId: workspace.explorer.id, - workspaceId: workspace.id, - attempts: result.attempts - }); - return 'Sync disabled due to repeated RPC failures'; - } - } catch (reportError) { - logger.warn({ - message: 'Failed to report sync failure', - error: reportError.message, - workspaceId: workspace.id - }); - } + // Report RPC failure to explorer (excludes rate limiting and timeouts) + const failureResult = await reportRpcFailure(error, workspace.explorer, 'receiptSync', workspace.id); + if (failureResult.shouldStop) { + return failureResult.message; } if (error.message == 'Rate limited') { diff --git a/run/lib/syncHelpers.js b/run/lib/syncHelpers.js new file mode 100644 index 00000000..759d49d3 --- /dev/null +++ b/run/lib/syncHelpers.js @@ -0,0 +1,61 @@ +/** + * @fileoverview Shared helper functions for sync jobs. + * @module lib/syncHelpers + */ + +const logger = require('./logger'); + +// Re-export the threshold constant for use in API responses +const SYNC_FAILURE_THRESHOLD = 3; + +/** + * Report an RPC failure to the explorer and handle auto-disable logic. + * Excludes rate-limited and timeout errors from failure counting since + * they are expected/transient conditions, not actual RPC failures. + * + * @param {Error} error - The error that occurred + * @param {Object} explorer - The explorer model instance + * @param {string} jobName - Name of the job reporting the failure (e.g., 'blockSync', 'receiptSync') + * @param {number} workspaceId - ID of the workspace + * @returns {Promise<{shouldStop: boolean, message: string|null}>} - Whether the job should stop and optional message + */ +async function reportRpcFailure(error, explorer, jobName, workspaceId) { + // Don't count rate limiting or timeouts as failures - they are expected/transient + if (error.message === 'Rate limited' || error.message.startsWith('Timed out after')) { + return { shouldStop: false, message: null }; + } + + // Only report if explorer exists and sync is enabled + if (!explorer || !explorer.shouldSync) { + return { shouldStop: false, message: null }; + } + + try { + const result = await explorer.incrementSyncFailures('rpc_error'); + if (result.disabled) { + logger.info({ + message: `Explorer auto-disabled due to RPC failures in ${jobName}`, + explorerId: explorer.id, + workspaceId: workspaceId, + attempts: result.attempts + }); + return { + shouldStop: true, + message: 'Sync disabled due to repeated RPC failures' + }; + } + } catch (reportError) { + logger.warn({ + message: 'Failed to report sync failure', + error: reportError.message, + workspaceId: workspaceId + }); + } + + return { shouldStop: false, message: null }; +} + +module.exports = { + reportRpcFailure, + SYNC_FAILURE_THRESHOLD +}; diff --git a/run/tests/lib/syncHelpers.test.js b/run/tests/lib/syncHelpers.test.js new file mode 100644 index 00000000..d489d888 --- /dev/null +++ b/run/tests/lib/syncHelpers.test.js @@ -0,0 +1,112 @@ +require('../mocks/lib/queue'); +require('../mocks/lib/firebase'); + +const { reportRpcFailure, SYNC_FAILURE_THRESHOLD } = require('../../lib/syncHelpers'); + +describe('syncHelpers', () => { + describe('SYNC_FAILURE_THRESHOLD', () => { + it('Should export the sync failure threshold constant', () => { + expect(SYNC_FAILURE_THRESHOLD).toBe(3); + }); + }); + + describe('reportRpcFailure', () => { + it('Should not count rate-limited errors as failures', async () => { + const error = new Error('Rate limited'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn() + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + expect(explorer.incrementSyncFailures).not.toHaveBeenCalled(); + }); + + it('Should not count timeout errors as failures', async () => { + const error = new Error('Timed out after 30000ms'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn() + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + expect(explorer.incrementSyncFailures).not.toHaveBeenCalled(); + }); + + it('Should count other RPC errors as failures', async () => { + const error = new Error('Connection refused'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }) + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + expect(explorer.incrementSyncFailures).toHaveBeenCalledWith('rpc_error'); + }); + + it('Should return shouldStop=true when explorer is auto-disabled', async () => { + const error = new Error('Connection refused'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn().mockResolvedValue({ disabled: true, attempts: 3 }) + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(true); + expect(result.message).toBe('Sync disabled due to repeated RPC failures'); + }); + + it('Should not report if explorer is null', async () => { + const error = new Error('Connection refused'); + + const result = await reportRpcFailure(error, null, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + }); + + it('Should not report if explorer.shouldSync is false', async () => { + const error = new Error('Connection refused'); + const explorer = { + id: 1, + shouldSync: false, + incrementSyncFailures: jest.fn() + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + expect(explorer.incrementSyncFailures).not.toHaveBeenCalled(); + }); + + it('Should handle incrementSyncFailures errors gracefully', async () => { + const error = new Error('Connection refused'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn().mockRejectedValue(new Error('DB error')) + }; + + // Should not throw + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + }); + }); +});