Skip to content

Commit

Permalink
Trying to fix #1
Browse files Browse the repository at this point in the history
Fixed some other issues and improved tests
  • Loading branch information
claustres committed Sep 8, 2017
1 parent 7c0e0ff commit 236fc90
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 48 deletions.
20 changes: 18 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,39 @@ export default function init () {

// This publisher publishes an event each time a local app service is registered
app.servicePublisher = new cote.Publisher({
name: 'feathers services publisher #' + app.uuid,
name: 'feathers services publisher',
namespace: 'services',
broadcasts: ['service']
});
// Also each time a new node pops up so that it does not depend of the initialization order of the apps
app.servicePublisher.on('cote:added', data => {
// console.log(data)
Object.entries(app.services).forEach(([path, service]) => {
app.servicePublisher.publish('service', { uuid: app.uuid, path });
debug('Republished local service on path ' + path);
});
});
// This subscriber listen to an event each time a remote app service has been registered
app.serviceSubscriber = new cote.Subscriber({
name: 'feathers services subscriber #' + app.uuid,
name: 'feathers services subscriber',
namespace: 'services',
subscribesTo: ['service']
});
app.servicePublisher.on('cote:added', data => {
// console.log(data)
});
// When a remote service is declared create the local proxy interface to it
app.serviceSubscriber.on('service', (serviceDescriptor) => {
// Do not register our own services
if (serviceDescriptor.uuid === app.uuid) {
debug('Do not register service as remote on path ' + serviceDescriptor.path);
return;
}
// Skip already registered services
if (app.service(serviceDescriptor.path)) {
debug('Already registered service as remote on path ' + serviceDescriptor.path);
return;
}
app.use(serviceDescriptor.path, new RemoteService(serviceDescriptor));
debug('Registered remote service on path ' + serviceDescriptor.path);
// dispatch an event internally through node so that async processes can run
Expand Down
22 changes: 11 additions & 11 deletions src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ class RemoteService {
setup (app, path) {
// Create the request manager to remote ones for this service
this.requester = new cote.Requester({
name: path + ' requester #' + app.uuid,
name: path + ' requester',
namespace: path,
requests: ['find', 'get', 'create', 'update', 'patch', 'remove']
});
this.path = path;
debug('Requester created for remote service on path ' + this.path);
// Create the subscriber to listen to events from other nodes
this.serviceEventsSubscriber = new cote.Subscriber({
name: path + ' events subscriber #' + app.uuid,
name: path + ' events subscriber',
namespace: path,
subscribesTo: ['created', 'updated', 'patched', 'removed']
});
Expand Down Expand Up @@ -67,15 +67,15 @@ class RemoteService {

update (id, data, params) {
debug('Requesting update() remote service on path ' + this.path);
return this.requester.send({ type: 'update', id, params }).then(result => {
return this.requester.send({ type: 'update', id, data, params }).then(result => {
debug('Successfully update() remote service on path ' + this.path);
return result;
});
}

patch (id, data, params) {
debug('Requesting patch() remote service on path ' + this.path);
return this.requester.send({ type: 'patch', id, params }).then(result => {
return this.requester.send({ type: 'patch', id, data, params }).then(result => {
debug('Successfully patch() remote service on path ' + this.path);
return result;
});
Expand All @@ -95,7 +95,7 @@ class LocalService extends cote.Responder {
constructor (options) {
const app = options.app;
const path = options.path;
super({ name: path + ' responder #' + app.uuid, namespace: path, respondsTo: ['find', 'get', 'create', 'update', 'patch', 'remove'] });
super({ name: path + ' responder', namespace: path, respondsTo: ['find', 'get', 'create', 'update', 'patch', 'remove'] });
debug('Responder created for local service on path ' + path);
let service = app.service(path);

Expand All @@ -109,43 +109,43 @@ class LocalService extends cote.Responder {
});
this.on('get', (req) => {
debug('Responding get() remote service on path ' + path);
service.get(req.id, req.params).then((result) => {
return service.get(req.id, req.params).then((result) => {
debug('Successfully get() local service on path ' + path);
return result;
});
});
this.on('create', (req) => {
debug('Responding create() remote service on path ' + path);
service.create(req.data, req.params).then((result) => {
return service.create(req.data, req.params).then((result) => {
debug('Successfully create() local service on path ' + path);
return result;
});
});
this.on('update', (req) => {
debug('Responding update() remote service on path ' + path);
service.update(req.id, req.data, req.params).then((result) => {
return service.update(req.id, req.data, req.params).then((result) => {
debug('Successfully update() local service on path ' + path);
return result;
});
});
this.on('patch', (req) => {
debug('Responding patch() remote service on path ' + path);
service.patch(req.id, req.data, req.params).then((result) => {
return service.patch(req.id, req.data, req.params).then((result) => {
debug('Successfully patch() local service on path ' + path);
return result;
});
});
this.on('remove', (req) => {
debug('Responding remove() remote service on path ' + path);
service.remove(req.id, req.params).then((result) => {
return service.remove(req.id, req.params).then((result) => {
debug('Successfully remove() local service on path ' + path);
return result;
});
});

// Dispatch events to other nodes
this.serviceEventsPublisher = new cote.Publisher({
name: path + ' events publisher #' + app.uuid,
name: path + ' events publisher',
namespace: path,
broadcasts: ['created', 'updated', 'patched', 'removed']
});
Expand Down
171 changes: 136 additions & 35 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ function clone (obj) {
}

describe('feathers-distributed', () => {
let firstApp, secondApp, firstServer, secondServer, localService, remoteService,
firstClient, secondClient, localClientService, remoteClientService;
let apps = [];
let servers = [];
let services = [];
let clients = [];
let clientServices = [];
const nbApps = 3;

function createApp () {
let app = feathers();
Expand All @@ -44,72 +48,169 @@ describe('feathers-distributed', () => {

before(() => {
chailint(chai, util);
firstApp = createApp();
secondApp = createApp();
for (let i = 0; i < nbApps; i++) {
apps[i] = createApp();
}
});

it('is CommonJS compatible', () => {
expect(typeof plugin).to.equal('function');
});

it('registers the plugin/services', (done) => {
firstApp.configure(plugin);
firstApp.use('users', memory({ store: clone(store), startId }));
localService = firstApp.service('users');
expect(localService).toExist();
firstServer = firstApp.listen(8081);
firstServer.once('listening', _ => {
secondApp.configure(plugin);
secondApp.on('service', data => {
function waitForService (apps, services, i) {
return new Promise((resolve, reject) => {
apps[i].on('service', data => {
if (data.path === 'users') {
remoteService = secondApp.service('users');
expect(remoteService).toExist();
secondServer = secondApp.listen(8082);
secondServer.once('listening', _ => done());
services[i] = apps[i].service('users');
expect(services[i]).toExist();
resolve(data.path);
}
});
});
}
function waitForListen (server) {
return new Promise((resolve, reject) => {
server.once('listening', _ => resolve());
});
}

it('registers the plugin/services', () => {
let promises = [];
for (let i = 0; i < nbApps; i++) {
apps[i].configure(plugin);
// Only the first app has a local service
if (i === 0) {
apps[i].use('users', memory({ store: clone(store), startId }));
services[i] = apps[i].service('users');
expect(services[i]).toExist();
} else {
// For remote services we have to wait they are registered
promises.push(waitForService(apps, services, i));
}
}
return Promise.all(promises)
.then(pathes => {
promises = [];
for (let i = 0; i < nbApps; i++) {
servers[i] = apps[i].listen(8080 + i);
promises.push(waitForListen(servers[i]));
}
return Promise.all(promises);
});
})
// Let enough time to process
.timeout(10000);

it('initiate the clients', () => {
firstClient = client().configure(socketioClient(io('http://localhost:8081')));
expect(firstClient).toExist();
// The first client will target the first app
localClientService = firstClient.service('users');
expect(localClientService).toExist();
secondClient = client().configure(socketioClient(io('http://localhost:8082')));
expect(secondClient).toExist();
// The second client will target the first app through the second one
remoteClientService = secondClient.service('users');
expect(remoteClientService).toExist();
for (let i = 0; i < nbApps; i++) {
const url = 'http://localhost:' + (8080 + i);
clients[i] = client().configure(socketioClient(io(url)));
expect(clients[i]).toExist();
clientServices[i] = clients[i].service('users');
expect(clientServices[i]).toExist();
}
})
// Let enough time to process
.timeout(10000);

it('dispatch service calls from remote to local', () => {
return remoteClientService.find({})
it('dispatch find service calls from remote to local', () => {
return clientServices[1].find({})
.then(users => {
expect(users.length > 0).beTrue();
});
})
// Let enough time to process
.timeout(5000);

it('dispatch service events from local to remote', (done) => {
remoteClientService.on('created', user => {
expect(user).toExist();
it('dispatch get service calls from remote to local', () => {
return clientServices[1].get(1)
.then(user => {
expect(user.id === 1).beTrue();
});
})
// Let enough time to process
.timeout(5000);

it('dispatch create service calls from remote to local', () => {
return clientServices[1].create({ name: 'Donald Doe' })
.then(user => {
expect(user.id === startId).beTrue();
});
})
// Let enough time to process
.timeout(5000);

it('dispatch update service calls from remote to local', () => {
return clientServices[1].update(startId, { name: 'Donald Dover' })
.then(user => {
expect(user.name === 'Donald Dover').beTrue();
});
})
// Let enough time to process
.timeout(5000);

it('dispatch patch service calls from remote to local', () => {
return clientServices[1].patch(startId, { name: 'Donald Doe' })
.then(user => {
expect(user.name === 'Donald Doe').beTrue();
});
})
// Let enough time to process
.timeout(5000);

it('dispatch remove service calls from remote to local', () => {
return clientServices[1].remove(startId)
.then(user => {
expect(user.id === startId).beTrue();
});
})
// Let enough time to process
.timeout(5000);

it('dispatch create service events from local to remote', (done) => {
clientServices[2].on('created', user => {
expect(user.id === startId + 1).beTrue();
done();
});
clientServices[0].create({ name: 'Donald Doe' });
})
// Let enough time to process
.timeout(5000);

it('dispatch update service events from local to remote', (done) => {
clientServices[2].on('updated', user => {
expect(user.name === 'Donald Dover').beTrue();
done();
});
clientServices[0].update(startId + 1, { name: 'Donald Dover' });
})
// Let enough time to process
.timeout(5000);

it('dispatch patch service events from local to remote', (done) => {
clientServices[2].on('patched', user => {
expect(user.name === 'Donald Doe').beTrue();
done();
});
clientServices[0].patch(startId + 1, { name: 'Donald Doe' });
})
// Let enough time to process
.timeout(5000);

it('dispatch remove service events from local to remote', (done) => {
clientServices[2].on('removed', user => {
expect(user.id === startId + 1).beTrue();
done();
});
localClientService.create({ name: 'Donald Doe' });
clientServices[0].remove(startId + 1);
})
// Let enough time to process
.timeout(5000);

// Cleanup
after(() => {
if (firstServer) firstServer.close();
if (secondServer) secondServer.close();
for (let i = 0; i < nbApps; i++) {
servers[i].close();
}
});
});

0 comments on commit 236fc90

Please # to comment.