threading issues? pasted by iterrogo on Fri Sep 12 02:46:32 2014
; websockets.scm
(module websockets
(websocket-close with-websocket upgrade-to-websocket
websocket-send-message websocket-receive-message
with-input-from-websocket with-output-to-websocket
make-websocket-read-port make-websocket-send-port)
(import chicken scheme data-structures extras ports)
(use srfi-1 srfi-4 spiffy intarweb uri-common base64 simple-sha1 srfi-18
srfi-13 mailbox)
(define-record-type websocket
(make-websocket inbound-port outbound-port
send-mailbox read-mailbox)
websocket?
(inbound-port websocket-inbound-port)
(outbound-port websocket-outbound-port)
(send-mailbox websocket-send-mailbox)
(read-mailbox websocket-read-mailbox))
(define current-websocket (make-parameter #f))
(define (string->bytes str)
;; XXX this wont work unless it's all ascii.
(let* ((lst (map char->integer (string->list str)))
(bv (make-u8vector (length lst))))
(let loop ((lst lst)
(pos 0))
(if (null? lst) bv
(begin
(u8vector-set! bv pos (car lst))
(loop (cdr lst) (+ pos 1)))))))
(define (hex-string->string hexstr)
;; convert a string like "a745ff12" to a string
(let ((result (make-string (/ (string-length hexstr) 2))))
(let loop ((hexs (string->list hexstr))
(i 0))
(if (< (length hexs) 2)
result
(let ((ascii (string->number (string (car hexs) (cadr hexs)) 16)))
(string-set! result i (integer->char ascii))
(loop (cddr hexs)
(+ i 1)))))))
(define (websocket-send-frame ws opcode data last-frame)
(let* ((frame-fin (if last-frame 1 0))
(frame-rsv1 0)
(frame-rsv2 0)
(frame-rsv3 0)
(frame-opcode opcode)
(octet0 (bitwise-ior (arithmetic-shift frame-fin 7)
(arithmetic-shift frame-rsv1 6)
(arithmetic-shift frame-rsv2 5)
(arithmetic-shift frame-rsv3 4)
frame-opcode))
(frame-masked 0)
(frame-payload-length (cond ((< (u8vector-length data) 126)
(u8vector-length data))
((< (u8vector-length data) 65536) 126)
(else 127)))
(octet1 (bitwise-ior (arithmetic-shift frame-masked 7)
frame-payload-length))
(outbound-port (websocket-outbound-port ws)))
(write-u8vector (u8vector octet0 octet1) outbound-port)
(write-u8vector
(cond
((= frame-payload-length 126)
(u8vector
(arithmetic-shift (bitwise-and (u8vector-length data) 65280) -8)
(bitwise-and (u8vector-length data) 255)))
((= frame-payload-length 127)
(u8vector
0 0 0 0
(arithmetic-shift
(bitwise-and (u8vector-length data) 4278190080) -24)
(arithmetic-shift
(bitwise-and (u8vector-length data) 16711680) -16)
(arithmetic-shift
(bitwise-and (u8vector-length data) 65280) -8)
(bitwise-and (u8vector-length data) 255)))
(else (u8vector)))
outbound-port)
(write-u8vector data outbound-port)
#t))
(define (websocket-send ws data)
;; TODO break up large data into multiple frames?
(websocket-send-frame ws 1 (string->bytes data) #t))
(define (websocket-read-frame-payload inbound-port frame-payload-length
frame-masked frame-masking-key)
(let ((masked-data (read-u8vector frame-payload-length inbound-port)))
(cond (frame-masked
(let ((unmasked-data (make-u8vector frame-payload-length)))
(let loop ((pos 0)
(mask-pos 0))
(cond ((= pos frame-payload-length) unmasked-data)
(else
(let ((octet (u8vector-ref masked-data pos))
(mask (vector-ref frame-masking-key mask-pos)))
(u8vector-set!
unmasked-data pos (bitwise-xor octet mask))
(loop (+ pos 1) (modulo (+ mask-pos 1) 4))))))
unmasked-data))
(else
masked-data))))
(define (websocket-read-frame ws)
(let* ((inbound-port (websocket-inbound-port ws))
;; first byte
(b0 (read-byte inbound-port)))
(cond
((eof-object? b0) b0)
(else
(let* ((frame-fin (> (bitwise-and b0 128) 0))
(frame-opcode (bitwise-and b0 15))
;; second byte
(b1 (read-byte inbound-port))
(frame-masked (> (bitwise-and b1 128) 0))
(frame-payload-length (bitwise-and b1 127)))
(cond ((= frame-payload-length 126)
(let ((bl0 (read-byte inbound-port))
(bl1 (read-byte inbound-port)))
(set! frame-payload-length (+ (arithmetic-shift bl0 8) bl1))))
((= frame-payload-length 127)
(error "8 byte payload length unsupported")))
(let* ((frame-masking-key
(if frame-masked
(let* ((fm0 (read-byte inbound-port))
(fm1 (read-byte inbound-port))
(fm2 (read-byte inbound-port))
(fm3 (read-byte inbound-port)))
(vector fm0 fm1 fm2 fm3))
#f)))
(cond
((= frame-opcode 1)
;; (if (= frame-fin 1) ;; something?
(websocket-read-frame-payload inbound-port frame-payload-length
frame-masked frame-masking-key))
((= frame-opcode 8)
;; eof frame
#!eof)
((= frame-opcode 10)
;; pong frame
;; we aren't required to respond to an unsolicited pong
#t)
(else
(error "websocket got unhandled opcode: " frame-opcode "\n")
#f))))))))
(define (sha1-sum in-bv)
(hex-string->string (string->sha1sum in-bv)))
(define (websocket-compute-handshake client-key)
(let* ((key-and-magic
(string-append client-key "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
(key-and-magic-sha1 (sha1-sum key-and-magic)))
(base64-encode key-and-magic-sha1)))
(define (sec-websocket-accept-unparser header-contents)
(map (lambda (header-content)
(car (vector-ref header-content 0)))
header-contents))
(header-unparsers
(alist-update! 'sec-websocket-accept
sec-websocket-accept-unparser
(header-unparsers)))
; handle multiple frames together in the future?
(define (websocket-read-message ws)
(apply string (map integer->char (u8vector->list (websocket-read-frame ws)))))
(define (make-websocket-send-port)
(let ((msg ""))
(make-output-port
(lambda (s)
(set! msg (string-append msg s)))
(lambda ()
(mailbox-send!
(websocket-send-mailbox (current-websocket)) msg)))))
; TODO how to close with close-port ?
(define (make-websocket-read-port)
(open-input-string (mailbox-receive! (websocket-read-mailbox (current-websocket)))))
(define (with-input-from-websocket proc)
(with-input-from-string
(mailbox-receive! (websocket-read-mailbox (current-websocket))) proc))
(define (with-output-to-websocket proc)
(mailbox-send! (websocket-send-mailbox (current-websocket))
(with-output-to-string proc)))
(define (websocket-send-message msg)
(mailbox-send! (websocket-send-mailbox (current-websocket)) msg))
(define (websocket-receive-message)
(mailbox-receive! (websocket-read-mailbox (current-websocket))))
; TODO respond to browsers close ACK
; or is that only for browser initiated closes?
; TODO threading issue with send-frame?
; TODO will drain send/receive mailboxes before closing
; do we want that?
(define (websocket-close)
; wake up threads so they can exit
(websocket-send-frame (current-websocket) 8 (make-u8vector 0) #t)
(mailbox-send! (websocket-read-mailbox (current-websocket)) #!eof)
(mailbox-send! (websocket-send-mailbox (current-websocket)) #!eof))
(define (websocket-accept)
(let* ((headers (request-headers (current-request)))
(client-key (header-value 'sec-websocket-key headers))
(ws-handshake (websocket-compute-handshake client-key))
(ws (make-websocket
(request-port (current-request))
(response-port (current-response))
(make-mailbox)
(make-mailbox))))
(with-headers
`((upgrade ("WebSocket" . #f))
(connection (upgrade . #t))
(sec-websocket-accept (,ws-handshake . #t)))
(lambda ()
(send-response status: 'switching-protocols)))
; read thread, close on #!eof
(thread-start!
(lambda ()
(let loop ()
(let ((msg (websocket-read-message ws)))
(if (eof-object? msg)
(websocket-close ws)
(begin (mailbox-send! (websocket-read-mailbox ws))
(loop)))))))
; send thread
(thread-start!
(lambda ()
(let loop ()
(let ((msg (mailbox-receive! (websocket-send-mailbox ws))))
(if (eof-object? msg)
(void) ; websocket closed
(begin (websocket-send ws msg)
(loop)))))))
ws))
(define (with-websocket proc)
(parameterize
((current-websocket (websocket-accept)))
(dynamic-wind (lambda () (void)) proc websocket-close)))
(define (upgrade-to-websocket)
(websocket-accept))
)
; test.scm
(use spiffy awful)
(load "websockets")
(import websockets)
(enable-ajax #t)
(define-page "/"
(lambda ()
"hi")
headers: "")
(define-page "/web-socket"
(lambda ()
(with-websocket
(lambda ()
(websocket-send-message "SYN")))))
; test.js
$(document).ready(function() {
ws = new WebSocket("ws://localhost:8080/web-socket")
ws.onmessage = function (evt) { console.log("got ws message: ", evt.data); };
ws.onopen = function() { ws.send('SYN'); };
window.onbeforeunload = function() {
ws.onclose = function () {}; // disable onclose
ws.close()
};
});
; $ awful --development-mode test.scmfix to code example for websocket (threading?) issues added by iterrogo on Fri Sep 12 17:50:01 2014
; websockets.scm
(module websockets
(websocket-close with-websocket upgrade-to-websocket
websocket-send-message websocket-receive-message
with-input-from-websocket with-output-to-websocket
make-websocket-read-port make-websocket-send-port)
(import chicken scheme data-structures extras ports)
(use srfi-1 srfi-4 spiffy intarweb uri-common base64 simple-sha1 srfi-18
srfi-13 mailbox)
(define-record-type websocket
(make-websocket inbound-port outbound-port
send-mailbox read-mailbox)
websocket?
(inbound-port websocket-inbound-port)
(outbound-port websocket-outbound-port)
(send-mailbox websocket-send-mailbox)
(read-mailbox websocket-read-mailbox))
(define current-websocket (make-parameter #f))
(define (string->bytes str)
;; XXX this wont work unless it's all ascii.
(let* ((lst (map char->integer (string->list str)))
(bv (make-u8vector (length lst))))
(let loop ((lst lst)
(pos 0))
(if (null? lst) bv
(begin
(u8vector-set! bv pos (car lst))
(loop (cdr lst) (+ pos 1)))))))
(define (hex-string->string hexstr)
;; convert a string like "a745ff12" to a string
(let ((result (make-string (/ (string-length hexstr) 2))))
(let loop ((hexs (string->list hexstr))
(i 0))
(if (< (length hexs) 2)
result
(let ((ascii (string->number (string (car hexs) (cadr hexs)) 16)))
(string-set! result i (integer->char ascii))
(loop (cddr hexs)
(+ i 1)))))))
(define (websocket-send-frame ws opcode data last-frame)
(let* ((frame-fin (if last-frame 1 0))
(frame-rsv1 0)
(frame-rsv2 0)
(frame-rsv3 0)
(frame-opcode opcode)
(octet0 (bitwise-ior (arithmetic-shift frame-fin 7)
(arithmetic-shift frame-rsv1 6)
(arithmetic-shift frame-rsv2 5)
(arithmetic-shift frame-rsv3 4)
frame-opcode))
(frame-masked 0)
(frame-payload-length (cond ((< (u8vector-length data) 126)
(u8vector-length data))
((< (u8vector-length data) 65536) 126)
(else 127)))
(octet1 (bitwise-ior (arithmetic-shift frame-masked 7)
frame-payload-length))
(outbound-port (websocket-outbound-port ws)))
(write-u8vector (u8vector octet0 octet1) outbound-port)
(write-u8vector
(cond
((= frame-payload-length 126)
(u8vector
(arithmetic-shift (bitwise-and (u8vector-length data) 65280) -8)
(bitwise-and (u8vector-length data) 255)))
((= frame-payload-length 127)
(u8vector
0 0 0 0
(arithmetic-shift
(bitwise-and (u8vector-length data) 4278190080) -24)
(arithmetic-shift
(bitwise-and (u8vector-length data) 16711680) -16)
(arithmetic-shift
(bitwise-and (u8vector-length data) 65280) -8)
(bitwise-and (u8vector-length data) 255)))
(else (u8vector)))
outbound-port)
(write-u8vector data outbound-port)
#t))
(define (websocket-send ws data)
;; TODO break up large data into multiple frames?
(websocket-send-frame ws 1 (string->bytes data) #t))
(define (websocket-read-frame-payload inbound-port frame-payload-length
frame-masked frame-masking-key)
(let ((masked-data (read-u8vector frame-payload-length inbound-port)))
(cond (frame-masked
(let ((unmasked-data (make-u8vector frame-payload-length)))
(let loop ((pos 0)
(mask-pos 0))
(cond ((= pos frame-payload-length) unmasked-data)
(else
(let ((octet (u8vector-ref masked-data pos))
(mask (vector-ref frame-masking-key mask-pos)))
(u8vector-set!
unmasked-data pos (bitwise-xor octet mask))
(loop (+ pos 1) (modulo (+ mask-pos 1) 4))))))
unmasked-data))
(else
masked-data))))
(define (websocket-read-frame ws)
(let* ((inbound-port (websocket-inbound-port ws))
;; first byte
(b0 (read-byte inbound-port)))
(cond
((eof-object? b0) b0)
(else
(let* ((frame-fin (> (bitwise-and b0 128) 0))
(frame-opcode (bitwise-and b0 15))
;; second byte
(b1 (read-byte inbound-port))
(frame-masked (> (bitwise-and b1 128) 0))
(frame-payload-length (bitwise-and b1 127)))
(cond ((= frame-payload-length 126)
(let ((bl0 (read-byte inbound-port))
(bl1 (read-byte inbound-port)))
(set! frame-payload-length (+ (arithmetic-shift bl0 8) bl1))))
((= frame-payload-length 127)
(error "8 byte payload length unsupported")))
(let* ((frame-masking-key
(if frame-masked
(let* ((fm0 (read-byte inbound-port))
(fm1 (read-byte inbound-port))
(fm2 (read-byte inbound-port))
(fm3 (read-byte inbound-port)))
(vector fm0 fm1 fm2 fm3))
#f)))
(cond
((= frame-opcode 1)
;; (if (= frame-fin 1) ;; something?
(websocket-read-frame-payload inbound-port frame-payload-length
frame-masked frame-masking-key))
((= frame-opcode 8)
;; eof frame
#!eof)
((= frame-opcode 10)
;; pong frame
;; we aren't required to respond to an unsolicited pong
#t)
(else
(error "websocket got unhandled opcode: " frame-opcode "\n")
#f))))))))
(define (sha1-sum in-bv)
(hex-string->string (string->sha1sum in-bv)))
(define (websocket-compute-handshake client-key)
(let* ((key-and-magic
(string-append client-key "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
(key-and-magic-sha1 (sha1-sum key-and-magic)))
(base64-encode key-and-magic-sha1)))
(define (sec-websocket-accept-unparser header-contents)
(map (lambda (header-content)
(car (vector-ref header-content 0)))
header-contents))
(header-unparsers
(alist-update! 'sec-websocket-accept
sec-websocket-accept-unparser
(header-unparsers)))
; handle multiple frames together in the future?
(define (websocket-read-message ws)
(apply string (map integer->char (u8vector->list (websocket-read-frame ws)))))
(define (make-websocket-send-port)
(let ((msg ""))
(make-output-port
(lambda (s)
(set! msg (string-append msg s)))
(lambda ()
(mailbox-send!
(websocket-send-mailbox (current-websocket)) msg)))))
; TODO how to close with close-port ?
(define (make-websocket-read-port)
(open-input-string (mailbox-receive! (websocket-read-mailbox (current-websocket)))))
(define (with-input-from-websocket proc)
(with-input-from-string
(mailbox-receive! (websocket-read-mailbox (current-websocket))) proc))
(define (with-output-to-websocket proc)
(mailbox-send! (websocket-send-mailbox (current-websocket))
(with-output-to-string proc)))
(define (websocket-send-message msg)
(mailbox-send! (websocket-send-mailbox (current-websocket)) msg))
(define (websocket-receive-message)
(mailbox-receive! (websocket-read-mailbox (current-websocket))))
; TODO respond to browsers close ACK
; or is that only for browser initiated closes?
; TODO threading issue with send-frame?
; TODO will drain send/receive mailboxes before closing
; do we want that?
(define (websocket-close)
; wake up threads so they can exit
(websocket-send-frame (current-websocket) 8 (make-u8vector 0) #t)
(mailbox-send! (websocket-read-mailbox (current-websocket)) #!eof)
(mailbox-send! (websocket-send-mailbox (current-websocket)) #!eof))
(define (websocket-accept)
(let* ((headers (request-headers (current-request)))
(client-key (header-value 'sec-websocket-key headers))
(ws-handshake (websocket-compute-handshake client-key))
(ws (make-websocket
(request-port (current-request))
(response-port (current-response))
(make-mailbox)
(make-mailbox))))
(with-headers
`((upgrade ("WebSocket" . #f))
(connection (upgrade . #t))
(sec-websocket-accept (,ws-handshake . #t)))
(lambda ()
(send-response status: 'switching-protocols)))
; read thread, close on #!eof
(thread-start!
(lambda ()
(let loop ()
(let ((msg (websocket-read-message ws)))
(if (eof-object? msg)
(websocket-close ws)
(begin (mailbox-send! (websocket-read-mailbox ws))
(loop)))))))
; send thread
(thread-start!
(lambda ()
(let loop ()
(let ((msg (mailbox-receive! (websocket-send-mailbox ws))))
(if (eof-object? msg)
(void) ; websocket closed
(begin (websocket-send ws msg)
(loop)))))))
ws))
(define (with-websocket proc)
(parameterize
((current-websocket (websocket-accept)))
(dynamic-wind (lambda () (void)) proc websocket-close)))
(define (upgrade-to-websocket)
(websocket-accept))
)
; test.scm
(use spiffy awful)
(load "websockets")
(import websockets)
(enable-ajax #t)
(define-page "/"
(lambda ()
"hi")
headers: "")
(define-page "/web-socket"
(lambda ()
(with-websocket
(lambda ()
(websocket-send-message "SYN")))))
; test.js
$(document).ready(function() {
ws = new WebSocket("ws://localhost:8080/web-socket")
ws.onmessage = function (evt) { console.log("got ws message: ", evt.data); };
ws.onopen = function() { ws.send('SYN'); };
window.onbeforeunload = function() {
ws.onclose = function () {}; // disable onclose
ws.close()
};
});
; $ awful --development-mode test.scm