;;; -*- Mode: Scheme -*- ;;;; Rendezvous Concurrency Abstraction ;;;; Synchronous Channels ;;; Copyright (c) 2005-2009, Taylor R. Campbell ;;; ;;; Redistribution and use in source and binary forms, with or without ;;; modification, are permitted provided that the following conditions ;;; are met: ;;; ;;; * Redistributions of source code must retain the above copyright ;;; notice, this list of conditions and the following disclaimer. ;;; ;;; * Redistributions in binary form must reproduce the above copyright ;;; notice, this list of conditions and the following disclaimer in ;;; the documentation and/or other materials provided with the ;;; distribution. ;;; ;;; * Neither the names of the authors nor the names of contributors ;;; may be used to endorse or promote products derived from this ;;; software without specific prior written permission. ;;; ;;; THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS ;;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY ;;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL ;;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE ;;; GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS ;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, ;;; WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING ;;; NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS ;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (define-synchronized-record-type :channel (%make-channel priority readers writers) (priority) ; PRIORITY is the only synchronized field channel? (priority channel-priority set-channel-priority!) (readers channel-readers) (writers channel-writers)) (define (make-channel) (%make-channel 1 (make-queue) (make-queue))) (define (channel-send channel message) (synchronize (channel-send-rendezvous channel message))) (define (channel-send-rendezvous channel message) (polling-rendezvous (lambda () (if (reader-waiting? channel) (values (let ((priority (channel-priority channel))) (set-channel-priority! channel (+ priority 1)) priority) (lambda (prepare-revival) (set-channel-priority! channel 1) (prepare-revival (dequeue! (channel-readers channel)) message) (values))) (values #f (lambda (suspension) (enqueue! (channel-writers channel) (cons suspension message)))))))) (define (reader-waiting? channel) (let ((readers (channel-readers channel))) (let loop () (if (queue-empty? readers) #f (let ((reader (queue-head readers))) (if (suspension-revived? reader) (begin (dequeue! readers) (loop)) #t)))))) (define (channel-receive channel) (synchronize (channel-receive-rendezvous channel))) (define (channel-receive-rendezvous channel) (polling-rendezvous (lambda () (if (writer-waiting? channel) (values (let ((priority (channel-priority channel))) (set-channel-priority! channel (+ priority 1)) priority) (lambda (prepare-revival) (set-channel-priority! channel 1) (let ((writer.message (dequeue! (channel-writers channel)))) (prepare-revival (car writer.message)) (cdr writer.message)))) (values #f (lambda (suspension) (enqueue! (channel-readers channel) suspension))))))) (define (writer-waiting? channel) (let ((writers (channel-writers channel))) (let loop () (if (queue-empty? writers) #f (let ((writer.message (queue-head writers))) (if (suspension-revived? (car writer.message)) (begin (dequeue! writers) (loop)) #t))))))