You are looking at historical revision 26043 of this page. It may differ significantly from its current revision.

channel

A channel implementation inspired by lamina for Clojure.

Overview

Channels are queues with callbacks. Enqueuing a message to a channel will execute all callbacks (or receivers) listening on the channel in the same thread. If no receivers are listening at the time of enqueuing a message, newly registered receivers will be executed with all queued up messages in the registering thread.

It's also possible to fork channels. All messages enqueued to a forked channel's parent channel will also be enqueued to the forked channel but not the other way around. Receiving messages from a forked channel does not affect its parent not the other way around.

When no messages are queued up in a channel it is empty. Channels may be closed to prevent further messages from being enqueued to them. An empty closed channel is drained.

The API is loosely based on that of Clojure's lamina channels, see the lamina wiki for an introduction.

Examples

   #;1> (use channel)
   ; loading ...
   #;2> (define numbers (make-channel 1 2 3 4))
   #;3> (channel-receive! numbers)
   1
   #;4> (channel-receive! numbers print print)
   2
   2
   #t
   #;5> (on-channel-receive numbers (lambda (n) (print "got " n)))
   got 3
   got 4
   #;6> (channel-enqueue! numbers 5)
   got 5
   #t
   #;7> (define odd-successors (channel-filter! (channel-map! numbers add1) odd?))
   #;8> (on-channel-receive odd-successors (lambda (n) (print "odd " n)))
   #;9> (channel-enqueue! numbers 6 7 8 9)
   odd 7
   got 6
   got 7
   odd 9
   got 8
   got 9
   #t

API

[procedure] (make-channel . messages)

Returns a new channel, optionally prepopulated with messages.

[procedure] (channel-enqueue! channel message . messages)

Enqueues one or more message[s] to channel.

[procedure] (channel-receive! channel . receivers)

If no receivers are given receives and returns the next message from channel. If channel is empty the current thread will be blocked until a message is enqueued from another thread.

If one or more receivers are given they will be called in order with the next message as their sole argument. If channel is not empty, this happens immediately and #t is returned. If channel is empty, #f is returned and the receivers will be called when a new message is enqueued. Note that this might happen in a different thread.

[procedure] (on-channel-receive channel receiver)

Registers a permanent receiver on channel which will be called with each message enqueued to channel.

[procedure] (on-channel-close channel thunk)
[procedure] (on-channel-drain channel thunk)

Will call thunk once channel is closed or drained, respectively. If channel is already closed or drained, thunk will be called immediately.

[procedure] (channel-messages channel)

Returns all messages queued up in channel as a list.

[procedure] (channel-empty? channel)

Checks whether channel contains no messages, i.e. it is empty.

[procedure] (channel-close! channel)

Close channel to prevent further messages from being enqueued to it.

[procedure] (channel-closed?)

Checks whether channel is closed, i.e. whether it is possible to enqueue messages to it.

[procedure] (channel-drained? channel)

Checks whether channel is drained, i.e. whether it is both empty and closed.

[procedure] (channel-fork! channel)

Returns a new channel which is a fork of channel. Every message enqueued to channel will also be enqueued to the forked channel but not the other way around. Receiving messages from either channel does not affect the other one. Closing channel will also close the forked channel.

[procedure] (make-receiving-channel source-channel on-receive)

Registers a permanent receiver on source-channel which will call on-receive with two arguments: a channel (receiving-channel) and the received message. When receiving-channel is closed, source-channel will be closed as well. Returns receiving-channel.

[procedure] (channel-fold! channel proc seed)

Returns a channel which receives all messages enqueued to channel applying proc to each message and the previous result of proc or seed for the first message (cf. fold in SRFI 1).

[procedure] (channel-map! channel proc)

Returns a channel which receives all messages enqueued to channel applying proc to each of them.

[procedure] (channel-filter! channel pred?)

Returns a channel which receives all messages enqueued to channel which satisfy pred?.

Author

Moritz Heidkamp

Version history

0.1
Initial release

License

 Copyright (c) 2010-2011, Moritz Heidkamp
 All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
 modification, are permitted provided that the following conditions are
 met:
 
 Redistributions of source code must retain the above copyright
 notice, this list of conditions and the following disclaimer.
 
 Redistributions in binary form must reproduce the above copyright
 notice, this list of conditions and the following disclaimer in the
 documentation and/or other materials provided with the distribution.
 
 Neither the name of the author nor the names of its contributors may
 be used to endorse or promote products derived from this software
 without specific prior written permission.
 
 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 OF THE POSSIBILITY OF SUCH DAMAGE.