RPC

Description

A flexible peer-to-peer RPC system.

Author

Thomas Chust

Requires

Documentation

This egg is a thin but flexible layer on top of tcp-server and s11n providing remote-procedure-call based communications. Special support for callbacks is provided, which makes the interface more a peer-to-peer than a client-server solution.

Managing published procedures

[procedure] (rpc:publish-procedure! name procedure #!optional (callback-outgoing? #t)) => <void>

Registers procedure to be callable by incoming RPC requests under the name name.

Names of procedures are matched using equal?.

If callback-outgoing? is true, a reverse lookup entry associating the procedure with its name is also created. The table of reverse lookup entries is used by rpc:procedure to send a callback stub to the remote machine instead of the procedure itself, should the procedure be one of the parameters of a RPC call.

[procedure] (rpc:withdraw-procedure! name-or-procedure) => <void>

Unregisters the given name-or-procedure as an externally callable object. If a procedure is passed for the name-or-procedure parameter, it can only successfully be removed if a reverse lookup entry for this procedure exists.

As mutex lock intervals are kept as short as possible, it may happen, that a currently active server thread calls the procedure once more immediately after its removal before it becomes completely unavailable to the outside world.

Managing connections

[parameter] rpc:default-server-port

The standard port number to establish RPC connections to. The default value is 29296.

[parameter] rpc:connect-procedure

The procedure used to establish network connections for RPC. Defaults to tcp-connect and must be signature-compatible with it.

[procedure] (rpc:is-connected? host #!optional (port (rpc:default-server-port))) => <boolean>

Determines whether an RPC connection to the given host and port is active. The table of active connections is thread-local.

[procedure] (rpc:get-connection host #!optional (port (rpc:default-server-port))) => <input-port>, <output-port>

Retrieves an existing RPC connection to the given host and port or creates a new one. The table of active connections is thread-local.

[procedure] (rpc:close-connection! host #!optional (port (rpc:default-server-port))) => <void>

Closes an existing RPC connection to the given host and port. Fails if no such connection exists. The table of active connections is thread-local.

[procedure] (rpc:close-all-connections!) => <void>

Closes all existing RPC connections of the current thread.

Client and server frontend

[parameter] rpc:current-peer

Inside the server threads processing RPC requests, this parameter is set to the address (as a string) of the peer on behalf of which the thread is executing.

Consider this parameter read-only unless you really know what you are doing. You may seriously mess up communications otherwise.

[procedure] (rpc:procedure name host #!optional (port (rpc:default-server-port))) => <procedure>

Creates a procedure that can be called with any number of parameters to invoke the externally callable procedure published as name on the server at host:port with the given arguments.

The arguments are scanned for procedures and if any such are found, those in the reverse lookup table are replaced with callback stubs before all the parameters are serialized over the network connection. Callback stubs are small procedures that use the value of the rpc:current-peer and rpc:default-server-port parameters in the remote server thread in order to determine where they came from and to use rpc:procedure again to connect back to their home and execute their real counterpart.

Some care has been taken to isolate code executing in an RPC server thread properly:

[procedure] (rpc:make-server listener) => <procedure>

Uses make-tcp-server to create a server procedure. The server threads spawned by this procedure are continuously processing RPC requests from their clients until the connection is closed.

Examples

This first example is a very simple server/client example:

;;;; server.scm
(use rpc)

(rpc:publish-procedure!
 'foo
 (lambda (x)
   (print "foo: " x)
   #f))

(rpc:publish-procedure!
 'fini
 (lambda () (print "fini") (thread-start! (lambda () (thread-sleep! 3) (print "terminate") (exit))) #f))

((rpc:make-server (tcp-listen (rpc:default-server-port))) #t)
;;;; client.scm
(use rpc posix)

(define call (rpc:procedure 'foo "localhost"))

(do ((i 10 (sub1 i)))
    ((zero? i))
  (print "-> " (call (random 100))))

This next example is a simple database server and client using sqlite3. As sqlite3 is not perfectly thread-safe and as there are of course better database servers around the example is perhaps a little academic, but it illustrates the use of this extension quite nicely.

Note that in this example the rpc:default-server-port parameter in the server can be set by the client because the server does not know where the client is listening. In a similar way, rpc:current-peer may be reset if the client knows its public IP better than the server.

;;;; rpc-demo.scm
;;;; Simple database server / client

(require-extension (srfi 18) extras tcp rpc sqlite3)

;;; Common things

(define operation (string->symbol (car (command-line-arguments))))
(define param (cadr (command-line-arguments)))

(define rpc:listener
  (if (eq? operation 'server)
      (tcp-listen (rpc:default-server-port))
      (tcp-listen 0)))

;; Start server thread
(define rpc:server
  (make-thread
   (cute (rpc:make-server rpc:listener) "rpc:server")
   'rpc:server))

(thread-start! rpc:server)

;;; Server side

(define (server)
  (rpc:publish-procedure!
   'change-response-port
   (lambda (port)
     (rpc:default-server-port port))
   #f)
  (let ((db (sqlite3:open param)))
    (set-finalizer! db sqlite3:finalize!)
    (rpc:publish-procedure!
     'query
     (lambda (sql callback)
       (print "Executing query '" sql "' ...")
       (sqlite3:for-each-row
	callback
	db sql))))
  (thread-join! rpc:server))

;;; Client side

(define (callback1 . columns)
  (let loop ((c columns) (i 0))
    (unless (null? c)
      (printf "~a=~s " i (car c))
      (loop (cdr c) (+ i 1))))
  (newline))

(define callback2-results '())

(define (callback2 . columns)
  (set! callback2-results (cons columns callback2-results)))

(define (client)
  ((rpc:procedure 'change-response-port "localhost")
   (tcp-listener-port rpc:listener))
  ((rpc:procedure 'query "localhost") param callback1)
  (rpc:publish-procedure! 'callback2 callback2)
  ((rpc:procedure 'query "localhost") param callback2)
  (pp callback2-results))

;;; Run it

(if (eq? operation 'server)
    (server)
    (client))

Changelog

License

 Copyright (c) 2005, Thomas Chust <chust@web.de>.  All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
 modification, are permitted provided that the following conditions are met:
 
   Redistributions of source code must retain the above copyright notice,
   this list of conditions and the following disclaimer. Redistributions in
   binary form must reproduce the above copyright notice, this list of
   conditions and the following disclaimer in the documentation and/or
   other materials provided with the distribution. Neither the name of the
   author nor the names of its contributors may be used to endorse or
   promote products derived from this software without specific prior
   written permission.
 
 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS \"AS
 IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.