-
Notifications
You must be signed in to change notification settings - Fork 37
Exchange class
The Exchange class defines an AMQP exchange. Normally only created from within a connection with declareExchange()
.
- constructor
- delete
- close
- bind
- unbind
- send
- rpc
- consumerQueueName
- activateConsumer
- stopConsumer
- publish - deprecated: use send instead!
- startConsumer - deprecated: use activateConsumer instead!
A detailed reference explaining the meaning and use of each method and property.
constructor ( connection: Connection,
name: string,
type?: string,
options?: Exchange.DeclarationOptions)
Creates an exchange for a connection. Normally only called from within a connection with
declareExchange()
.
connection: Connection
: Connection this exchange is declared forname: string
: exchange name.type?: string
: exchange type, a valid AMQP exchange type name.options?: Exchange.DeclarationOptions
: exchange options as defined in amqplib. An extra exchange option has been added in v1.3:noCreate
, this connects to an already existing AMQP exchangename
, ignoring the exchange type and all other declaration options.// normally not used directly, but from a connection connection.declareExchange("exchangeName", "amq.topic", {durable: false}); // calls internally var exchange = new Exchange(connection, "exchangeName", "amq.topic", {durable: false}); // expect an existing exchange connection.declareExchange("existingExchangeName", "", {noCreate: true});
exchange.delete (): Promise<void>
Delete the exchange
Promise<void>
: promise that resolves when the exchange is deleted (or an error has occurred).exchange.delete().then(() => { // do things when the exchange is deleted });
exchange.close (): Promise<void>
Close the exchange only in amqp-ts, does not delete a persistent exchange
Promise<void>
: promise that resolves when the exchange is closed (or an error has occurred).exchange.delete().then(() => { // do things when the exchange is deleted });
exchange.bind ( source: Exchange,
pattern?: string,
args?: any)
: Promise<void>
Bind this exchange to another exchange (RabbitMQ extension).
source: Exchange
: source exchange this exchange is connected to.pattern?: string
: pattern that defines which messages will be received, defaults to""
.args?: any
: object containing extra arguments that may be required for the particular exchange type
Promise<Binding>
: promise that resolves when the binding is initialized// normal use destExchange.bind(sourceExchange); // less frequently used, but may be useful in certain situations destExchange.bind(sourceExchange).then((binding) => { // do things when the binding is initialized });
exchange.unbind ( source: Exchange,
pattern?: string,
args?: any)
: Promise<void>
Remove binding.
source: Exchange
: source exchange this exchange is connected to.pattern?: string
: pattern that defines which messages will be received, defaults to""
.args?: any
: object containing extra arguments that may be required for the particular exchange type
Promise<Binding>
: promise that resolves when the binding is removeddestExchange.unbind(sourceExchange).then(() => { // do things when the binding is removed });
exchange.send ( message: Message,
routingKey?: string)
: void
message: Message
: the [message](Message class) to be sent to the exchange.routingKey?: string
: routing key for the message, defaults to""
.import * as Amqp from "amqp-ts"; var message = new Amqp.Message("ExampleMessageString"); exchange.send(message);
exchange.rpc ( requestParameters: any,
routingKey = "")
: Promise<Message>
Execute a RabbitMQ 'direct reply-to' remote procedure call. The return type of this method has changed in version 0.14. It now returns the full message object instead of just the processed message content.
requestParameters: any
: the rpc parameters to be sent to the exchange. the following preprocessing takes place if it is a- Buffer : send the content as is (no preprocessing)
- string : create a Buffer from the string and send that buffer
- everything else : create a Buffer from the to JSON converted object and, if not defined, set the contentType option to
"application/json"
routingKey?: string
: routing key for the message, defaults to""
.
Promise<Message>
: promise that resolves when the result is receivedexchange.rpc("Parameters").then((result) => { console.log("Rpc result: " + result.getContent()); });
exchange.consumerQueueName (): string
Returns a meaningfull unique name for the default consumer queue of the exchange. The default unique names generated by RabbitMQ are rather cryptic for an administrator, this can help.
exchange.activateConsumer ( onMessage: (msg: Message) => any,
options?: Queue.ActivateConsumerOptions)
: Promise<void>
Define the function that can process messages for this exchange. Only one consumer can be active per exchange. Under water it creates a consumer queue with consumerQueueName that is bound to the exchange, from which the messages are read.
onMessage: (msg: Message) => any
: function that processes the messages.options?: Queue.ActivateConsumerOptions
: consumer options as defined in amqplib.
Promise<any>
: promise that resolves when the consumer is started// 'simple' consumer queue.activateConsumer((msg) => { console.log(msg.getContent()); // the preprocessed content of the message received }, {noAck: true}); // message consumer example function consumerFunction(msg) { console.log(msg.getContent()); console.log(msg.fields); console.log(msg.properties); msg.ack(); } queue.activateConsumer(consumerFunction); // simple rpc server queue.activateConsumer((msg) => { var rpcParameters = msg.getContent(); return rpcParameters.value; }); // rpc client var param = { name: "test", value: "This is a test!" } queue.rpc(param).then((result) => { console.log(result.getContent()); // should result in 'This is a test!' } // rpc server that returns a Message queue.activateConsumer((msg) => { var rpcParameters = msg.getContent(); return new Amqp.Message(rpcParameters.value, {}); }); // rpc client var param = { name: "test", value: "This is a test!" } queue.rpc(param).then((result) => { console.log(result.getContent()); // should result in 'This is a test!' }
exchange.stopConsumer (): Promise<void>
Stops the consumer function and deletes the queue and binding created in startConsumer.
Promise<any>
: promise that resolves when the consumer is stoppedexchange.stopConsumer();
exchange.initialized: Promise<Exchange.InitializeResult>
indicates whether the exchange initialization is resolved (or rejected)
exchange.initialized.then((result) => { console.log("Exchange initialized: " + result.exchange); // stuff to do } exchange.initialized.catch((err) => { // something went wrong }
exchange.name: string
name of the exchange (read only)
exchange.type: string
type of the exchange (read only)
exchange.publish - deprecated: use send instead!
exchange.publish ( content: any,
routingKey?: string,
options?: any)
: void
Publish a message to an exchange
content: any
: the content to be sent to the exchange. the following preprocessing takes place if it is a- Buffer : send the content as is (no preprocessing)
- string : create a Buffer from the string and send that buffer
- everything else : create a Buffer from the to JSON converted object and, if not defined, set the contentType option to
"application/json"
routingKey?: string
: routing key for the message, defaults to""
.options?: any
: publish options as defined in amqplib.exchange.publish("ExampleMessageString");
exchange.startConsumer - deprecated: use activateConsumer instead!
exchange.startConsumer ( onMessage: (msg: any, channel?: AmqpLib.Channel) => any,
options?: Queue.StartConsumerOptions)
: Promise<void>
Define the function that can process messages for this exchange. Only one consumer can be active per exchange. Under water it creates a consumerqueue with consumerQueueName that is bound to the exchange, from which the messages are read.
onMessage: (msg: any, channel?: AmqpLib.Channel) => any
: function that processes the messages.options?: Queue.StartConsumerOptions
: consumer options as defined in amqplib. An extra propertyrawMessage
has been added to allow more low level message processing, see [queue.startConsumer](Queue class#startConsumer) for more details.
Promise<any>
: promise that resolves when the consumer is startedexchange.startConsumer((msg) => { console.log(msg); };