Skip to content

Commit

Permalink
Rework MongoService into MongoConnection component
Browse files Browse the repository at this point in the history
  • Loading branch information
znerol committed May 18, 2016
1 parent e705ebc commit cadf2ca
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 39 deletions.
66 changes: 28 additions & 38 deletions spreadflow_mongodb/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,29 @@ def run(self, reactor):
reactor.callFromThread(self.d.callback, result)
return True

class MongoService(object):
class MongoConnection(object):

reactor = None
_client = None
_connection = None
_queue = None
_thread = None

def __init__(self, db = 'test_database', uri = 'mongodb://localhost:27017/'):
def __init__(self, uri='mongodb://localhost:27017/'):
self.uri = uri
self.db = db

@defer.inlineCallbacks
def attach(self, dispatcher, reactor):
self.reactor = reactor

@defer.inlineCallbacks
def start(self):
self._queue = Queue()
self._thread = threading.Thread(target=self._mongo_worker, args=(self.reactor, self._queue))
self._thread.start()

self._client = yield self.cmd(pymongo.MongoClient, self.uri)

@defer.inlineCallbacks
def join(self):
def detach(self):
if self._thread:
if self._thread.is_alive():
yield self._worker_stop()
Expand All @@ -73,18 +71,23 @@ def join(self):
self._client = None
self._queue = None
self._thread = None

def detach(self):
self.reactor = None

@defer.inlineCallbacks
def __call__(self, item, send):
if item[0] == 'collection':
_, database, collection, name, args, kwargs = item
func = getattr(self._client[database][collection], name)
result = yield self.cmd(func, *args, **kwargs)
send(result, self)
else:
raise RuntimeError('Cannot handle command ' + item[0])

def cmd(self, f, *args, **kwargs):
d = defer.Deferred()
self._queue.put(WorkerJob(d, f, *args, **kwargs))
return d

def collection(self, collection):
return self._client[self.db][collection]

def _worker_stop(self):
d = defer.Deferred()
self._queue.put(WorkerStop(d))
Expand All @@ -98,41 +101,28 @@ def _mongo_worker(reactor, queue):
alive = job.run(reactor)
queue.task_done()

class MongoDestination(object):

_collection = None
class MongoCollectionDeltaSync(object):

def __init__(self, service, collection='test_collection', reset=False):
self.service = service
def __init__(self, database='test_database', collection='test_collection', reset=False):
self.database = database
self.collection = collection
self.reset = reset

@defer.inlineCallbacks
def start(self):
self._collection = self.service.collection(self.collection)
if self.reset:
yield self.service.cmd(self._collection.remove, {})

def join(self):
self._collection = None

@defer.inlineCallbacks
def __call__(self, item, send):
deferreds = []
if (self.reset):
cmd = self._format_cmd('delete_many', {})
send(cmd, self)
self.reset = False

if len(item['deletes']):
deletes = item['deletes'][:]
deferreds.append(self.service.cmd(self._collection.remove, {'_id':{'$in':deletes}}))
query = {'_id': {'$in': item['deletes'][:]}}
cmd = self._format_cmd('delete_many', query)
send(cmd, self)

if len(item['inserts']):
docs = [dict(item['data'][oid].items() + [('_id', oid)]) for oid in item['inserts']]
deferreds.append(self.service.cmd(self._collection.insert, docs))

if len(deferreds):
yield defer.DeferredList(deferreds, fireOnOneErrback=True)

send(item, self)
cmd = self._format_cmd('insert_many', docs)
send(cmd, self)

@property
def dependencies(self):
yield (self, self.service)
def _format_cmd(self, name, *args):
return "collection", self.database, self.collection, name, args, {}
2 changes: 1 addition & 1 deletion spreadflow_mongodb/test/test_mongodb_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from twisted.trial import unittest

from spreadflow_mongodb.proc import MongoService, MongoDestination
from spreadflow_mongodb.proc import MongoConnection, MongoCollectionDeltaSync


class SpreadflowMongoDBTestCase(unittest.TestCase):
Expand Down

0 comments on commit cadf2ca

Please # to comment.