;;; -*- Mode: Scheme -*- ;;;; Join Calculus ;;; There are four basic primitives here: ;;; ;;; (WITH-PROCESS ((WHEN ;syntax ;;; )*) ;;; ) ;;; where is ( *) ;;; or (AND *) ;;; Creates a new process with the given join patterns. When the ;;; supplied channels are matched up, the respective body is run. ;;; The supplied channel names are bound to procedures. ;;; ;;; (CALL-WITH-CONTINUATION-CHANNEL ) ;procedure ;;; For synchronous calls -- passes to the receiver a channel ;;; suitable for returning a value to the continuation of the ;;; CALL-WITH-CONTINUATION-CHANNEL call. ;;; ;;; (PARALLEL *) ;syntax ;;; (PARALLELIZE *) ;syntax ;;; Two ways to parallelize: PARALLEL is a hint, while PARALLELIZE ;;; is a command. PARALLEL could be equivalent to ;;; (LET ((T0 subproblem0) (T1 subproblem1) ... (TN subproblemN)) ;;; UNSPECIFIC) ;;; in a single-processor system. (declare (usual-integrations)) ;;;; Lesser Primitives: CWCC(hannel), PARALLEL, PARALLELIZE (define (call-with-continuation-channel receiver) (let ((flag #f) (value #f)) (receiver (let ((thread (current-thread))) (define (continue val) ;++ This should be atomic. (set! continue (lambda (val) (error "Multiple return to continuation channel:" val))) (signal-thread-event thread (lambda () (set! flag #t) (set! value val)))) (lambda (value) (continue value)))) (let loop () (suspend-current-thread) (if flag value (loop))))) (define-syntax parallel (sc-macro-transformer (lambda (form environment) (let* ((subproblems (close-syntax* (cdr form) environment)) (identifiers (map (lambda (subproblem) subproblem (make-synthetic-identifier 'IGNORED)) subproblems))) `(LET ,(map list identifiers subproblems) ,@identifiers ;ignored UNSPECIFIC))))) ;;;; WITH-PROCESS Macro (define-syntax with-process (sc-macro-transformer (lambda (form environment) (call-with-syntax-error-procedure (lambda (syntax-error) (capture-syntactic-environment (lambda (closing-environment) (parse-process form environment (lambda (keyword-identifier user-identifier) (identifier=? closing-environment keyword-identifier environment user-identifier)) (lambda (identifier-a identifier-b) (identifier=? environment identifier-a environment identifier-b)) (lambda (process) `(LET () ,@(generate-process-definitions process) ,@(make-syntactic-closures environment (map channel/identifier (process/channels process)) (cddr form)))) syntax-error)))))))) ;;;;; Intermediate Transformer Structures (define-structure (process (conc-name process/)) thread ;Identifier of the variable for the process's thread queue ;Identifier of the variable for the queue to send to clauses ;All clauses involved in the process channels ;All channels involved in the process ) (define-structure (clause (conc-name clause/) (constructor make-clause (identifier parameters mask-variable mask-initializer channels))) identifier ;Identifier of the variable for the clause's procedure parameters ;List of all parameters' identifiers to be bound in body body ;Body expression, to be evaluated when pattern matches mask-variable ;Identifier naming variable bound to current pattern mask mask-initializer ;Integer bit string of with all its channels' indices set channels ;All channels matched in this clause's join pattern ) (define-structure (channel (conc-name channel/) (constructor %make-channel)) identifier ;Identifier supplied by the user, bound to procedure that ; sends to the channel parameters ;Number of parameters for the channel index ;Number assigned to the channel, unique to a process queue ;Variable bound to a queue of messages on the channel, or the ; number of messages if each message is empty. ) ;;;;; Process Form Parser (define (parse-process form environment keyword? user-identifier=? win lose) (if (and (pair? (cdr form)) ;(keyword (clause ...) body0 body1 ...) (list? (cadr form)) (pair? (cddr form)) (list? (cdddr form)) (list-of-type? (cadr form) (process-clause-form-predicate keyword?))) (parse-clauses (cadr form) environment keyword? user-identifier=? (lambda (clauses channels) (win (make-process (make-synthetic-identifier 'THREAD) (make-synthetic-identifier 'QUEUE) clauses channels))) lose) (lose "Ill-formed syntax:" form))) ;;; (WHEN ) (define (process-clause-form-predicate keyword?) (let ((join-pattern? (join-pattern-predicate keyword?))) (define (clause-form? form) (and (pair? form) (identifier? (car form)) (keyword? 'WHEN (car form)) (pair? (cdr form)) (join-pattern? (cadr form)) (pair? (cddr form)) ;Non-empty body (list? (cdddr form)))) clause-form?)) (define (join-pattern-predicate keyword?) (define (join-pattern? form) (and (pair? form) (identifier? (car form)) (list-of-type? (cdr form) (if (keyword? 'AND (car form)) join-pattern? identifier?)))) join-pattern?) (define (parse-clauses clause-forms environment keyword? user-identifier=? win lose) (*parse-clauses (map cadr clause-forms) keyword? user-identifier=? (lambda (clauses channels) (finalize-clauses clauses (map cddr clause-forms) (map channel/identifier channels) environment) (win clauses channels)) lose)) (define (finalize-clauses clauses body-forms channel-identifiers environment) (for-each (lambda (clause body-form) (set-clause/body! clause (make-syntactic-closures environment (append channel-identifiers (clause/parameters clause)) body-form))) clauses body-forms)) ;;; This procedure is too long and should be factored into several small ones. (define (*parse-clauses channel-form-lists keyword? user-identifier=? win lose) (let ((flatten-join-pattern (join-pattern-flattener keyword?)) (find-channel (association-procedure user-identifier=? channel/identifier)) (valid-channel-bvl? (valid-channel-bvl-predicate user-identifier=?))) (let outer-loop ((channel-form-lists channel-form-lists) (clauses '()) (channel-count 0) (channels '())) (if (null? channel-form-lists) (win (reverse! clauses) (reverse! channels)) (let inner-loop ((channel-forms (flatten-join-pattern (car channel-form-lists))) (all-channels channels) (channel-count channel-count) (channels '()) (parameters '())) (if (null? channel-forms) (outer-loop (cdr channel-form-lists) (cons (let ((name (make-clause-name channels)) (mask (make-mask-initializer channels))) (make-clause name (reverse! parameters) (make-mask-variable name) mask (reverse! channels))) clauses) channel-count all-channels) (let ((channel-form (car channel-forms))) (cond ((not (and (identifier? (car channel-form)) (valid-channel-bvl? (cdr channel-form)) (not (find-channel (car channel-form) channels)))) (lose "Ill-formed channel:" channel-form)) ((find-channel (car channel-form) all-channels) => (lambda (channel) (if (= (channel/parameters channel) (length (cdr channel-form))) (inner-loop (cdr channel-forms) all-channels channel-count (cons channel channels) (append-reverse (cdr channel-form) parameters)) (lose "Mismatching channel arity:" channel-form)))) (else (let ((channel (make-channel channel-form channel-count))) (inner-loop (cdr channel-forms) (cons channel all-channels) (+ channel-count 1) (cons channel channels) (append-reverse (cdr channel-form) parameters)))))))))))) (define (join-pattern-flattener keyword?) (define (flatten-join-pattern form) (if (keyword? 'AND (car form)) (append-map flatten-join-pattern (cdr form)) (list form))) flatten-join-pattern) (define (valid-channel-bvl-predicate user-identifier=?) (let ((seen? (member-procedure user-identifier=?))) (define (valid-channel-bvl? form) (let loop ((form form) (seen '())) (cond ((pair? form) (let ((item (car form))) (and (identifier? item) (not (seen? item seen)) (loop (cdr form) (cons item seen))))) ((null? form) #t) (else #f)))) valid-channel-bvl?)) (define (make-channel channel-form index) (let ((parameters (length (cdr channel-form)))) (%make-channel (car channel-form) (length (cdr channel-form)) index ((if (zero? parameters) make-count-variable make-queue-variable) (car channel-form))))) (define (make-count-variable channel) (make-synthetic-identifier (symbol-append (identifier->symbol channel) '- 'COUNT))) (define (make-queue-variable channel) (make-synthetic-identifier (symbol-append (identifier->symbol channel) '- 'QUEUE))) (define (make-clause-name channels) (make-synthetic-identifier (string->symbol (reduce-map (lambda (channel) (symbol-name (identifier->symbol (channel/identifier channel)))) (lambda (left right) (string-append left "/" right)) "" channels)))) (define (make-mask-variable name) (make-synthetic-identifier (symbol-append (identifier->symbol name) '- 'MASK))) (define (make-mask-initializer channels) (reduce-map channel/mask fix:or 0 channels)) (define (channel/mask channel) (fix:lsh 1 (channel/index channel))) (define (channel/key channel) (fix:not (channel/mask channel))) ;;;;; Top-Level Output Generator (define (generate-process-definitions process) `((DEFINE ,(process/queue process) (MAKE-QUEUE)) (DEFINE ,(process/thread process) (CREATE-THREAD #F (LET ,(generate-local-bindings process) ,@(map generate-clause (process/clauses process)) (DEFINE DISPATCHER ,(generate-dispatcher process)) (LAMBDA () ;Thunk argument to CREATE-THREAD (LET LOOP () (DEQUEUE-OR-SUSPEND ,(process/queue process) DISPATCHER) (LOOP)))))) ,@(generate-channel-procedures process))) (define (generate-local-bindings process) (if (null? (cdr (process/channels process))) '() `(,@(map (lambda (clause) `(,(clause/mask-variable clause) ,(clause/mask-initializer clause))) (process/clauses process)) ,@(map (lambda (channel) `(,(channel/queue channel) ,(if (zero? (channel/parameters channel)) '0 '(MAKE-QUEUE)))) (process/channels process))))) (define (generate-clause clause) (let ((identifier (clause/identifier clause)) (parameters (clause/parameters clause)) (channels (clause/channels clause)) (body (clause/body clause))) (receive (channels-with-items item-variables) (generate-item-parameters channels) `(DEFINE (,identifier ,@item-variables) (LET ,(generate-clause-bindings parameters channels-with-items item-variables) ,@body))))) (define (generate-item-parameters channels) (let loop ((channels channels) (channels-with-items '()) (item-variables '())) (if (null? channels) (values (reverse! channels-with-items) (reverse! item-variables)) (let ((channel (car channels)) (channels (cdr channels))) (if (zero? (channel/parameters channel)) (loop channels channels-with-items item-variables) (loop channels (cons channel channels-with-items) (cons (generate-item-variable channel) item-variables))))))) (define (generate-item-variable channel) (make-synthetic-identifier (symbol-append (identifier->symbol (channel/identifier channel)) '- 'ITEM))) (define (generate-clause-bindings parameters channels items) (let outer ((parameters parameters) (channels channels) (items items)) (if (null? channels) (if (null? parameters) '() (error "mismatch" parameters)) (let ((channel (car channels)) (item (car items))) (if (= 1 (channel/parameters channel)) `((,(car parameters) ,item) ,@(outer (cdr parameters) (cdr channels) (cdr items))) (let inner ((parameters parameters) (index 0)) (if (= index (channel/parameters channel)) (outer parameters (cdr channels) (cdr items)) (let ((index (+ index 1))) `((,(car parameters) (VECTOR-REF ,item ,index)) ,@(inner (cdr parameters) index)))))))))) ;;;;; Dispatcher (define (generate-dispatcher process) `(LAMBDA (ITEM) ,(if (null? (cdr (process/channels process))) ;** This requires a check earlier on of whether there are any ;** equivalent clauses. `(,(clause/identifier (car (process/clauses process))) ,@(case (channel/parameters (car (process/channels process))) ((0) '()) ((1) '((VECTOR-REF ITEM 1))) (else '(ITEM)))) (*generate-dispatcher process)) UNSPECIFIC)) (define (*generate-dispatcher process) `(LET ((KEY (VECTOR-REF ITEM 0))) ,(let recur ((clauses (process/clauses process)) (mask-variables '()) (mask-locals '())) (if (null? clauses) (generate-dispatcher-default mask-variables mask-locals process) (let* ((clause (car clauses)) (mask-variable (clause/mask-variable clause)) (mask-local (make-synthetic-identifier mask-variable))) `(LET ((,mask-local (FIX:AND ,mask-variable KEY))) (IF (FIX:ZERO? ,mask-local) ,(generate-calls clause process) ,(recur (cdr clauses) (cons mask-variable mask-variables) (cons mask-local mask-locals))))))))) (define (generate-dispatcher-default mask-variables mask-locals process) `(BEGIN ,@(map (lambda (variable local) `(SET! ,variable ,local)) mask-variables mask-locals) (COND ,@(map (lambda (channel) `((FIX:= KEY ,(channel/key channel)) ,(generate-channel-enqueue channel))) (process/channels process))))) (define (generate-calls clause process) (let ((operator (clause/identifier clause)) (channels (clause/channels clause))) (if (null? (cdr channels)) ;+++ `(,operator ,@(generate-channel-arguments channels (car channels) process)) `(COND ,@(map (let ((operator (clause/identifier clause))) (lambda (channel) `((FIX:= KEY ,(channel/key channel)) ,@(generate-count-decrements channels channel process) (,operator ,@(generate-channel-arguments channels channel process))))) channels))))) (define (generate-channel-enqueue channel) (let ((variable (channel/queue channel)) (parameters (channel/parameters channel))) (if (zero? parameters) `(SET! ,variable (FIX:+ ,variable 1)) `(ENQUEUE!/UNSAFE ,variable ,(if (= parameters 1) '(VECTOR-REF ITEM 1) 'ITEM))))) (define (generate-count-decrements channels channel process) (append-map (lambda (*channel) (if (and (not (eq? *channel channel)) (zero? (channel/parameters *channel))) (generate-count-decrement *channel process) '())) channels)) (define (generate-channel-arguments channels channel process) (append-map (lambda (*channel) (if (zero? (channel/parameters *channel)) '() (list (if (eq? *channel channel) (if (= 1 (channel/parameters channel)) '(VECTOR-REF ITEM 1) 'ITEM) (generate-channel-dequeue *channel process))))) channels)) (define (generate-count-decrement channel process) (let ((variable (channel/queue channel))) `((SET! ,variable (FIX:- ,variable 1)) (IF (FIX:ZERO? ,variable) ,(generate-mask-updates channel process))))) (define (generate-channel-dequeue channel process) (let ((queue (channel/queue channel))) `(LET ((ITEM (DEQUEUE!/UNSAFE ,queue))) (IF (QUEUE-EMPTY? ,queue) ,(generate-mask-updates channel process)) ITEM))) (define (generate-mask-updates channel process) (let ((mask (channel/mask channel))) `(BEGIN ,@(append-map (lambda (clause) (if (memq channel (clause/channels clause)) (let ((mask-variable (clause/mask-variable clause))) `((SET! ,mask-variable (FIX:OR ,mask-variable ,mask)))) '())) (process/clauses process))))) ;;;;; Channel Procedures (define (generate-channel-procedures process) (map (let ((thread (process/thread process)) (queue (process/queue process))) (lambda (channel) (let ((parameters (unfold zero? make-parameter-variable -1+ (channel/parameters channel)))) `(DEFINE (,(channel/identifier channel) ,@parameters) (ENQUEUE-AND-SIGNAL ,queue (VECTOR ,(channel/key channel) ,@parameters) ,thread #T))))) (process/channels process))) (define (make-parameter-variable i) (make-synthetic-identifier (symbol-append 'PARAMETER (string->symbol (number->string i))))) ;;;; Utilities (define-integrable (enqueue-and-signal queue datum thread event) (with-interrupt-mask interrupt-mask/gc-ok (lambda (interrupt-mask) interrupt-mask (enqueue!/unsafe queue datum) (signal-thread-event thread event)))) (define-integrable (dequeue-or-suspend queue win) (let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok))) (if (queue-empty? queue) (begin (set-interrupt-enables! interrupt-mask) (suspend-current-thread)) (win (let ((item (dequeue!/unsafe queue))) (set-interrupt-enables! interrupt-mask) item))))) (define (fold-map mapper combiner state list) (let loop ((state state) (list list)) (if (pair? list) (loop (combiner (mapper (car list)) state) (cdr list)) state))) (define (reduce-map mapper combiner right-identity list) (if (pair? list) (fold-map mapper combiner (mapper (car list)) (cdr list)) right-identity)) (define (unfold stop? generate next state) (if (stop? state) '() (cons (generate state) (unfold stop? generate next (next state))))) (define (append-reverse list tail) (let loop ((list list) (tail tail)) (if (pair? list) (loop (cdr list) (cons (car list) tail)) tail))) (define (make-syntactic-closures environment free-names forms) (map (lambda (form) (make-syntactic-closure environment free-names form)) forms)) (define (close-syntax* forms environment) (map (lambda (form) (close-syntax form environment)) forms))