A channel implementation inspired by Lamina for Clojure.
Channels are queues with callbacks. Enqueuing a message to a channel will execute all callbacks (or receivers) listening on the channel in the same thread. If no receivers are listening at the time of enqueuing a message, newly registered receivers will be executed with all queued up messages in the registering thread.
It's also possible to fork channels. All messages enqueued to a forked channel's parent channel will also be enqueued to the forked channel but not the other way around. Receiving messages from a forked channel does not affect its parent nor the other way around.
When no messages are queued up in a channel it is empty. Channels may be closed to prevent further messages from being enqueued to them. An empty closed channel is drained.
The API is loosely based on that of Clojure's Lamina channels, see the Lamina wiki for an introduction.
#;1> (use channel) ; loading ... #;2> (define numbers (make-channel 1 2 3 4)) #;3> (channel-receive numbers) 1 #;4> (channel-receive numbers print print) 2 2 #t #;5> (on-channel-receive numbers (lambda (n) (print "got " n))) got 3 got 4 #;6> (channel-enqueue numbers 5) got 5 #t #;7> (define odd-successors (filter-channel (map-channel numbers add1) odd?)) #;8> (on-channel-receive odd-successors (lambda (n) (print "odd " n))) #;9> (channel-enqueue numbers 6 7 8 9) odd 7 got 6 got 7 odd 9 got 8 got 9 #t
For a more elaborate example see this basic implementation of a chat server and client.
API[procedure] (make-channel . messages)
Returns a new channel, optionally prepopulated with messages.[procedure] (channel-enqueue channel message . messages)
Enqueues one or more message[s] to channel.[procedure] (channel-receive channel . receivers)
[procedure] (channel-receive channel #!optional timeout default)
If no receivers are given receives and returns the next message from channel. If channel is empty the current thread will be blocked until a message is enqueued from another thread. If a timeout is given and no message has been received after timeout seconds (may be a flonum) the result of applying the default thunk is returned (#f by default).
If one or more receivers are given they will be called in order with the next message as their sole argument. If channel is not empty, this happens immediately and #t is returned. If channel is empty, #f is returned and the receivers will be called when a new message is enqueued. Note that this might happen in a different thread.[procedure] (on-channel-receive channel receiver)
Registers a permanent receiver on channel which will be called with each message enqueued to channel.[procedure] (channel-receive/delay channel #!optional timeout default)
Starts a blocking channel-receive in a thread and returns a promise which when forced, will either return the received message or block until a message is received. timeout and default have the same meaning as for channel-receive.[procedure] (channel-remove-receiver channel receiver)
Removes receiver (which has to be the same procedure as passed to on-channel-receive) from channel.[procedure] (on-channel-error channel proc)
Registers proc as an error handler on channel. Whenever a receiver raises an exception proc will be called with two arguments: the condition object and the message which made the receiver raise the exception. Note that all error handlers registered on a channel are called for every exception raised. When a message could not be handled successfully by any receiver (i.e. all receivers raised an exception) it remains queued up on the channel.[procedure] (on-channel-close channel thunk)
[procedure] (on-channel-drain channel thunk)
Will call thunk once channel is closed or drained, respectively. If channel is already closed or drained, thunk will be called immediately.[procedure] (channel-messages channel)
Returns all messages queued up in channel as a list.[procedure] (channel-forks channel)
Returns all forks for channel as a list.[procedure] (channel-empty? channel)
Checks whether channel is empty, i.e. contains no messages.[procedure] (close-channel channel)
Close channel to prevent further messages from being enqueued to it.[procedure] (channel-closed? channel)
Checks whether channel is closed, i.e. whether it is possible to enqueue messages to it.[procedure] (channel-drained? channel)
Checks whether channel is drained, i.e. whether it is both empty and closed.[procedure] (fork-channel channel)
Returns a new channel which is a fork of channel. Every message enqueued to channel will also be enqueued to the forked channel but not the other way around. Receiving messages from either channel does not affect the other one. Closing channel will also close the forked channel.[procedure] (siphon-channel source-channel #!optional destination-channel on-receive)
Registers a permanent receiver on source-channel which will call on-receive with two arguments: destination-channel (the default value is a new channel) and the received message. When destination-channel is closed, source-channel will be closed as well unless there are still other receivers registered on it. The default on-receive function is channel-enqueue. Returns destination-channel.[procedure] (fold-channel channel proc seed)
Returns a channel which receives all messages enqueued to channel applying proc to each message and the previous result of proc or seed for the first message (cf. fold in SRFI 1).
The returned channel is a siphoning channel (see siphon-channel).[procedure] (map-channel channel proc)
Returns a channel which receives all messages enqueued to channel applying proc to each of them.
The returned channel is a siphoning channel (see siphon-channel).[procedure] (filter-channel channel pred?)
Returns a channel which receives all messages enqueued to channel which satisfy pred?.
The returned channel is a siphoning channel (see siphon-channel).[procedure] (siphon-input-port port read #!optional channel)
Siphons each message read from port into channel. Returns two values:
- a procedure which can be called to read the next message from port and enqueue it to channel; it will return #t on success and #f when port is at its end.
The latter is useful because by default channel will be a new channel as returned by make-channel.[procedure] (flush-channel-to-output-port channel port write)
Registers a receiver on channel which writes each received message to port using write.
About this egg
The source code is hosted at Bitbucket. Feel free to fork it and send pull requests there.
- Initial release
Copyright (c) 2012, Moritz Heidkamp All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. Neither the name of the author nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.