Skip to content

Commit

Permalink
Merge pull request #86 from jonbinney/queue-size
Browse files Browse the repository at this point in the history
use queue_size for publishers
  • Loading branch information
jihoonl committed Mar 27, 2014
2 parents 4fa96b2 + e6d8cbc commit 7e94663
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def __init__(self, client_id, topic):
def unregister(self):
manager.unregister(self.client_id, self.topic)

def register_advertisement(self, msg_type, adv_id=None, latch=False):
def register_advertisement(self, msg_type, adv_id=None, latch=False, queue_size=100):
# Register with the publisher manager, propagating any exception
manager.register(self.client_id, self.topic, msg_type, latch=latch)
manager.register(self.client_id, self.topic, msg_type, latch=latch, queue_size=queue_size)

self.clients[adv_id] = True

Expand Down Expand Up @@ -94,14 +94,15 @@ def advertise(self, message):
topic = message["topic"]
msg_type = message["type"]
latch = message.get("latch", False)
queue_size = message.get("queue_size", 100)

# Create the Registration if one doesn't yet exist
if not topic in self._registrations:
client_id = self.protocol.client_id
self._registrations[topic] = Registration(client_id, topic)

# Register, propagating any exceptions
self._registrations[topic].register_advertisement(msg_type, aid, latch)
self._registrations[topic].register_advertisement(msg_type, aid, latch, queue_size)

def unadvertise(self, message):
# Pull out the ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ def publish(self, message):
self.basic_type_check(message, self.publish_msg_fields)
topic = message["topic"]
latch = message.get("latch", False)
queue_size = message.get("queue_size", 100)

# Register as a publishing client, propagating any exceptions
client_id = self.protocol.client_id
manager.register(client_id, topic, latch=latch)
manager.register(client_id, topic, latch=latch, queue_size=queue_size)
self._published[topic] = True

# Get the message if one was provided
msg = message.get("msg", {})

# Publish the message
manager.publish(client_id, topic, msg, latch=latch)
manager.publish(client_id, topic, msg, latch=latch, queue_size=queue_size)

def finish(self):
client_id = self.protocol.client_id
Expand Down
23 changes: 13 additions & 10 deletions rosbridge_library/src/rosbridge_library/internal/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class MultiPublisher():
Provides an API to publish messages and register clients that are using
this publisher """

def __init__(self, topic, msg_type=None, latched_client_id=None):
def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
""" Register a publisher on the specified topic.
Keyword arguments:
Expand Down Expand Up @@ -164,7 +164,7 @@ def __init__(self, topic, msg_type=None, latched_client_id=None):
self.latched_client_id = latched_client_id
self.topic = topic
self.msg_class = msg_class
self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None))
self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None), queue_size=queue_size)
self.listener = PublisherConsistencyListener()
self.listener.attach(self.publisher)

Expand Down Expand Up @@ -255,17 +255,18 @@ class PublisherManager():
def __init__(self):
self._publishers = {}

def register(self, client_id, topic, msg_type=None, latch=False):
def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
""" Register a publisher on the specified topic.
Publishers are shared between clients, so a single MultiPublisher
instance is created per topic, even if multiple clients register.
Keyword arguments:
client_id -- the ID of the client making this request
topic -- the name of the topic to publish on
msg_type -- (optional) the type to publish
latch -- (optional) whether to make this publisher latched
client_id -- the ID of the client making this request
topic -- the name of the topic to publish on
msg_type -- (optional) the type to publish
latch -- (optional) whether to make this publisher latched
queue_size -- (optional) rospy publisher queue_size to use
Throws:
Exception -- exceptions are propagated from the MultiPublisher if
Expand All @@ -275,7 +276,8 @@ def register(self, client_id, topic, msg_type=None, latch=False):
"""
latched_client_id = client_id if latch else None
if not topic in self._publishers:
self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id)
self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id,
queue_size=queue_size)
elif latch and self._publishers[topic].latched_client_id != client_id:
logwarn("Client ID %s attempted to register topic [%s] as latched " +
"but this topic was previously registered." % (client_id, topic))
Expand Down Expand Up @@ -320,7 +322,7 @@ def unregister_all(self, client_id):
for topic in self._publishers.keys():
self.unregister(client_id, topic)

def publish(self, client_id, topic, msg, latch=False):
def publish(self, client_id, topic, msg, latch=False, queue_size=100):
""" Publish a message on the given topic.
Tries to create a publisher on the topic if one does not already exist.
Expand All @@ -330,14 +332,15 @@ def publish(self, client_id, topic, msg, latch=False):
topic -- the topic to publish the message on
msg -- a JSON-like dict of fields and values
latch -- (optional) whether to make this publisher latched
queue_size -- (optional) rospy publisher queue_size to use
Throws:
Exception -- a variety of exceptions are propagated. They can be
thrown if there is a problem setting up or getting the publisher,
or if the provided msg does not map to the msg class of the publisher.
"""
self.register(client_id, topic, latch=latch)
self.register(client_id, topic, latch=latch, queue_size=queue_size)

self._publishers[topic].publish(msg)

Expand Down

0 comments on commit 7e94663

Please # to comment.