From e6d8cbc392e78ea4d2eb5caf37913aed05bab7ee Mon Sep 17 00:00:00 2001 From: Jon Binney Date: Mon, 24 Mar 2014 17:26:16 -0700 Subject: [PATCH] use queue_size for publishers --- .../capabilities/advertise.py | 7 +++--- .../rosbridge_library/capabilities/publish.py | 5 ++-- .../rosbridge_library/internal/publishers.py | 23 +++++++++++-------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise.py index ab30fda96..7c16e0d26 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise.py @@ -54,9 +54,9 @@ def __init__(self, client_id, topic): def unregister(self): manager.unregister(self.client_id, self.topic) - def register_advertisement(self, msg_type, adv_id=None, latch=False): + def register_advertisement(self, msg_type, adv_id=None, latch=False, queue_size=100): # Register with the publisher manager, propagating any exception - manager.register(self.client_id, self.topic, msg_type, latch=latch) + manager.register(self.client_id, self.topic, msg_type, latch=latch, queue_size=queue_size) self.clients[adv_id] = True @@ -94,6 +94,7 @@ def advertise(self, message): topic = message["topic"] msg_type = message["type"] latch = message.get("latch", False) + queue_size = message.get("queue_size", 100) # Create the Registration if one doesn't yet exist if not topic in self._registrations: @@ -101,7 +102,7 @@ def advertise(self, message): self._registrations[topic] = Registration(client_id, topic) # Register, propagating any exceptions - self._registrations[topic].register_advertisement(msg_type, aid, latch) + self._registrations[topic].register_advertisement(msg_type, aid, latch, queue_size) def unadvertise(self, message): # Pull out the ID diff --git a/rosbridge_library/src/rosbridge_library/capabilities/publish.py b/rosbridge_library/src/rosbridge_library/capabilities/publish.py index 6c9e978c8..95601c1ac 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/publish.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/publish.py @@ -54,17 +54,18 @@ def publish(self, message): self.basic_type_check(message, self.publish_msg_fields) topic = message["topic"] latch = message.get("latch", False) + queue_size = message.get("queue_size", 100) # Register as a publishing client, propagating any exceptions client_id = self.protocol.client_id - manager.register(client_id, topic, latch=latch) + manager.register(client_id, topic, latch=latch, queue_size=queue_size) self._published[topic] = True # Get the message if one was provided msg = message.get("msg", {}) # Publish the message - manager.publish(client_id, topic, msg, latch=latch) + manager.publish(client_id, topic, msg, latch=latch, queue_size=queue_size) def finish(self): client_id = self.protocol.client_id diff --git a/rosbridge_library/src/rosbridge_library/internal/publishers.py b/rosbridge_library/src/rosbridge_library/internal/publishers.py index 16f9e3cfe..8680fa153 100644 --- a/rosbridge_library/src/rosbridge_library/internal/publishers.py +++ b/rosbridge_library/src/rosbridge_library/internal/publishers.py @@ -122,7 +122,7 @@ class MultiPublisher(): Provides an API to publish messages and register clients that are using this publisher """ - def __init__(self, topic, msg_type=None, latched_client_id=None): + def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100): """ Register a publisher on the specified topic. Keyword arguments: @@ -164,7 +164,7 @@ def __init__(self, topic, msg_type=None, latched_client_id=None): self.latched_client_id = latched_client_id self.topic = topic self.msg_class = msg_class - self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None)) + self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None), queue_size=queue_size) self.listener = PublisherConsistencyListener() self.listener.attach(self.publisher) @@ -255,17 +255,18 @@ class PublisherManager(): def __init__(self): self._publishers = {} - def register(self, client_id, topic, msg_type=None, latch=False): + def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100): """ Register a publisher on the specified topic. Publishers are shared between clients, so a single MultiPublisher instance is created per topic, even if multiple clients register. Keyword arguments: - client_id -- the ID of the client making this request - topic -- the name of the topic to publish on - msg_type -- (optional) the type to publish - latch -- (optional) whether to make this publisher latched + client_id -- the ID of the client making this request + topic -- the name of the topic to publish on + msg_type -- (optional) the type to publish + latch -- (optional) whether to make this publisher latched + queue_size -- (optional) rospy publisher queue_size to use Throws: Exception -- exceptions are propagated from the MultiPublisher if @@ -275,7 +276,8 @@ def register(self, client_id, topic, msg_type=None, latch=False): """ latched_client_id = client_id if latch else None if not topic in self._publishers: - self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id) + self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id, + queue_size=queue_size) elif latch and self._publishers[topic].latched_client_id != client_id: logwarn("Client ID %s attempted to register topic [%s] as latched " + "but this topic was previously registered." % (client_id, topic)) @@ -320,7 +322,7 @@ def unregister_all(self, client_id): for topic in self._publishers.keys(): self.unregister(client_id, topic) - def publish(self, client_id, topic, msg, latch=False): + def publish(self, client_id, topic, msg, latch=False, queue_size=100): """ Publish a message on the given topic. Tries to create a publisher on the topic if one does not already exist. @@ -330,6 +332,7 @@ def publish(self, client_id, topic, msg, latch=False): topic -- the topic to publish the message on msg -- a JSON-like dict of fields and values latch -- (optional) whether to make this publisher latched + queue_size -- (optional) rospy publisher queue_size to use Throws: Exception -- a variety of exceptions are propagated. They can be @@ -337,7 +340,7 @@ def publish(self, client_id, topic, msg, latch=False): or if the provided msg does not map to the msg class of the publisher. """ - self.register(client_id, topic, latch=latch) + self.register(client_id, topic, latch=latch, queue_size=queue_size) self._publishers[topic].publish(msg)