diff --git a/run/api/explorers.js b/run/api/explorers.js index 8bd9f050..ad64e08c 100644 --- a/run/api/explorers.js +++ b/run/api/explorers.js @@ -49,6 +49,7 @@ const { getSupportedOpNetworks, isOpNetworkSupported } = require('../lib/opNetwo const authMiddleware = require('../middlewares/auth'); const stripeMiddleware = require('../middlewares/stripe'); const secretMiddleware = require('../middlewares/secret'); +const { SYNC_FAILURE_THRESHOLD } = require('../lib/syncHelpers'); /** * Get orbit config for a given explorer @@ -481,6 +482,48 @@ router.post('/syncExplorers', secretMiddleware, async (req, res, next) => { } }); +/** + * Report a sync failure for an explorer + * Used by PM2 server and sync jobs to report RPC failures + * @route POST /api/explorers/:slug/syncFailure + * @param {string} slug - Explorer slug + * @param {string} reason - Failure reason (e.g., 'rpc_error', 'rpc_unreachable') + * @param {string} source - Source of the failure (e.g., 'pm2', 'blockSync', 'receiptSync') + * @returns {object} - Result with disabled status, attempts count, and message + */ +router.post('/:slug/syncFailure', secretMiddleware, async (req, res, next) => { + try { + const { slug } = req.params; + const { reason, source } = req.body; + + const explorer = await Explorer.findOne({ where: { slug } }); + if (!explorer) { + return res.status(404).json({ error: 'Explorer not found' }); + } + + const result = await explorer.incrementSyncFailures(reason || 'rpc_error'); + + logger.info({ + message: 'Sync failure reported', + explorerSlug: slug, + reason: reason || 'rpc_error', + source: source || 'unknown', + attempts: result.attempts, + disabled: result.disabled + }); + + res.status(200).json({ + disabled: result.disabled, + attempts: result.attempts, + message: result.disabled + ? `Sync auto-disabled after ${result.attempts} failures` + : `Failure recorded (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD})` + }); + } catch(error) { + unmanagedError(error, req, next); + } +}); + router.put('/:id/stopSync', [authMiddleware], async (req, res, next) => { try { if (!req.params.id) diff --git a/run/jobs/blockSync.js b/run/jobs/blockSync.js index abc29596..76966438 100644 --- a/run/jobs/blockSync.js +++ b/run/jobs/blockSync.js @@ -14,6 +14,7 @@ const { enqueue, bulkEnqueue } = require('../lib/queue'); const RateLimiter = require('../lib/rateLimiter'); const constants = require('../constants/orbit'); const { isBatchTransaction, getBatchInfo } = require('../lib/opBatches'); +const { reportRpcFailure } = require('../lib/syncHelpers'); module.exports = async job => { const data = job.data; @@ -91,6 +92,13 @@ module.exports = async job => { block = await providerConnector.fetchRawBlockWithTransactions(data.blockNumber); } catch(error) { const priority = job.opts.priority || (data.source == 'cli-light' ? 1 : 10); + + // Report RPC failure to explorer (excludes rate limiting and timeouts) + const failureResult = await reportRpcFailure(error, workspace.explorer, 'blockSync', workspace.id); + if (failureResult.shouldStop) { + return failureResult.message; + } + if (error.message == 'Rate limited') { return enqueue('blockSync', `blockSync-${workspace.id}-${data.blockNumber}-${Date.now()}`, { userId: workspace.user.firebaseUserId, diff --git a/run/jobs/receiptSync.js b/run/jobs/receiptSync.js index ecaa9ea3..c3efc198 100644 --- a/run/jobs/receiptSync.js +++ b/run/jobs/receiptSync.js @@ -10,6 +10,7 @@ const { processRawRpcObject } = require('../lib/utils'); const { enqueue } = require('../lib/queue'); const RateLimiter = require('../lib/rateLimiter'); const logger = require('../lib/logger'); +const { reportRpcFailure } = require('../lib/syncHelpers'); const { isTransactionDepositedEvent, isDisputeGameCreatedEvent, @@ -119,6 +120,13 @@ module.exports = async job => { receipt = await providerConnector.fetchTransactionReceipt(transaction.hash); } catch(error) { const priority = job.opts.priority || (data.source == 'cli-light' ? 1 : 10); + + // Report RPC failure to explorer (excludes rate limiting and timeouts) + const failureResult = await reportRpcFailure(error, workspace.explorer, 'receiptSync', workspace.id); + if (failureResult.shouldStop) { + return failureResult.message; + } + if (error.message == 'Rate limited') { return enqueue('receiptSync', `receiptSync-${workspace.id}-${transaction.hash}-${Date.now()}`, { transactionId: transaction.id, diff --git a/run/lib/syncHelpers.js b/run/lib/syncHelpers.js new file mode 100644 index 00000000..759d49d3 --- /dev/null +++ b/run/lib/syncHelpers.js @@ -0,0 +1,61 @@ +/** + * @fileoverview Shared helper functions for sync jobs. + * @module lib/syncHelpers + */ + +const logger = require('./logger'); + +// Re-export the threshold constant for use in API responses +const SYNC_FAILURE_THRESHOLD = 3; + +/** + * Report an RPC failure to the explorer and handle auto-disable logic. + * Excludes rate-limited and timeout errors from failure counting since + * they are expected/transient conditions, not actual RPC failures. + * + * @param {Error} error - The error that occurred + * @param {Object} explorer - The explorer model instance + * @param {string} jobName - Name of the job reporting the failure (e.g., 'blockSync', 'receiptSync') + * @param {number} workspaceId - ID of the workspace + * @returns {Promise<{shouldStop: boolean, message: string|null}>} - Whether the job should stop and optional message + */ +async function reportRpcFailure(error, explorer, jobName, workspaceId) { + // Don't count rate limiting or timeouts as failures - they are expected/transient + if (error.message === 'Rate limited' || error.message.startsWith('Timed out after')) { + return { shouldStop: false, message: null }; + } + + // Only report if explorer exists and sync is enabled + if (!explorer || !explorer.shouldSync) { + return { shouldStop: false, message: null }; + } + + try { + const result = await explorer.incrementSyncFailures('rpc_error'); + if (result.disabled) { + logger.info({ + message: `Explorer auto-disabled due to RPC failures in ${jobName}`, + explorerId: explorer.id, + workspaceId: workspaceId, + attempts: result.attempts + }); + return { + shouldStop: true, + message: 'Sync disabled due to repeated RPC failures' + }; + } + } catch (reportError) { + logger.warn({ + message: 'Failed to report sync failure', + error: reportError.message, + workspaceId: workspaceId + }); + } + + return { shouldStop: false, message: null }; +} + +module.exports = { + reportRpcFailure, + SYNC_FAILURE_THRESHOLD +}; diff --git a/run/tests/api/explorers.test.js b/run/tests/api/explorers.test.js index 967f2b63..6b01ef63 100644 --- a/run/tests/api/explorers.test.js +++ b/run/tests/api/explorers.test.js @@ -412,6 +412,90 @@ describe(`POST ${BASE_URL}/syncExplorers`, () => { }); }); +describe(`POST ${BASE_URL}/:slug/syncFailure`, () => { + it('Should return 401 if no secret provided', (done) => { + request.post(`${BASE_URL}/test-explorer/syncFailure`) + .send({ reason: 'rpc_error', source: 'blockSync' }) + .expect(401) + .then(() => done()); + }); + + it('Should return 401 if invalid secret provided', (done) => { + request.post(`${BASE_URL}/test-explorer/syncFailure?secret=invalid`) + .send({ reason: 'rpc_error', source: 'blockSync' }) + .expect(401) + .then(() => done()); + }); + + it('Should return 404 if explorer not found', (done) => { + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce(null); + request.post(`${BASE_URL}/nonexistent/syncFailure?secret=secret`) + .send({ reason: 'rpc_error', source: 'blockSync' }) + .expect(404) + .then(({ body }) => { + expect(body.error).toEqual('Explorer not found'); + done(); + }); + }); + + it('Should increment failure counter and return result', (done) => { + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', + incrementSyncFailures: mockIncrementSyncFailures + }); + + request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`) + .send({ reason: 'rpc_error', source: 'blockSync' }) + .expect(200) + .then(({ body }) => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_error'); + expect(body.disabled).toBe(false); + expect(body.attempts).toBe(1); + expect(body.message).toContain('Failure recorded'); + done(); + }); + }); + + it('Should auto-disable after threshold and return disabled status', (done) => { + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: true, attempts: 3 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', + incrementSyncFailures: mockIncrementSyncFailures + }); + + request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`) + .send({ reason: 'rpc_unreachable', source: 'pm2' }) + .expect(200) + .then(({ body }) => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_unreachable'); + expect(body.disabled).toBe(true); + expect(body.attempts).toBe(3); + expect(body.message).toContain('auto-disabled'); + done(); + }); + }); + + it('Should use default reason if not provided', (done) => { + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', + incrementSyncFailures: mockIncrementSyncFailures + }); + + request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`) + .send({}) + .expect(200) + .then(() => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_error'); + done(); + }); + }); +}); + describe(`PUT ${BASE_URL}/:id/stopSync`, () => { it('Should return an error if cannot find explorer', (done) => { jest.spyOn(db, 'getExplorerById').mockResolvedValueOnce(null); diff --git a/run/tests/lib/syncHelpers.test.js b/run/tests/lib/syncHelpers.test.js new file mode 100644 index 00000000..d489d888 --- /dev/null +++ b/run/tests/lib/syncHelpers.test.js @@ -0,0 +1,112 @@ +require('../mocks/lib/queue'); +require('../mocks/lib/firebase'); + +const { reportRpcFailure, SYNC_FAILURE_THRESHOLD } = require('../../lib/syncHelpers'); + +describe('syncHelpers', () => { + describe('SYNC_FAILURE_THRESHOLD', () => { + it('Should export the sync failure threshold constant', () => { + expect(SYNC_FAILURE_THRESHOLD).toBe(3); + }); + }); + + describe('reportRpcFailure', () => { + it('Should not count rate-limited errors as failures', async () => { + const error = new Error('Rate limited'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn() + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + expect(explorer.incrementSyncFailures).not.toHaveBeenCalled(); + }); + + it('Should not count timeout errors as failures', async () => { + const error = new Error('Timed out after 30000ms'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn() + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + expect(explorer.incrementSyncFailures).not.toHaveBeenCalled(); + }); + + it('Should count other RPC errors as failures', async () => { + const error = new Error('Connection refused'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }) + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + expect(explorer.incrementSyncFailures).toHaveBeenCalledWith('rpc_error'); + }); + + it('Should return shouldStop=true when explorer is auto-disabled', async () => { + const error = new Error('Connection refused'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn().mockResolvedValue({ disabled: true, attempts: 3 }) + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(true); + expect(result.message).toBe('Sync disabled due to repeated RPC failures'); + }); + + it('Should not report if explorer is null', async () => { + const error = new Error('Connection refused'); + + const result = await reportRpcFailure(error, null, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + }); + + it('Should not report if explorer.shouldSync is false', async () => { + const error = new Error('Connection refused'); + const explorer = { + id: 1, + shouldSync: false, + incrementSyncFailures: jest.fn() + }; + + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + expect(explorer.incrementSyncFailures).not.toHaveBeenCalled(); + }); + + it('Should handle incrementSyncFailures errors gracefully', async () => { + const error = new Error('Connection refused'); + const explorer = { + id: 1, + shouldSync: true, + incrementSyncFailures: jest.fn().mockRejectedValue(new Error('DB error')) + }; + + // Should not throw + const result = await reportRpcFailure(error, explorer, 'blockSync', 123); + + expect(result.shouldStop).toBe(false); + expect(result.message).toBeNull(); + }); + }); +});