Skip to content

Commit

Permalink
Merge pull request bitpay#1952 from nitsujlangston/stepDownSyncNode
Browse files Browse the repository at this point in the history
fix(sync): handle sync node going awol
  • Loading branch information
micahriggan authored Jan 28, 2019
2 parents 86a9c34 + dd1a0bd commit bd46b05
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 86 deletions.
179 changes: 97 additions & 82 deletions packages/bitcore-node/src/services/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { StateStorage } from '../models/state';
import { SpentHeightIndicators } from '../types/Coin';
import os from 'os';
import { Config, ConfigService } from './config';
import { wait } from "../utils/wait";
import { wait } from '../utils/wait';
const Chain = require('../chain');

export class P2pManager {
Expand Down Expand Up @@ -63,15 +63,16 @@ export class P2pWorker {
private bitcoreP2p: any;
private chainConfig: any;
private events: EventEmitter;
private syncing: boolean;
private isSyncing: boolean;
public syncer?: Promise<void>;
private messages: any;
private pool: any;
private connectInterval?: NodeJS.Timer;
private invCache: any;
private invCacheLimits: any;
private initialSyncComplete: boolean;
private isSyncingNode: boolean;
private syncingNodeHeartBeat?: NodeJS.Timer;
private stopping?: boolean;
private blockModel: BlockModel;
constructor({ chain, network, chainConfig, blockModel = BlockStorage }) {
this.blockModel = blockModel;
Expand All @@ -81,7 +82,7 @@ export class P2pWorker {
this.bitcoreP2p = Chain[this.chain].p2p;
this.chainConfig = chainConfig;
this.events = new EventEmitter();
this.syncing = false;
this.isSyncing = false;
this.initialSyncComplete = false;
this.isSyncingNode = false;
this.invCache = {};
Expand Down Expand Up @@ -149,7 +150,7 @@ export class P2pWorker {
network: this.network,
hash
});
if (this.isSyncingNode && !this.isCachedInv(this.bitcoreP2p.Inventory.TYPE.TX, hash) && !this.syncing) {
if (this.isSyncingNode && !this.isCachedInv(this.bitcoreP2p.Inventory.TYPE.TX, hash) && !this.isSyncing) {
this.cacheInv(this.bitcoreP2p.Inventory.TYPE.TX, hash);
this.processTransaction(message.transaction);
this.events.emit('transaction', message.transaction);
Expand All @@ -170,10 +171,10 @@ export class P2pWorker {
if (!blockInCache) {
this.cacheInv(this.bitcoreP2p.Inventory.TYPE.BLOCK, hash);
}
if (this.isSyncingNode && (!blockInCache || this.syncing)) {
if (this.isSyncingNode && (!blockInCache || this.isSyncing)) {
this.events.emit(hash, message.block);
this.events.emit('block', message.block);
if (!this.syncing) {
if (!this.isSyncing) {
this.sync();
}
}
Expand All @@ -190,7 +191,7 @@ export class P2pWorker {
});

this.pool.on('peerinv', (peer, message) => {
if (this.isSyncingNode && !this.syncing) {
if (this.isSyncingNode && !this.isSyncing) {
const filtered = message.inventory.filter(inv => {
const hash = this.bitcoreLib.encoding
.BufferReader(inv.hash)
Expand Down Expand Up @@ -288,73 +289,78 @@ export class P2pWorker {
}

async sync() {
if (this.syncing) {
return;
}
this.syncing = true;
const { chain, chainConfig, network } = this;
const { parentChain, forkHeight } = chainConfig;
const state = await StateStorage.collection.findOne({});
this.initialSyncComplete =
state && state.initialSyncComplete && state.initialSyncComplete.includes(`${chain}:${network}`);
let tip = await ChainStateProvider.getLocalTip({ chain, network });
if (parentChain && (!tip || tip.height < forkHeight)) {
let parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network });
while (!parentTip || parentTip.height < forkHeight) {
logger.info(`Waiting until ${parentChain} syncs before ${chain} ${network}`);
await new Promise(resolve => {
setTimeout(resolve, 5000);
});
parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network });
}
if (this.isSyncing) {
return this.syncer;
}
this.isSyncing = true;
this.syncer = new Promise(async (resolve, reject) => {
try {
const { chain, chainConfig, network } = this;
const { parentChain, forkHeight } = chainConfig;
const state = await StateStorage.collection.findOne({});
this.initialSyncComplete =
state && state.initialSyncComplete && state.initialSyncComplete.includes(`${chain}:${network}`);
let tip = await ChainStateProvider.getLocalTip({ chain, network });
if (parentChain && (!tip || tip.height < forkHeight)) {
let parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network });
while (!parentTip || parentTip.height < forkHeight) {
logger.info(`Waiting until ${parentChain} syncs before ${chain} ${network}`);
await wait(5000);
parentTip = await ChainStateProvider.getLocalTip({ chain: parentChain, network });
}
}

const getHeaders = async () => {
const locators = await ChainStateProvider.getLocatorHashes({ chain, network });
return this.getHeaders(locators);
};
const getHeaders = async () => {
const locators = await ChainStateProvider.getLocatorHashes({ chain, network });
return this.getHeaders(locators);
};

let headers = await getHeaders();
while (headers.length > 0) {
tip = await ChainStateProvider.getLocalTip({ chain, network });
let currentHeight = tip ? tip.height : 0;
let lastLog = 0;
logger.info(`Syncing ${headers.length} blocks for ${chain} ${network}`);
for (const header of headers) {
try {
const block = await this.getBlock(header.hash);
await this.processBlock(block);
currentHeight++;
if (Date.now() - lastLog > 100) {
logger.info(`Sync `, {
chain,
network,
height: currentHeight
});
lastLog = Date.now();
let headers = await getHeaders();
while (headers.length > 0) {
tip = await ChainStateProvider.getLocalTip({ chain, network });
let currentHeight = tip ? tip.height : 0;
let lastLog = 0;
logger.info(`Syncing ${headers.length} blocks for ${chain} ${network}`);
for (const header of headers) {
try {
const block = await this.getBlock(header.hash);
await this.processBlock(block);
currentHeight++;
if (Date.now() - lastLog > 100) {
logger.info(`Sync `, {
chain,
network,
height: currentHeight
});
lastLog = Date.now();
}
} catch (err) {
logger.error(`Error syncing ${chain} ${network}`, err);
this.isSyncing = false;
return this.sync();
}
}
} catch (err) {
logger.error(`Error syncing ${chain} ${network}`, err);
this.syncing = false;
return this.sync();
headers = await getHeaders();
}
logger.info(`${chain}:${network} up to date.`);
this.isSyncing = false;
await StateStorage.collection.findOneAndUpdate(
{},
{ $addToSet: { initialSyncComplete: `${chain}:${network}` } },
{ upsert: true }
);
resolve();
} catch (e) {
reject(e);
}
headers = await getHeaders();
}
logger.info(`${chain}:${network} up to date.`);
this.syncing = false;
StateStorage.collection.findOneAndUpdate(
{},
{ $addToSet: { initialSyncComplete: `${chain}:${network}` } },
{ upsert: true }
);
return true;
});
return this.syncer;
}

async resync(from: number, to: number) {
const { chain, network } = this;
let currentHeight = Math.max(1, from);
this.syncing = true;
this.isSyncing = true;
while (currentHeight < to) {
const locatorHashes = await ChainStateProvider.getLocatorHashes({
chain,
Expand Down Expand Up @@ -387,46 +393,55 @@ export class P2pWorker {
}
}
}
this.syncing = false;
this.isSyncing = false;
}

registerSyncingNode() {
this.syncingNodeHeartBeat = setInterval(async () => {
async registerSyncingNode() {
while (!this.stopping) {
const syncingNode = await StateStorage.getSyncingNode({ chain: this.chain, network: this.network });
if (!syncingNode) {
return StateStorage.selfNominateSyncingNode({
StateStorage.selfNominateSyncingNode({
chain: this.chain,
network: this.network,
lastHeartBeat: syncingNode
});
continue;
}
const [hostname, pid] = syncingNode.split(':');
const amSyncingNode = hostname === os.hostname() && pid === process.pid.toString();
const [hostname, pid, timestamp] = syncingNode.split(':');
const amSyncingNode =
hostname === os.hostname() && pid === process.pid.toString() && Date.now() - parseInt(timestamp) < 1000;
if (amSyncingNode) {
StateStorage.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode });
StateStorage.selfNominateSyncingNode({
chain: this.chain,
network: this.network,
lastHeartBeat: syncingNode
});
if (!this.isSyncingNode) {
logger.info(`This worker is now the syncing node for ${this.chain} ${this.network}`);
this.isSyncingNode = true;
this.sync();
}
} else {
setTimeout(() => {
StateStorage.selfNominateSyncingNode({
chain: this.chain,
network: this.network,
lastHeartBeat: syncingNode
});
}, 10000);
if (this.isSyncingNode) {
logger.info(`This worker is no longer syncing node for ${this.chain} ${this.network}`);
this.isSyncingNode = false;
await wait(100000);
}
await wait(10000);
StateStorage.selfNominateSyncingNode({
chain: this.chain,
network: this.network,
lastHeartBeat: syncingNode
});
}
}, 500);
await wait(500);
}
}

async stop() {
this.stopping = true;
logger.debug(`Stopping worker for chain ${this.chain}`);
await this.disconnect();
if (this.syncingNodeHeartBeat) {
clearInterval(this.syncingNodeHeartBeat);
}
}

async start() {
Expand Down
19 changes: 15 additions & 4 deletions packages/bitcore-node/test/integration/websocket.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@ const chainConfig = config.chains[chain][network];
const creds = chainConfig.rpc;
const rpc = new AsyncRPC(creds.username, creds.password, creds.host, creds.port);

let p2pWorker;

describe('Websockets', function() {
this.timeout(50000);
before(async () => {
await resetDatabase();
});

it('should get a new block when one is generated', async () => {
const p2pWorker = new P2pWorker({
beforeEach(() => {
p2pWorker = new P2pWorker({
chain,
network,
chainConfig
});
});

afterEach(async () => {
try {
await p2pWorker.stop();
} catch (e) {
console.log('Error stopping p2p worker');
}
});

it('should get a new block when one is generated', async () => {
await p2pWorker.start();

await rpc.generate(5);
Expand All @@ -46,8 +59,6 @@ describe('Websockets', function() {
if (beforeGenTip != null && afterGenTip != null) {
expect(beforeGenTip.height).to.be.lt(afterGenTip.height);
}

await p2pWorker.stop();
});

it('should get a websocket event when a block is added', async () => {
Expand Down

0 comments on commit bd46b05

Please # to comment.