;;; -*- Mode: Scheme -*- ;;;; Rendezvous Concurrency Abstraction ;;;; Mailboxes: Asynchronous Channels ;;; Copyright (c) 2005-2009, Taylor R. Campbell ;;; ;;; Redistribution and use in source and binary forms, with or without ;;; modification, are permitted provided that the following conditions ;;; are met: ;;; ;;; * Redistributions of source code must retain the above copyright ;;; notice, this list of conditions and the following disclaimer. ;;; ;;; * Redistributions in binary form must reproduce the above copyright ;;; notice, this list of conditions and the following disclaimer in ;;; the documentation and/or other materials provided with the ;;; distribution. ;;; ;;; * Neither the names of the authors nor the names of contributors ;;; may be used to endorse or promote products derived from this ;;; software without specific prior written permission. ;;; ;;; THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS ;;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY ;;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL ;;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE ;;; GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS ;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, ;;; WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING ;;; NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS ;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (define-synchronized-record-type :mailbox (%make-mailbox priority queue) (priority) ; Only the priority field is synchronized. mailbox? ;; The priority may be #F, in which case the queue is a queue of ;; waiting readers' suspensions, or an integer, the number of ;; messages in the queue. (priority mailbox-priority set-mailbox-priority!) (queue mailbox-queue)) (define (make-mailbox) (%make-mailbox #f (make-queue))) (define (mailbox-send mailbox message) (enter-critical-region (lambda (critical-token) (mailbox-send/critical mailbox message critical-token values)))) (define (mailbox-send/critical mailbox message critical-token body) (let ((queue (mailbox-queue mailbox))) (cond ((mailbox-priority mailbox) => (lambda (priority) (set-mailbox-priority! mailbox (+ priority 1)) (enqueue! queue message) (exit-critical-region critical-token body))) (else (let loop () (cond ((queue-empty? queue) (set-mailbox-priority! mailbox 1) (enqueue! queue message) (exit-critical-region critical-token body)) (else (maybe-revive (dequeue! queue) critical-token body ; if-revived loop ; if-not-revived message)))))))) (define (mailbox-receive mailbox) (synchronize (mailbox-receive-rendezvous mailbox))) (define (mailbox-receive-rendezvous mailbox) (polling-rendezvous (lambda () (let ((queue (mailbox-queue mailbox))) (cond ((mailbox-priority mailbox) => (lambda (priority) (set-mailbox-priority! mailbox (+ priority 1)) (values priority (lambda (prepare-revival) (let ((message (dequeue! queue))) (set-mailbox-priority! mailbox (if (queue-empty? queue) #f 1)) message))))) (else (values #f (lambda (suspension) (enqueue! queue suspension)))))))))