From 66385ce615451913f066199e3d174c618b3e243d Mon Sep 17 00:00:00 2001 From: Justin Langston Date: Wed, 26 Dec 2018 15:13:04 -0500 Subject: [PATCH] feat(service): support multiple sync nodes failover p2p service support for clusters of bitcore nodes --- packages/bitcore-node/src/models/state.ts | 24 ++++++++++++++++++++ packages/bitcore-node/src/services/p2p.ts | 27 +++++++++++++++++++---- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/packages/bitcore-node/src/models/state.ts b/packages/bitcore-node/src/models/state.ts index cc0e44d3f11..a566dcf72f9 100644 --- a/packages/bitcore-node/src/models/state.ts +++ b/packages/bitcore-node/src/models/state.ts @@ -1,5 +1,6 @@ import { BaseModel } from './base'; import { ObjectID } from 'mongodb'; +import os from 'os'; export type IState = { _id?: ObjectID; @@ -13,6 +14,29 @@ export class State extends BaseModel { allowedPaging = []; onConnect() {} + + async getSingletonState() { + return this.collection.findOneAndUpdate( + {}, + { $setOnInsert: { created: new Date()}}, + { upsert: true } + ); + } + + async getSyncingNode(params: { chain: string, network: string }): Promise { + const { chain, network } = params; + const state = await this.getSingletonState(); + return state.value![`syncingNode:${chain}:${network}`]; + } + + async selfNominateSyncingNode(params: { chain: string, network: string, lastHeartBeat: any }) { + const { chain, network, lastHeartBeat } = params; + const singleState = await this.getSingletonState(); + this.collection.findOneAndUpdate( + { _id: singleState.value!._id, $or: [{ [`syncingNode:${chain}:${network}`]: { $exists: false } }, { [`syncingNode:${chain}:${network}`]: lastHeartBeat }]}, + { $set: { [`syncingNode:${chain}:${network}`]: `${os.hostname}:${process.pid}:${Date.now()}` } } + ); + } } export let StateModel = new State(); diff --git a/packages/bitcore-node/src/services/p2p.ts b/packages/bitcore-node/src/services/p2p.ts index e4fc7e83257..a8ae021f229 100644 --- a/packages/bitcore-node/src/services/p2p.ts +++ b/packages/bitcore-node/src/services/p2p.ts @@ -7,6 +7,7 @@ import { TransactionModel } from '../models/transaction'; import { Bitcoin } from '../types/namespaces/Bitcoin'; import { StateModel } from '../models/state'; import { SpentHeightIndicators } from '../types/Coin'; +import os from 'os'; const Chain = require('../chain'); const LRU = require('lru-cache'); @@ -22,6 +23,7 @@ export class P2pService { private pool: any; private invCache: any; private initialSyncComplete: boolean; + private isSyncingNode: boolean; constructor(params) { const { chain, network, chainConfig } = params; this.chain = chain; @@ -32,6 +34,7 @@ export class P2pService { this.events = new EventEmitter(); this.syncing = false; this.initialSyncComplete = false; + this.isSyncingNode = false; this.invCache = new LRU({ max: 10000 }); this.messages = new this.bitcoreP2p.Messages({ network: this.bitcoreLib.Networks.get(this.network) @@ -76,7 +79,7 @@ export class P2pService { network: this.network, hash }); - if (!this.invCache.get(hash)) { + if (this.isSyncingNode && !this.invCache.get(hash)) { this.processTransaction(message.transaction); this.events.emit('transaction', message.transaction); } @@ -93,7 +96,7 @@ export class P2pService { hash }); - if (!this.invCache.get(hash)) { + if (this.isSyncingNode && !this.invCache.get(hash)) { this.invCache.set(hash); this.events.emit(hash, message.block); this.events.emit('block', message.block); @@ -112,7 +115,7 @@ export class P2pService { }); this.pool.on('peerinv', (peer, message) => { - if (!this.syncing) { + if (this.isSyncingNode && !this.syncing) { const filtered = message.inventory.filter(inv => { const hash = this.bitcoreLib.encoding .BufferReader(inv.hash) @@ -304,6 +307,22 @@ export class P2pService { logger.debug(`Started worker for chain ${this.chain}`); this.setupListeners(); await this.connect(); - this.sync(); + setInterval(async () => { + const syncingNode = await StateModel.getSyncingNode({chain: this.chain, network: this.network}); + const [hostname, pid] = syncingNode.split(':'); + const amSyncingNode = (hostname === os.hostname() && pid === process.pid.toString()); + if (amSyncingNode) { + StateModel.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode }); + if (!this.isSyncingNode) { + logger.info(`This worker is now the syncing node for ${this.chain} ${this.network}`); + this.isSyncingNode = true; + this.sync(); + } + } else { + setTimeout(() => { + StateModel.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode }); + }, 5000) + } + }, 1000); } }