Skip to content

Latest commit

 

History

History
123 lines (96 loc) · 3.87 KB

amqp.rst

File metadata and controls

123 lines (96 loc) · 3.87 KB

Shrapnel/AMQP

The AMQP protocol is somewhat different from most protocols in that it uses a half-synchronous, half-asynchronous design that is difficult to provide an interface for: most systems provide concurrency through either blocking code, or non-blocking code; but AMQP requires a bit of each.

Because of this, Shrapnel is a perfect solution: it handles both models easily - underneath its blocking/synchronous exterior beats a heart of pure event-driven high-performance mayhem.

Shrapnel makes it easy to juggle multiple connections, multiple channels, multiple channels per connection, and unravels the hellish complexity of managing RPCs, correlation ids, asynchronous notifications, etc..

Using

Check out the examples in the examples/amqp directory, they show how the client is used within the shrapnel event loop paradigm. Remember to always run within a thread/coroutine -i.e., you can't use this library from the command line! [an exception to this is to use the 'back door' facility which lets you telnet into a python command line, very useful for debugging].

AMQP

.. automodule:: coro.amqp

.. autoclass:: coro.amqp.client
    :members:

.. autoclass:: coro.amqp.channel
    :members:

.. autoclass:: coro.amqp.consumer
    :members:

Example:

c = coro.amqp.client (('guest', 'guest'), '127.0.0.1')
print 'connecting...'
c.go() # i.e., connect...
print 'channel...'
ch = c.channel()
print 'confirm_select...'
ch.confirm_select()
print 'entering send loop'
for i in range (10):
    props = {'content-type':'raw goodness', 'message-id' : 'msg_%d' % (i,)}
    ch.basic_publish ('howdy there!', exchange='ething', routing_key='notification', properties=props)
    print 'sent/confirmed'
    coro.sleep_relative (1)
coro.set_exit()

RPC

.. automodule:: coro.amqp.rpc

.. autoclass:: coro.amqp.rpc.client
    :members:

.. autoclass:: coro.amqp.rpc.server
    :members:

Making an RPC call is easy:

ch = c.channel()
rpc = coro.amqp.rpc.client (ch)
frame, props, reply = rpc.call ({}, '19', '', 'rpc_queue')

In the above example, the properties are empty, the payload is the string '19', the exchange is set to the empty string (indicating to use the default exchange which sends message directly to a particular queue via...) and the routing_key is set to that queue.

Various other rpc architectures are accommodated by passing in an already-created queue object:

ch = c.channel()
queue = ch.queue_declare (exclusive=True).queue
rpc = coro.amqp.rpc.client (ch, queue)
ch.queue_bind (queue=rpc.queue, exchange='pumpkin.exchange', routing_key=rpc.queue)
frame, props, reply = rpc.call (
    {'content-type':'application/json', 'delivery-mode':2},
    json.dumps ({'GetPumpkinData' :{}}),
    exchange='pumpkin.exchange',
    routing_key='request'
    )

Internals

Taking a cue from Pika, the majority of the wire protocol is handled by code that is autogenerated from the RabbitMQ machine-readable JSON specification. The util/codegen.py script generates the file spec.py, which uses the wire.py module to encode and decode AMQP frame data.

I plan to eventually rewrite the wire module and the code generator to emit Cython rather than Python, which should make AMQP/Shrapnel run at near-C speeds.

Indices and tables