;;; -*- Mode: Scheme -*- ;;;; Join Calculus Examples ;;; This code is written by Taylor R. Campbell and placed in the Public ;;; Domain. All warranties are disclaimed. ;;;; Jar Example (define-structure (jar (constructor %make-jar)) getter putter ) (define (make-jar) (with-process ((when (and (put k obj) (empty)) (parallel (k unspecific) (full obj))) (when (and (get k) (full obj)) (parallel (k obj) (empty)))) (empty) (%make-jar get put))) (define (jar/get jar) (call-with-continuation-channel (jar-getter jar))) (define (jar/put jar obj) (call-with-continuation-channel (lambda (k) ((jar-putter jar) k obj)))) ;;; manually-built structure of above for testing (define put-channel (%make-channel 'PUT 2 0 'PUT-QUEUE)) (define empty-channel (%make-channel 'EMPTY 0 1 'EMPTY-QUEUE)) (define put/empty-clause (make-clause 'PUT/EMPTY '(K OBJ) 'PUT/EMPTY-MASK #b0011 (list put-channel empty-channel))) (set-clause/body! put/empty-clause '((DEBUG-OUTPUT ";PUT/EMPTY ~S ~S~%" K OBJ) (PARALLEL (FULL OBJ) (K)))) (define get-channel (%make-channel 'GET 1 2 'GET-QUEUE)) (define full-channel (%make-channel 'FULL 1 3 'FULL-QUEUE)) (define get/full-clause (make-clause 'GET/FULL '(K OBJ) 'GET/FULL-MASK #b1100 (list get-channel full-channel))) (set-clause/body! get/full-clause '((DEBUG-OUTPUT ";GET/FULL ~S ~S~%" K OBJ) (PARALLEL (K OBJ) (EMPTY)))) (define jar-process (make-process 'THREAD 'QUEUE (list put/empty-clause get/full-clause) (list put-channel empty-channel get-channel full-channel))) ;;; The above definition expands to: (define (make-jar) (define queue (make-queue)) (define thread (create-thread #f (let ((put/empty-mask #b0011) (get/full-mask #b1100) (put-queue (make-queue)) (empty-queue (make-queue)) (get-queue (make-queue)) (full-queue (make-queue))) (define (put/empty put-item empty-item) (let ((k (vector-ref put-item 1)) (obj (vector-ref put-item 2))) (debug-output ";PUT/EMPTY ~S ~S~%" k obj) (parallel (full obj) (k unspecific)))) (define (get/full get-item full-item) (let ((k (vector-ref get-item 1)) (obj (vector-ref full-item 1))) (debug-output ";GET/FULL ~S ~S~%" k obj) (parallel (k obj) (empty)))) (define (loop) (dequeue-or-suspend queue dispatch) (loop)) (define (dispatch item) (debug-output ";Dispatching ~S~%" item) (let ((key (vector-ref item 0))) ;; This is hyper-bummed up the bloody wazoo. (let ((p/e-mask (fix:and put/empty-mask key))) (if (fix:zero? p/e-mask) (cond ((fix:= key #b1110) (put/empty item (let ((empty-item (dequeue!/unsafe empty-queue))) (if (queue-empty? empty-queue) (set! put/empty-mask (fix:or put/empty-mask #b-0010))) empty-item))) ((fix:= key #b1101) (put/empty (let ((put-item (dequeue!/unsafe put-queue))) (if (queue-empty? put-queue) (set! put/empty-mask (fix:or put/empty-mask #b-0001))) put-item) item))) (let ((g/f-mask (fix:and get/full-mask key))) (if (fix:zero? g/f-mask) (cond ((fix:= key #b1011) (get/full item (let ((full-item (dequeue!/unsafe full-queue))) (if (queue-empty? full-queue) (set! get/full-mask (fix:and get/full-mask #b-1000))) full-item))) ((fix:= key #b0111) (get/full (let ((get-item (dequeue!/unsafe get-queue))) (if (queue-empty? get-queue) (set! get/full-mask (fix:and get/full-mask #b-0100))) get-item) item))) (begin (set! put/empty-mask p/e-mask) (set! get/full-mask g/f-mask) (cond ((fix:= key #b1110) (enqueue!/unsafe put-queue item)) ((fix:= key #b1101) (enqueue!/unsafe empty-queue item)) ((fix:= key #b1011) (enqueue!/unsafe get-queue item)) ((fix:= key #b0111) (enqueue!/unsafe full-queue item)))))))))) ;; thunk argument to CREATE-THREAD loop))) (define-integrable (send message) (enqueue-and-signal queue message thread #t)) (define (put k obj) (send (vector #b1110 k obj))) (define (empty) (send (vector #b1101))) (define (get k) (send (vector #b1011 k))) (define (full obj) (send (vector #b0111 obj))) (yield-current-thread) (empty) (%make-jar get put)) ;;;; Counter Example (define-structure (counter (constructor %make-counter) (conc-name counter/)) incrementor accessor ) (define (make-counter initial-value) (with-process ((when (and (count n) (increment k)) (parallel (count (+ n 1)) (k unspecific))) (when (and (count n) (access k)) (parallel (count n) (k n)))) (count initial-value) (%make-counter increment access))) (define (counter-value counter) (call-with-continuation-channel (counter/accessor counter))) (define (increment-counter counter) (call-with-continuation-channel (counter/incrementor counter))) ;;;; Synchronous Channels (define (make-synchronous-channel) (let ((queue (make-queue))) (define (send obj reader writer queue-channel) (parallel (reader obj) (writer unspecific) (if (not (queue-empty? queue)) (queue-channel)))) (with-process ((when (and (readers) (write obj writer)) (send obj (dequeue! queue) writer readers)) (when (and (writers) (read reader)) (let ((obj.writer (dequeue! queue))) (send (car obj.writer) reader (cdr obj.writer) writers))) ;; This is a little annoying. We have to duplicate ;; this pair of clauses in order to consume any ;; WRITERS or READERS message if there is one. (when (and (writers) (write obj writer)) (enqueue! queue (cons obj writer)) (writers)) (when (and (readers) (read reader)) (enqueue! queue reader) (readers)) (when (write obj writer) (enqueue! queue (cons obj writer)) (writers)) (when (read reader) (enqueue! queue reader) (readers))) (values write read)))) (named-lambda (make-synchronous-channel) (let ((queue (make-queue))) (define (send obj reader writer queue-channel) (let ((.ignored.1-0 (reader obj)) (.ignored.1-1 (writer unspecific)) (ignored (if (not (queue-empty? queue)) (queue-channel)))) .ignored.1-0 .ignored.1-1 ignored unspecific)) (let () (define .queue.2-0 (make-queue)) (define thread (create-thread #f (let ((readers/write-mask 3) (writers/read-mask 12) (writers/write-mask 6) (readers/read-mask 9) (write-mask 2) (read-mask 8) (readers-count 0) (write-queue (make-queue)) (writers-count 0) (read-queue (make-queue))) (define (readers/write write-item) (let ((obj (vector-ref write-item 1)) (writer (vector-ref write-item 2))) (send obj (dequeue! queue) writer readers))) (define (writers/read read-item) (let ((reader read-item)) (let ((obj.writer (dequeue! queue))) (send (car obj.writer) reader (cdr obj.writer) writers)))) (define (writers/write write-item) (let ((obj (vector-ref write-item 1)) (writer (vector-ref write-item 2))) (enqueue! queue (cons obj writer)) (writers))) (define (readers/read read-item) (let ((reader read-item)) (enqueue! queue reader) (readers))) (define (write write-item) (let ((obj (vector-ref write-item 1)) (writer (vector-ref write-item 2))) (enqueue! queue (cons obj writer)) (writers))) (define (read read-item) (let ((reader read-item)) (enqueue! queue reader) (readers))) (define dispatcher (lambda (item) (let ((key (vector-ref item 0))) (let ((.readers/write-mask.3-0 (fix:and readers/write-mask key))) (if (fix:zero? .readers/write-mask.3-0) (cond ((fix:= key -2) (readers/write (let ((item (dequeue!/unsafe write-queue))) (if (queue-empty? write-queue) (begin (set! readers/write-mask (fix:or readers/write-mask 2)) (set! writers/write-mask (fix:or writers/write-mask 2)) (set! write-mask (fix:or write-mask 2)))) item))) ((fix:= key -3) (set! readers-count (fix:- readers-count 1)) (if (fix:zero? readers-count) (begin (set! readers/write-mask (fix:or readers/write-mask 1)) (set! readers/read-mask (fix:or readers/read-mask 1)))) (readers/write item))) (let ((.writers/read-mask.4-0 (fix:and writers/read-mask key))) (if (fix:zero? .writers/read-mask.4-0) (cond ((fix:= key -5) (writers/read (let ((item (dequeue!/unsafe read-queue))) (if (queue-empty? read-queue) (begin (set! writers/read-mask (fix:or writers/read-mask 8)) (set! readers/read-mask (fix:or readers/read-mask 8)) (set! read-mask (fix:or read-mask 8)))) item))) ((fix:= key -9) (set! writers-count (fix:- writers-count 1)) (if (fix:zero? writers-count) (begin (set! writers/read-mask (fix:or writers/read-mask 4)) (set! writers/write-mask (fix:or writers/write-mask 4)))) (writers/read (vector-ref item 1)))) (let ((.writers/write-mask.5-0 (fix:and writers/write-mask key))) (if (fix:zero? .writers/write-mask.5-0) (cond ((fix:= key -5) (writers/write (let ((item (dequeue!/unsafe write-queue))) (if (queue-empty? write-queue) (begin (set! readers/write-mask (fix:or readers/write-mask 2)) (set! writers/write-mask (fix:or writers/write-mask 2)) (set! write-mask (fix:or write-mask 2)))) item))) ((fix:= key -3) (set! writers-count (fix:- writers-count 1)) (if (fix:zero? writers-count) (begin (set! writers/read-mask (fix:or writers/read-mask 4)) (set! writers/write-mask (fix:or writers/write-mask 4)))) (writers/write item))) (let ((.readers/read-mask.6-0 (fix:and readers/read-mask key))) (if (fix:zero? .readers/read-mask.6-0) (cond ((fix:= key -2) (readers/read (let ((item (dequeue!/unsafe read-queue))) (if (queue-empty? read-queue) (begin (set! writers/read-mask (fix:or writers/read-mask 8)) (set! readers/read-mask (fix:or readers/read-mask 8)) (set! read-mask (fix:or read-mask 8)))) item))) ((fix:= key -9) (set! readers-count (fix:- readers-count 1)) (if (fix:zero? readers-count) (begin (set! readers/write-mask (fix:or readers/write-mask 1)) (set! readers/read-mask (fix:or readers/read-mask 1)))) (readers/read (vector-ref item 1)))) (let ((.write-mask.7-0 (fix:and write-mask key))) (if (fix:zero? .write-mask.7-0) (write item) (let ((.read-mask.8-0 (fix:and read-mask key))) (if (fix:zero? .read-mask.8-0) (read (vector-ref item 1)) (begin (set! read-mask .read-mask.8-0) (set! write-mask .write-mask.7-0) (set! readers/read-mask .readers/read-mask.6-0) (set! writers/write-mask .writers/write-mask.5-0) (set! writers/read-mask .writers/read-mask.4-0) (set! readers/write-mask .readers/write-mask.3-0) (cond ((fix:= key -2) (set! readers-count (fix:+ readers-count 1))) ((fix:= key -3) (enqueue!/unsafe write-queue item)) ((fix:= key -5) (set! writers-count (fix:+ writers-count 1))) ((fix:= key -9) (enqueue!/unsafe read-queue (vector-ref item 1)))))))))))))))))) unspecific)) (lambda () ((let () (define loop (lambda () (dequeue-or-suspend .queue.2-0 dispatcher) (loop))) loop)))))) (define (readers) (enqueue-and-signal .queue.2-0 (vector -2) thread #t)) (define (write parameter2 parameter1) (enqueue-and-signal .queue.2-0 (vector -3 parameter2 parameter1) thread #t)) (define (writers) (enqueue-and-signal .queue.2-0 (vector -5) thread #t)) (define (read parameter1) (enqueue-and-signal .queue.2-0 (vector -9 parameter1) thread #t)) (values write read)))) ;;;; Debug Output Utility for Testing (define debug-output-port) (define (debug-output fmt . args) (fresh-line debug-output-port) (fluid-let ((*unparser-radix* 2)) (apply format debug-output-port fmt args)))