Tandem
Jan Dvořák <mordae@anilinux.org>
Cooperative Communication Framework
When communicating with a remote party, one often needs to run multiple parallel queries from different threads while monitoring the communication for asynchronous notifications. The simple way to implement this pattern is to spawn a dedicated background thread that takes care of the communication and acts as a request-response server for other local threads.
Tandem realizes this pattern using events and channels within the threads participating in the communication, thus eliminating the background thread.
(require tandem) | package: tandem |
Tandem works as a middleware of sorts. You need to supply a backend event that produces tagged messages from the remote party and a procedure that sends our tagged messages.
Note that even though tandem itself is thread-safe and in fact designed to be shared among multiple threads, it does not ensure that the actual transmitting and receiving code is called from one thread only. It’s up to you to ensure that it will operate correctly under those conditions.
The first argument of the transmit procedure is a message tag, that uniquely represents a call/return pair or non-uniquely identifies an out-of-band notification. The other argument is the payload.
The same scheme applies to receive-any-evt: it’s first result value represents a tag and the second the payload.
As an example, we create a simple FIFO echo server that does not really communicate with anything external, but imagine that we used a TCP socket instead of the asynchronous channel.
> (define echo-server (let ((echoes (make-fast-channel))) (tandem echoes (λ (tag value) (fast-channel-put echoes tag value)))))
> (tandem? echo-server) #t
procedure
(tandem-transmit tandem tag value) → void?
tandem : tandem? tag : any/c value : any/c
> (tandem-transmit echo-server 'a-tag "something")
procedure
(tandem-receive-evt tandem tag) → (evt/c any/c)
tandem : tandem? tag : any/c
Please note that creating such event registers a new channel that gets removed only after a garbage collection cycle. Thus creating many such events in a tight loop will be incredibly inefficient.
Since we are using an asynchronous queue and nobody else tries to outrun us, we can retrieve the value from tandem-transmit above.
> (sync (tandem-receive-evt echo-server 'a-tag)) "something"
procedure
(tandem-call-evt tandem tag value) → (evt/c any/c)
tandem : tandem? tag : any/c value : any/c
> (sync (tandem-call-evt echo-server 'hello "Hello World!")) "Hello World!"
> (sync (tandem-call-evt echo-server 'bye "It's time to wrap this up.")) "It's time to wrap this up."