;;; -*- Mode: Scheme -*- ;;;; Rendezvous Concurrency Abstraction ;;;; (Concurrent ML for Scheme) ;;; This code is written by Taylor Campbell and placed in the Public ;;; Domain. All warranties are disclaimed. ;;; This is based on TRC's understanding of Reppy's original code, ;;; aided by Mike Sperber's implementation of it in scsh and by TRC's ;;; older implementation which was wrong but which helped him to learn ;;; how it really works. Most of the terms came from Reppy or Sperber; ;;; the naming conventions, however, built from these base terms, are ;;; generally unique, as far as I know. I, TRC, apologize for that ;;; ungraceful transition from the third person to the first person and ;;; hope that he has not caused damage to your brain's parser. ;;;; Pollers ;;; Pollers are the basis of all rendezvous operation. A poller ;;; consists of two procedures: a thunk, which actually polls; and a ;;; composed sequence of procedures to be applied to the result of the ;;; poller if/when enabled and selected. Polling rendezvous consist ;;; simply of sets of pollers. Mapping a polling rendezvous adds to ;;; its composition. (define-record-type :poller (%make-poller composition thunk) poller? (composition poller-composition) (thunk poller-thunk)) (define (make-poller thunk) (%make-poller values thunk)) (define (map-poller poller proc) (%make-poller (compose proc (poller-composition poller)) (poller-thunk poller))) ;;;; Rendezvous Objects (define-variant-type rendezvous :rendezvous rendezvous? rendezvous-type-case ; Not RENDEZVOUS-CASE for obvious reasons. (variant POLLING (make-polling-rendezvous pollers)) (variant CHOOSING (make-choosing-rendezvous choices)) (variant DELAYED (delayed-rendezvous generator)) (variant DELAYED-WITH-NACK (delayed-rendezvous-with-nack generator))) (define (polling-rendezvous . poller-thunks) (make-polling-rendezvous (map make-poller poller-thunks))) (define (choosing-rendezvous . choices) (let loop ((choices choices) (pollers '())) (if (null? choices) ; Only pollers. Simplify. (make-polling-rendezvous (reverse! pollers)) (rendezvous-type-case (car choices) ((POLLING new-pollers) (loop (cdr choices) (append-reverse new-pollers pollers))) (else ;; Not a simple polling rendezvous, so start collecting the ;; choices of a full choosing rendezvous. (*choosing-rendezvous choices (if (null? pollers) '() (list (make-polling-rendezvous (reverse! pollers)))))))))) (define (*choosing-rendezvous in-choices out-choices) (let loop ((in-choices in-choices) (out-choices out-choices)) (if (null? in-choices) (cond ((null? out-choices) (quiescent-rendezvous)) ((null? (cdr out-choices)) (car out-choices)) (else (make-choosing-rendezvous (reverse! out-choices)))) (loop (cdr in-choices) (let ((choice (car in-choices))) (rendezvous-type-case choice ((POLLING pollers) (if (null? out-choices) (cons choice '()) (rendezvous-type-case (car out-choices) ((POLLING more-pollers) ;; Coalesce adjacent pollers. (cons (make-polling-rendezvous (append pollers more-pollers)) out-choices)) (else (cons choice out-choices))))) ((CHOOSING subchoices) ;; Flatten nested choices. (append-reverse subchoices out-choices)) ((DELAYED generator) (cons choice out-choices)) ((DELAYED-WITH-NACK generator) (cons choice out-choices)))))))) ;;;; Miscellaneous Rendezvous Combinators (define (map-rendezvous rendezvous proc) (if (eq? proc values) rendezvous (rendezvous-type-case rendezvous ((POLLING pollers) (make-polling-rendezvous (map (lambda (poller) (map-poller poller proc)) pollers))) ((CHOOSING choices) (make-choosing-rendezvous (map (lambda (choice) (map-rendezvous choice proc)) choices))) ((DELAYED generator) (delayed-rendezvous (lambda () (map-rendezvous (generator) proc)))) ((DELAYED-WITH-NACK generator) (delayed-rendezvous-with-nack (lambda (nack-rv) (map-rendezvous (generator nack-rv) proc))))))) (define quiescent-rendezvous (let ((rv (polling-rendezvous))) (lambda () rv))) (define (values-rendezvous . vals) (polling-rendezvous (lambda () (values -1 (lambda (prepare-revival) (apply values vals)))))) (define (thunk-rendezvous thunk) (delayed-rendezvous (lambda () (call-with-values thunk values-rendezvous)))) ;;;; Forcing Delayed Rendezvous and Initializing Nacks ;;; Rendezvous groups represent simplified rendezvous after forcing all ;;; delayed rendezvous. (define-variant-type rendezvous-group :rendezvous-group rendezvous-group? rendezvous-group-type-case (variant POLLING (make-polling-group pollers)) (variant CHOOSING (make-choosing-group choices)) (variant NACK (make-nack-group nack-token subgroup))) (define (force-rendezvous rendezvous) (rendezvous-type-case rendezvous ((POLLING pollers) (make-polling-group pollers)) ((CHOOSING choices) (force-rendezvous-choices choices)) ((DELAYED generator) (force-rendezvous (generator))) ((DELAYED-WITH-NACK generator) (force-delayed-rendezvous-with-nack generator)))) (define (force-delayed-rendezvous-with-nack generator) (let ((token (make-nack-token))) (make-nack-group token (force-rendezvous (generator (nack-token-rendezvous token)))))) (define (force-rendezvous-choices choices) (define (cons-pollers groups pollers) (if (pair? pollers) (cons (make-polling-group (reverse! pollers)) groups) groups)) (let loop ((choices choices) (groups '()) (pollers '())) (if (not (pair? choices)) (make-choosing-group (reverse! (cons-pollers groups pollers))) (let ((more-choices (cdr choices))) (define (cons-delayed group) (cons group (cons-pollers groups pollers))) (rendezvous-type-case (car choices) ((POLLING new-pollers) (loop more-choices groups (append-reverse new-pollers pollers))) ((CHOOSING new-choices) (loop (append new-choices more-choices) groups pollers)) ((DELAYED generator) (loop more-choices (cons-delayed (force-rendezvous (generator))) '())) ((DELAYED-WITH-NACK generator) (loop more-choices (cons-delayed (force-delayed-rendezvous-with-nack generator)) '()))))))) ;;;; Synchronization (define (synchronize rendezvous) (poll-group (force-rendezvous rendezvous) invoke-enabled suspend)) (define (synchronize/timeout rendezvous timeout timeout-continuation enabled-continuation) (synchronize-chosen-rendezvous (map-rendezvous rendezvous enabled-continuation) (map-rendezvous (after-time-rendezvous timeout) timeout-continuation))) (define (synchronize-chosen-rendezvous . rendezvous) (poll-group (force-rendezvous-choices rendezvous) invoke-enabled suspend)) (define (synchronize-chosen-rendezvous/timeout timeout timeout-continuation enabled-continuation . rendezvous) (synchronize-chosen-rendezvous (map-rendezvous (after-time-rendezvous timeout) timeout-continuation) (map-rendezvous (apply choosing-rendezvous rendezvous) enabled-continuation))) (define (poll rendezvous absence-continuation enabled-continuation) (poll-group (force-rendezvous rendezvous) (lambda (critical-token enabler composition finalizer) (invoke-enabled critical-token enabler (compose enabled-continuation composition) finalizer)) (lambda (critical-token setup-suspension) (exit-critical-region critical-token absence-continuation)))) (define (invoke-enabled critical-token enabler continuation finalizer) (revive-multiple (lambda (prepare-revival) (finalizer prepare-revival) (enabler prepare-revival)) critical-token continuation)) ;;;; Polling Machinery (define (poll-group group enabled-continuation blocked-continuation) (rendezvous-group-type-case group ((POLLING pollers) (simple-poll pollers enabled-continuation blocked-continuation)) (else (poll-group/nacks group enabled-continuation blocked-continuation)))) (define (poll-singular poller enabled-continuation blocked-continuation) (let ((composition (poller-composition poller)) (thunk (poller-thunk poller))) (enter-critical-region (lambda (critical-token) (receive (priority enabler/blocker) (thunk) (if priority (enabled-continuation critical-token enabler/blocker composition (lambda (prepare-revival) ;; Null finalizer. (values))) (blocked-continuation critical-token (lambda (make-suspension) (enabler/blocker (make-suspension (lambda (prepare-revival) ;; Null finalizer. (values)) composition)))))))))) ;;; Plain poller groups -- no nacks involved (define (simple-poll pollers enabled-continuation blocked-continuation) (if (pair? pollers) (if (pair? (cdr pollers)) (*simple-poll pollers enabled-continuation blocked-continuation) (poll-singular (car pollers) enabled-continuation blocked-continuation)) (enter-critical-region (lambda (critical-token) (blocked-continuation critical-token (lambda (token) ;; Record no suspension; never revive. (values))))))) (define (*simple-poll pollers enabled-continuation blocked-continuation) (enter-critical-region (lambda (critical-token) (let loop ((pollers pollers) (blocker.composition-list '())) (if (pair? pollers) (let ((poller (car pollers)) (more-pollers (cdr pollers))) (let ((thunk (poller-thunk poller)) (composition (poller-composition poller))) (receive (maybe-priority enabler/blocker) (thunk) (if (not maybe-priority) (loop more-pollers (cons (cons enabler/blocker composition) blocker.composition-list)) (immediately-enabled-without-nacks more-pollers (list (cons maybe-priority (cons enabler/blocker composition))) critical-token enabled-continuation))))) (blocked-continuation critical-token (lambda (make-suspension) (for-each (lambda (blocker.composition) ((car blocker.composition) (make-suspension (lambda (prepare-revival) ;; Null finalizer. (values)) (cdr blocker.composition)))) blocker.composition-list)))))))) (define (immediately-enabled-without-nacks pollers enabled critical-token enabled-continuation) (let loop ((pollers pollers) (enabled enabled) (priority 1)) (if (null? pollers) (let ((enabler.composition (select-by-priority enabled priority))) (enabled-continuation critical-token (car enabler.composition) (cdr enabler.composition) (lambda (prepare-revival) ;; Null finalizer. (values)))) (let ((poller (car pollers)) (more (cdr pollers))) (receive (maybe-priority enabler/blocker) ((poller-thunk poller)) (if (not maybe-priority) ; Filter out blocked rendezvous. (loop enabled priority) (loop (cons (cons maybe-priority (cons enabler/blocker (poller-composition poller))) enabled) (+ priority 1)))))))) ;;; Same as above, but with negative acknowledgements involved (define (poll-group/nacks group enabled-continuation blocked-continuation) (receive (poller.ack-list flag-sets) (collect-group-acks group) (*poll-group poller.ack-list flag-sets enabled-continuation blocked-continuation))) (define (*poll-group poller.ack-list flag-sets enabled-continuation blocked-continuation) (enter-critical-region (lambda (critical-token) (let loop ((poller.ack-list poller.ack-list) (blockers '())) (if (pair? poller.ack-list) (let ((poller (caar poller.ack-list)) (ack-cell (cdar poller.ack-list)) (more (cdr poller.ack-list))) (receive (maybe-priority enabler/blocker) ((poller-thunk poller)) (let ((mumble (vector enabler/blocker ack-cell (poller-composition poller)))) (if (not maybe-priority) (loop more (cons mumble blockers)) (immediately-enabled-with-nacks more (list (cons maybe-priority mumble)) flag-sets critical-token enabled-continuation))))) (blocked-continuation critical-token (lambda (make-suspension) (for-each (lambda (blocker) ((vector-ref blocker 0) (make-suspension (ack-finalizer (vector-ref blocker 1) flag-sets) (vector-ref blocker 2)))) blockers)))))))) (define (immediately-enabled-with-nacks poller.ack-list enabled flag-sets critical-token enabled-continuation) (let loop ((poller.ack-list poller.ack-list) (enabled enabled) (priority 1)) (if (pair? poller.ack-list) (let ((poller (caar poller.ack-list)) (more (cdr poller.ack-list))) (receive (maybe-priority enabler/blocker) ((poller-thunk poller)) (if (not maybe-priority) (loop more enabled priority) (loop more (cons (cons maybe-priority (vector enabler/blocker (cdar poller.ack-list) (poller-composition poller))) enabled) (+ priority 1))))) (let ((enabler (select-by-priority enabled priority))) (enabled-continuation critical-token (vector-ref enabler 0) ; enabler procedure (vector-ref enabler 2) ; enabled continuation (ack-finalizer (vector-ref enabler 1) flag-sets)))))) ;;;; Collecting Nacks (define-record-type :flag-set (make-flag-set nack-token ack-cells) flag-set? (nack-token flag-set-nack-token) (ack-cells flag-set-ack-cells)) (define (collect-group-acks group) (rendezvous-group-type-case group ;; The POLLING case is handled long before now. ((CHOOSING choices) (collect-choice-acks choices)) ((NACK nack-token subgroup) (collect-acks nack-token subgroup '() '())))) ;;; Associate a single common ack cell with all of the choices, until ;;; descent into a nack; then switch start associating ack cells ;;; separately in its subgroup. (define (collect-choice-acks choices) (let ((ack-cell (make-ack-cell))) (define (dispatch group poller.ack-list flag-sets) (rendezvous-group-type-case group ((POLLING pollers) (let loop ((pollers pollers) (poller.ack-list poller.ack-list)) (if (null? pollers) (values poller.ack-list flag-sets) (loop (cdr pollers) (cons (cons (car pollers) ack-cell) poller.ack-list))))) ((CHOOSING choices) (fold-choices choices poller.ack-list flag-sets)) ((NACK nack-token subgroup) (collect-acks nack-token subgroup poller.ack-list flag-sets)))) (define (fold-choices choices poller.ack-list flag-sets) (if (null? choices) (values poller.ack-list flag-sets) (receive (poller.ack-list flag-sets) (dispatch (car choices) poller.ack-list flag-sets) (fold-choices (cdr choices) poller.ack-list flag-sets)))) (fold-choices choices '() '()))) ;;; Associate a unique ack cell with each poller. Associate a unique ;;; flag set with each nack group composed of that group's nack token ;;; and the set of ack cells from its subgroup. (define (collect-acks nack-token group poller.ack-list flag-sets) (define (dispatch group poller.ack-list ack-cells flag-sets) (rendezvous-group-type-case group ;; Create an ack cell for each poller. Leave the list of flag ;; sets untouched. ((POLLING pollers) (let loop ((pollers pollers) (poller.ack-list poller.ack-list) (ack-cells ack-cells)) (if (null? pollers) (values poller.ack-list ack-cells flag-sets) (let ((poller (car pollers)) (ack-cell (make-ack-cell))) (loop (cdr pollers) (cons (cons poller ack-cell) poller.ack-list) (cons ack-cell ack-cells)))))) ;; Fold the choices together. ((CHOOSING choices) (let loop ((choices choices) (poller.ack-list poller.ack-list) (ack-cells ack-cells) (flag-sets flag-sets)) (if (null? choices) (values poller.ack-list ack-cells flag-sets) (receive (poller.ack-list ack-cells flag-sets) (dispatch (car choices) poller.ack-list ack-cells flag-sets) (loop (cdr choices) poller.ack-list ack-cells flag-sets))))) ;; Compute the ack cells of the subgroup; then add a new flag set ;; for this nack group's token with those ack cells. ((NACK nack-token subgroup) (receive (poller.ack-list new-ack-cells flag-sets) (dispatch subgroup poller.ack-list ack-cells flag-sets) (values poller.ack-list (append new-ack-cells ack-cells) (cons (make-flag-set nack-token new-ack-cells) flag-sets)))))) (receive (poller.ack-list ack-cells flag-sets) (dispatch group poller.ack-list '() flag-sets) (values poller.ack-list (cons (make-flag-set nack-token ack-cells) flag-sets)))) ;;;; Negative Acknowledgement Rendezvous (define-synchronized-record-type :nack-token (%make-nack-token flagged? data) (flagged? data) nack-token? (flagged? nack-token-flagged? set-nack-token-flagged?!) (data nack-token-data set-nack-token-data!)) (define (make-nack-token) (%make-nack-token #f '())) (define-synchronized-record-type :ack-cell (%make-ack-cell set?) (set?) ack-cell? (set? ack-cell-set? set-ack-cell-set?!)) (define (make-ack-cell) (%make-ack-cell #f)) (define (set-ack-cell! ack-cell) (set-ack-cell-set?! ack-cell #t)) (define (nack-token-rendezvous nack-token) (polling-rendezvous (lambda () (if (nack-token-flagged? nack-token) (let ((priority (nack-token-data nack-token))) (set-nack-token-data! nack-token (+ priority 1)) (values priority (lambda (prepare-revival) (set-nack-token-data! nack-token 1)))) (values #f (lambda (suspension) (set-nack-token-data! nack-token (cons suspension (nack-token-data nack-token))))))))) (define (ack-finalizer ack-cell flag-sets) (lambda (prepare-revival) (set-ack-cell! ack-cell) ;; Go through each flag set in the list. If the flag set has no ;; ack cell set -- i.e., none of the rendezvous it represents was ;; selected --, flag its nack token. (for-each (lambda (flag-set) (let loop ((ack-cells (flag-set-ack-cells flag-set))) (cond ((null? ack-cells) (flag-nack! (flag-set-nack-token flag-set) prepare-revival)) ((not (ack-cell-set? (car ack-cells))) (loop (cdr ack-cells)))))) flag-sets))) (define (flag-nack! nack-token prepare-revival) (cond ((nack-token-flagged? nack-token) ; Sanity check. (error "already flagged nack" nack-token prepare-revival)) (else (for-each prepare-revival (nack-token-data nack-token)) (set-nack-token-flagged?! nack-token #t) (set-nack-token-data! nack-token 1)))) ;;;; Priority Selection (define (select-by-priority items default-priority) (if (null? (cdr items)) (cdar items) ;+++ (let loop ((items (cdr items)) (maximum (canonical-priority (caar items) default-priority)) (count 1) (data (cons (cdar items) '()))) (if (null? items) (if (pair? (cdr data)) (select-random-element data count) (car data)) (let* ((item (car items)) (priority (canonical-priority (car item) default-priority)) (datum (cdr item))) (cond ((> priority maximum) ;; Greater than all those we have, so toss those. (loop priority 1 (cons datum '()))) ((= priority maximum) ;; Lump it in with the rest of the same priority. (loop maximum (+ count 1) (cons datum data))) (else ;; Ignore that with a lesser priority. (loop maximum count data)))))))) (define (canonical-priority priority default) (if (negative? priority) default priority)) (define (select-random-element data limit) ;++ (list-ref data (random-integer limit)) (car data)) ;;;; Rendezvous Syntactic Sugar ;; (put 'rendezvous-case 'scheme-indent-function 0) (define-syntax rendezvous-case (syntax-rules () ((RENDEZVOUS-CASE clause1 clause2 ...) (SYNCHRONIZE-CHOSEN-RENDEZVOUS (RENDEZVOUS-CASE-CLAUSE clause1) (RENDEZVOUS-CASE-CLAUSE clause2) ...)))) (define-syntax rendezvous-case-clause (syntax-rules (=>) ((RENDEZVOUS-CASE-CLAUSE (rv)) rv) ((RENDEZVOUS-CASE-CLAUSE (rv => consumer)) (MAP-RENDEZVOUS rv consumer)) ((RENDEZVOUS-CASE-CLAUSE (rv body0 body1 ...)) (MAP-RENDEZVOUS rv (LAMBDA IGNORED body0 body1 ...))))) ;;;; Miscellaneous Utilities (define (compose f g) (lambda args (call-with-values (lambda () (apply g args)) f))) ;;; These are identical to their counterparts in SRFI 1. They are ;;; defined here to minimize dependencies. (define (append-reverse list tail) (if (pair? list) (append-reverse (cdr list) (cons (car list) tail)) tail)) (define (reverse! list) (if (pair? list) (let loop ((list list) (tail '())) (let ((d (cdr list))) (set-cdr! list tail) (if (pair? d) (loop d list) list))) '()))