;;; -*- Mode: Scheme -*- ;;;; SRFI 18: Multithreading support ;;;; Scheme48 implementation ;;; Copyright (c) 2009, 2010, Taylor R. Campbell ;;; ;;; Redistribution and use in source and binary forms, with or without ;;; modification, are permitted provided that the following conditions ;;; are met: ;;; ;;; * Redistributions of source code must retain the above copyright ;;; notice, this list of conditions and the following disclaimer. ;;; ;;; * Redistributions in binary form must reproduce the above copyright ;;; notice, this list of conditions and the following disclaimer in ;;; the documentation and/or other materials provided with the ;;; distribution. ;;; ;;; * Neither the names of the authors nor the names of contributors ;;; may be used to endorse or promote products derived from this ;;; software without specific prior written permission. ;;; ;;; THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS ;;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY ;;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL ;;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE ;;; GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS ;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, ;;; WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING ;;; NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS ;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ;;; This is a partial implementation of Marc Feeley's SRFI 18, ;;; Multithreading support, for Scheme48. Currently only the mutex and ;;; condition variable parts are implemented; none of the thread or ;;; exception parts are implemented yet. This code is probably still ;;; buggy: it has been only extremely minimally tested. ;;; Randomness. Used to discern what state to put a mutex in when ;;; locking it -- not very useful, though, because threads don't yet ;;; update the states of the mutices that they hold when they exit. (define (thread-terminated? thread) (not (or (running? thread) (thread-continuation thread)))) ;;;; Condition Variables (define-record-type condition-variable :condition-variable (%make-condition-variable waiters specific name) condition-variable? (waiters condition-variable.waiters) (specific condition-variable-specific condition-variable-specific-set!) (name condition-variable-name)) (define-record-discloser :condition-variable (lambda (condition-variable) (list 'CONDITION-VARIABLE (condition-variable-name condition-variable)))) (define (make-condition-variable . name) (%make-condition-variable (make-queue) #f (if (pair? name) (car name) #f))) (define (condition-variable-signal! condition-variable) (if (current-proposal) (error "I am about to perform an action I cannot roll back!" (current-thread) `(CONDITION-VARIABLE-SIGNAL ,condition-variable))) (with-new-proposal (retry) (if (not (maybe-commit-and-signal-condition-variable condition-variable)) (retry)))) (define (maybe-commit-and-signal-condition-variable condition-variable) (let ((waiter (maybe-dequeue-thread! (condition-variable.waiters condition-variable)))) (if waiter (maybe-commit-and-make-ready waiter) (maybe-commit)))) (define (maybe-commit-and-broadcast-condition-variable condition-variable) (maybe-commit-and-make-ready (condition-variable.waiters condition-variable))) (define (condition-variable-broadcast! condition-variable) (if (current-proposal) (error "I am about to perform an action I cannot roll back!" (current-thread) `(CONDITION-VARIABLE-SIGNAL ,condition-variable))) (with-new-proposal (retry) (if (not (maybe-commit-and-broadcast-condition-variable condition-variable)) (retry)))) ;;;; Mutices (define-synchronized-record-type mutex :mutex (%make-mutex state waiters specific name) (state) mutex? (state mutex.state set-mutex.state!) (waiters mutex.waiters) (specific mutex-specific mutex-specific-set!) (name mutex-name)) (define-record-discloser :mutex (lambda (mutex) (list 'MUTEX (mutex-name mutex) (mutex.state mutex)))) (define (make-mutex . name) (%make-mutex 'UNLOCKED (make-queue) #f (if (pair? name) (car name) #f))) (define (mutex-state mutex) (mutex.state mutex)) (define (mutex-lock! mutex . arguments) (if (current-proposal) (error "I am about to perform an action I cannot roll back!" (current-thread) `(MUTEX-LOCK! ,mutex ,@arguments))) (if (not (mutex? mutex)) (error "Invalid mutex for MUTEX-LOCK!:" mutex)) (if (pair? arguments) (let ((timeout (car arguments))) (if (not (or (not timeout) (timeout? timeout))) (error "Invalid timeout for MUTEX-LOCK!:" timeout)) (if (pair? (cdr arguments)) (let ((thread (cadr arguments))) (if (not (thread? thread)) (error "Invalid thread for MUTEX-LOCK!:" thread)) (if (null? (cddr arguments)) (%mutex-lock! mutex timeout thread) (error "Too many arguments:" `(MUTEX-LOCK! ,mutex ,@arguments)))) (%mutex-lock! mutex timeout (current-thread)))) (%mutex-lock! mutex #f (current-thread)))) (define (attempt-lock mutex thread first? if-enabled if-blocked) ((with-new-proposal (retry) (let ((state (mutex.state mutex))) (cond ((or (and (thread? state) (not (eq? state (current-thread)))) (eq? state 'NOT-OWNED)) (if-blocked retry)) ((and first? (eq? state (current-thread))) (lambda () (error "Locking against myself!" mutex))) (else (if-enabled) (set-mutex.state! mutex (cond ((not thread) 'NOT-OWNED) ((thread-terminated? thread) 'ABANDONED) (else thread))) (if (maybe-commit) (lambda () (if (eq? state 'ABANDONED) (signal-abandoned-mutex-exception mutex) #t)) (retry)))))))) (define (%mutex-lock! mutex timeout thread) (attempt-lock mutex thread #t values ; Do nothing if enabled. (cond ((eqv? timeout 0) (lambda (retry) (if (maybe-commit) (lambda () #f) (retry)))) ((eqv? timeout #f) (let () (define (if-blocked retry) (if (maybe-commit-and-block-on-queue (mutex.waiters mutex)) (lambda () (attempt-lock mutex thread #f values if-blocked)) (retry))) if-blocked)) (else (lambda (retry) ;; Exit this transaction so that %MUTEX-LOCK!/TIMEOUT can ;; call REGISTER-TIMEOUT! outside of a transaction before ;; starting a new one. This causes us to poll the mutex ;; twice before actually blocking, but of course that may ;; happen anyway if any of these transactions fails, and ;; this way, the first poll doesn't register a timeout, ;; which saves resources if we optimistically assume that ;; the mutex will be unlocked. (if (maybe-commit) (lambda () (%mutex-lock!/timeout mutex timeout thread)) (retry))))))) (define (%mutex-lock!/timeout mutex timeout thread) (let ((done?-cell (make-cell #t)) (thread-cell (make-cell #f))) (register-timeout! timeout done?-cell thread-cell) (let loop () (attempt-lock mutex thread #f (lambda () (provisional-cell-set! done?-cell #f)) (lambda (retry) ;; This must be provisional, because the timeout procedure ;; may set it between when we have read it and when we block. (if (provisional-cell-ref done?-cell) (if (maybe-commit) (lambda () #f) (retry)) (begin ;; We set this here, rather than just creating the cell ;; above with the current thread, because this code ;; runs in a loop reusing the same cell. (provisional-cell-set! thread-cell (current-thread)) ;; ENQUEUE! is provisional. (enqueue! (mutex.waiters mutex) thread-cell) (if (maybe-commit-and-block thread-cell) loop (retry))))))))) (define (mutex-unlock! mutex . arguments) (if (current-proposal) (error "I am about to perform an action I cannot roll back!" (current-thread) `(MUTEX-UNLOCK! ,mutex ,@arguments))) (if (not (mutex? mutex)) (error "Invalid mutex for MUTEX-UNLOCK!:" mutex)) (if (pair? arguments) (let ((condition-variable (car arguments))) (if (not (condition-variable? condition-variable)) (error "Invalid condition variable for MUTEX-UNLOCK!:" condition-variable)) (if (pair? (cdr arguments)) (let ((timeout (cadr arguments))) (if (not (timeout? timeout)) (error "Invalid timeout for MUTEX-UNLOCK!:" timeout)) (if (null? (cddr arguments)) (%mutex-unlock-and-wait!/timeout mutex condition-variable timeout) (error "Too many arguments:" `(MUTEX-UNLOCK! ,mutex ,@arguments)))) (%mutex-unlock-and-wait! mutex condition-variable))) (%mutex-unlock! mutex))) (define (%mutex-unlock! mutex) (let ((waiters (mutex.waiters mutex))) (with-new-proposal (retry) (let ((waiter (maybe-dequeue-thread! waiters))) (if waiter (begin (set-mutex.state! mutex waiter) (if (maybe-commit-and-make-ready waiter) #t (retry))) (begin (set-mutex.state! mutex 'NOT-ABANDONED) (if (maybe-commit) #t (retry)))))))) ;;; Utility for unlocking and waiting. Remember in all this mess that ;;; MUTEX-UNLOCK! with a condition variable is allowed to return at any ;;; time, even if the condition variable has not been signalled or ;;; broadcast and the timeout has not been reached. That's why there's ;;; no looping logic here or in the following procedures. (define (%make-ready-and-block mutex condition-variable thread-cell if-ready if-no-waiters) (let ((waiters (mutex.waiters mutex))) (with-new-proposal (retry) ;; ENQUEUE! is provisional. (enqueue! (condition-variable.waiters condition-variable) thread-cell) (let ((waiter (maybe-dequeue-thread! (mutex.waiters mutex)))) (if waiter (begin (set-mutex.state! mutex waiter) (if (maybe-commit-and-make-ready waiter) (with-new-proposal (retry) ;; Someone may have signalled or broadcast the ;; condition variable, or we may have timed out, ;; while we were making the mutex's waiter ready. ;; Either case causes #F to be stored in ;; THREAD-CELL; if so, we're done -- don't block. (if (if (provisional-cell-ref thread-cell) (maybe-commit-and-block thread-cell) (maybe-commit)) (if-ready) (retry))) (retry))) (begin (set-mutex.state! mutex 'NOT-ABANDONED) (if-no-waiters retry))))))) (define (%mutex-unlock-and-wait! mutex condition-variable) (let ((thread-cell (make-cell (current-thread)))) (%make-ready-and-block mutex condition-variable thread-cell (lambda () #t) (lambda (retry) (if (maybe-commit-and-block thread-cell) #t (retry)))))) (define (%mutex-unlock-and-wait!/timeout mutex condition-variable timeout) (let ((done?-cell (make-cell #f)) (thread-cell (make-cell (current-thread)))) (define (finish) ;; There is a minor race condition here: if we get signalled and ;; then soon after time out, we may say that we timed out rather ;; than being woken through the condition variable. But programs ;; can't tell the difference between these anyway. Avoiding this ;; race condition the obvious way, by not setting DONE? in ;; REGISTER-TIMEOUT! if the thread cell holds #F, causes a more ;; serious race condition in the MUTEX-LOCK! logic, because that ;; does need to loop and is not sensitive to this harmless race. ;; We could split REGISTER-TIMEOUT! into two different versions, ;; but that's not worth the trouble. ;; ;; The following cell operations are non-provisional because we ;; have just been unblocked and are no longer in a transaction. ;; They serve only to communicate with the timeout mechanism, ;; which doesn't need more atomicity than CELL-REF and CELL-SET! ;; have. (if (cell-ref done?-cell) (lambda () #f) (begin (cell-set! done?-cell #t) (lambda () #t)))) (register-timeout! timeout done?-cell thread-cell) (%make-ready-and-block mutex condition-variable thread-cell finish (lambda (retry) ;; This must be provisional, or else we might block even if the ;; timeout has been reached during this transaction. We could ;; also do a provisional read from THREAD-CELL for the same ;; effect. (if (provisional-cell-ref done?-cell) (if (maybe-commit) #f (retry)) (if (maybe-commit-and-block thread-cell) (finish) (retry))))))) (define-record-type time :time (make-time seconds) time? (seconds time->seconds)) (define (seconds->time seconds) (if (not (real? seconds)) (error "Invalid seconds:" seconds)) (make-time seconds)) (define (current-time) (make-time (/ (real-time) 1000.0))) (define (timeout? object) (or (real? object) (time? object))) (define (timeout->real-milliseconds timeout) (define (seconds->exact-milliseconds seconds) (inexact->exact (round (* 1000 seconds)))) (cond ((real? timeout) (+ (real-time) (seconds->exact-milliseconds timeout))) ((time? timeout) (seconds->exact-milliseconds (time->seconds timeout))) (else (error "Invalid timeout:" timeout)))) ;;; REGISTER-TIMEOUT! has global effects, and thus must not be invoked ;;; within a transaction. (define (register-timeout! timeout done?-cell thread-cell) (register-dozer! (timeout->real-milliseconds timeout) ;; Interrupts are disabled for the following two ;; procedures; hence, no transactions. (lambda () (not (cell-ref done?-cell))) (lambda () (cell-set! done?-cell #t) (let ((thread (cell-ref thread-cell))) (if thread (with-new-proposal (retry) (if (not (maybe-commit-and-make-ready thread)) (retry))))))))