From 372b273c8a5f0d5e0a2a96934b502e5cd41c85f3 Mon Sep 17 00:00:00 2001 From: Justin Langston Date: Sun, 27 Jan 2019 15:40:01 -0500 Subject: [PATCH 1/3] fix(sync): handle sync node going awol --- packages/bitcore-node/src/services/p2p.ts | 44 +++++++++++++---------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/packages/bitcore-node/src/services/p2p.ts b/packages/bitcore-node/src/services/p2p.ts index 1a491dffd06..e32eac35b3a 100644 --- a/packages/bitcore-node/src/services/p2p.ts +++ b/packages/bitcore-node/src/services/p2p.ts @@ -70,7 +70,7 @@ export class P2pWorker { private invCacheLimits: any; private initialSyncComplete: boolean; private isSyncingNode: boolean; - private syncingNodeHeartBeat?: NodeJS.Timer; + private stopping?: boolean; private blockModel: BlockModel; constructor({ chain, network, chainConfig, blockModel = BlockStorage }) { this.blockModel = blockModel; @@ -389,43 +389,51 @@ export class P2pWorker { this.syncing = false; } - registerSyncingNode() { - this.syncingNodeHeartBeat = setInterval(async () => { + async registerSyncingNode() { + while(!this.stopping) { const syncingNode = await StateStorage.getSyncingNode({ chain: this.chain, network: this.network }); if (!syncingNode) { - return StateStorage.selfNominateSyncingNode({ + StateStorage.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode }); + continue; } - const [hostname, pid] = syncingNode.split(':'); - const amSyncingNode = hostname === os.hostname() && pid === process.pid.toString(); + const [hostname, pid, timestamp] = syncingNode.split(':'); + const amSyncingNode = hostname === os.hostname() && pid === process.pid.toString() && Date.now() - parseInt(timestamp) < 1000; if (amSyncingNode) { - StateStorage.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode }); + StateStorage.selfNominateSyncingNode({ + chain: this.chain, + network: this.network, + lastHeartBeat: syncingNode + }); if (!this.isSyncingNode) { logger.info(`This worker is now the syncing node for ${this.chain} ${this.network}`); this.isSyncingNode = true; this.sync(); } } else { - setTimeout(() => { - StateStorage.selfNominateSyncingNode({ - chain: this.chain, - network: this.network, - lastHeartBeat: syncingNode - }); - }, 10000); + if (this.isSyncingNode) { + logger.info(`This worker is no longer syncing node for ${this.chain} ${this.network}`); + this.isSyncingNode = false; + await new Promise(resolve => setTimeout(resolve, 100000)); + } + await new Promise(resolve => setTimeout(resolve, 10000)); + StateStorage.selfNominateSyncingNode({ + chain: this.chain, + network: this.network, + lastHeartBeat: syncingNode + }); } - }, 500); + await new Promise(resolve => setTimeout(resolve, 500)); + } } async stop() { + this.stopping = true; logger.debug(`Stopping worker for chain ${this.chain}`); await this.disconnect(); - if (this.syncingNodeHeartBeat) { - clearInterval(this.syncingNodeHeartBeat); - } } async start() { From 31fb049a7336b1203d4e83ddc62cb4a63b9200b3 Mon Sep 17 00:00:00 2001 From: Micah Riggan Date: Mon, 28 Jan 2019 10:22:22 -0500 Subject: [PATCH 2/3] using wait function --- packages/bitcore-node/src/services/p2p.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/bitcore-node/src/services/p2p.ts b/packages/bitcore-node/src/services/p2p.ts index 3fe30fc74d5..7b4d3a9eae9 100644 --- a/packages/bitcore-node/src/services/p2p.ts +++ b/packages/bitcore-node/src/services/p2p.ts @@ -302,9 +302,7 @@ export class P2pWorker { let parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network }); while (!parentTip || parentTip.height < forkHeight) { logger.info(`Waiting until ${parentChain} syncs before ${chain} ${network}`); - await new Promise(resolve => { - setTimeout(resolve, 5000); - }); + await wait(5000); parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network }); } } @@ -418,16 +416,16 @@ export class P2pWorker { if (this.isSyncingNode) { logger.info(`This worker is no longer syncing node for ${this.chain} ${this.network}`); this.isSyncingNode = false; - await new Promise(resolve => setTimeout(resolve, 100000)); + await wait(100000); } - await new Promise(resolve => setTimeout(resolve, 10000)); + await wait(10000); StateStorage.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode }); } - await new Promise(resolve => setTimeout(resolve, 500)); + await wait(500); } } From dd1a0bdb99a194dae7d6a52183c794aef55ae9ed Mon Sep 17 00:00:00 2001 From: Micah Riggan Date: Mon, 28 Jan 2019 11:43:50 -0500 Subject: [PATCH 3/3] Updating sync to return the current syncing promise, so you can await sync and it will resolve with the sync is done, even if that particular call didn't add any blocks, but some previous call did --- packages/bitcore-node/src/services/p2p.ts | 137 ++++++++++-------- .../test/integration/websocket.integration.ts | 19 ++- 2 files changed, 88 insertions(+), 68 deletions(-) diff --git a/packages/bitcore-node/src/services/p2p.ts b/packages/bitcore-node/src/services/p2p.ts index 7b4d3a9eae9..9457817c91c 100644 --- a/packages/bitcore-node/src/services/p2p.ts +++ b/packages/bitcore-node/src/services/p2p.ts @@ -8,7 +8,7 @@ import { StateStorage } from '../models/state'; import { SpentHeightIndicators } from '../types/Coin'; import os from 'os'; import { Config, ConfigService } from './config'; -import { wait } from "../utils/wait"; +import { wait } from '../utils/wait'; const Chain = require('../chain'); export class P2pManager { @@ -63,7 +63,8 @@ export class P2pWorker { private bitcoreP2p: any; private chainConfig: any; private events: EventEmitter; - private syncing: boolean; + private isSyncing: boolean; + public syncer?: Promise; private messages: any; private pool: any; private connectInterval?: NodeJS.Timer; @@ -81,7 +82,7 @@ export class P2pWorker { this.bitcoreP2p = Chain[this.chain].p2p; this.chainConfig = chainConfig; this.events = new EventEmitter(); - this.syncing = false; + this.isSyncing = false; this.initialSyncComplete = false; this.isSyncingNode = false; this.invCache = {}; @@ -149,7 +150,7 @@ export class P2pWorker { network: this.network, hash }); - if (this.isSyncingNode && !this.isCachedInv(this.bitcoreP2p.Inventory.TYPE.TX, hash) && !this.syncing) { + if (this.isSyncingNode && !this.isCachedInv(this.bitcoreP2p.Inventory.TYPE.TX, hash) && !this.isSyncing) { this.cacheInv(this.bitcoreP2p.Inventory.TYPE.TX, hash); this.processTransaction(message.transaction); this.events.emit('transaction', message.transaction); @@ -170,10 +171,10 @@ export class P2pWorker { if (!blockInCache) { this.cacheInv(this.bitcoreP2p.Inventory.TYPE.BLOCK, hash); } - if (this.isSyncingNode && (!blockInCache || this.syncing)) { + if (this.isSyncingNode && (!blockInCache || this.isSyncing)) { this.events.emit(hash, message.block); this.events.emit('block', message.block); - if (!this.syncing) { + if (!this.isSyncing) { this.sync(); } } @@ -190,7 +191,7 @@ export class P2pWorker { }); this.pool.on('peerinv', (peer, message) => { - if (this.isSyncingNode && !this.syncing) { + if (this.isSyncingNode && !this.isSyncing) { const filtered = message.inventory.filter(inv => { const hash = this.bitcoreLib.encoding .BufferReader(inv.hash) @@ -288,71 +289,78 @@ export class P2pWorker { } async sync() { - if (this.syncing) { - return; - } - this.syncing = true; - const { chain, chainConfig, network } = this; - const { parentChain, forkHeight } = chainConfig; - const state = await StateStorage.collection.findOne({}); - this.initialSyncComplete = - state && state.initialSyncComplete && state.initialSyncComplete.includes(`${chain}:${network}`); - let tip = await ChainStateProvider.getLocalTip({ chain, network }); - if (parentChain && (!tip || tip.height < forkHeight)) { - let parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network }); - while (!parentTip || parentTip.height < forkHeight) { - logger.info(`Waiting until ${parentChain} syncs before ${chain} ${network}`); - await wait(5000); - parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network }); - } + if (this.isSyncing) { + return this.syncer; } + this.isSyncing = true; + this.syncer = new Promise(async (resolve, reject) => { + try { + const { chain, chainConfig, network } = this; + const { parentChain, forkHeight } = chainConfig; + const state = await StateStorage.collection.findOne({}); + this.initialSyncComplete = + state && state.initialSyncComplete && state.initialSyncComplete.includes(`${chain}:${network}`); + let tip = await ChainStateProvider.getLocalTip({ chain, network }); + if (parentChain && (!tip || tip.height < forkHeight)) { + let parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network }); + while (!parentTip || parentTip.height < forkHeight) { + logger.info(`Waiting until ${parentChain} syncs before ${chain} ${network}`); + await wait(5000); + parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network }); + } + } - const getHeaders = async () => { - const locators = await ChainStateProvider.getLocatorHashes({ chain, network }); - return this.getHeaders(locators); - }; + const getHeaders = async () => { + const locators = await ChainStateProvider.getLocatorHashes({ chain, network }); + return this.getHeaders(locators); + }; - let headers = await getHeaders(); - while (headers.length > 0) { - tip = await ChainStateProvider.getLocalTip({ chain, network }); - let currentHeight = tip ? tip.height : 0; - let lastLog = 0; - logger.info(`Syncing ${headers.length} blocks for ${chain} ${network}`); - for (const header of headers) { - try { - const block = await this.getBlock(header.hash); - await this.processBlock(block); - currentHeight++; - if (Date.now() - lastLog > 100) { - logger.info(`Sync `, { - chain, - network, - height: currentHeight - }); - lastLog = Date.now(); + let headers = await getHeaders(); + while (headers.length > 0) { + tip = await ChainStateProvider.getLocalTip({ chain, network }); + let currentHeight = tip ? tip.height : 0; + let lastLog = 0; + logger.info(`Syncing ${headers.length} blocks for ${chain} ${network}`); + for (const header of headers) { + try { + const block = await this.getBlock(header.hash); + await this.processBlock(block); + currentHeight++; + if (Date.now() - lastLog > 100) { + logger.info(`Sync `, { + chain, + network, + height: currentHeight + }); + lastLog = Date.now(); + } + } catch (err) { + logger.error(`Error syncing ${chain} ${network}`, err); + this.isSyncing = false; + return this.sync(); + } } - } catch (err) { - logger.error(`Error syncing ${chain} ${network}`, err); - this.syncing = false; - return this.sync(); + headers = await getHeaders(); } + logger.info(`${chain}:${network} up to date.`); + this.isSyncing = false; + await StateStorage.collection.findOneAndUpdate( + {}, + { $addToSet: { initialSyncComplete: `${chain}:${network}` } }, + { upsert: true } + ); + resolve(); + } catch (e) { + reject(e); } - headers = await getHeaders(); - } - logger.info(`${chain}:${network} up to date.`); - this.syncing = false; - StateStorage.collection.findOneAndUpdate( - {}, - { $addToSet: { initialSyncComplete: `${chain}:${network}` } }, - { upsert: true } - ); - return true; + }); + return this.syncer; } async resync(from: number, to: number) { const { chain, network } = this; let currentHeight = Math.max(1, from); - this.syncing = true; + this.isSyncing = true; while (currentHeight < to) { const locatorHashes = await ChainStateProvider.getLocatorHashes({ chain, @@ -385,11 +393,11 @@ export class P2pWorker { } } } - this.syncing = false; + this.isSyncing = false; } async registerSyncingNode() { - while(!this.stopping) { + while (!this.stopping) { const syncingNode = await StateStorage.getSyncingNode({ chain: this.chain, network: this.network }); if (!syncingNode) { StateStorage.selfNominateSyncingNode({ @@ -400,7 +408,8 @@ export class P2pWorker { continue; } const [hostname, pid, timestamp] = syncingNode.split(':'); - const amSyncingNode = hostname === os.hostname() && pid === process.pid.toString() && Date.now() - parseInt(timestamp) < 1000; + const amSyncingNode = + hostname === os.hostname() && pid === process.pid.toString() && Date.now() - parseInt(timestamp) < 1000; if (amSyncingNode) { StateStorage.selfNominateSyncingNode({ chain: this.chain, diff --git a/packages/bitcore-node/test/integration/websocket.integration.ts b/packages/bitcore-node/test/integration/websocket.integration.ts index f1f28562006..0f60074fcf3 100644 --- a/packages/bitcore-node/test/integration/websocket.integration.ts +++ b/packages/bitcore-node/test/integration/websocket.integration.ts @@ -15,18 +15,31 @@ const chainConfig = config.chains[chain][network]; const creds = chainConfig.rpc; const rpc = new AsyncRPC(creds.username, creds.password, creds.host, creds.port); +let p2pWorker; + describe('Websockets', function() { this.timeout(50000); before(async () => { await resetDatabase(); }); - it('should get a new block when one is generated', async () => { - const p2pWorker = new P2pWorker({ + beforeEach(() => { + p2pWorker = new P2pWorker({ chain, network, chainConfig }); + }); + + afterEach(async () => { + try { + await p2pWorker.stop(); + } catch (e) { + console.log('Error stopping p2p worker'); + } + }); + + it('should get a new block when one is generated', async () => { await p2pWorker.start(); await rpc.generate(5); @@ -46,8 +59,6 @@ describe('Websockets', function() { if (beforeGenTip != null && afterGenTip != null) { expect(beforeGenTip.height).to.be.lt(afterGenTip.height); } - - await p2pWorker.stop(); }); it('should get a websocket event when a block is added', async () => {