From 78f85518bcf6a0d39946bd9df3140636c6d5eeba Mon Sep 17 00:00:00 2001 From: utnim2 Date: Fri, 5 Sep 2025 02:41:08 +0530 Subject: [PATCH 1/9] feat: implemented dynamic datasource removing --- .../src/indexer/dynamic-ds.service.spec.ts | 141 +++++++++++++++++- .../src/indexer/dynamic-ds.service.ts | 38 ++++- .../node-core/src/indexer/indexer.manager.ts | 23 +++ .../worker/worker.dynamic-ds.service.ts | 7 + packages/types-core/src/global.ts | 3 +- packages/types-core/src/interfaces.ts | 1 + 6 files changed, 205 insertions(+), 8 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index c56aa05927..479efbd522 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -15,8 +15,8 @@ class TestDynamicDsService extends DynamicDsService { let datasourceParams: DatasourceParams[] = initData; @@ -40,7 +41,7 @@ const mockMetadata = (initData: DatasourceParams[] = []) => { describe('DynamicDsService', () => { let service: TestDynamicDsService; const project = { - templates: [{name: 'Test'}], + templates: [{name: 'Test'}, {name: 'Other'}], } as any as ISubqueryProject; beforeEach(() => { @@ -70,6 +71,67 @@ describe('DynamicDsService', () => { ]); }); + it('can destroy a dynamic datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); + expect(updatedParams[1]).toEqual(testParam2); + + const datasources = (service as any)._datasources; + expect(datasources[0].endBlock).toBe(50); + }); + + it('throws error when destroying non-existent datasource', async () => { + const meta = mockMetadata([testParam1]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('NonExistent', 50)).rejects.toThrow( + 'Dynamic datasource with template name "NonExistent" not found' + ); + }); + + it('throws error when destroying already destroyed datasource', async () => { + const destroyedParam = {...testParam1, endBlock: 30}; + const meta = mockMetadata([destroyedParam]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow( + 'Dynamic datasource "Test" is already destroyed' + ); + }); + + it('allows creating new datasource after destroying existing one', async () => { + const meta = mockMetadata([testParam1]); + await service.init(meta); + + expect((service as any)._datasourceParams).toEqual([testParam1]); + + await service.destroyDynamicDatasource('Test', 50); + + const paramsAfterDestroy = (service as any)._datasourceParams; + expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 50}); + + const newParam = {templateName: 'Test', startBlock: 60}; + await service.createDynamicDatasource(newParam); + + const finalParams = (service as any)._datasourceParams; + const destroyedCount = finalParams.filter((p) => p.endBlock !== undefined).length; + const activeCount = finalParams.filter((p) => p.endBlock === undefined).length; + + expect(destroyedCount).toBeGreaterThanOrEqual(1); + expect(activeCount).toBeGreaterThanOrEqual(1); + + const destroyedParam = finalParams.find((p) => p.startBlock === 1 && p.endBlock === 50); + expect(destroyedParam).toBeDefined(); + + const newParamFound = finalParams.find((p) => p.startBlock === 60 && !p.endBlock); + expect(newParamFound).toBeDefined(); + }); + it('resets dynamic datasources', async () => { const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); await service.init(meta); @@ -83,6 +145,25 @@ describe('DynamicDsService', () => { ]); }); + it('handles reset after datasource destruction correctly', async () => { + const params = [testParam1, testParam2, testParam3, testParam4]; + const meta = mockMetadata(params); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 25); // Destroys testParam1 + + const paramsAfterDestroy = (service as any)._datasourceParams; + expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 25}); + + // Reset to block 2 (should keep testParam1 and testParam2) + await service.resetDynamicDatasource(2, null as any); + + const paramsAfterReset = (service as any)._datasourceParams; + expect(paramsAfterReset).toHaveLength(2); + expect(paramsAfterReset[0]).toEqual({...testParam1, endBlock: 25}); + expect(paramsAfterReset[1]).toEqual(testParam2); + }); + it('getDynamicDatasources with force reloads from metadata', async () => { const meta = mockMetadata([testParam1, testParam2]); await service.init(meta); @@ -107,6 +188,29 @@ describe('DynamicDsService', () => { ]); }); + it('loads destroyed datasources with endBlock correctly', async () => { + const destroyedParam = {...testParam1, endBlock: 100}; + const meta = mockMetadata([destroyedParam, testParam2]); + await service.init(meta); + + const datasources = await service.getDynamicDatasources(); + expect(datasources).toHaveLength(2); + expect((datasources[0] as any).endBlock).toBe(100); + expect((datasources[1] as any).endBlock).toBeUndefined(); + }); + + it('updates metadata correctly when destroying datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 75); + + const metadataParams = await meta.find('dynamicDatasources'); + expect(metadataParams).toBeDefined(); + expect(metadataParams![0]).toEqual({...testParam1, endBlock: 75}); + expect(metadataParams![1]).toEqual(testParam2); + }); + it('can find a template and cannot mutate the template', () => { const template1 = service.getTemplate('Test', 1); const template2 = service.getTemplate('Test', 2); @@ -120,4 +224,35 @@ describe('DynamicDsService', () => { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion expect(project.templates![0]).toEqual({name: 'Test'}); }); + + it('can create template with endBlock', () => { + const template = service.getTemplate('Test', 1, 100); + + expect(template.startBlock).toBe(1); + expect((template as any).endBlock).toBe(100); + expect((template as any).name).toBeUndefined(); + }); + + it('handles multiple templates with same name during destruction', async () => { + const param1 = {templateName: 'Test', startBlock: 1}; + const param2 = {templateName: 'Test', startBlock: 5}; + const param3 = {templateName: 'Other', startBlock: 3}; + + const meta = mockMetadata([param1, param2, param3]); + await service.init(meta); + + // Should destroy the first matching one + await service.destroyDynamicDatasource('Test', 10); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...param1, endBlock: 10}); + expect(updatedParams[1]).toEqual(param2); // Not destroyed + expect(updatedParams[2]).toEqual(param3); // Not destroyed + }); + + it('throws error when service not initialized for destruction', async () => { + await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow( + 'DynamicDsService has not been initialized' + ); + }); }); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 708ab5f3bb..17a309d22b 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -19,11 +19,13 @@ export interface DatasourceParams { templateName: string; args?: Record; startBlock: number; + endBlock?: number; } export interface IDynamicDsService { dynamicDatasources: DS[]; createDynamicDatasource(params: DatasourceParams): Promise; + destroyDynamicDatasource(templateName: string, currentBlockHeight: number): Promise; getDynamicDatasources(forceReload?: boolean): Promise; } @@ -91,6 +93,34 @@ export class DynamicDsService { + if (!this._datasources || !this._datasourceParams) { + throw new Error('DynamicDsService has not been initialized'); + } + + const dsIndex = this._datasourceParams.findIndex((params) => params.templateName === templateName); + if (dsIndex === -1) { + throw new Error(`Dynamic datasource with template name "${templateName}" not found`); + } + + const dsParam = this._datasourceParams[dsIndex]; + + if (dsParam.endBlock !== undefined) { + throw new Error(`Dynamic datasource "${templateName}" is already destroyed`); + } + + const updatedParams = {...dsParam, endBlock: currentBlockHeight}; + this._datasourceParams[dsIndex] = updatedParams; + + if (this._datasources[dsIndex]) { + (this._datasources[dsIndex] as any).endBlock = currentBlockHeight; + } + + await this.metadata.set(METADATA_KEY, this._datasourceParams, tx); + + logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); + } + // Not force only seems to be used for project changes async getDynamicDatasources(forceReload?: boolean): Promise { // Workers should not cache this result in order to keep in sync @@ -117,19 +147,19 @@ export class DynamicDsService t.name === templateName); if (!t) { throw new Error(`Unable to find matching template in project for name: "${templateName}"`); } const {name, ...template} = cloneDeep(t); - return {...template, startBlock} as DS; + return {...template, startBlock, endBlock} as DS; } private async getDatasource(params: DatasourceParams): Promise { - const dsObj = this.getTemplate(params.templateName, params.startBlock); + const dsObj = this.getTemplate(params.templateName, params.startBlock, params.endBlock); try { await this.blockchainService.updateDynamicDs(params, dsObj); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 9400f3707f..6d9b954993 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -116,6 +116,29 @@ export abstract class BaseIndexerManager< dynamicDsCreated = true; }, 'createDynamicDatasource'); + // Inject function to destroy ds into vm + vm.freeze(async (templateName?: string) => { + if (!templateName) { + throw new Error('Cannot destroy datasource: template name must be provided'); + } + + await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight); + + // Mark datasources with this template for removal from current processing + filteredDataSources.forEach((fds) => { + const dsParams = this.dynamicDsService.dynamicDatasources.find((dynamicDs) => { + // Find the corresponding params for this datasource + const params = (this.dynamicDsService as any)._datasourceParams?.find( + (p: any) => p.templateName === templateName && p.startBlock === (fds as any).startBlock + ); + return params !== undefined; + }); + if (dsParams) { + (fds as any).endBlock = blockHeight; + } + }); + }, 'destroyDynamicDatasource'); + return vm; }); } diff --git a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts index 3c6e456a08..3e09bfc070 100644 --- a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts @@ -7,11 +7,13 @@ import {DatasourceParams, IDynamicDsService} from '../dynamic-ds.service'; export type HostDynamicDS = { dynamicDsCreateDynamicDatasource: (params: DatasourceParams) => Promise; + dynamicDsDestroyDynamicDatasource: (templateName: string, currentBlockHeight: number) => Promise; dynamicDsGetDynamicDatasources: () => Promise; }; export const hostDynamicDsKeys: (keyof HostDynamicDS)[] = [ 'dynamicDsCreateDynamicDatasource', + 'dynamicDsDestroyDynamicDatasource', 'dynamicDsGetDynamicDatasources', ]; @@ -32,6 +34,10 @@ export class WorkerDynamicDsService implements IDynamicDsService { return this.host.dynamicDsCreateDynamicDatasource(JSON.parse(JSON.stringify(params))); } + async destroyDynamicDatasource(templateName: string, currentBlockHeight: number): Promise { + return this.host.dynamicDsDestroyDynamicDatasource(templateName, currentBlockHeight); + } + async getDynamicDatasources(): Promise { return this.host.dynamicDsGetDynamicDatasources(); } @@ -40,6 +46,7 @@ export class WorkerDynamicDsService implements IDynamicDsService { export function dynamicDsHostFunctions(dynamicDsService: IDynamicDsService): HostDynamicDS { return { dynamicDsCreateDynamicDatasource: dynamicDsService.createDynamicDatasource.bind(dynamicDsService), + dynamicDsDestroyDynamicDatasource: dynamicDsService.destroyDynamicDatasource.bind(dynamicDsService), dynamicDsGetDynamicDatasources: dynamicDsService.getDynamicDatasources.bind(dynamicDsService), }; } diff --git a/packages/types-core/src/global.ts b/packages/types-core/src/global.ts index cbaf1be173..66d1c33861 100644 --- a/packages/types-core/src/global.ts +++ b/packages/types-core/src/global.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import type Pino from 'pino'; -import {Cache, DynamicDatasourceCreator} from './interfaces'; +import {Cache, DynamicDatasourceCreator, DynamicDatasourceDestructor} from './interfaces'; import {Store} from './store'; // base global @@ -12,4 +12,5 @@ declare global { const cache: Cache; const chainId: string; const createDynamicDatasource: DynamicDatasourceCreator; + const destroyDynamicDatasource: DynamicDatasourceDestructor; } diff --git a/packages/types-core/src/interfaces.ts b/packages/types-core/src/interfaces.ts index bde78e77ac..28e1406103 100644 --- a/packages/types-core/src/interfaces.ts +++ b/packages/types-core/src/interfaces.ts @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 export type DynamicDatasourceCreator = (name: string, args: Record) => Promise; +export type DynamicDatasourceDestructor = (name?: string) => Promise; export interface Cache = Record> { set(key: keyof T, value: T[keyof T]): Promise; From 323a622b441228bfa9d2b27446f091187ef05af0 Mon Sep 17 00:00:00 2001 From: utnim2 Date: Fri, 17 Oct 2025 11:51:45 +0530 Subject: [PATCH 2/9] feat: implemented listing of dynamic datasources and to remove it by index noi --- .../src/indexer/dynamic-ds.service.spec.ts | 218 +++++++++++++++++- .../src/indexer/dynamic-ds.service.ts | 78 +++++-- .../node-core/src/indexer/indexer.manager.ts | 9 +- .../worker/worker.dynamic-ds.service.ts | 18 +- packages/types-core/src/global.ts | 3 +- packages/types-core/src/interfaces.ts | 12 +- 6 files changed, 309 insertions(+), 29 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 479efbd522..1fa846072a 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -75,7 +75,8 @@ describe('DynamicDsService', () => { const meta = mockMetadata([testParam1, testParam2]); await service.init(meta); - await service.destroyDynamicDatasource('Test', 50); + // Destroy specific datasource by index + await service.destroyDynamicDatasource('Test', 50, 0); const updatedParams = (service as any)._datasourceParams; expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); @@ -100,7 +101,7 @@ describe('DynamicDsService', () => { await service.init(meta); await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow( - 'Dynamic datasource "Test" is already destroyed' + 'Dynamic datasource with template name "Test" not found or already destroyed' ); }); @@ -110,7 +111,8 @@ describe('DynamicDsService', () => { expect((service as any)._datasourceParams).toEqual([testParam1]); - await service.destroyDynamicDatasource('Test', 50); + // Destroy by index + await service.destroyDynamicDatasource('Test', 50, 0); const paramsAfterDestroy = (service as any)._datasourceParams; expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 50}); @@ -150,7 +152,8 @@ describe('DynamicDsService', () => { const meta = mockMetadata(params); await service.init(meta); - await service.destroyDynamicDatasource('Test', 25); // Destroys testParam1 + // Destroy only the first datasource by index + await service.destroyDynamicDatasource('Test', 25, 0); const paramsAfterDestroy = (service as any)._datasourceParams; expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 25}); @@ -203,7 +206,8 @@ describe('DynamicDsService', () => { const meta = mockMetadata([testParam1, testParam2]); await service.init(meta); - await service.destroyDynamicDatasource('Test', 75); + // Destroy first datasource by index + await service.destroyDynamicDatasource('Test', 75, 0); const metadataParams = await meta.find('dynamicDatasources'); expect(metadataParams).toBeDefined(); @@ -241,8 +245,8 @@ describe('DynamicDsService', () => { const meta = mockMetadata([param1, param2, param3]); await service.init(meta); - // Should destroy the first matching one - await service.destroyDynamicDatasource('Test', 10); + // Should destroy the first matching one by index + await service.destroyDynamicDatasource('Test', 10, 0); const updatedParams = (service as any)._datasourceParams; expect(updatedParams[0]).toEqual({...param1, endBlock: 10}); @@ -255,4 +259,204 @@ describe('DynamicDsService', () => { 'DynamicDsService has not been initialized' ); }); + + describe('getDynamicDatasourcesByTemplate', () => { + it('returns list of active datasources for a template', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + const testDatasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(testDatasources).toHaveLength(3); + expect(testDatasources[0]).toEqual({ + index: 0, + templateName: 'Test', + startBlock: 1, + endBlock: undefined, + args: undefined, + }); + expect(testDatasources[1]).toEqual({ + index: 1, + templateName: 'Test', + startBlock: 2, + endBlock: undefined, + args: undefined, + }); + expect(testDatasources[2]).toEqual({ + index: 2, + templateName: 'Test', + startBlock: 3, + endBlock: undefined, + args: undefined, + }); + }); + + it('excludes destroyed datasources from list', async () => { + const destroyedParam = {...testParam1, endBlock: 50}; + const meta = mockMetadata([destroyedParam, testParam2, testParam3]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toHaveLength(2); + expect(datasources[0].startBlock).toBe(2); + expect(datasources[1].startBlock).toBe(3); + }); + + it('returns empty array when no datasources match template', async () => { + const meta = mockMetadata([testParamOther]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toEqual([]); + }); + + it('includes args in datasource info when present', async () => { + const paramWithArgs = {...testParam1, args: {address: '0x123', tokenId: 1}}; + const meta = mockMetadata([paramWithArgs]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toHaveLength(1); + expect(datasources[0].args).toEqual({address: '0x123', tokenId: 1}); + }); + + it('throws error when service not initialized', () => { + expect(() => service.getDynamicDatasourcesByTemplate('Test')).toThrow( + 'DynamicDsService has not been initialized' + ); + }); + }); + + describe('destroyDynamicDatasource with index', () => { + it('destroys specific datasource by index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50, 1); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual(testParam1); // Not destroyed + expect(updatedParams[1]).toEqual({...testParam2, endBlock: 50}); // Destroyed + expect(updatedParams[2]).toEqual(testParam3); // Not destroyed + expect(updatedParams[3]).toEqual(testParamOther); // Not destroyed + }); + + it('destroys all datasources when no index provided', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 75); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 75}); + expect(updatedParams[1]).toEqual({...testParam2, endBlock: 75}); + expect(updatedParams[2]).toEqual({...testParam3, endBlock: 75}); + expect(updatedParams[3]).toEqual(testParamOther); // Different template, not destroyed + }); + + it('throws error when index is out of bounds', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 5)).rejects.toThrow( + 'Index 5 is out of bounds. There are 2 active datasource(s) for template "Test"' + ); + }); + + it('throws error when index is negative', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, -1)).rejects.toThrow( + 'Index -1 is out of bounds. There are 2 active datasource(s) for template "Test"' + ); + }); + + it('throws error when trying to destroy with index but no active datasources exist', async () => { + const destroyedParam = {...testParam1, endBlock: 30}; + const meta = mockMetadata([destroyedParam]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'Dynamic datasource with template name "Test" not found or already destroyed' + ); + }); + + it('correctly handles index after some datasources are destroyed', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); + await service.init(meta); + + // Destroy the first one + await service.destroyDynamicDatasource('Test', 40, 0); + + // Now indices are: [1->0, 2->1, 3->2] + const activeDatasources = service.getDynamicDatasourcesByTemplate('Test'); + expect(activeDatasources).toHaveLength(3); + expect(activeDatasources[0].startBlock).toBe(2); + expect(activeDatasources[1].startBlock).toBe(3); + expect(activeDatasources[2].startBlock).toBe(4); + + // Destroy what is now at index 1 (was originally testParam3) + await service.destroyDynamicDatasource('Test', 60, 1); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 40}); + expect(updatedParams[1]).toEqual(testParam2); // Still active + expect(updatedParams[2]).toEqual({...testParam3, endBlock: 60}); + expect(updatedParams[3]).toEqual(testParam4); // Still active + }); + + it('updates datasources in memory correctly when destroying by index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 100, 1); + + const datasources = (service as any)._datasources; + expect(datasources[0].endBlock).toBeUndefined(); + expect(datasources[1].endBlock).toBe(100); + expect(datasources[2].endBlock).toBeUndefined(); + }); + + it('logs correctly when destroying single datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + const logSpy = jest.spyOn(console, 'info').mockImplementation(); + + await service.destroyDynamicDatasource('Test', 50, 0); + + // The logger.info should be called with single datasource message + // Note: actual implementation uses getLogger which might not be console + logSpy.mockRestore(); + }); + + it('logs correctly when destroying all datasources', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + const logSpy = jest.spyOn(console, 'info').mockImplementation(); + + await service.destroyDynamicDatasource('Test', 50); + + // The logger.info should be called with multiple datasources message + logSpy.mockRestore(); + }); + + it('allows destroying datasources from different templates independently', async () => { + const meta = mockMetadata([testParam1, testParam2, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50, 0); + await service.destroyDynamicDatasource('Other', 60, 0); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); + expect(updatedParams[1]).toEqual(testParam2); // Not destroyed + expect(updatedParams[2]).toEqual({...testParamOther, endBlock: 60}); + }); + }); }); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 17a309d22b..9621dc1c62 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import {Inject, Injectable} from '@nestjs/common'; -import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource} from '@subql/types-core'; +import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource, DynamicDatasourceInfo} from '@subql/types-core'; import {Transaction} from '@subql/x-sequelize'; import {cloneDeep} from 'lodash'; import {IBlockchainService} from '../blockchain.service'; @@ -25,8 +25,9 @@ export interface DatasourceParams { export interface IDynamicDsService { dynamicDatasources: DS[]; createDynamicDatasource(params: DatasourceParams): Promise; - destroyDynamicDatasource(templateName: string, currentBlockHeight: number): Promise; + destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index?: number): Promise; getDynamicDatasources(forceReload?: boolean): Promise; + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[]; } @Injectable() @@ -93,32 +94,79 @@ export class DynamicDsService { - if (!this._datasources || !this._datasourceParams) { + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[] { + if (!this._datasourceParams) { throw new Error('DynamicDsService has not been initialized'); } - const dsIndex = this._datasourceParams.findIndex((params) => params.templateName === templateName); - if (dsIndex === -1) { - throw new Error(`Dynamic datasource with template name "${templateName}" not found`); + const matchingDatasources = this._datasourceParams + .map((params, globalIndex) => ({params, globalIndex})) + .filter(({params}) => params.templateName === templateName && params.endBlock === undefined); + + return matchingDatasources.map(({globalIndex, params}, index) => ({ + index, + templateName: params.templateName, + startBlock: params.startBlock, + endBlock: params.endBlock, + args: params.args, + })); + } + + async destroyDynamicDatasource( + templateName: string, + currentBlockHeight: number, + index?: number, + tx?: Transaction + ): Promise { + if (!this._datasources || !this._datasourceParams) { + throw new Error('DynamicDsService has not been initialized'); } - const dsParam = this._datasourceParams[dsIndex]; + // Get all matching datasources for this template + const matchingDatasources = this._datasourceParams + .map((params, globalIndex) => ({params, globalIndex})) + .filter(({params}) => params.templateName === templateName && params.endBlock === undefined); + + if (matchingDatasources.length === 0) { + throw new Error(`Dynamic datasource with template name "${templateName}" not found or already destroyed`); + } - if (dsParam.endBlock !== undefined) { - throw new Error(`Dynamic datasource "${templateName}" is already destroyed`); + // Determine which datasources to destroy + let datasourcesToDestroy: {params: DatasourceParams; globalIndex: number}[]; + + if (index !== undefined) { + // Destroy specific datasource by index + if (index < 0 || index >= matchingDatasources.length) { + throw new Error( + `Index ${index} is out of bounds. There are ${matchingDatasources.length} active datasource(s) for template "${templateName}"` + ); + } + datasourcesToDestroy = [matchingDatasources[index]]; + } else { + // Destroy all matching datasources + datasourcesToDestroy = matchingDatasources; } - const updatedParams = {...dsParam, endBlock: currentBlockHeight}; - this._datasourceParams[dsIndex] = updatedParams; + // Update each datasource + for (const {globalIndex} of datasourcesToDestroy) { + const dsParam = this._datasourceParams[globalIndex]; + const updatedParams = {...dsParam, endBlock: currentBlockHeight}; + this._datasourceParams[globalIndex] = updatedParams; - if (this._datasources[dsIndex]) { - (this._datasources[dsIndex] as any).endBlock = currentBlockHeight; + if (this._datasources[globalIndex]) { + (this._datasources[globalIndex] as any).endBlock = currentBlockHeight; + } } await this.metadata.set(METADATA_KEY, this._datasourceParams, tx); - logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); + if (datasourcesToDestroy.length === 1) { + logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); + } else { + logger.info( + `Destroyed ${datasourcesToDestroy.length} dynamic datasource(s) with template "${templateName}" at block ${currentBlockHeight}` + ); + } } // Not force only seems to be used for project changes diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 6d9b954993..619b4262cd 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -116,13 +116,18 @@ export abstract class BaseIndexerManager< dynamicDsCreated = true; }, 'createDynamicDatasource'); + // Inject function to get dynamic datasources by template into vm + vm.freeze((templateName: string) => { + return this.dynamicDsService.getDynamicDatasourcesByTemplate(templateName); + }, 'getDynamicDatasources'); + // Inject function to destroy ds into vm - vm.freeze(async (templateName?: string) => { + vm.freeze(async (templateName?: string, index?: number) => { if (!templateName) { throw new Error('Cannot destroy datasource: template name must be provided'); } - await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight); + await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); // Mark datasources with this template for removal from current processing filteredDataSources.forEach((fds) => { diff --git a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts index 3e09bfc070..9b4c8097dc 100644 --- a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts @@ -3,18 +3,25 @@ import {isMainThread} from 'node:worker_threads'; import {Injectable} from '@nestjs/common'; +import {DynamicDatasourceInfo} from '@subql/types-core'; import {DatasourceParams, IDynamicDsService} from '../dynamic-ds.service'; export type HostDynamicDS = { dynamicDsCreateDynamicDatasource: (params: DatasourceParams) => Promise; - dynamicDsDestroyDynamicDatasource: (templateName: string, currentBlockHeight: number) => Promise; + dynamicDsDestroyDynamicDatasource: ( + templateName: string, + currentBlockHeight: number, + index?: number + ) => Promise; dynamicDsGetDynamicDatasources: () => Promise; + dynamicDsGetDynamicDatasourcesByTemplate: (templateName: string) => DynamicDatasourceInfo[]; }; export const hostDynamicDsKeys: (keyof HostDynamicDS)[] = [ 'dynamicDsCreateDynamicDatasource', 'dynamicDsDestroyDynamicDatasource', 'dynamicDsGetDynamicDatasources', + 'dynamicDsGetDynamicDatasourcesByTemplate', ]; @Injectable() @@ -34,13 +41,17 @@ export class WorkerDynamicDsService implements IDynamicDsService { return this.host.dynamicDsCreateDynamicDatasource(JSON.parse(JSON.stringify(params))); } - async destroyDynamicDatasource(templateName: string, currentBlockHeight: number): Promise { - return this.host.dynamicDsDestroyDynamicDatasource(templateName, currentBlockHeight); + async destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index?: number): Promise { + return this.host.dynamicDsDestroyDynamicDatasource(templateName, currentBlockHeight, index); } async getDynamicDatasources(): Promise { return this.host.dynamicDsGetDynamicDatasources(); } + + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[] { + return this.host.dynamicDsGetDynamicDatasourcesByTemplate(templateName); + } } export function dynamicDsHostFunctions(dynamicDsService: IDynamicDsService): HostDynamicDS { @@ -48,5 +59,6 @@ export function dynamicDsHostFunctions(dynamicDsService: IDynamicDsService) => Promise; -export type DynamicDatasourceDestructor = (name?: string) => Promise; +export type DynamicDatasourceDestructor = (name?: string, index?: number) => Promise; + +export interface DynamicDatasourceInfo { + index: number; + templateName: string; + startBlock: number; + endBlock?: number; + args?: Record; +} + +export type DynamicDatasourceGetter = (templateName: string) => Promise; export interface Cache = Record> { set(key: keyof T, value: T[keyof T]): Promise; From 4971c3d90d19182a698b1b9b0d747b1848cd8634 Mon Sep 17 00:00:00 2001 From: utnim2 Date: Wed, 22 Oct 2025 21:31:47 +0530 Subject: [PATCH 3/9] refactor: make index not optional and also destroyed dynamic datasources should be removed from filteredDatasources --- .../src/indexer/dynamic-ds.service.spec.ts | 44 +------------ .../src/indexer/dynamic-ds.service.ts | 65 +++++++++---------- .../node-core/src/indexer/indexer.manager.ts | 32 +++++---- .../worker/worker.dynamic-ds.service.ts | 8 +-- packages/types-core/src/interfaces.ts | 2 +- 5 files changed, 50 insertions(+), 101 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 1fa846072a..59c9503cfb 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -90,7 +90,7 @@ describe('DynamicDsService', () => { const meta = mockMetadata([testParam1]); await service.init(meta); - await expect(service.destroyDynamicDatasource('NonExistent', 50)).rejects.toThrow( + await expect(service.destroyDynamicDatasource('NonExistent', 50, 0)).rejects.toThrow( 'Dynamic datasource with template name "NonExistent" not found' ); }); @@ -100,7 +100,7 @@ describe('DynamicDsService', () => { const meta = mockMetadata([destroyedParam]); await service.init(meta); - await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow( + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( 'Dynamic datasource with template name "Test" not found or already destroyed' ); }); @@ -255,7 +255,7 @@ describe('DynamicDsService', () => { }); it('throws error when service not initialized for destruction', async () => { - await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow( + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( 'DynamicDsService has not been initialized' ); }); @@ -344,19 +344,6 @@ describe('DynamicDsService', () => { expect(updatedParams[3]).toEqual(testParamOther); // Not destroyed }); - it('destroys all datasources when no index provided', async () => { - const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); - await service.init(meta); - - await service.destroyDynamicDatasource('Test', 75); - - const updatedParams = (service as any)._datasourceParams; - expect(updatedParams[0]).toEqual({...testParam1, endBlock: 75}); - expect(updatedParams[1]).toEqual({...testParam2, endBlock: 75}); - expect(updatedParams[2]).toEqual({...testParam3, endBlock: 75}); - expect(updatedParams[3]).toEqual(testParamOther); // Different template, not destroyed - }); - it('throws error when index is out of bounds', async () => { const meta = mockMetadata([testParam1, testParam2]); await service.init(meta); @@ -421,31 +408,6 @@ describe('DynamicDsService', () => { expect(datasources[2].endBlock).toBeUndefined(); }); - it('logs correctly when destroying single datasource', async () => { - const meta = mockMetadata([testParam1, testParam2]); - await service.init(meta); - - const logSpy = jest.spyOn(console, 'info').mockImplementation(); - - await service.destroyDynamicDatasource('Test', 50, 0); - - // The logger.info should be called with single datasource message - // Note: actual implementation uses getLogger which might not be console - logSpy.mockRestore(); - }); - - it('logs correctly when destroying all datasources', async () => { - const meta = mockMetadata([testParam1, testParam2, testParam3]); - await service.init(meta); - - const logSpy = jest.spyOn(console, 'info').mockImplementation(); - - await service.destroyDynamicDatasource('Test', 50); - - // The logger.info should be called with multiple datasources message - logSpy.mockRestore(); - }); - it('allows destroying datasources from different templates independently', async () => { const meta = mockMetadata([testParam1, testParam2, testParamOther]); await service.init(meta); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 9621dc1c62..6f41dce7f3 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -25,7 +25,7 @@ export interface DatasourceParams { export interface IDynamicDsService { dynamicDatasources: DS[]; createDynamicDatasource(params: DatasourceParams): Promise; - destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index?: number): Promise; + destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise; getDynamicDatasources(forceReload?: boolean): Promise; getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[]; } @@ -115,58 +115,51 @@ export class DynamicDsService { if (!this._datasources || !this._datasourceParams) { throw new Error('DynamicDsService has not been initialized'); } - // Get all matching datasources for this template - const matchingDatasources = this._datasourceParams - .map((params, globalIndex) => ({params, globalIndex})) - .filter(({params}) => params.templateName === templateName && params.endBlock === undefined); + // Get all matching datasources using the existing method + const matchingDatasourcesInfo = this.getDynamicDatasourcesByTemplate(templateName); - if (matchingDatasources.length === 0) { + if (matchingDatasourcesInfo.length === 0) { throw new Error(`Dynamic datasource with template name "${templateName}" not found or already destroyed`); } - // Determine which datasources to destroy - let datasourcesToDestroy: {params: DatasourceParams; globalIndex: number}[]; - - if (index !== undefined) { - // Destroy specific datasource by index - if (index < 0 || index >= matchingDatasources.length) { - throw new Error( - `Index ${index} is out of bounds. There are ${matchingDatasources.length} active datasource(s) for template "${templateName}"` - ); - } - datasourcesToDestroy = [matchingDatasources[index]]; - } else { - // Destroy all matching datasources - datasourcesToDestroy = matchingDatasources; + // Validate index + if (index < 0 || index >= matchingDatasourcesInfo.length) { + throw new Error( + `Index ${index} is out of bounds. There are ${matchingDatasourcesInfo.length} active datasource(s) for template "${templateName}"` + ); + } + + // Get the datasource info at the specified index + const dsInfo = matchingDatasourcesInfo[index]; + + // Find the global index in _datasourceParams + const globalIndex = this._datasourceParams.findIndex( + (p) => p.templateName === dsInfo.templateName && p.startBlock === dsInfo.startBlock && p.endBlock === undefined + ); + + if (globalIndex === -1) { + throw new Error(`Could not find datasource in internal storage`); } - // Update each datasource - for (const {globalIndex} of datasourcesToDestroy) { - const dsParam = this._datasourceParams[globalIndex]; - const updatedParams = {...dsParam, endBlock: currentBlockHeight}; - this._datasourceParams[globalIndex] = updatedParams; + // Update the datasource + const dsParam = this._datasourceParams[globalIndex]; + const updatedParams = {...dsParam, endBlock: currentBlockHeight}; + this._datasourceParams[globalIndex] = updatedParams; - if (this._datasources[globalIndex]) { - (this._datasources[globalIndex] as any).endBlock = currentBlockHeight; - } + if (this._datasources[globalIndex]) { + (this._datasources[globalIndex] as any).endBlock = currentBlockHeight; } await this.metadata.set(METADATA_KEY, this._datasourceParams, tx); - if (datasourcesToDestroy.length === 1) { - logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); - } else { - logger.info( - `Destroyed ${datasourcesToDestroy.length} dynamic datasource(s) with template "${templateName}" at block ${currentBlockHeight}` - ); - } + logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); } // Not force only seems to be used for project changes diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 619b4262cd..6386599dc8 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -122,26 +122,24 @@ export abstract class BaseIndexerManager< }, 'getDynamicDatasources'); // Inject function to destroy ds into vm - vm.freeze(async (templateName?: string, index?: number) => { - if (!templateName) { - throw new Error('Cannot destroy datasource: template name must be provided'); - } - + vm.freeze(async (templateName: string, index: number) => { await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); - // Mark datasources with this template for removal from current processing - filteredDataSources.forEach((fds) => { - const dsParams = this.dynamicDsService.dynamicDatasources.find((dynamicDs) => { - // Find the corresponding params for this datasource - const params = (this.dynamicDsService as any)._datasourceParams?.find( - (p: any) => p.templateName === templateName && p.startBlock === (fds as any).startBlock - ); - return params !== undefined; - }); - if (dsParams) { - (fds as any).endBlock = blockHeight; - } + // Remove destroyed datasources from current processing + // Filter out datasources that have been destroyed + const updatedFilteredDataSources = filteredDataSources.filter((fds) => { + const dsParam = (this.dynamicDsService as any)._datasourceParams?.find( + (p: any) => + p.templateName === (fds as any).mapping?.file?.split('/').pop()?.replace('.js', '') || + (p.startBlock === (fds as any).startBlock && p.templateName === templateName) + ); + // Keep datasource if it does not have an endBlock (still active) + return !dsParam || dsParam.endBlock === undefined; }); + + // Update the filteredDataSources array in place + filteredDataSources.length = 0; + filteredDataSources.push(...updatedFilteredDataSources); }, 'destroyDynamicDatasource'); return vm; diff --git a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts index 9b4c8097dc..88193f7a06 100644 --- a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts @@ -8,11 +8,7 @@ import {DatasourceParams, IDynamicDsService} from '../dynamic-ds.service'; export type HostDynamicDS = { dynamicDsCreateDynamicDatasource: (params: DatasourceParams) => Promise; - dynamicDsDestroyDynamicDatasource: ( - templateName: string, - currentBlockHeight: number, - index?: number - ) => Promise; + dynamicDsDestroyDynamicDatasource: (templateName: string, currentBlockHeight: number, index: number) => Promise; dynamicDsGetDynamicDatasources: () => Promise; dynamicDsGetDynamicDatasourcesByTemplate: (templateName: string) => DynamicDatasourceInfo[]; }; @@ -41,7 +37,7 @@ export class WorkerDynamicDsService implements IDynamicDsService { return this.host.dynamicDsCreateDynamicDatasource(JSON.parse(JSON.stringify(params))); } - async destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index?: number): Promise { + async destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise { return this.host.dynamicDsDestroyDynamicDatasource(templateName, currentBlockHeight, index); } diff --git a/packages/types-core/src/interfaces.ts b/packages/types-core/src/interfaces.ts index 561e7df3cd..e34e4fc5eb 100644 --- a/packages/types-core/src/interfaces.ts +++ b/packages/types-core/src/interfaces.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 export type DynamicDatasourceCreator = (name: string, args: Record) => Promise; -export type DynamicDatasourceDestructor = (name?: string, index?: number) => Promise; +export type DynamicDatasourceDestructor = (name: string, index: number) => Promise; export interface DynamicDatasourceInfo { index: number; From af7339fc56bad24f67fce694b94e4a313754a861 Mon Sep 17 00:00:00 2001 From: utnim2 Date: Thu, 23 Oct 2025 16:11:09 +0530 Subject: [PATCH 4/9] refactor: used global index instead of local one and also addressed review comments --- .../src/indexer/dynamic-ds.service.spec.ts | 39 +++++++++++------ .../src/indexer/dynamic-ds.service.ts | 42 ++++++++----------- .../node-core/src/indexer/indexer.manager.ts | 28 ++++++------- 3 files changed, 58 insertions(+), 51 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 59c9503cfb..1c40a3ed01 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -91,7 +91,7 @@ describe('DynamicDsService', () => { await service.init(meta); await expect(service.destroyDynamicDatasource('NonExistent', 50, 0)).rejects.toThrow( - 'Dynamic datasource with template name "NonExistent" not found' + 'Datasource at index 0 has template name "Test", not "NonExistent"' ); }); @@ -101,7 +101,7 @@ describe('DynamicDsService', () => { await service.init(meta); await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( - 'Dynamic datasource with template name "Test" not found or already destroyed' + 'Dynamic datasource at index 0 is already destroyed' ); }); @@ -299,7 +299,9 @@ describe('DynamicDsService', () => { const datasources = service.getDynamicDatasourcesByTemplate('Test'); expect(datasources).toHaveLength(2); + expect(datasources[0].index).toBe(1); // Global index expect(datasources[0].startBlock).toBe(2); + expect(datasources[1].index).toBe(2); // Global index expect(datasources[1].startBlock).toBe(3); }); @@ -349,7 +351,7 @@ describe('DynamicDsService', () => { await service.init(meta); await expect(service.destroyDynamicDatasource('Test', 50, 5)).rejects.toThrow( - 'Index 5 is out of bounds. There are 2 active datasource(s) for template "Test"' + 'Index 5 is out of bounds. There are 2 datasource(s) in total' ); }); @@ -358,36 +360,39 @@ describe('DynamicDsService', () => { await service.init(meta); await expect(service.destroyDynamicDatasource('Test', 50, -1)).rejects.toThrow( - 'Index -1 is out of bounds. There are 2 active datasource(s) for template "Test"' + 'Index -1 is out of bounds. There are 2 datasource(s) in total' ); }); - it('throws error when trying to destroy with index but no active datasources exist', async () => { + it('throws error when trying to destroy already destroyed datasource', async () => { const destroyedParam = {...testParam1, endBlock: 30}; const meta = mockMetadata([destroyedParam]); await service.init(meta); await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( - 'Dynamic datasource with template name "Test" not found or already destroyed' + 'Dynamic datasource at index 0 is already destroyed' ); }); - it('correctly handles index after some datasources are destroyed', async () => { + it('correctly handles global index after some datasources are destroyed', async () => { const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); await service.init(meta); - // Destroy the first one + // Destroy the first one using global index 0 await service.destroyDynamicDatasource('Test', 40, 0); - // Now indices are: [1->0, 2->1, 3->2] + // Now only 3 active datasources for 'Test' template, with global indices 1, 2, 3 const activeDatasources = service.getDynamicDatasourcesByTemplate('Test'); expect(activeDatasources).toHaveLength(3); + expect(activeDatasources[0].index).toBe(1); // Global index expect(activeDatasources[0].startBlock).toBe(2); + expect(activeDatasources[1].index).toBe(2); // Global index expect(activeDatasources[1].startBlock).toBe(3); + expect(activeDatasources[2].index).toBe(3); // Global index expect(activeDatasources[2].startBlock).toBe(4); - // Destroy what is now at index 1 (was originally testParam3) - await service.destroyDynamicDatasource('Test', 60, 1); + // Destroy using global index 2 (testParam3) + await service.destroyDynamicDatasource('Test', 60, 2); const updatedParams = (service as any)._datasourceParams; expect(updatedParams[0]).toEqual({...testParam1, endBlock: 40}); @@ -413,12 +418,22 @@ describe('DynamicDsService', () => { await service.init(meta); await service.destroyDynamicDatasource('Test', 50, 0); - await service.destroyDynamicDatasource('Other', 60, 0); + await service.destroyDynamicDatasource('Other', 60, 2); const updatedParams = (service as any)._datasourceParams; expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); expect(updatedParams[1]).toEqual(testParam2); // Not destroyed expect(updatedParams[2]).toEqual({...testParamOther, endBlock: 60}); }); + + it('throws error when template name does not match global index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParamOther]); + await service.init(meta); + + // Try to destroy 'Test' template with index 2, which is 'Other' template + await expect(service.destroyDynamicDatasource('Test', 50, 2)).rejects.toThrow( + 'Datasource at index 2 has template name "Other", not "Test"' + ); + }); }); }); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 6f41dce7f3..3028cd782d 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -103,8 +103,8 @@ export class DynamicDsService ({params, globalIndex})) .filter(({params}) => params.templateName === templateName && params.endBlock === undefined); - return matchingDatasources.map(({globalIndex, params}, index) => ({ - index, + return matchingDatasources.map(({globalIndex, params}) => ({ + index: globalIndex, templateName: params.templateName, startBlock: params.startBlock, endBlock: params.endBlock, @@ -122,39 +122,33 @@ export class DynamicDsService= matchingDatasourcesInfo.length) { + // Validate the global index is within bounds + if (index < 0 || index >= this._datasourceParams.length) { throw new Error( - `Index ${index} is out of bounds. There are ${matchingDatasourcesInfo.length} active datasource(s) for template "${templateName}"` + `Index ${index} is out of bounds. There are ${this._datasourceParams.length} datasource(s) in total` ); } - // Get the datasource info at the specified index - const dsInfo = matchingDatasourcesInfo[index]; + // Get the datasource at the global index + const dsParam = this._datasourceParams[index]; - // Find the global index in _datasourceParams - const globalIndex = this._datasourceParams.findIndex( - (p) => p.templateName === dsInfo.templateName && p.startBlock === dsInfo.startBlock && p.endBlock === undefined - ); + // Validate it matches the template name and is not already destroyed + if (dsParam.templateName !== templateName) { + throw new Error( + `Datasource at index ${index} has template name "${dsParam.templateName}", not "${templateName}"` + ); + } - if (globalIndex === -1) { - throw new Error(`Could not find datasource in internal storage`); + if (dsParam.endBlock !== undefined) { + throw new Error(`Dynamic datasource at index ${index} is already destroyed`); } // Update the datasource - const dsParam = this._datasourceParams[globalIndex]; const updatedParams = {...dsParam, endBlock: currentBlockHeight}; - this._datasourceParams[globalIndex] = updatedParams; + this._datasourceParams[index] = updatedParams; - if (this._datasources[globalIndex]) { - (this._datasources[globalIndex] as any).endBlock = currentBlockHeight; + if (this._datasources[index]) { + (this._datasources[index] as any).endBlock = currentBlockHeight; } await this.metadata.set(METADATA_KEY, this._datasourceParams, tx); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 6386599dc8..147e0fafd8 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -89,7 +89,7 @@ export abstract class BaseIndexerManager< const blockHeight = block.getHeader().blockHeight; monitorWrite(`- BlockHash: ${block.getHeader().blockHash}`); - const filteredDataSources = this.filterDataSources(blockHeight, dataSources); + let filteredDataSources = this.filterDataSources(blockHeight, dataSources); this.assertDataSources(filteredDataSources, blockHeight); @@ -125,21 +125,19 @@ export abstract class BaseIndexerManager< vm.freeze(async (templateName: string, index: number) => { await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); - // Remove destroyed datasources from current processing - // Filter out datasources that have been destroyed - const updatedFilteredDataSources = filteredDataSources.filter((fds) => { - const dsParam = (this.dynamicDsService as any)._datasourceParams?.find( - (p: any) => - p.templateName === (fds as any).mapping?.file?.split('/').pop()?.replace('.js', '') || - (p.startBlock === (fds as any).startBlock && p.templateName === templateName) - ); - // Keep datasource if it does not have an endBlock (still active) - return !dsParam || dsParam.endBlock === undefined; - }); + // Remove the destroyed datasource from current processing + // The datasource at the global index now has an endBlock set + const destroyedDsParam = (this.dynamicDsService as any)._datasourceParams[index]; + + // Filter out the destroyed datasource by matching startBlock and args + filteredDataSources = filteredDataSources.filter((fds) => { + const fdsStartBlock = (fds as any).startBlock; + const fdsArgs = JSON.stringify((fds as any).options || {}); + const paramArgs = JSON.stringify(destroyedDsParam.args || {}); - // Update the filteredDataSources array in place - filteredDataSources.length = 0; - filteredDataSources.push(...updatedFilteredDataSources); + // Keep datasource if it doesn't match the destroyed one + return !(fdsStartBlock === destroyedDsParam.startBlock && fdsArgs === paramArgs); + }); }, 'destroyDynamicDatasource'); return vm; From cd60d9923a40b74eb55b8c010ccaa590965c6135 Mon Sep 17 00:00:00 2001 From: utnim2 Date: Thu, 23 Oct 2025 16:12:59 +0530 Subject: [PATCH 5/9] feat: improved jsdoc and also introduced a new public method --- .../src/indexer/dynamic-ds.service.ts | 32 +++++++++++++++++-- .../node-core/src/indexer/indexer.manager.ts | 13 ++++++-- packages/types-core/src/interfaces.ts | 13 +++++++- 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 3028cd782d..04d20f7524 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -28,6 +28,7 @@ export interface IDynamicDsService { destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise; getDynamicDatasources(forceReload?: boolean): Promise; getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[]; + getDatasourceParamByIndex(index: number): DatasourceParams | undefined; } @Injectable() @@ -94,6 +95,14 @@ export class DynamicDsService= this._datasourceParams.length) { + return undefined; + } + return this._datasourceParams[index]; + } + async destroyDynamicDatasource( templateName: string, currentBlockHeight: number, @@ -143,13 +165,17 @@ export class DynamicDsService { const fdsStartBlock = (fds as any).startBlock; - const fdsArgs = JSON.stringify((fds as any).options || {}); + // For custom datasources, args are stored in processor.options + // For runtime datasources, they may be stored differently + const fdsArgs = JSON.stringify((fds as any).processor?.options || (fds as any).options || {}); const paramArgs = JSON.stringify(destroyedDsParam.args || {}); // Keep datasource if it doesn't match the destroyed one diff --git a/packages/types-core/src/interfaces.ts b/packages/types-core/src/interfaces.ts index e34e4fc5eb..7930692ad8 100644 --- a/packages/types-core/src/interfaces.ts +++ b/packages/types-core/src/interfaces.ts @@ -4,15 +4,26 @@ export type DynamicDatasourceCreator = (name: string, args: Record) => Promise; export type DynamicDatasourceDestructor = (name: string, index: number) => Promise; +/** + * Information about a dynamic datasource instance. + */ export interface DynamicDatasourceInfo { + /** + * Global index of the datasource in the internal storage array. + * Use this value when calling destroyDynamicDatasource(). + */ index: number; + /** Template name this datasource was created from */ templateName: string; + /** Block height where this datasource starts processing */ startBlock: number; + /** Block height where this datasource stops processing (if destroyed) */ endBlock?: number; + /** Arguments passed when creating this datasource */ args?: Record; } -export type DynamicDatasourceGetter = (templateName: string) => Promise; +export type DynamicDatasourceGetter = (templateName: string) => DynamicDatasourceInfo[]; export interface Cache = Record> { set(key: keyof T, value: T[keyof T]): Promise; From 54eb62324254abb6ac42a6e1c05e35ab6bb0125c Mon Sep 17 00:00:00 2001 From: utnim2 Date: Tue, 28 Oct 2025 15:03:20 +0530 Subject: [PATCH 6/9] refactor: addressed reviewed changes --- .../src/indexer/dynamic-ds.service.ts | 15 +++++------- .../node-core/src/indexer/indexer.manager.ts | 23 +++---------------- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 04d20f7524..00422ad395 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -128,10 +128,7 @@ export class DynamicDsService= this._datasourceParams.length) { - return undefined; - } - return this._datasourceParams[index]; + return this._datasourceParams?.[index]; } async destroyDynamicDatasource( @@ -144,16 +141,16 @@ export class DynamicDsService= this._datasourceParams.length) { + // Get the datasource at the global index + const dsParam = this._datasourceParams[index]; + + // Validate datasource exists + if (!dsParam) { throw new Error( `Index ${index} is out of bounds. There are ${this._datasourceParams.length} datasource(s) in total` ); } - // Get the datasource at the global index - const dsParam = this._datasourceParams[index]; - // Validate it matches the template name and is not already destroyed if (dsParam.templateName !== templateName) { throw new Error( diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index b2b0ef3775..989a147135 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -125,28 +125,11 @@ export abstract class BaseIndexerManager< vm.freeze(async (templateName: string, index: number) => { await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); - // Remove the destroyed datasource from current processing - // The datasource at the global index now has an endBlock set - const destroyedDsParam = this.dynamicDsService.getDatasourceParamByIndex(index); - - if (!destroyedDsParam) { - logger.warn(`Unable to filter destroyed datasource at index ${index} - params not found`); - return; - } - - // Filter out the destroyed datasource by matching startBlock and args + // Re-filter datasources to exclude the destroyed one + // The destroyed datasource now has endBlock set, so filterDataSources will exclude it // Note: Reassigning filteredDataSources is intentional - subsequent handlers // within the same block will see the updated filtered list - filteredDataSources = filteredDataSources.filter((fds) => { - const fdsStartBlock = (fds as any).startBlock; - // For custom datasources, args are stored in processor.options - // For runtime datasources, they may be stored differently - const fdsArgs = JSON.stringify((fds as any).processor?.options || (fds as any).options || {}); - const paramArgs = JSON.stringify(destroyedDsParam.args || {}); - - // Keep datasource if it doesn't match the destroyed one - return !(fdsStartBlock === destroyedDsParam.startBlock && fdsArgs === paramArgs); - }); + filteredDataSources = this.filterDataSources(blockHeight, filteredDataSources); }, 'destroyDynamicDatasource'); return vm; From 1382702e7cff296c5e6ed539e0d98d1035f6c1fc Mon Sep 17 00:00:00 2001 From: utnim2 Date: Fri, 31 Oct 2025 14:11:13 +0530 Subject: [PATCH 7/9] fix: refiltering datasorces and adding a test case to it --- .../src/indexer/dynamic-ds.service.spec.ts | 20 +++++++++++++++++ .../node-core/src/indexer/indexer.manager.ts | 22 ++++++++++++++----- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 1c40a3ed01..207294d7f2 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -435,5 +435,25 @@ describe('DynamicDsService', () => { 'Datasource at index 2 has template name "Other", not "Test"' ); }); + + it('sets endBlock correctly allowing in-place removal during block processing', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + // Destroy datasource at index 1 at block 50 + await service.destroyDynamicDatasource('Test', 50, 1); + + // Verify the datasource has endBlock set + const dsParam = service.getDatasourceParamByIndex(1); + expect(dsParam).toBeDefined(); + expect(dsParam?.endBlock).toBe(50); + expect(dsParam?.startBlock).toBe(2); + expect(dsParam?.templateName).toBe('Test'); + + // Verify the internal _datasources array also has endBlock set + const datasources = (service as any)._datasources; + expect(datasources[1]).toBeDefined(); + expect((datasources[1] as any).endBlock).toBe(50); + }); }); }); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 989a147135..66492b38da 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -89,7 +89,7 @@ export abstract class BaseIndexerManager< const blockHeight = block.getHeader().blockHeight; monitorWrite(`- BlockHash: ${block.getHeader().blockHash}`); - let filteredDataSources = this.filterDataSources(blockHeight, dataSources); + const filteredDataSources = this.filterDataSources(blockHeight, dataSources); this.assertDataSources(filteredDataSources, blockHeight); @@ -125,11 +125,21 @@ export abstract class BaseIndexerManager< vm.freeze(async (templateName: string, index: number) => { await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); - // Re-filter datasources to exclude the destroyed one - // The destroyed datasource now has endBlock set, so filterDataSources will exclude it - // Note: Reassigning filteredDataSources is intentional - subsequent handlers - // within the same block will see the updated filtered list - filteredDataSources = this.filterDataSources(blockHeight, filteredDataSources); + // Remove the destroyed datasource from the current processing array + // Find the datasource by matching the global index stored in the service + const destroyedDsParam = this.dynamicDsService.getDatasourceParamByIndex(index); + if (destroyedDsParam) { + const dsIndex = filteredDataSources.findIndex((fds) => { + return ( + fds.startBlock === destroyedDsParam.startBlock && + JSON.stringify((fds as any).options || (fds as any).processor?.options || {}) === + JSON.stringify(destroyedDsParam.args || {}) + ); + }); + if (dsIndex !== -1) { + filteredDataSources.splice(dsIndex, 1); + } + } }, 'destroyDynamicDatasource'); return vm; From 87dca656fe6ae553ba8bd5c074916a68ab406278 Mon Sep 17 00:00:00 2001 From: utnim2 Date: Tue, 2 Dec 2025 03:31:50 +0530 Subject: [PATCH 8/9] fix: remove any `any` kind and simplified the index manager --- .../src/indexer/dynamic-ds.service.spec.ts | 10 ++++---- .../src/indexer/dynamic-ds.service.ts | 7 ++++-- .../node-core/src/indexer/indexer.manager.ts | 18 ++------------ .../worker/worker.dynamic-ds.service.ts | 7 ++++++ packages/node/src/indexer/indexer.manager.ts | 24 ++++++++++++++----- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 207294d7f2..5acb0c1b2f 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -120,17 +120,17 @@ describe('DynamicDsService', () => { const newParam = {templateName: 'Test', startBlock: 60}; await service.createDynamicDatasource(newParam); - const finalParams = (service as any)._datasourceParams; - const destroyedCount = finalParams.filter((p) => p.endBlock !== undefined).length; - const activeCount = finalParams.filter((p) => p.endBlock === undefined).length; + const finalParams: DatasourceParams[] = (service as any)._datasourceParams; + const destroyedCount = finalParams.filter((p: DatasourceParams) => p.endBlock !== undefined).length; + const activeCount = finalParams.filter((p: DatasourceParams) => p.endBlock === undefined).length; expect(destroyedCount).toBeGreaterThanOrEqual(1); expect(activeCount).toBeGreaterThanOrEqual(1); - const destroyedParam = finalParams.find((p) => p.startBlock === 1 && p.endBlock === 50); + const destroyedParam = finalParams.find((p: DatasourceParams) => p.startBlock === 1 && p.endBlock === 50); expect(destroyedParam).toBeDefined(); - const newParamFound = finalParams.find((p) => p.startBlock === 60 && !p.endBlock); + const newParamFound = finalParams.find((p: DatasourceParams) => p.startBlock === 60 && !p.endBlock); expect(newParamFound).toBeDefined(); }); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 00422ad395..edd7478096 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -169,10 +169,13 @@ export class DynamicDsService { await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); - - // Remove the destroyed datasource from the current processing array - // Find the datasource by matching the global index stored in the service - const destroyedDsParam = this.dynamicDsService.getDatasourceParamByIndex(index); - if (destroyedDsParam) { - const dsIndex = filteredDataSources.findIndex((fds) => { - return ( - fds.startBlock === destroyedDsParam.startBlock && - JSON.stringify((fds as any).options || (fds as any).processor?.options || {}) === - JSON.stringify(destroyedDsParam.args || {}) - ); - }); - if (dsIndex !== -1) { - filteredDataSources.splice(dsIndex, 1); - } - } + // Note: The datasource object in filteredDataSources is updated with endBlock + // The child class implementation should check ds.endBlock before processing }, 'destroyDynamicDatasource'); return vm; diff --git a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts index 88193f7a06..a8c33206fb 100644 --- a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts @@ -11,6 +11,7 @@ export type HostDynamicDS = { dynamicDsDestroyDynamicDatasource: (templateName: string, currentBlockHeight: number, index: number) => Promise; dynamicDsGetDynamicDatasources: () => Promise; dynamicDsGetDynamicDatasourcesByTemplate: (templateName: string) => DynamicDatasourceInfo[]; + dynamicDsGetDatasourceParamByIndex: (index: number) => DatasourceParams | undefined; }; export const hostDynamicDsKeys: (keyof HostDynamicDS)[] = [ @@ -18,6 +19,7 @@ export const hostDynamicDsKeys: (keyof HostDynamicDS)[] = [ 'dynamicDsDestroyDynamicDatasource', 'dynamicDsGetDynamicDatasources', 'dynamicDsGetDynamicDatasourcesByTemplate', + 'dynamicDsGetDatasourceParamByIndex', ]; @Injectable() @@ -48,6 +50,10 @@ export class WorkerDynamicDsService implements IDynamicDsService { getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[] { return this.host.dynamicDsGetDynamicDatasourcesByTemplate(templateName); } + + getDatasourceParamByIndex(index: number): DatasourceParams | undefined { + return this.host.dynamicDsGetDatasourceParamByIndex(index); + } } export function dynamicDsHostFunctions(dynamicDsService: IDynamicDsService): HostDynamicDS { @@ -56,5 +62,6 @@ export function dynamicDsHostFunctions(dynamicDsService: IDynamicDsService Promise, ): Promise { + // Extract block height for checking destroyed datasources + const blockHeight = blockContent.block.block.header.number.toNumber(); + if (isFullBlock(blockContent)) { const { block, events, extrinsics } = blockContent; - await this.indexContent(SubstrateHandlerKind.Block)( + await this.indexContent(SubstrateHandlerKind.Block, blockHeight)( block, dataSources, getVM, @@ -131,7 +134,7 @@ export class IndexerManager extends BaseIndexerManager< // Run initialization events for (const event of groupedEvents.init) { - await this.indexContent(SubstrateHandlerKind.Event)( + await this.indexContent(SubstrateHandlerKind.Event, blockHeight)( event, dataSources, getVM, @@ -139,7 +142,7 @@ export class IndexerManager extends BaseIndexerManager< } for (const extrinsic of extrinsics) { - await this.indexContent(SubstrateHandlerKind.Call)( + await this.indexContent(SubstrateHandlerKind.Call, blockHeight)( extrinsic, dataSources, getVM, @@ -151,7 +154,7 @@ export class IndexerManager extends BaseIndexerManager< ); for (const event of extrinsicEvents) { - await this.indexContent(SubstrateHandlerKind.Event)( + await this.indexContent(SubstrateHandlerKind.Event, blockHeight)( event, dataSources, getVM, @@ -161,7 +164,7 @@ export class IndexerManager extends BaseIndexerManager< // Run finalization events for (const event of groupedEvents.finalize) { - await this.indexContent(SubstrateHandlerKind.Event)( + await this.indexContent(SubstrateHandlerKind.Event, blockHeight)( event, dataSources, getVM, @@ -169,7 +172,7 @@ export class IndexerManager extends BaseIndexerManager< } } else { for (const event of blockContent.events) { - await this.indexContent(SubstrateHandlerKind.Event)( + await this.indexContent(SubstrateHandlerKind.Event, blockHeight)( event, dataSources, getVM, @@ -180,6 +183,7 @@ export class IndexerManager extends BaseIndexerManager< private indexContent( kind: SubstrateHandlerKind, + blockHeight: number, ): ( content: | SubstrateBlock @@ -191,6 +195,14 @@ export class IndexerManager extends BaseIndexerManager< ) => Promise { return async (content, dataSources, getVM) => { for (const ds of dataSources) { + // Skip datasources that have been destroyed at or before this block + // When a datasource is destroyed, its endBlock is set to the current blockHeight + // We want to exclude it from processing in subsequent handlers within the same block + const endBlock = + 'endBlock' in ds ? (ds as { endBlock?: number }).endBlock : undefined; + if (endBlock !== undefined && endBlock <= blockHeight) { + continue; + } await this.indexData(kind, content, ds, getVM); } }; From 5cf5af124bae599226dec4c7cbabc6a0f4a7c48f Mon Sep 17 00:00:00 2001 From: utnim2 Date: Thu, 4 Dec 2025 13:45:03 +0530 Subject: [PATCH 9/9] feat: implement proper filtering with for in loop and added a test case for filtering that check it is correct --- .../src/indexer/dynamic-ds.service.spec.ts | 69 +++++++++++++++++++ .../src/indexer/dynamic-ds.service.ts | 3 +- .../node-core/src/indexer/indexer.manager.ts | 9 +-- packages/node/src/indexer/indexer.manager.ts | 12 +--- 4 files changed, 78 insertions(+), 15 deletions(-) diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 5acb0c1b2f..14706d5e2c 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -455,5 +455,74 @@ describe('DynamicDsService', () => { expect(datasources[1]).toBeDefined(); expect((datasources[1] as any).endBlock).toBe(50); }); + + it('destroyed datasource is filtered out in subsequent block processing', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + const blockHeight = 100; + const datasources = (service as any)._datasources; + + // Simulate filtering datasources for block 100 (all should be included initially) + const filterDataSources = (blockHeight: number, dataSources: BaseDataSource[]) => { + return dataSources.filter( + (ds) => + ds.startBlock !== undefined && + ds.startBlock <= blockHeight && + ((ds as any).endBlock ?? Number.MAX_SAFE_INTEGER) > blockHeight + ); + }; + + // Initial state: all 3 datasources should be active + let filteredDs = filterDataSources(blockHeight, datasources); + expect(filteredDs.length).toBe(3); + + // Simulate processing: DS2 destroys DS3 during block 100 + await service.destroyDynamicDatasource('Test', blockHeight, 2); + + // Re-filter datasources + filteredDs = filterDataSources(blockHeight, datasources); + + // After destruction, only DS1 and DS2 should remain + expect(filteredDs.length).toBe(2); + expect(filteredDs[0].startBlock).toBe(1); // DS1 + expect(filteredDs[1].startBlock).toBe(2); // DS2 + + // Verify DS3 was destroyed + expect((datasources[2] as any).endBlock).toBe(blockHeight); + }); + + it('demonstrates traditional for loop pattern works with array reassignment', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + const blockHeight = 100; + let datasources = (service as any)._datasources; + + const processed: number[] = []; + + for (let i = 0; i < datasources.length; i++) { + const ds = datasources[i]; + processed.push(ds.startBlock); + + // When processing DS2, destroy DS3 and re-filter + if (ds.startBlock === 2) { + await service.destroyDynamicDatasource('Test', blockHeight, 2); + + // Re-filter datasources using filter and reassignment + datasources = datasources.filter( + (d: any) => + d.startBlock !== undefined && + d.startBlock <= blockHeight && + (d.endBlock ?? Number.MAX_SAFE_INTEGER) > blockHeight + ); + } + } + expect(processed).toEqual([1, 2]); + + // Verify DS3 has endBlock set + const allDs = (service as any)._datasources; + expect((allDs[2] as any).endBlock).toBe(blockHeight); + }); }); }); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index edd7478096..8d468cb545 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -174,8 +174,7 @@ export class DynamicDsService { await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); - // Note: The datasource object in filteredDataSources is updated with endBlock - // The child class implementation should check ds.endBlock before processing + + // Re-filter datasources to exclude the destroyed one + filteredDataSources = this.filterDataSources(blockHeight, filteredDataSources); }, 'destroyDynamicDatasource'); return vm; @@ -151,7 +152,7 @@ export abstract class BaseIndexerManager< (ds) => ds.startBlock !== undefined && ds.startBlock <= nextProcessingHeight && - (ds.endBlock ?? Number.MAX_SAFE_INTEGER) >= nextProcessingHeight + (ds.endBlock ?? Number.MAX_SAFE_INTEGER) > nextProcessingHeight ); // perform filter for custom ds diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index a7923d6ecb..2bd8d39095 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -194,15 +194,9 @@ export class IndexerManager extends BaseIndexerManager< getVM: (d: SubstrateProjectDs) => Promise, ) => Promise { return async (content, dataSources, getVM) => { - for (const ds of dataSources) { - // Skip datasources that have been destroyed at or before this block - // When a datasource is destroyed, its endBlock is set to the current blockHeight - // We want to exclude it from processing in subsequent handlers within the same block - const endBlock = - 'endBlock' in ds ? (ds as { endBlock?: number }).endBlock : undefined; - if (endBlock !== undefined && endBlock <= blockHeight) { - continue; - } + // When a datasource is destroyed, its removed from the array and the loop exits early + for (let i = 0; i < dataSources.length; i++) { + const ds = dataSources[i]; await this.indexData(kind, content, ds, getVM); } };