Module amqp
Description
A low-level Chicken Scheme client for the Advanced Message Queueing Protocol v0.9.1.
Limitations
- Only AMQP 0.9.1 is supported, and 1.0 support is unlikely to be added.
- Only PLAIN authentication is supported.
- No SSL support.
Authors
Fredrik Appelberg (fredrik@appelberg.me)
Repository
https://github.com/fred-o/chicken-amqp
Requirements
Examples
Producer
;; Establish a connection and open a channel (define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost")) (define chan (amqp-channel-open conn)) ;; Declare a durable topic exchange (amqp-exchange-declare chan "my-exchange" "topic" durable: 1) ;; Publish a message with a plain text payload (amqp-publish-message chan "my-exchange" "my-routing-key" "hello, world" '((content-type "text/plain")))
Consumer
;; Establish a connection and open a channel (define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost")) (define chan (amqp-channel-open conn)) ;; Declare the previously created exchange passively. Not strictly ;; neccessary, but good practice. (amqp-exchange-declare chan "my-exchange" passive: 1) ;; Declare an anymous queue. The server will assign a random name, so ;; we need to check the return data to get it. (let ((q (alist-ref 'queue (amqp-declare-queue chan "" auto-delete: 1)))) ;; Bind the queue to our exchange (amqp-queue-bind chan q "my-exchange" "#") ;; Tell the server that we want to start consuming messages (amqp-basic-consume chan q) ;; Loop forever, recieving messages and printing the payload (let loop () (let ((msg (amqp-receive-message chan)) (print (blob->string (amqp-message-payload msg))) (loop)))))
API
Concurrency
Each AMQP connection starts two SRFI-18 threads; one for reading and dispatching incoming frames, and one for handling heartbeats.
All operations use SRFI-18 mutexes to ensure thread safety, and it should be perfectly fine to have multiple threads accessing the same channel. The mailbox egg is used internally when data needs to be passed between threads.
Conditions
AMQP handles errors by simply closing down the offending channel and/or connection. When this happens an amqp condition with reply-text and reply-code properties is raised, and the channel/connection object becomes invalid.
Connections
[procedure] (amqp-connect URL)Create a new AMQP connection and returns a connection object. The url parameter should have the format amqp://[user][:password@][host]/[vhost]
[procedure] (amqp-disconnect CONNECTION)Close an AMQP connection.
[constant] amqp-debug- default
- (make-parameter #f)
Set amqp-debug to #t to enable detailed debug logging of sent and received frames.
Receiving messages
[procedure] (amqp-receive-message CHANNEL)Block until the next amqp-message can be read on the given CHANNEL.
The client will receive messages from any of the following AMQP methods:
- basic.deliver (after basic.consume on the same channel)
- basic.return (for messages sent with mandatory: 1 or immediate: 1 that could not be routed or delivered immediately by the server)
- basic.get-ok (in response to basic.get)
- constructor
- (make-amqp-message DELIVERY PROPERTIES PAYLOAD)
- predicate
- amqp-message?
- implementation
- define-record
field | getter | setter |
---|---|---|
delivery | amqp-message-delivery | amqp-message-delivery-set! |
properties | amqp-message-properties | amqp-message-properties-set! |
payload | amqp-message-payload | amqp-message-payload-set! |
Record that holds received messages. The delivery slot holds an alist of delivery information from the server. The properties slot is an alist of AMQP message properties.
[constant] amqp-payload-conversion- default
- (make-parameter bitstring->blob)
The ampq egg uses bitstring to handle binary data internally, but in an attempt to avoid abstraction leakage message payloads returned by amqp-receive-message are converted to standard blobs. If you don't want this behaviour, set the parameter amqp-payload-conversion to either #f or a function that accepts a bitstring and returns the desired payload format.
Publishing messages
[procedure] (amqp-publish-message CHANNEL EXCHANGE ROUTING-KEY PAYLOAD PROPERTIES #!key (MANDATORY 0) (IMMEDIATE 0))Publish a message. The PAYLOAD can be an u8vector, string, vector or blob. PROPERTIES should be an alist of AMQP message properties.
AMQP Commands
The AMQP command functions map 1:1 to the commands specified by the AMQP specification.
- The first argument to any command function except amqp-channel-open should be a channel object. For other arguments, consult the AMQP specification.
- All synchronous command functions except amqp-channel-open return an alist of values sent by the server in the reply command. Consult the AMQP specification for details.
- Asynchronous command functions (amqp-basic-ack, amqp-basic-reject, etc.) or synchronous functions called with no-wait: 1 return (void).
- amqp-channel-open returns a new channel object.
The Channel Class
[procedure] (amqp-channel-open CONNECTION)Returns a new channel object that is then passed as an argument to the other functions.
[procedure] (amqp-channel-flow CHANNEL ACTIVE)[procedure] (amqp-channel-close CHANNEL #!key (REPLY-CODE 0) (REPLY-TEXT "") (METHOD-ID 0) (CLASS-ID 0))
[procedure] (amqp-exchange-declare CHANNEL EXCHANGE TYPE #!key (PASSIVE 0) (DURABLE 0) (NO-WAIT 0) (ARGUMENTS '()))
The Exchange Class
[procedure] (amqp-exchange-delete CHANNEL EXCHANGE #!key (IF-UNUSED 0) (NO-WAIT 0))[procedure] (amqp-queue-declare CHANNEL QUEUE #!key (PASSIVE 0) (DURABLE 0) (EXCLUSIVE 0) (AUTO-DELETE 0) (NO-WAIT 0) (ARGUMENTS '()))
The Queue Class
[procedure] (amqp-queue-bind CHANNEL QUEUE EXCHANGE ROUTING-KEY #!key (NO-WAIT 0) (ARGUMENTS '()))[procedure] (amqp-queue-purge CHANNEL QUEUE #!key (NO-WAIT 0))
[procedure] (amqp-queue-delete CHANNEL QUEUE #!key (IF-UNUSED 0) (IF-EMPTY 0) (NO-WAIT 0))
[procedure] (amqp-queue-unbind CHANNEL QUEUE EXCHANGE ROUTING-KEY #!key (ARGUMENTS '()))
[procedure] (amqp-basic-qos CHANNEL PREFETCH-SIZE PREFETCH-COUNT GLOBAL)
The Basic Class
[procedure] (amqp-basic-consume CHANNEL QUEUE #!key (CONSUMER-TAG "") (NO-LOCAL 0) (NO-ACK 0) (EXCLUSIVE 0) (NO-WAIT 0) (ARGUMENTS '()))[procedure] (amqp-basic-cancel CHANNEL CONSUMER-TAG #!key (NO-WAIT 0))
[procedure] (amqp-basic-get CHANNEL QUEUE #!key (NO-ACK 0))
[procedure] (amqp-basic-ack CHANNEL DELIVERY-TAG #!key (MULTIPLE 0))
[procedure] (amqp-basic-reject CHANNEL DELIVERY-TAG #!key (REQUEUE 0))
[procedure] (amqp-basic-recover CHANNEL REQUEUE)
[procedure] (amqp-basic-recover-async CHANNEL REQUEUE)
[procedure] (amqp-tx-select CHANNEL)
The Transaction Class
[procedure] (amqp-tx-commit CHANNEL)[procedure] (amqp-tx-rollback CHANNEL)
License
BSD
Version History
- 1.0.0 - First public release
- 0.9 - Preparing for an 1.0 release