diff --git a/src/taoensso/sente.cljc b/src/taoensso/sente.cljc index 87f185c..6da9ee9 100644 --- a/src/taoensso/sente.cljc +++ b/src/taoensso/sente.cljc @@ -376,8 +376,8 @@ :ch-recv ; core.async channel to receive `event-msg`s (internal or from clients). :send-fn ; (fn [user-id ev] for server>user push. - :ajax-post-fn ; (fn [ring-req]) for Ring CSRF-POST + chsk URL. - :ajax-get-or-ws-handshake-fn ; (fn [ring-req]) for Ring GET + chsk URL. + :ajax-post-fn ; Ring handler for CSRF-POST + chsk URL. + :ajax-get-or-ws-handshake-fn ; Ring handler for Ring GET + chsk URL. :connected-uids ; Watchable, read-only (atom {:ws #{_} :ajax #{_} :any #{_}}). :send-buffers ; Implementation detail, read-only (atom {:ws #{_} :ajax #{_} :any #{_}}). @@ -699,239 +699,249 @@ ;; Does not participate in `conns_` (has specific req->resp) :ajax-post-fn - (fn [ring-req] - (enc/cond - :if-let [resp (possible-rejection-resp ring-req)] resp - :else - (interfaces/ring-req->server-ch-resp web-server-ch-adapter ring-req - {:on-open - (fn [server-ch websocket?] - (assert (not websocket?)) - (let [params (get ring-req :params) - ppstr (get params :ppstr) - client-id (get params :client-id) - [clj has-cb?] (unpack packer ppstr) - reply-fn - (let [replied?_ (atom false)] - (fn [resp-clj] ; Any clj form - (when (compare-and-set! replied?_ false true) - (timbre/debugf "[ajax/on-open] Server will reply to message from %s: %s" - (lid (user-id-fn ring-req client-id) client-id) - resp-clj) - - (interfaces/sch-send! server-ch websocket? - (pack packer resp-clj)))))] - - (put-server-event-msg>ch-recv! ch-recv - (merge ev-msg-const - {;; Note that the client-id is provided here just for the - ;; user's convenience. non-lp-POSTs don't actually need a - ;; client-id for Sente's own implementation: - :client-id client-id #_"unnecessary-for-non-lp-POSTs" - :ring-req ring-req - :event clj - :uid (user-id-fn ring-req client-id) - :?reply-fn (when has-cb? reply-fn)})) - - (if has-cb? - (when-let [ms lp-timeout-ms] - (go - (server-ch-resp web-server-ch-adapter ring-req + {:ring-async-resp-fn ?ring-async-resp-fn + :ring-async-raise-fn ?ring-async-raise-fn + + :on-open + (fn [server-ch websocket?] + (assert (not websocket?)) + (let [params (get ring-req :params) + ppstr (get params :ppstr) + client-id (get params :client-id) + [clj has-cb?] (unpack packer ppstr) + reply-fn + (let [replied?_ (atom false)] + (fn [resp-clj] ; Any clj form + (when (compare-and-set! replied?_ false true) + (timbre/debugf "[ajax/on-open] Server will reply to message from %s: %s" + (lid (user-id-fn ring-req client-id) client-id) + resp-clj) + + (interfaces/sch-send! server-ch websocket? + (pack packer resp-clj)))))] + + (put-server-event-msg>ch-recv! ch-recv + (merge ev-msg-const + {;; Note that the client-id is provided here just for the + ;; user's convenience. non-lp-POSTs don't actually need a + ;; client-id for Sente's own implementation: + :client-id client-id #_"unnecessary-for-non-lp-POSTs" + :ring-req ring-req + :event clj + :uid (user-id-fn ring-req client-id) + :?reply-fn (when has-cb? reply-fn)})) + + (if has-cb? + (when-let [ms lp-timeout-ms] + (go + (ch-recv! ch-recv - (merge ev-msg-const - {:client-id client-id - :ring-req ring-req - :event event - :?reply-fn ?reply-fn - :uid uid})))) - - send-handshake! - (fn [server-ch websocket?] - - (timbre/infof "Server will send %s handshake to %s" - (if websocket? :ws :ajax) - (lid uid client-id)) - - (let [?handshake-data (handshake-data-fn ring-req) - handshake-ev - (if (nil? ?handshake-data) ; Micro optimization - [:chsk/handshake [uid nil]] - [:chsk/handshake [uid nil ?handshake-data]])] - (interfaces/sch-send! server-ch websocket? - (pack packer handshake-ev))))] - - (enc/cond - - (str/blank? client-id) - (let [err-msg "Client's Ring request doesn't have a client id. Does your server have the necessary keyword Ring middleware (`wrap-params` & `wrap-keyword-params`)?"] - (timbre/errorf (str err-msg ": %s") ring-req) ; Careful re: % in req - (throw (ex-info err-msg {:ring-req ring-req}))) - - :if-let [resp (possible-rejection-resp ring-req)] resp - :else - (interfaces/ring-req->server-ch-resp web-server-ch-adapter ring-req - {:on-open + (fn ring-handler + ([ring-req] (ring-handler ring-req nil nil)) + ([ring-req ?ring-async-resp-fn ?ring-async-raise-fn] + (let [;; sch-uuid (enc/uuid-str 6) + params (get ring-req :params) + client-id (get params :client-id) + uid (user-id-fn ring-req client-id) + ;; ?ws-key (get-in ring-req [:headers "sec-websocket-key"]) + + receive-event-msg! ; Partial + (fn self + ([event ] (self event nil)) + ([event ?reply-fn] + (put-server-event-msg>ch-recv! ch-recv + (merge ev-msg-const + {:client-id client-id + :ring-req ring-req + :event event + :?reply-fn ?reply-fn + :uid uid})))) + + send-handshake! (fn [server-ch websocket?] - (if websocket? - - ;; WebSocket handshake - (let [updated-conn (upd-conn! :ws uid client-id :any server-ch) - udt-open (:udt updated-conn) - send-handshake? true] - - (timbre/infof "[ws/on-open] New server WebSocket sch for %s: %s" - (lid uid client-id) - {:send-handshake? send-handshake?}) - - (when (connect-uid! :ws uid) - (receive-event-msg! [:chsk/uidport-open uid])) - - (when send-handshake? - (send-handshake! server-ch websocket?)) - - ;; Start server-side ws-kalive loop - ;; Also helps server detect broken conns earlier - (when-let [ms ws-kalive-ms] - (go-loop [udt-t0 udt-open] - (server-ch-resp web-server-ch-adapter ring-req + {:ring-async-resp-fn ?ring-async-resp-fn + :ring-async-raise-fn ?ring-async-raise-fn + + :on-open + (fn [server-ch websocket?] + (if websocket? + + ;; WebSocket handshake + (let [updated-conn (upd-conn! :ws uid client-id :any server-ch) + udt-open (:udt updated-conn) + send-handshake? true] + + (timbre/infof "[ws/on-open] New server WebSocket sch for %s: %s" + (lid uid client-id) + {:send-handshake? send-handshake?}) + + (when (connect-uid! :ws uid) + (receive-event-msg! [:chsk/uidport-open uid])) + + (when send-handshake? + (send-handshake! server-ch websocket?)) + + ;; Start server-side ws-kalive loop + ;; Also helps server detect broken conns earlier + (when-let [ms ws-kalive-ms] + (go-loop [udt-t0 udt-open] + (clients! "Actually pushes buffered events (as packed-str) to all uid's conns. diff --git a/src/taoensso/sente/interfaces.cljc b/src/taoensso/sente/interfaces.cljc index 9ac0a37..6f80af4 100644 --- a/src/taoensso/sente/interfaces.cljc +++ b/src/taoensso/sente/interfaces.cljc @@ -25,19 +25,32 @@ "Wraps a web server's own ring-request->ring-response interface to abstract away implementation differences." (ring-req->server-ch-resp [sch-adapter ring-req callbacks-map] - "Given a Ring request (WebSocket handshake or Ajax GET/POST), returns - a Ring response map with a web-server-specific channel :body that - implements Sente's IServerChan protocol. - - Configures channel callbacks with a callbacks map using keys: - :on-open - (fn [server-ch websocket?]) called exactly once after - channel is available for sending. - :on-close - (fn [server-ch websocket? status]) called exactly once - after channel is closed for any cause, incl. an explicit - call to `close!`. `status` type is currently undefined. - :on-msg - (fn [server-ch websocket? msg]) called for each String or - byte[] message received from client. - :on-error - (fn [server-ch websocket? error]) currently unused.")) + "Given a Ring request (WebSocket GET handshake or Ajax GET/POST), + returns a Ring response map appropriate for the underlying web server. + + `callbacks-map` contains the following functions that must be called as described: + + `:on-open` - (fn [server-ch websocket?]) + Call exactly once after `server-ch` is available for sending. + + `:on-close` - (fn [server-ch websocket? status]) + Call exactly once after `server-ch` is closed for any cause, incl. an + explicit call to `sch-close!`. `status` type is currently undefined. + + `:on-msg` - (fn [server-ch websocket? msg]) + Call for each `String` or byte[] message received from client. + + `:on-error` - (fn [server-ch websocket? error]) + Currently unused. + + The provided `server-ch` arguments must implement the `IServerChan` protocol. + + + `callbacks-map` contains the following functions IFF server is configured to + use async Ring v1.6+ handlers: + + `:ring-async-resp-fn` - ?(fn [ring-response]) + `:ring-async-raise-fn` - ?(fn [throwable])")) ;;;; Packers