(import (srfi 1) (srfi 18) (srfi 111)) (define-record chan mutex cv data end) (define (new-chan) (let ((new-data (box (list)))) (make-chan (make-mutex) (make-condition-variable) new-data new-data))) (define (chan-get! chan) (let loop () (let ((data (unbox (chan-data chan))) (m (chan-mutex chan))) (mutex-lock! m) (if (null? data) (begin (mutex-unlock! m (chan-cv chan)) (loop)) (begin (chan-data-set! chan (cdr data)) (mutex-unlock! m) (car data)))))) (define (chan-put! chan x) (if (not (chan-end chan)) (error "Can't put items into a dupped channel.")) (let ((new-end (box (list)))) (set-box! (chan-end chan) (cons x new-end)) (chan-end-set! chan new-end) (condition-variable-broadcast! (chan-cv chan)))) (define (chan-dup chan) (make-chan (make-mutex) (chan-cv chan) (chan-data chan) #f)) (define out-mutex (make-mutex)) (define (consumer chan name) (let loop () (let ((data (chan-get! chan))) (mutex-lock! out-mutex) (display name) (display ": ") (display data) (newline) (mutex-unlock! out-mutex) (if (not (null? data)) (loop))))) (define chan (new-chan)) (define threads (list (thread-start! (make-thread (lambda () (consumer (chan-dup chan) "Alice")))) (thread-start! (make-thread (lambda () (consumer (chan-dup chan) "Bob")))) (thread-start! (make-thread (lambda () (consumer (chan-dup chan) "Charlie")))) (thread-start! (make-thread (lambda () (consumer (chan-dup chan) "Dave")))) (thread-start! (make-thread (lambda () (consumer (chan-dup chan) "Eve")))))) (map (lambda (n) (chan-put! chan n) (thread-sleep! 0.1)) (iota 10)) (chan-put! chan (list)) ; nil signals the consumers to quit (map thread-join! threads)