Skip to content

chore: Optimize garbage collection for Topology instance #33

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Apr 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,7 @@ export class Crawler {
this.client.on('streamCreated', this.onStreamCreated)
let iterationIndex = 0
while (true) {
try {
const topology = await crawlTopology(
networkNodeFacade,
this.client.getEntryPoints(),
(nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer.neighbors,
`full-${Date.now()}`
)
await this.nodeRepository.replaceNetworkTopology(topology)
await this.analyzeContractStreams(topology, this.subscribeGate)
} catch (e) {
logger.error('Error', { err: e })
await wait(RECOVERY_DELAY)
}
logger.info('Crawl iteration completed')
await this.runCrawlIteration(networkNodeFacade)
if ((iterationCount === undefined) || (iterationIndex < iterationCount - 1)) {
await wait(this.config.crawler.iterationDelay)
iterationIndex++
Expand All @@ -158,9 +145,25 @@ export class Crawler {
}
}

private async runCrawlIteration(networkNodeFacade: NetworkNodeFacade): Promise<void> {
try {
const topology = await crawlTopology(
networkNodeFacade,
this.client.getEntryPoints(),
(nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer.neighbors,
`full-${Date.now()}`
)
await this.nodeRepository.replaceNetworkTopology(topology)
await this.analyzeContractStreams(topology)
} catch (e) {
logger.error('Error', { err: e })
await wait(RECOVERY_DELAY)
}
logger.info('Crawl iteration completed')
}

private async analyzeContractStreams(
topology: Topology,
subscribeGate: SubscribeGate
topology: Topology
): Promise<void> {
// wrap this.client.getAllStreams() with retry because in streamr-docker-dev environment
// the graph-node dependency may not be available immediately after the service has
Expand All @@ -176,7 +179,7 @@ export class Crawler {
const workedThreadLimit = pLimit(MAX_SUBSCRIPTION_COUNT)
await Promise.all(sortedContractStreams.map((stream: Stream) => {
return workedThreadLimit(async () => {
await this.analyzeStream(stream.id, await stream.getMetadata(), topology, subscribeGate)
await this.analyzeStream(stream.id, await stream.getMetadata(), topology)
})
}))

Expand All @@ -186,8 +189,7 @@ export class Crawler {
private async analyzeStream(
id: StreamID,
metadata: StreamMetadata,
topology: Topology,
subscribeGate: SubscribeGate
topology: Topology
): Promise<void> {
logger.info(`Analyze ${id}`)
const peersByPartition = new Map<number, Set<DhtAddress>>
Expand All @@ -204,7 +206,7 @@ export class Crawler {
[...peersByPartition.keys()],
isPublicStream(subscriberCount),
await this.client.getNetworkNodeFacade(),
subscribeGate,
this.subscribeGate!,
this.config
)
: { messagesPerSecond: 0, bytesPerSecond: 0 }
Expand Down Expand Up @@ -275,7 +277,7 @@ export class Crawler {
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor))).flat()
}, `stream-${payload.streamId}-${Date.now()}`)
// TODO could add new nodes and neighbors to NodeRepository?
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)
await this.analyzeStream(payload.streamId, payload.metadata, topology)
} catch (e: any) {
logger.error(`Failed to handle new stream ${payload.streamId}`, e)
}
Expand Down