Skip to content

Commit

Permalink
feat(service): support multiple sync nodes
Browse files Browse the repository at this point in the history
failover p2p service support for clusters of bitcore nodes
  • Loading branch information
nitsujlangston committed Dec 26, 2018
1 parent 4f24286 commit 66385ce
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
24 changes: 24 additions & 0 deletions packages/bitcore-node/src/models/state.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BaseModel } from './base';
import { ObjectID } from 'mongodb';
import os from 'os';

export type IState = {
_id?: ObjectID;
Expand All @@ -13,6 +14,29 @@ export class State extends BaseModel<IState> {
allowedPaging = [];

onConnect() {}

async getSingletonState() {
return this.collection.findOneAndUpdate(
{},
{ $setOnInsert: { created: new Date()}},
{ upsert: true }
);
}

async getSyncingNode(params: { chain: string, network: string }): Promise<string> {
const { chain, network } = params;
const state = await this.getSingletonState();
return state.value![`syncingNode:${chain}:${network}`];
}

async selfNominateSyncingNode(params: { chain: string, network: string, lastHeartBeat: any }) {
const { chain, network, lastHeartBeat } = params;
const singleState = await this.getSingletonState();
this.collection.findOneAndUpdate(
{ _id: singleState.value!._id, $or: [{ [`syncingNode:${chain}:${network}`]: { $exists: false } }, { [`syncingNode:${chain}:${network}`]: lastHeartBeat }]},
{ $set: { [`syncingNode:${chain}:${network}`]: `${os.hostname}:${process.pid}:${Date.now()}` } }
);
}
}

export let StateModel = new State();
27 changes: 23 additions & 4 deletions packages/bitcore-node/src/services/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { TransactionModel } from '../models/transaction';
import { Bitcoin } from '../types/namespaces/Bitcoin';
import { StateModel } from '../models/state';
import { SpentHeightIndicators } from '../types/Coin';
import os from 'os';
const Chain = require('../chain');
const LRU = require('lru-cache');

Expand All @@ -22,6 +23,7 @@ export class P2pService {
private pool: any;
private invCache: any;
private initialSyncComplete: boolean;
private isSyncingNode: boolean;
constructor(params) {
const { chain, network, chainConfig } = params;
this.chain = chain;
Expand All @@ -32,6 +34,7 @@ export class P2pService {
this.events = new EventEmitter();
this.syncing = false;
this.initialSyncComplete = false;
this.isSyncingNode = false;
this.invCache = new LRU({ max: 10000 });
this.messages = new this.bitcoreP2p.Messages({
network: this.bitcoreLib.Networks.get(this.network)
Expand Down Expand Up @@ -76,7 +79,7 @@ export class P2pService {
network: this.network,
hash
});
if (!this.invCache.get(hash)) {
if (this.isSyncingNode && !this.invCache.get(hash)) {
this.processTransaction(message.transaction);
this.events.emit('transaction', message.transaction);
}
Expand All @@ -93,7 +96,7 @@ export class P2pService {
hash
});

if (!this.invCache.get(hash)) {
if (this.isSyncingNode && !this.invCache.get(hash)) {
this.invCache.set(hash);
this.events.emit(hash, message.block);
this.events.emit('block', message.block);
Expand All @@ -112,7 +115,7 @@ export class P2pService {
});

this.pool.on('peerinv', (peer, message) => {
if (!this.syncing) {
if (this.isSyncingNode && !this.syncing) {
const filtered = message.inventory.filter(inv => {
const hash = this.bitcoreLib.encoding
.BufferReader(inv.hash)
Expand Down Expand Up @@ -304,6 +307,22 @@ export class P2pService {
logger.debug(`Started worker for chain ${this.chain}`);
this.setupListeners();
await this.connect();
this.sync();
setInterval(async () => {
const syncingNode = await StateModel.getSyncingNode({chain: this.chain, network: this.network});
const [hostname, pid] = syncingNode.split(':');
const amSyncingNode = (hostname === os.hostname() && pid === process.pid.toString());
if (amSyncingNode) {
StateModel.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode });
if (!this.isSyncingNode) {
logger.info(`This worker is now the syncing node for ${this.chain} ${this.network}`);
this.isSyncingNode = true;
this.sync();
}
} else {
setTimeout(() => {
StateModel.selfNominateSyncingNode({ chain: this.chain, network: this.network, lastHeartBeat: syncingNode });
}, 5000)
}
}, 1000);
}
}

0 comments on commit 66385ce

Please # to comment.