Module amqp

Description

A low-level Chicken Scheme client for the Advanced Message Queueing Protocol v0.9.1.

Limitations

Authors

Fredrik Appelberg (fredrik@appelberg.me)

Repository

https://github.com/fred-o/chicken-amqp

Requirements

Examples

Producer

;; Establish a connection and open a channel
(define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost"))
(define chan (amqp-channel-open conn))

;; Declare a durable topic exchange
(amqp-exchange-declare chan "my-exchange" "topic" durable: 1)

;; Publish a message with a plain text payload
(amqp-publish-message chan "my-exchange" "my-routing-key" "hello, world" '((content-type "text/plain")))

Consumer

;; Establish a connection and open a channel
(define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost"))
(define chan (amqp-channel-open conn))

;; Declare the previously created exchange passively. Not strictly
;; neccessary, but good practice.
(amqp-exchange-declare chan "my-exchange" passive: 1)

;; Declare an anymous queue. The server will assign a random name, so
;; we need to check the return data to get it. 
(let ((q (alist-ref 'queue (amqp-declare-queue chan "" auto-delete: 1))))
  ;; Bind the queue to our exchange
  (amqp-queue-bind chan q "my-exchange" "#")

  ;; Tell the server that we want to start consuming messages
  (amqp-basic-consume chan q)

  ;; Loop forever, recieving messages and printing the payload
  (let loop ()
    (let ((msg (amqp-receive-message chan))
      (print (blob->string (amqp-message-payload msg)))
      (loop)))))

API

Concurrency

Each AMQP connection starts two SRFI-18 threads; one for reading and dispatching incoming frames, and one for handling heartbeats.

All operations use SRFI-18 mutexes to ensure thread safety, and it should be perfectly fine to have multiple threads accessing the same channel. The mailbox egg is used internally when data needs to be passed between threads.

Conditions

AMQP handles errors by simply closing down the offending channel and/or connection. When this happens an amqp condition with reply-text and reply-code properties is raised, and the channel/connection object becomes invalid.

Connections

[procedure] (amqp-connect URL)

Create a new AMQP connection and returns a connection object. The url parameter should have the format amqp://[user][:password@][host]/[vhost]

[procedure] (amqp-disconnect CONNECTION)

Close an AMQP connection.

[constant] amqp-debug
default
(make-parameter #f)

Set amqp-debug to #t to enable detailed debug logging of sent and received frames.

Receiving messages

[procedure] (amqp-receive-message CHANNEL)

Block until the next amqp-message can be read on the given CHANNEL.

The client will receive messages from any of the following AMQP methods:

[record] amqp-message
constructor
(make-amqp-message DELIVERY PROPERTIES PAYLOAD)
predicate
amqp-message?
implementation
define-record
field getter setter
delivery amqp-message-delivery amqp-message-delivery-set!
properties amqp-message-properties amqp-message-properties-set!
payload amqp-message-payload amqp-message-payload-set!

Record that holds received messages. The delivery slot holds an alist of delivery information from the server. The properties slot is an alist of AMQP message properties.

[constant] amqp-payload-conversion
default
(make-parameter bitstring->blob)

The ampq egg uses bitstring to handle binary data internally, but in an attempt to avoid abstraction leakage message payloads returned by amqp-receive-message are converted to standard blobs. If you don't want this behaviour, set the parameter amqp-payload-conversion to either #f or a function that accepts a bitstring and returns the desired payload format.

Publishing messages

[procedure] (amqp-publish-message CHANNEL EXCHANGE ROUTING-KEY PAYLOAD PROPERTIES #!key (MANDATORY 0) (IMMEDIATE 0))

Publish a message. The PAYLOAD can be an u8vector, string, vector or blob. PROPERTIES should be an alist of AMQP message properties.

AMQP Commands

The AMQP command functions map 1:1 to the commands specified by the AMQP specification.

The Channel Class
[procedure] (amqp-channel-open CONNECTION)

Returns a new channel object that is then passed as an argument to the other functions.

[procedure] (amqp-channel-flow CHANNEL ACTIVE)
[procedure] (amqp-channel-close CHANNEL #!key (REPLY-CODE 0) (REPLY-TEXT "") (METHOD-ID 0) (CLASS-ID 0))
[procedure] (amqp-exchange-declare CHANNEL EXCHANGE TYPE #!key (PASSIVE 0) (DURABLE 0) (NO-WAIT 0) (ARGUMENTS '()))
The Exchange Class
[procedure] (amqp-exchange-delete CHANNEL EXCHANGE #!key (IF-UNUSED 0) (NO-WAIT 0))
[procedure] (amqp-queue-declare CHANNEL QUEUE #!key (PASSIVE 0) (DURABLE 0) (EXCLUSIVE 0) (AUTO-DELETE 0) (NO-WAIT 0) (ARGUMENTS '()))
The Queue Class
[procedure] (amqp-queue-bind CHANNEL QUEUE EXCHANGE ROUTING-KEY #!key (NO-WAIT 0) (ARGUMENTS '()))
[procedure] (amqp-queue-purge CHANNEL QUEUE #!key (NO-WAIT 0))
[procedure] (amqp-queue-delete CHANNEL QUEUE #!key (IF-UNUSED 0) (IF-EMPTY 0) (NO-WAIT 0))
[procedure] (amqp-queue-unbind CHANNEL QUEUE EXCHANGE ROUTING-KEY #!key (ARGUMENTS '()))
[procedure] (amqp-basic-qos CHANNEL PREFETCH-SIZE PREFETCH-COUNT GLOBAL)
The Basic Class
[procedure] (amqp-basic-consume CHANNEL QUEUE #!key (CONSUMER-TAG "") (NO-LOCAL 0) (NO-ACK 0) (EXCLUSIVE 0) (NO-WAIT 0) (ARGUMENTS '()))
[procedure] (amqp-basic-cancel CHANNEL CONSUMER-TAG #!key (NO-WAIT 0))
[procedure] (amqp-basic-get CHANNEL QUEUE #!key (NO-ACK 0))
[procedure] (amqp-basic-ack CHANNEL DELIVERY-TAG #!key (MULTIPLE 0))
[procedure] (amqp-basic-reject CHANNEL DELIVERY-TAG #!key (REQUEUE 0))
[procedure] (amqp-basic-recover CHANNEL REQUEUE)
[procedure] (amqp-basic-recover-async CHANNEL REQUEUE)
[procedure] (amqp-tx-select CHANNEL)
The Transaction Class
[procedure] (amqp-tx-commit CHANNEL)
[procedure] (amqp-tx-rollback CHANNEL)

License

BSD

Version History