;;; -*- Mode:LISP; Package:TCP; Readtable:CL; Base:10 -*- #| Copyright LISP Machine, Inc. 1987 See filename "Copyright.Text" for licensing and release information. |# (export '(tcp-buffered-stream tcp-auto-buffered-stream tcp-unbuffered-stream *tcp-stream-whostate* )) (defresource simple-art-8b-buffer (size) :constructor (make-array size :element-type '(unsigned-byte 8.) :fill-pointer 0) :matcher (= size (array-length object))) ;;;***TCP-STREAM-MIXIN (defflavor tcp-stream-mixin ((socket nil) ;The tcp-socket (open nil) ;T if user has issued open (closing nil) ;T if remote side has closed (auto-push nil) ;T if open with auto-push (bytes-read 0) ;total bytes read on this socket (bytes-written 0) ;total bytes written on this socket (timeout nil) ;T if send timed out (urgent-output nil) ;T if in Urgent mode for output (urgent-input nil) ;T if in Urgent mode for input (urgent-offset nil) ;offset of urgent data ) () (:method-combination (:daemon-with-or :base-flavor-last :listen)) (:gettable-instance-variables socket urgent-output urgent-input) (:settable-instance-variables urgent-output) (:inittable-instance-variables socket auto-push) ) (defmethod (tcp-stream-mixin :open) (&optional keyword &rest args) (setq args (copy-list args)) (let* ((host-list (member :remote-address args :test #'eq)) (host (second host-list)) (user-timeout (member :send-timeout args :test #'eq)) (current-timeout (* 30 60)) (open-socket (null args))) (setq args (append args `(:auto-push ,auto-push))) (if user-timeout (setq current-timeout (second user-timeout)) (setq args (append args `(:send-timeout ,current-timeout)))) (when (null socket) (setq socket (make-tcp-socket :keyword keyword))) (setq closing nil) (cond (open-socket ;Is socket already open? (setq open t) (send self :build-buffers) self) (t ;If user specified open keywords, open socket (flet ((host-name (host) (and (stringp host) (not (global:string-search-not-set "0123456789" host)) (setq host (global:parse-number host))) (cond ((numberp host) (let ((object (si:get-host-from-address host :internet))) (if object (send object :name) (canonical-ip host)))) ((typep host 'si:host) (send host :name)) (t host)))) (loop (setq open (apply socket :open args)) (if (tcp:tcp-user-active socket) (ecase (send self :handle-replies) (:open ;All is well! (return self)) (:unreachable (cerror "Try again." "Remote Host ~S is unreachable" (host-name host))) (:reset (cerror "Try again." "Connection refused by ~S" (host-name host))) (:timeout (setq open nil) (send socket :abort) (incf current-timeout current-timeout) (cerror (format nil "Try again with timeout of ~D ticks." current-timeout) "Connection timed out") (let ((user-timeout (member :send-timeout args :test #'eq))) (setf (second user-timeout) current-timeout)))) (return self)))))))) (defmethod (tcp-stream-mixin :remote-port) () (send socket :remote-port)) (defmethod (tcp-stream-mixin :remote-address) () (send socket :remote-address)) (defmethod (tcp-stream-mixin :local-port) () (send socket :local-port)) (defmethod (tcp-stream-mixin :local-address) () (send socket :local-address)) (defmethod (tcp-stream-mixin :accept) () (loop (ecase (send self :handle-replies) (:open ;All is well! (return self)) (:timeout (setq open nil) (send socket :abort) (error "Listen timed out") (return nil))))) (defmethod (tcp-stream-mixin :close) (&optional mode) (when open (send self :force-output) (setq open nil) (apply socket :close (ncons mode)) (unless (eq mode :normal) (loop (case (send self :handle-replies) ((:reset :unreachable :closed) (return)) (:timeout (send self :send-timeout))))))) (defmethod (tcp-stream-mixin :abort) () (setq open nil) (when (send socket :abort) (do () ((eq (send self :handle-replies) :reset))))) (defmethod (tcp-stream-mixin :handle-replies) (&optional no-hang-p) (loop (cond ((send socket :listen) ;Activity on the socket (let ((item (send socket :read-data))) (case (first item) (:open (send self :build-buffers) (return :open)) (:write-reply (incf bytes-written (fill-pointer (second item))) (send self :write-reply (second item)) (return :write-reply)) (:data (let ((length (fill-pointer (second item))) (offset (fourth item))) (when offset ;Remember last known offset of urgent data (setq urgent-offset (+ bytes-read offset 1)) (setq urgent-input t)) (incf bytes-read length) (when (eq (third item) :eof) (setq closing t)) (send self :read-reply (second item) urgent-offset)) (return :read-reply)) (:urgent ;should signal this somehow... (setq urgent-input t)) (:closing ;Remote side has closed (setq closing t) (dolist (x (second item)) (send self :discard-buffer x)) (return :remote-close)) (:reset (setq closing t) (setq open nil) (dolist (x (second item)) (send self :discard-buffer x)) (dolist (b (third item)) (send self :write-reply b)) ;;(cerror "Continue, treating as end-of-file" "Connection reset remotely") (return :reset)) (:close ;Socket closed out from under us (setq closing t) (setq open nil) (dolist (x (second item)) (send self :discard-buffer x)) ;;(cerror "Continue, treating as end-of-file" "Connection reset locally") (return :local-close)) ((:network-unreachable :host-unreachable :protocol-unreachable :port-unreachable) (setq closing t) (setq open nil) (send socket :abort) (return :unreachable)) (:timeout (setq timeout t) (return :timeout)) (:closed (return :closed)) (otherwise ;;Ignore it )))) (no-hang-p ;No activity and no-hang (return nil)) (t ;No activity -- wait (send self :wait-for-reply))))) (defvar *tcp-stream-whostate* "TCP socket I/O" "The wait state for the wholine") (defmethod (tcp-stream-mixin :wait-for-reply) (&optional function &rest args) (process-wait *tcp-stream-whostate* #'(lambda (tcp-socket func arg-list) (or (send tcp-socket :listen) (and func (apply func arg-list)))) socket function args)) (defmethod (tcp-stream-mixin :send-timeout) (&aux ok) (unwind-protect (progn (cerror "Reset timeout and continue" "Send timed out") (send socket :reset-timeout) (setq timeout nil) (setq ok t)) (unless ok (send self :abort)))) (defmethod (tcp-stream-mixin :handle-all-replies) () (do ((count 0 (1+ count)) (event (send self :handle-replies t) (send self :handle-replies t))) ((null event) (plusp count)))) (defmethod (tcp-stream-mixin :or :listen) () (or timeout closing)) ;;;***TCP-BUFFERED-STREAM (defflavor tcp-buffered-stream ((input-buffer-size 0) ;Local MSS (input-buffer-fifo nil) ;List of full input buffers (input-buffer-limit 4) ;Maximum number of input buffers (output-buffer-size 0) ;Remote MSS (output-buffer-list nil) ;Free list of output buffers (output-buffer-limit 4) ;Maximum number of output buffers ) (tcp-stream-mixin si:buffered-stream) (:settable-instance-variables input-buffer-limit output-buffer-limit) (:inittable-instance-variables input-buffer-limit output-buffer-limit) ) (defmethod (tcp-buffered-stream :before :open) (&rest ignore) (setq input-buffer-fifo (make-fifo)) (setq output-buffer-list nil) ) (defmethod (tcp-buffered-stream :build-buffers) () (multiple-value-bind (send receive) (send socket :mss) (setq input-buffer-size receive) (setq output-buffer-size send)) (dotimes (i input-buffer-limit) (let ((buffer (allocate-resource 'simple-art-8b-buffer input-buffer-size))) (unless (send socket :receive buffer) (deallocate-resource 'simple-art-8b-buffer buffer)))) (dotimes (i output-buffer-limit) (let ((buffer (allocate-resource 'simple-art-8b-buffer output-buffer-size))) (push buffer output-buffer-list)))) (defmethod (tcp-buffered-stream :build-more-buffers) (&optional (new-input-limit input-buffer-limit) (new-output-limit output-buffer-limit)) (when (> new-input-limit input-buffer-limit) (dotimes (i (- new-input-limit input-buffer-limit)) (let ((buffer (allocate-resource 'simple-art-8b-buffer input-buffer-size))) (unless (send socket :receive buffer) (deallocate-resource 'simple-art-8b-buffer buffer)))) (setq input-buffer-limit new-input-limit)) (when (> new-output-limit output-buffer-limit) (dotimes (i (- new-output-limit output-buffer-limit)) (let ((buffer (allocate-resource 'simple-art-8b-buffer output-buffer-size))) (push buffer output-buffer-list))) (setq output-buffer-limit new-output-limit))) (defmethod (tcp-buffered-stream :discard-buffer) (buffer) (deallocate-resource 'simple-art-8b-buffer buffer)) (defmethod (tcp-buffered-stream :write-reply) (buffer) (push buffer output-buffer-list)) (defmethod (tcp-buffered-stream :read-reply) (buffer offset) (push-fifo (cons buffer offset) input-buffer-fifo)) (defmethod (tcp-buffered-stream :next-input-buffer) (&optional no-hang-p) (declare (values buffer start end)) (loop (cond ((send self :handle-all-replies)) ((not (fifo-empty-p input-buffer-fifo)) (let* ((elt (pop-fifo input-buffer-fifo)) (buffer (car elt)) (offset (cdr elt))) (setq urgent-input (not (null offset))) (return (values buffer 0 (fill-pointer buffer))))) ((not open) (return nil)) (closing (return nil)) (no-hang-p (setq urgent-input (and urgent-offset (> urgent-offset bytes-read))) (return nil)) (timeout (send self :send-timeout)) (t (send self :wait-for-reply #'(lambda (b o c to) (or (not (fifo-empty-p (cdr b))) (not (cdr o)) (cdr c) (cdr to))) (locf input-buffer-fifo) (locf open) (locf closing) (locf timeout)))))) (defmethod (tcp-buffered-stream :discard-input-buffer) (buffer) (cond (closing (send self :discard-buffer buffer)) ((send socket :receive buffer)) (t (send self :discard-buffer buffer)))) (defmethod (tcp-buffered-stream :new-output-buffer) () (declare (values buffer start end)) (loop (cond ((send self :handle-all-replies)) ((not open) (global:ferror 'sys:connection-closed "Connection closed" socket)) (output-buffer-list (return (values (pop output-buffer-list) 0 output-buffer-size))) (timeout (send self :send-timeout)) (t (send self :wait-for-reply #'(lambda (x y) (or (cdr x) (cdr y))) (locf output-buffer-list) (locf timeout)))))) (defmethod (tcp-buffered-stream :send-output-buffer) (buffer count) (setf (fill-pointer buffer) count) (unless (send socket :write-data buffer :pushed (< count output-buffer-size) :urgent urgent-output) (send self :discard-buffer buffer) (global:ferror 'sys:connection-closed "Connection closed" socket))) (defmethod (tcp-buffered-stream :discard-output-buffer) (buffer) (push buffer output-buffer-list)) (defwrapper (tcp-buffered-stream :close) (ignore . body) `(unwind-protect (progn ,@body) (send self :discard-all-buffers))) (defwrapper (tcp-buffered-stream :abort) (ignore . body) `(unwind-protect (progn ,@body) (send self :discard-all-buffers))) (defmethod (tcp-buffered-stream :discard-all-buffers) () (let ((input-buffers (fifo-as-list input-buffer-fifo)) (output-buffers output-buffer-list)) (setq input-buffer-fifo nil) (setq output-buffer-list nil) (dolist (x input-buffers) (send self :discard-buffer (car x))) (dolist (x output-buffers) (send self :discard-buffer x)))) (compile-flavor-methods tcp-buffered-stream) ;;;***TCP-AUTO-BUFFERED-STREAM (defflavor tcp-auto-buffered-stream ((force-output-p t)) ;T if force-output after each write (tcp-buffered-stream) (:default-init-plist :auto-push t) (:settable-instance-variables force-output-p)) (defmethod (tcp-auto-buffered-stream :after :tyo) (&rest ignore) (when force-output-p (send self :force-output))) (defmethod (tcp-auto-buffered-stream :after :string-out) (&rest ignore) (when force-output-p (send self :force-output))) (compile-flavor-methods tcp-auto-buffered-stream) ;;;***TCP-UNBUFFERED-STREAM (defflavor tcp-unbuffered-stream ((tyi-buffer nil) (tyo-buffer nil) (untyi-char nil) ) (tcp-stream-mixin si:bidirectional-stream)) (defmethod (tcp-unbuffered-stream :build-buffers) () (setq tyi-buffer nil) (send socket :receive (make-array 1 :element-type '(unsigned-byte 8) :fill-pointer 0)) (setq tyo-buffer (make-array 1 :element-type '(unsigned-byte 8) :fill-pointer 1)) (setq untyi-char nil)) (defmethod (tcp-unbuffered-stream :discard-buffer) (ignore) ) (defmethod (tcp-unbuffered-stream :write-reply) (buffer) (setq tyo-buffer buffer)) (defmethod (tcp-unbuffered-stream :read-reply) (buffer offset) (declare (ignore offset)) (setq tyi-buffer buffer)) (defmethod (tcp-unbuffered-stream :tyi) (&optional eof) (loop (cond ((send self :handle-all-replies)) (untyi-char (return (prog1 untyi-char (setq untyi-char nil)))) (tyi-buffer (let* ((buffer tyi-buffer) (byte (aref buffer 0))) (setq tyi-buffer nil) (send socket :receive buffer) (setq urgent-input (and urgent-offset (>= urgent-offset bytes-read))) (return byte))) ((or closing (not open)) (if eof (global:signal 'sys:end-of-file :format-string "End of File on TCP stream") (return nil))) (timeout (send self :send-timeout)) (t (send self :wait-for-reply #'(lambda (u b o c to) (or (cdr u) (cdr b) (not (cdr o)) (cdr c) (cdr to))) (locf untyi-char) (locf tyi-buffer) (locf open) (locf closing) (locf timeout)))))) (defmethod (tcp-unbuffered-stream :untyi) (byte) (when untyi-char (error "Can't UNTYI more than once")) (setq untyi-char byte)) (defmethod (tcp-unbuffered-stream :listen) () (loop (unwind-protect (cond ((send self :handle-all-replies)) (untyi-char (return t)) (tyi-buffer (return t)) ((or closing (not open)) (return t)) (timeout (return t)) (t (return nil))) (setq urgent-input (and urgent-offset (>= urgent-offset bytes-read)))))) (defmethod (tcp-unbuffered-stream :tyi-no-hang) (&optional eof) (loop (cond ((send self :handle-all-replies)) (untyi-char (return (prog1 untyi-char (setq untyi-char nil)))) (tyi-buffer (let* ((buffer tyi-buffer) (byte (aref buffer 0))) (setq tyi-buffer nil) (send socket :receive buffer) (setq urgent-input (and urgent-offset (>= urgent-offset bytes-read))) (return byte))) ((or closing (not open)) (if eof (global:signal 'sys:end-of-file :format-string "End of File on TCP stream") (return nil))) (timeout (send self :send-timeout)) (t (return nil))))) (defmethod (tcp-unbuffered-stream :tyo) (byte) (loop (cond ((send self :handle-all-replies)) ((not open) (return nil)) (tyo-buffer (let ((buffer tyo-buffer)) (setq tyo-buffer nil) (setf (aref buffer 0) byte) (setf (fill-pointer buffer) 1) (send socket :write-data buffer :urgent urgent-output) (return t))) (timeout (send self :send-timeout)) (t (send self :wait-for-reply #'(lambda (b o to) (or (cdr b) (not (cdr o)) (cdr to))) (locf tyo-buffer) (locf open) (locf timeout)))))) (compile-flavor-methods tcp-unbuffered-stream)