Wiki
Download
Manual
Eggs
API
Tests
Bugs
show
edit
history
You can edit this page using
wiki syntax
for markup.
Article contents:
== Module amqp === Description A low-level [[https://call-cc.org|Chicken Scheme]] client for the Advanced Message Queueing Protocol v0.9.1. ==== Limitations * Only AMQP 0.9.1 is supported, and 1.0 support is unlikely to be added. * Only PLAIN authentication is supported. * No SSL support. === Authors Fredrik Appelberg (fredrik@appelberg.me) === Repository https://github.com/fred-o/chicken-amqp === Requirements * [[srfi-18]] * [[mailbox]] * [[bitstring]] * [[uri-generic]] === Examples ==== Producer <enscript highlight=#"scheme#">;; Establish a connection and open a channel (define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost")) (define chan (amqp-channel-open conn)) ;; Declare a durable topic exchange (amqp-exchange-declare chan "my-exchange" "topic" durable: 1) ;; Publish a message with a plain text payload (amqp-publish-message chan "my-exchange" "my-routing-key" "hello, world" '((content-type "text/plain")))</enscript> ==== Consumer <enscript highlight=#"scheme#">;; Establish a connection and open a channel (define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost")) (define chan (amqp-channel-open conn)) ;; Declare the previously created exchange passively. Not strictly ;; neccessary, but good practice. (amqp-exchange-declare chan "my-exchange" passive: 1) ;; Declare an anymous queue. The server will assign a random name, so ;; we need to check the return data to get it. (let ((q (alist-ref 'queue (amqp-declare-queue chan "" auto-delete: 1)))) ;; Bind the queue to our exchange (amqp-queue-bind chan q "my-exchange" "#") ;; Tell the server that we want to start consuming messages (amqp-basic-consume chan q) ;; Loop forever, recieving messages and printing the payload (let loop () (let ((msg (amqp-receive-message chan)) (print (blob->string (amqp-message-payload msg))) (loop)))))</enscript> === API ==== Concurrency Each AMQP connection starts two SRFI-18 threads; one for reading and dispatching incoming frames, and one for handling heartbeats. All operations use SRFI-18 mutexes to ensure thread safety, and it should be perfectly fine to have multiple threads accessing the same channel. The {{mailbox}} egg is used internally when data needs to be passed between threads. ==== Conditions AMQP handles errors by simply closing down the offending channel and/or connection. When this happens an ''amqp'' condition with {{reply-text}} and {{reply-code}} properties is raised, and the channel/connection object becomes invalid. ==== Connections <procedure>(amqp-connect URL)</procedure> Create a new AMQP connection and returns a connection object. The {{url}} parameter should have the format {{amqp://[user][:password@][host]/[vhost]}} <procedure>(amqp-disconnect CONNECTION)</procedure> Close an AMQP connection. <constant>amqp-debug</constant> ; default : {{(make-parameter #f)}} Set {{amqp-debug}} to {{#t}} to enable detailed debug logging of sent and received frames. ==== Receiving messages <procedure>(amqp-receive-message CHANNEL)</procedure> Block until the next {{amqp-message}} can be read on the given CHANNEL. The client will receive messages from any of the following AMQP methods: * {{basic.deliver}} (after {{basic.consume}} on the same channel) * {{basic.return}} (for messages sent with {{mandatory: 1}} or {{immediate: 1}} that could not be routed or delivered immediately by the server) * {{basic.get-ok}} (in response to {{basic.get}}) <record>amqp-message</record> ; constructor : {{(make-amqp-message DELIVERY PROPERTIES PAYLOAD)}} ; predicate : {{amqp-message?}} ; implementation : {{define-record}} <table><tr><th>field</th> <th>getter</th> <th>setter</th></tr><tr><td>{{delivery}}</td> <td>{{amqp-message-delivery}}</td> <td>{{amqp-message-delivery-set!}}</td></tr> <tr><td>{{properties}}</td> <td>{{amqp-message-properties}}</td> <td>{{amqp-message-properties-set!}}</td></tr> <tr><td>{{payload}}</td> <td>{{amqp-message-payload}}</td> <td>{{amqp-message-payload-set!}}</td></tr></table> Record that holds received messages. The {{delivery}} slot holds an alist of delivery information from the server. The {{properties}} slot is an alist of AMQP message properties. <constant>amqp-payload-conversion</constant> ; default : {{(make-parameter bitstring->blob)}} The {{ampq}} egg uses {{bitstring}} to handle binary data internally, but in an attempt to avoid abstraction leakage message payloads returned by {{amqp-receive-message}} are converted to standard {{blob}}s. If you don't want this behaviour, set the parameter {{amqp-payload-conversion}} to either #f or a function that accepts a {{bitstring}} and returns the desired payload format. ==== Publishing messages <procedure>(amqp-publish-message CHANNEL EXCHANGE ROUTING-KEY PAYLOAD PROPERTIES #!key (MANDATORY 0) (IMMEDIATE 0))</procedure> Publish a message. The PAYLOAD can be an {{u8vector}}, {{string}}, {{vector}} or {{blob}}. PROPERTIES should be an alist of AMQP message properties. ==== AMQP Commands The AMQP command functions map 1:1 to the commands specified by the AMQP specification. * The first argument to any command function except {{amqp-channel-open}} should be a {{channel}} object. For other arguments, consult the AMQP specification. * All synchronous command functions except {{amqp-channel-open}} return an alist of values sent by the server in the reply command. Consult the AMQP specification for details. * Asynchronous command functions ({{amqp-basic-ack}}, {{amqp-basic-reject}}, etc.) or synchronous functions called with {{no-wait: 1}} return {{(void)}}. * {{amqp-channel-open}} returns a new {{channel}} object. ===== The Channel Class <procedure>(amqp-channel-open CONNECTION)</procedure> Returns a new {{channel}} object that is then passed as an argument to the other functions. <procedure>(amqp-channel-flow CHANNEL ACTIVE)</procedure> <procedure>(amqp-channel-close CHANNEL #!key (REPLY-CODE 0) (REPLY-TEXT "") (METHOD-ID 0) (CLASS-ID 0))</procedure> <procedure>(amqp-exchange-declare CHANNEL EXCHANGE TYPE #!key (PASSIVE 0) (DURABLE 0) (NO-WAIT 0) (ARGUMENTS '()))</procedure> ===== The Exchange Class <procedure>(amqp-exchange-delete CHANNEL EXCHANGE #!key (IF-UNUSED 0) (NO-WAIT 0))</procedure> <procedure>(amqp-queue-declare CHANNEL QUEUE #!key (PASSIVE 0) (DURABLE 0) (EXCLUSIVE 0) (AUTO-DELETE 0) (NO-WAIT 0) (ARGUMENTS '()))</procedure> ===== The Queue Class <procedure>(amqp-queue-bind CHANNEL QUEUE EXCHANGE ROUTING-KEY #!key (NO-WAIT 0) (ARGUMENTS '()))</procedure> <procedure>(amqp-queue-purge CHANNEL QUEUE #!key (NO-WAIT 0))</procedure> <procedure>(amqp-queue-delete CHANNEL QUEUE #!key (IF-UNUSED 0) (IF-EMPTY 0) (NO-WAIT 0))</procedure> <procedure>(amqp-queue-unbind CHANNEL QUEUE EXCHANGE ROUTING-KEY #!key (ARGUMENTS '()))</procedure> <procedure>(amqp-basic-qos CHANNEL PREFETCH-SIZE PREFETCH-COUNT GLOBAL)</procedure> ===== The Basic Class <procedure>(amqp-basic-consume CHANNEL QUEUE #!key (CONSUMER-TAG "") (NO-LOCAL 0) (NO-ACK 0) (EXCLUSIVE 0) (NO-WAIT 0) (ARGUMENTS '()))</procedure> <procedure>(amqp-basic-cancel CHANNEL CONSUMER-TAG #!key (NO-WAIT 0))</procedure> <procedure>(amqp-basic-get CHANNEL QUEUE #!key (NO-ACK 0))</procedure> <procedure>(amqp-basic-ack CHANNEL DELIVERY-TAG #!key (MULTIPLE 0))</procedure> <procedure>(amqp-basic-reject CHANNEL DELIVERY-TAG #!key (REQUEUE 0))</procedure> <procedure>(amqp-basic-recover CHANNEL REQUEUE)</procedure> <procedure>(amqp-basic-recover-async CHANNEL REQUEUE)</procedure> <procedure>(amqp-tx-select CHANNEL)</procedure> ===== The Transaction Class <procedure>(amqp-tx-commit CHANNEL)</procedure> <procedure>(amqp-tx-rollback CHANNEL)</procedure> === License BSD === Version History * 1.0.0 - First public release * 0.9 - Preparing for an 1.0 release
Description of your changes:
I would like to authenticate
Authentication
Username:
Password:
Spam control
What do you get when you multiply 3 by 9?