;;; -*- Mode: Scheme -*- ;;;; Rendezvous Concurrency Abstraction ;;;; (Concurrent ML for Scheme) ;;; Copyright (c) 2005-2009, Taylor R. Campbell ;;; ;;; Redistribution and use in source and binary forms, with or without ;;; modification, are permitted provided that the following conditions ;;; are met: ;;; ;;; * Redistributions of source code must retain the above copyright ;;; notice, this list of conditions and the following disclaimer. ;;; ;;; * Redistributions in binary form must reproduce the above copyright ;;; notice, this list of conditions and the following disclaimer in ;;; the documentation and/or other materials provided with the ;;; distribution. ;;; ;;; * Neither the names of the authors nor the names of contributors ;;; may be used to endorse or promote products derived from this ;;; software without specific prior written permission. ;;; ;;; THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS ;;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY ;;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL ;;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE ;;; GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS ;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, ;;; WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING ;;; NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS ;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ;;; This is based on TRC's understanding of Reppy's original code, ;;; aided by Mike Sperber's implementation of it in scsh and by TRC's ;;; older implementation which was wrong but which helped him to learn ;;; how it really works. Most of the terms came from Reppy or Sperber; ;;; the naming conventions, however, built from these base terms, are ;;; generally unique, as far as I know. I, TRC, apologize for that ;;; ungraceful transition from the third person to the first person and ;;; hope that he has not caused damage to your brain's parser. ;;;; Pollers ;;; Pollers are the basis of all rendezvous operation. A poller ;;; consists of two procedures: a thunk, which actually polls; and a ;;; composed sequence of procedures to be applied to the result of the ;;; poller if/when enabled and selected. Polling rendezvous consist ;;; simply of sets of pollers. Mapping a polling rendezvous adds to ;;; its composition. (define-record-type :poller (%make-poller composition thunk) poller? (composition poller-composition) (thunk poller-thunk)) (define (make-poller thunk) (%make-poller values thunk)) (define (map-poller poller procedure) (%make-poller (compose procedure (poller-composition poller)) (poller-thunk poller))) ;;;; Rendezvous Objects (define-variant-type rendezvous :rendezvous rendezvous? rendezvous-type-case ; Not RENDEZVOUS-CASE for obvious reasons. (variant POLLING (make-polling-rendezvous pollers)) (variant CHOOSING (make-choosing-rendezvous choices)) (variant DELAYED (delayed-rendezvous generator)) (variant DELAYED-WITH-NACK (delayed-rendezvous-with-nack generator))) (define (polling-rendezvous thunk) (make-polling-rendezvous (list (make-poller thunk)))) (define (choosing-rendezvous . choices) (let loop ((choices choices) (pollers '())) (if (null-list? choices) ;; Only pollers. Simplify. (make-polling-rendezvous (reverse! pollers)) (rendezvous-type-case (car choices) ((POLLING pollers*) (loop (cdr choices) (append-reverse pollers* pollers))) (else ;; Not a simple polling rendezvous, so start collecting the ;; choices of a full choosing rendezvous. (*choosing-rendezvous choices (if (null-list? pollers) '() (list (make-polling-rendezvous (reverse! pollers)))))))))) (define (*choosing-rendezvous in-choices out-choices) (let loop ((in-choices in-choices) (out-choices out-choices)) (if (null-list? in-choices) (cond ((null-list? out-choices) (quiescent-rendezvous)) ((null-list? (cdr out-choices)) (car out-choices)) (else (make-choosing-rendezvous (reverse! out-choices)))) (loop (cdr in-choices) (adjoin-choice (car in-choices) out-choices))))) (define (adjoin-choice choice choices) (rendezvous-type-case choice ((POLLING pollers) (if (null-list? choices) (cons choice '()) (rendezvous-type-case (car choices) ((POLLING pollers*) ;; Coalesce adjacent pollers. (cons (make-polling-rendezvous (append pollers pollers*)) choices)) (else (cons choice choices))))) ((CHOOSING choices*) ;; Flatten nested choices. (append-reverse choices* choices)) ((DELAYED generator) (cons choice choices)) ((DELAYED-WITH-NACK generator) (cons choice choices)))) ;;;; Miscellaneous Rendezvous Combinators (define (map-rendezvous rendezvous procedure) (if (eq? procedure values) rendezvous (rendezvous-type-case rendezvous ((POLLING pollers) (make-polling-rendezvous (map (lambda (poller) (map-poller poller procedure)) pollers))) ((CHOOSING choices) (make-choosing-rendezvous (map (lambda (choice) (map-rendezvous choice procedure)) choices))) ((DELAYED generator) (delayed-rendezvous (lambda () (map-rendezvous (generator) procedure)))) ((DELAYED-WITH-NACK generator) (delayed-rendezvous-with-nack (lambda (nack-rv) (map-rendezvous (generator nack-rv) procedure))))))) (define quiescent-rendezvous (let ((rv (make-polling-rendezvous '()))) (lambda () rv))) (define (values-rendezvous . vals) (polling-rendezvous (lambda () (values -1 (lambda (prepare-revival) prepare-revival ;ignore (apply values vals)))))) (define (thunk-rendezvous thunk) (delayed-rendezvous (lambda () (call-with-values thunk values-rendezvous)))) ;;;; Forcing Delayed Rendezvous and Initializing Nacks ;;; Rendezvous groups represent simplified rendezvous after forcing all ;;; delayed rendezvous. (define-variant-type rendezvous-group :rendezvous-group rendezvous-group? rendezvous-group-type-case (variant POLLING (make-polling-group pollers)) (variant CHOOSING (make-choosing-group choices)) (variant NACK (make-nack-group nack-token subgroup))) (define (force-rendezvous rendezvous) (rendezvous-type-case rendezvous ((POLLING pollers) (make-polling-group pollers)) ((CHOOSING choices) (force-rendezvous-choices choices)) ((DELAYED generator) (force-rendezvous (generator))) ((DELAYED-WITH-NACK generator) (force-delayed-rendezvous-with-nack generator)))) (define (force-delayed-rendezvous-with-nack generator) (let ((token (make-nack-token))) (make-nack-group token (force-rendezvous (generator (nack-token-rendezvous token)))))) (define (force-rendezvous-choices choices) (define (cons-pollers groups pollers) (if (null-list? pollers) groups (cons (make-polling-group (reverse! pollers)) groups))) (let loop ((choices choices) (groups '()) (pollers '())) (if (null-list? choices) (make-choosing-group (reverse! (cons-pollers groups pollers))) (let ((choice (car choices)) (choices (cdr choices))) (define (cons-delayed group) (cons group (cons-pollers groups pollers))) (rendezvous-type-case choice ((POLLING pollers*) (loop choices groups (append-reverse pollers* pollers))) ((CHOOSING choices*) (loop (append choices* choices) groups pollers)) ((DELAYED generator) (loop choices (cons-delayed (force-rendezvous (generator))) '())) ((DELAYED-WITH-NACK generator) (loop choices (cons-delayed (force-delayed-rendezvous-with-nack generator)) '()))))))) ;;;; Synchronization (define (synchronize rendezvous) (poll-group (force-rendezvous rendezvous) invoke-enabled suspend)) (define (synchronize/timeout rendezvous timeout if-timeout if-enabled) (synchronize-chosen-rendezvous (map-rendezvous rendezvous if-enabled) (map-rendezvous (after-time-rendezvous timeout) if-timeout))) (define (synchronize-chosen-rendezvous . rendezvous) (poll-group (force-rendezvous-choices rendezvous) invoke-enabled suspend)) (define (synchronize-chosen-rendezvous/timeout timeout if-timeout if-enabled . rendezvous) (synchronize-chosen-rendezvous (map-rendezvous (after-time-rendezvous timeout) if-timeout) (map-rendezvous (apply choosing-rendezvous rendezvous) if-enabled))) ;++ This takes arguments in the wrong order! (define (poll rendezvous if-blocked if-enabled) (poll-group (force-rendezvous rendezvous) (lambda (critical-token enabler composition finalizer) (invoke-enabled critical-token enabler (compose if-enabled composition) finalizer)) (lambda (critical-token setup-suspension) setup-suspension ;ignore (exit-critical-region critical-token if-blocked)))) (define (invoke-enabled critical-token enabler continuation finalizer) (revive-multiple (lambda (prepare-revival) (finalizer prepare-revival) (enabler prepare-revival)) critical-token continuation)) ;;;; Polling Machinery ;;; The following pages are sorted by ascending complexity. ;;; POLL-POLLER shows the basic idea of what a poller does. Next, ;;; POLL-POLLERS extends this to handling multiple pollers; finally, ;;; *POLL-GROUP includes all the negative acknowledgement machinery. (define (poll-group group if-enabled if-blocked) (rendezvous-group-type-case group ((POLLING pollers) (poll-pollers pollers if-enabled if-blocked)) (else (*poll-group group if-enabled if-blocked)))) ;; (put 'poll-group 'scheme-indent-function 1) (define (poll-poller poller if-enabled if-blocked) (let ((composition (poller-composition poller)) (thunk (poller-thunk poller))) (enter-critical-region (lambda (critical-token) (receive (priority enabler/blocker) (thunk) (if priority (if-enabled critical-token enabler/blocker composition null-finalizer) (if-blocked critical-token (lambda (make-suspension) (enabler/blocker (make-suspension null-finalizer composition)))))))))) (define null-finalizer (lambda (prepare-revival) prepare-revival ;ignore (values))) ;;;;; Polling without Nacks (define (poll-pollers pollers if-enabled if-blocked) (cond ((null-list? pollers) (block if-blocked)) ((null-list? (cdr pollers)) (poll-poller (car pollers) if-enabled if-blocked)) (else (poll-without-nacks pollers if-enabled if-blocked)))) (define (block if-blocked) (enter-critical-region (lambda (critical-token) (if-blocked critical-token (lambda (token) token ;ignore ;; Record no suspension; never revive. (values)))))) ;;; Accumulate a list of the blocked rendezvous' priorities and ;;; blocking procedures. (A `mumble' is a priority and an enabling or ;;; blocking procedure.) (define (poll-without-nacks pollers if-enabled if-blocked) (enter-critical-region (lambda (critical-token) (let loop ((pollers pollers) (mumbles '())) (if (null-list? pollers) (blocked-without-nacks critical-token mumbles if-blocked) (let ((poller (car pollers)) (pollers (cdr pollers))) (let ((thunk (poller-thunk poller)) (composition (poller-composition poller))) (receive (priority enabler/blocker) (thunk) (let ((mumble (cons priority enabler/blocker))) (if priority (enabled-without-nacks pollers (list (cons priority mumble)) critical-token if-enabled) (loop pollers (cons mumble mumbles)))))))))))) ;;; We have found one enabled rendezvous. Find the rest, and choose ;;; the one with the highest priority. (define (enabled-without-nacks pollers priority.mumble-list critical-token if-enabled) (let loop ((pollers pollers) (priority.mumble-list priority.mumble-list) (priority 1)) (if (null-list? pollers) (let ((mumble (select-by-priority priority.mumble-list priority))) (let ((enabler (car mumble)) (composition (cdr mumble))) (if-enabled critical-token enabler composition null-finalizer))) (let ((poller (car pollers)) (pollers (cdr pollers))) (receive (priority enabler/blocker) ((poller-thunk poller)) (if priority (loop (cons (cons priority (cons enabler/blocker (poller-composition poller))) priority.mumble-list) (+ priority 1)) (loop priority.mumble-list priority))))))) ;;; All the pollers answered `blocked'. Block. (define (blocked-without-nacks critical-token mumbles if-blocked) (if-blocked critical-token (lambda (make-suspension) (for-each (lambda (mumble) (let ((blocker (car mumble)) (composition (cdr mumble))) (blocker (make-suspension null-finalizer composition)))) mumbles)))) ;;;;; Polling with Nacks ;;; First collect the pollers into groups according to which ones need ;;; to know about negative acknowledgement. Then do the polling. (define (*poll-group group if-enabled if-blocked) (receive (poller.ack-list flag-sets) (collect-group-acks group) (poll-with-nacks poller.ack-list flag-sets if-enabled if-blocked))) ;;; Do like in POLL-WITHOUT-NACKS, but this time a `mumble' ;;; additionally has an ack cell associated with it. This lets us ;;; tell, when we finally choose a particular rendezvous, which one we ;;; chose, so that we can negatively acknowledge, or enable the nack ;;; rendezvous of, every other one. (define (poll-with-nacks poller.ack-list flag-sets if-enabled if-blocked) (enter-critical-region (lambda (critical-token) (let loop ((poller.ack-list poller.ack-list) (mumbles '())) (if (null-list? poller.ack-list) (blocked-with-nacks critical-token mumbles flag-sets if-blocked) (let ((poller (caar poller.ack-list)) (ack-cell (cdar poller.ack-list)) (poller.ack-list (cdr poller.ack-list))) (receive (priority enabler/blocker) ((poller-thunk poller)) (let ((mumble (vector enabler/blocker ack-cell (poller-composition poller)))) (if priority (enabled-with-nacks poller.ack-list (list (cons priority mumble)) flag-sets critical-token if-enabled) (loop poller.ack-list (cons mumble mumbles))))))))))) (define (enabled-with-nacks poller.ack-list priority.mumble-list flag-sets critical-token if-enabled) (let loop ((poller.ack-list poller.ack-list) (priority.mumble-list priority.mumble-list) (priority 1)) (if (null-list? poller.ack-list) (let ((mumble (select-by-priority priority.mumble-list priority))) (let ((enabler (vector-ref mumble 0)) (ack-cell (vector-ref mumble 1)) (composition (vector-ref mumble 2))) (if-enabled critical-token enabler composition (ack-finalizer ack-cell flag-sets)))) (let ((poller (caar poller.ack-list)) (poller.ack-list (cdr poller.ack-list))) (receive (priority enabler/blocker) ((poller-thunk poller)) (if priority (loop poller.ack-list (cons (cons priority (vector enabler/blocker (cdar poller.ack-list) (poller-composition poller))) priority.mumble-list) (+ priority 1)) (loop poller.ack-list priority.mumble-list priority))))))) (define (blocked-with-nacks critical-token mumbles flag-sets if-blocked) (if-blocked critical-token (lambda (make-suspension) (for-each (lambda (mumble) (let ((blocker (vector-ref mumble 0)) (ack-cell (vector-ref mumble 1)) (composition (vector-ref mumble 2))) (blocker (make-suspension (ack-finalizer ack-cell flag-sets) composition)))) mumbles)))) ;;;;;; Collecting Nacks (define-record-type :flag-set (make-flag-set nack-token ack-cells) flag-set? (nack-token flag-set-nack-token) (ack-cells flag-set-ack-cells)) (define (collect-group-acks group) (rendezvous-group-type-case group ;; The POLLING case is handled long before now. ((CHOOSING choices) (collect-choice-acks choices)) ((NACK nack-token subgroup) (collect-acks nack-token subgroup '() '())))) ;;; Associate a single common ack cell with all of the choices, until ;;; descent into a nack; then switch start associating ack cells ;;; separately in its subgroup. (define (collect-choice-acks choices) (let ((ack-cell (make-ack-cell))) (define (dispatch group poller.ack-list flag-sets) (rendezvous-group-type-case group ((POLLING pollers) (let loop ((pollers pollers) (poller.ack-list poller.ack-list)) (if (null-list? pollers) (values poller.ack-list flag-sets) (loop (cdr pollers) (cons (cons (car pollers) ack-cell) poller.ack-list))))) ((CHOOSING choices) (fold-choices choices poller.ack-list flag-sets)) ((NACK nack-token subgroup) (collect-acks nack-token subgroup poller.ack-list flag-sets)))) (define (fold-choices choices poller.ack-list flag-sets) (if (null-list? choices) (values poller.ack-list flag-sets) (receive (poller.ack-list flag-sets) (dispatch (car choices) poller.ack-list flag-sets) (fold-choices (cdr choices) poller.ack-list flag-sets)))) (fold-choices choices '() '()))) ;;; Associate a unique ack cell with each poller. Associate a unique ;;; flag set with each nack group composed of that group's nack token ;;; and the set of ack cells from its subgroup. (define (collect-acks nack-token group poller.ack-list flag-sets) (define (dispatch group poller.ack-list ack-cells flag-sets) (rendezvous-group-type-case group ;; Create an ack cell for each poller. Leave the list of flag ;; sets untouched. ((POLLING pollers) (let loop ((pollers pollers) (poller.ack-list poller.ack-list) (ack-cells ack-cells)) (if (null-list? pollers) (values poller.ack-list ack-cells flag-sets) (let ((poller (car pollers)) (ack-cell (make-ack-cell))) (loop (cdr pollers) (cons (cons poller ack-cell) poller.ack-list) (cons ack-cell ack-cells)))))) ;; Fold the choices together. ((CHOOSING choices) (let loop ((choices choices) (poller.ack-list poller.ack-list) (ack-cells ack-cells) (flag-sets flag-sets)) (if (null-list? choices) (values poller.ack-list ack-cells flag-sets) (receive (poller.ack-list ack-cells flag-sets) (dispatch (car choices) poller.ack-list ack-cells flag-sets) (loop (cdr choices) poller.ack-list ack-cells flag-sets))))) ;; Compute the ack cells of the subgroup; then add a new flag set ;; for this nack group's token with those ack cells. ((NACK nack-token subgroup) (receive (poller.ack-list ack-cells* flag-sets) (dispatch subgroup poller.ack-list ack-cells flag-sets) (values poller.ack-list (append ack-cells* ack-cells) (cons (make-flag-set nack-token ack-cells*) flag-sets)))))) (receive (poller.ack-list ack-cells flag-sets) (dispatch group poller.ack-list '() flag-sets) (values poller.ack-list (cons (make-flag-set nack-token ack-cells) flag-sets)))) ;;;; Negative Acknowledgement Rendezvous (define-synchronized-record-type :nack-token (%make-nack-token flagged? data) (flagged? data) nack-token? (flagged? nack-token-flagged? set-nack-token-flagged?!) (data nack-token-data set-nack-token-data!)) (define (make-nack-token) (%make-nack-token #f '())) (define-synchronized-record-type :ack-cell (%make-ack-cell set?) (set?) ack-cell? (set? ack-cell-set? set-ack-cell-set?!)) (define (make-ack-cell) (%make-ack-cell #f)) (define (set-ack-cell! ack-cell) (set-ack-cell-set?! ack-cell #t)) (define (nack-token-rendezvous nack-token) (polling-rendezvous (lambda () (if (nack-token-flagged? nack-token) (let ((priority (nack-token-data nack-token))) (set-nack-token-data! nack-token (+ priority 1)) (values priority (lambda (prepare-revival) prepare-revival ;ignore (set-nack-token-data! nack-token 1)))) (values #f (lambda (suspension) (set-nack-token-data! nack-token (cons suspension (nack-token-data nack-token))))))))) ;;; Make a finalizer that goes through each flag set in the list that ;;; has no ack cell set -- i.e. those of whose represented rendezvous ;;; none was selected --, and flags its nack token. (define (ack-finalizer ack-cell flag-sets) (lambda (prepare-revival) (set-ack-cell! ack-cell) (for-each (lambda (flag-set) (let loop ((ack-cells (flag-set-ack-cells flag-set))) (cond ((null-list? ack-cells) (flag-nack! (flag-set-nack-token flag-set) prepare-revival)) ((not (ack-cell-set? (car ack-cells))) (loop (cdr ack-cells)))))) flag-sets))) (define (flag-nack! nack-token prepare-revival) (cond ((nack-token-flagged? nack-token) ; Sanity check. (error "Already flagged nack:" nack-token prepare-revival)) (else (for-each prepare-revival (nack-token-data nack-token)) (set-nack-token-flagged?! nack-token #t) (set-nack-token-data! nack-token 1)))) ;;;; Priority Selection ;;; From a list of ( . ) items, select the datum with ;;; the highest priority, breaking ties at `random'. (define (select-by-priority items default-priority) (if (null-list? (cdr items)) (cdar items) (let () (define (continue items maximum count data) (let ((items* (cdr items))) (if (null-list? items*) (select-random-element data count) (loop items* maximum count data)))) (define (loop items maximum count data) (let ((item (car items))) (let ((priority (canonical-priority (car item) default-priority)) (datum (cdr item))) (cond ((> priority maximum) (continue items priority 1 (cons datum '()))) ((= priority maximum) (continue items maximum (+ count 1) (cons datum data))) (else (continue items maximum count data)))))) (let ((priority (canonical-priority (caar items) default-priority)) (datum (cdar items))) (loop items priority 1 (list datum)))))) (define (canonical-priority priority default) (if (negative? priority) default priority)) (define (select-random-element data limit) ;++ (list-ref data (random-integer limit)) (car data)) ;;;; Rendezvous Syntactic Sugar ;; (put 'rendezvous-case 'scheme-indent-function 0) (define-syntax rendezvous-case (syntax-rules () ((RENDEZVOUS-CASE clause1 clause2 ...) (SYNCHRONIZE-CHOSEN-RENDEZVOUS (RENDEZVOUS-CASE-CLAUSE clause1) (RENDEZVOUS-CASE-CLAUSE clause2) ...)))) (define-syntax rendezvous-case-clause (syntax-rules (=>) ((RENDEZVOUS-CASE-CLAUSE (rv)) rv) ((RENDEZVOUS-CASE-CLAUSE (rv => consumer)) (MAP-RENDEZVOUS rv consumer)) ((RENDEZVOUS-CASE-CLAUSE (rv body0 body1 ...)) (MAP-RENDEZVOUS rv (LAMBDA IGNORED IGNORED body0 body1 ...))))) ;;;; Miscellaneous Utilities (define (compose f g) (lambda arguments (call-with-values (lambda () (apply g arguments)) f))) ;;; These are identical to their counterparts in SRFI 1. They are ;;; defined here to minimize dependencies. (define (append-reverse list tail) (if (null-list? list) tail (append-reverse (cdr list) (cons (car list) tail)))) (define (reverse! list) (if (null-list? list) '() (let loop ((list list) (tail '())) (let ((d (cdr list))) (set-cdr! list tail) (if (null-list? d) list (loop d list)))))) (define (null-list? object) (if (pair? object) #f (begin (if (not (null? object)) (error "Non-list:" object)) #t)))