Skip to content

Commit

Permalink
Add factory for creating block dispatcher (#2701)
Browse files Browse the repository at this point in the history
* Add factory for creating block dispatcher

* Update changelog

* Update changelog
  • Loading branch information
stwiname authored Feb 26, 2025
1 parent da45288 commit cde9a9a
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 90 deletions.
2 changes: 2 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- BlockDispatcher factory function and expose workerData (#2701)

## [17.0.1] - 2025-02-25
### Fixed
Expand Down
77 changes: 77 additions & 0 deletions packages/node-core/src/indexer/blockDispatcher/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {EventEmitter2} from '@nestjs/event-emitter';
import {BaseDataSource} from '@subql/types-core';
import {IApiConnectionSpecific} from '../../api.service';
import {IBlockchainService} from '../../blockchain.service';
import {IProjectUpgradeService, NodeConfig} from '../../configure';
import {ConnectionPoolStateManager} from '../connectionPoolState.manager';
import {DynamicDsService} from '../dynamic-ds.service';
import {InMemoryCacheService} from '../inMemoryCache.service';
import {MonitorService} from '../monitor.service';
import {PoiSyncService} from '../poi';
import {ProjectService} from '../project.service';
import {StoreService} from '../store.service';
import {IStoreModelProvider} from '../storeModelProvider';
import {IIndexerManager, ISubqueryProject} from '../types';
import {UnfinalizedBlocksService} from '../unfinalizedBlocks.service';
import {createIndexerWorker, IBaseIndexerWorker} from '../worker';
import {IBlockDispatcher} from './base-block-dispatcher';
import {BlockDispatcher} from './block-dispatcher';
import {WorkerBlockDispatcher} from './worker-block-dispatcher';

export const blockDispatcherFactory =
<DS extends BaseDataSource, Block, ApiConn extends IApiConnectionSpecific, Worker extends IBaseIndexerWorker>(
workerPath: string,
workerFns: Parameters<typeof createIndexerWorker<Worker, ApiConn, Block, DS>>[1]
) =>
(
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
projectService: ProjectService<DS>,
projectUpgradeService: IProjectUpgradeService,
cacheService: InMemoryCacheService,
storeService: StoreService,
storeModelProvider: IStoreModelProvider,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
dynamicDsService: DynamicDsService<DS>,
unfinalizedBlocks: UnfinalizedBlocksService,
connectionPoolState: ConnectionPoolStateManager<ApiConn>,
blockchainService: IBlockchainService<DS>,
indexerManager: IIndexerManager<Block, DS>,
monitorService?: MonitorService
): IBlockDispatcher<Block> => {
return nodeConfig.workers
? new WorkerBlockDispatcher<DS, Worker, Block, ApiConn>(
nodeConfig,
eventEmitter,
projectService,
projectUpgradeService,
storeService,
storeModelProvider,
cacheService,
poiSyncService,
dynamicDsService,
unfinalizedBlocks,
connectionPoolState,
project,
blockchainService,
workerPath,
workerFns,
monitorService
)
: new BlockDispatcher(
nodeConfig,
eventEmitter,
projectService,
projectUpgradeService,
storeService,
storeModelProvider,
poiSyncService,
project,
blockchainService,
indexerManager
);
};
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/blockDispatcher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
export * from './base-block-dispatcher';
export * from './block-dispatcher';
export * from './worker-block-dispatcher';
export * from './factory';
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Interval } from '@nestjs/schedule';
import { BaseDataSource } from '@subql/types-core';
import { last } from 'lodash';
import { IApiConnectionSpecific } from '../../api.service';
import { IBlockchainService } from '../../blockchain.service';
import { NodeConfig } from '../../configure';
import { IProjectUpgradeService } from '../../configure/ProjectUpgrade.service';
import { IndexerEvent } from '../../events';
import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {last} from 'lodash';
import {IApiConnectionSpecific} from '../../api.service';
import {IBlockchainService} from '../../blockchain.service';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
import {
ConnectionPoolStateManager,
createIndexerWorker,
Expand All @@ -23,15 +23,15 @@ import {
TerminateableWorker,
UnfinalizedBlocksService,
} from '../../indexer';
import { getLogger } from '../../logger';
import { monitorWrite } from '../../process';
import { AutoQueue, isTaskFlushedError } from '../../utils';
import { MonitorServiceInterface } from '../monitor.service';
import { StoreService } from '../store.service';
import { IStoreModelProvider } from '../storeModelProvider';
import { ISubqueryProject, IProjectService } from '../types';
import { isBlockUnavailableError } from '../worker/utils';
import { BaseBlockDispatcher } from './base-block-dispatcher';
import {getLogger} from '../../logger';
import {monitorWrite} from '../../process';
import {AutoQueue, isTaskFlushedError} from '../../utils';
import {MonitorServiceInterface} from '../monitor.service';
import {StoreService} from '../store.service';
import {IStoreModelProvider} from '../storeModelProvider';
import {ISubqueryProject, IProjectService} from '../types';
import {isBlockUnavailableError} from '../worker/utils';
import {BaseBlockDispatcher} from './base-block-dispatcher';

const logger = getLogger('WorkerBlockDispatcherService');

Expand All @@ -47,13 +47,14 @@ function initAutoQueue<T>(

@Injectable()
export class WorkerBlockDispatcher<
DS extends BaseDataSource = BaseDataSource,
Worker extends IBaseIndexerWorker = IBaseIndexerWorker,
Block = any,
ApiConn extends IApiConnectionSpecific = IApiConnectionSpecific
>
DS extends BaseDataSource = BaseDataSource,
Worker extends IBaseIndexerWorker = IBaseIndexerWorker,
Block = any,
ApiConn extends IApiConnectionSpecific = IApiConnectionSpecific,
>
extends BaseBlockDispatcher<AutoQueue<void>, DS, Block>
implements OnApplicationShutdown {
implements OnApplicationShutdown
{
protected workers: TerminateableWorker<Worker>[] = [];
private numWorkers: number;
private isShutdown = false;
Expand All @@ -76,7 +77,8 @@ export class WorkerBlockDispatcher<
@Inject('IBlockchainService') private blockchainService: IBlockchainService<DS>,
workerPath: string,
workerFns: Parameters<typeof createIndexerWorker<Worker, ApiConn, Block, DS>>[1],
monitorService?: MonitorServiceInterface
monitorService?: MonitorServiceInterface,
workerData?: unknown
) {
super(
nodeConfig,
Expand All @@ -102,7 +104,8 @@ export class WorkerBlockDispatcher<
connectionPoolState,
project.root,
projectService.startHeight,
monitorService
monitorService,
workerData
);
// initAutoQueue will assert that workers is set. unfortunately we cant do anything before the super call
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand Down Expand Up @@ -170,7 +173,7 @@ export class WorkerBlockDispatcher<
// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this.latestBufferedHeight;

const pendingBlock = this.blockchainService.fetchBlockWorker(worker, height, { workers: this.workers });
const pendingBlock = this.blockchainService.fetchBlockWorker(worker, height, {workers: this.workers});

const processBlock = async () => {
try {
Expand All @@ -183,7 +186,7 @@ export class WorkerBlockDispatcher<
await this.preProcessBlock(header);

monitorWrite(`Processing from worker #${workerIdx}`);
const { dynamicDsCreated, reindexBlockHeader } = await worker.processBlock(height);
const {dynamicDsCreated, reindexBlockHeader} = await worker.processBlock(height);

await this.postProcessBlock(header, {
dynamicDsCreated,
Expand Down
2 changes: 2 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- Use blockDispatcherFactory from node-core (#2701)

## [5.9.0] - 2025-02-25
### Changed
Expand Down
71 changes: 10 additions & 61 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,21 @@ import {
StoreService,
NodeConfig,
ConnectionPoolStateManager,
IProjectUpgradeService,
PoiSyncService,
InMemoryCacheService,
MonitorService,
CoreModule,
IStoreModelProvider,
ConnectionPoolService,
UnfinalizedBlocksService,
BlockDispatcher,
DsProcessorService,
ProjectService,
DynamicDsService,
WorkerBlockDispatcher,
FetchService,
DictionaryService,
blockDispatcherFactory,
} from '@subql/node-core';
import { SubstrateDatasource } from '@subql/types';
import { BlockchainService } from '../blockchain.service';
import { SubqueryProject } from '../configure/SubqueryProject';
import { ApiService } from './api.service';
import { ApiPromiseConnection } from './apiPromise.connection';
import { SubstrateDictionaryService } from './dictionary/substrateDictionary.service';
Expand Down Expand Up @@ -56,7 +52,6 @@ import { IIndexerWorker } from './worker/worker';
provide: 'IBlockchainService',
useClass: BlockchainService,
},
/* START: Move to node core */
DsProcessorService,
DynamicDsService,
{
Expand All @@ -67,64 +62,18 @@ import { IIndexerWorker } from './worker/worker';
useClass: ProjectService,
provide: 'IProjectService',
},
/* END: Move to node core */
IndexerManager,
{
provide: 'IBlockDispatcher',
useFactory: (
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
projectService: ProjectService<SubstrateDatasource>,
projectUpgradeService: IProjectUpgradeService,
cacheService: InMemoryCacheService,
storeService: StoreService,
storeModelProvider: IStoreModelProvider,
poiSyncService: PoiSyncService,
project: SubqueryProject,
dynamicDsService: DynamicDsService<SubstrateDatasource>,
unfinalizedBlocks: UnfinalizedBlocksService,
connectionPoolState: ConnectionPoolStateManager<ApiPromiseConnection>,
blockchainService: BlockchainService,
indexerManager: IndexerManager,
monitorService?: MonitorService,
) => {
return nodeConfig.workers
? new WorkerBlockDispatcher<
SubstrateDatasource,
IIndexerWorker,
BlockContent | LightBlockContent,
ApiPromiseConnection
>(
nodeConfig,
eventEmitter,
projectService,
projectUpgradeService,
storeService,
storeModelProvider,
cacheService,
poiSyncService,
dynamicDsService,
unfinalizedBlocks,
connectionPoolState,
project,
blockchainService,
path.resolve(__dirname, '../../dist/indexer/worker/worker.js'),
['syncRuntimeService', 'getSpecFromMap'],
monitorService,
)
: new BlockDispatcher(
nodeConfig,
eventEmitter,
projectService,
projectUpgradeService,
storeService,
storeModelProvider,
poiSyncService,
project,
blockchainService,
indexerManager,
);
},
useFactory: blockDispatcherFactory<
SubstrateDatasource,
BlockContent | LightBlockContent,
ApiPromiseConnection,
IIndexerWorker
>(path.resolve(__dirname, '../../dist/indexer/worker/worker.js'), [
'syncRuntimeService',
'getSpecFromMap',
]),
inject: [
NodeConfig,
EventEmitter2,
Expand Down

0 comments on commit cde9a9a

Please # to comment.