diff --git a/examples/rj9a/websocket_upgrade.clj b/examples/rj9a/websocket_upgrade.clj new file mode 100644 index 0000000..442d679 --- /dev/null +++ b/examples/rj9a/websocket_upgrade.clj @@ -0,0 +1,59 @@ +(ns rj9a.websocket-upgrade + (:gen-class) + (:require [ring.adapter.jetty9 :as jetty] + [ring.adapter.jetty9.websocket :refer [ws-upgrade-request? ws-upgrade-response]])) + +(defn my-websocket-handler [_] + {:on-connect (fn on-connect [_] + (tap> [:ws :connect])) + :on-text (fn on-text [ws text-message] + (tap> [:ws :msg text-message]) + (jetty/send! ws (str "echo: " text-message))) + :on-bytes (fn on-bytes [_ _ _ _] + (tap> [:ws :bytes])) + :on-close (fn on-close [_ status-code reason] + (tap> [:ws :close status-code reason])) + :on-ping (fn on-ping [ws payload] + (tap> [:ws :ping]) + (jetty/send! ws payload)) + :on-pong (fn on-pong [_ _] + (tap> [:ws :pong])) + :on-error (fn on-error [_ e] + (tap> [:ws :error e]))}) + +(defn handler [req] + (if (ws-upgrade-request? req) + (ws-upgrade-response my-websocket-handler) + {:status 200 :body "hello"})) + +(defn async-handler [request send-response _] + (send-response + (if (ws-upgrade-request? request) + (ws-upgrade-response my-websocket-handler) + {:status 200 :body "hello"}))) + +(defonce server (atom nil)) + +(defn start! [async?] + (when-not @server + (reset! server (jetty/run-jetty + (if async? #'async-handler #'handler) + {:port 5000 + :join? false + :async? async? + :allow-null-path-info true + ;; The same ws can also be available via the old regular websocket endpoints. + ;; It's added here in this example just for regression testing purposes. + :websockets {"/mywebsocket" my-websocket-handler}})))) + +(defn stop! [] + (when @server + (jetty/stop-server @server) + (reset! server nil))) + +(comment + (start! false) + (stop!)) + +(defn -main [& _] + (start! false)) diff --git a/src/ring/adapter/jetty9.clj b/src/ring/adapter/jetty9.clj index ca5e6cf..17ed466 100644 --- a/src/ring/adapter/jetty9.clj +++ b/src/ring/adapter/jetty9.clj @@ -20,8 +20,9 @@ HTTP2CServerConnectionFactory HTTP2ServerConnectionFactory] [org.eclipse.jetty.alpn.server ALPNServerConnectionFactory] [java.security KeyStore]) - (:require [ring.util.servlet :as servlet] - [ring.adapter.jetty9.common :refer [RequestMapDecoder build-request-map]] + (:require [clojure.string :refer [lower-case]] + [ring.util.servlet :as servlet] + [ring.adapter.jetty9.common :refer [RequestMapDecoder build-request-map lower-case-keys]] [ring.adapter.jetty9.websocket :as ws])) (def send! ws/send!) @@ -44,39 +45,65 @@ (string? response) {:body response} :else response)) +(defn- websocket-upgrade-response? [{:keys [status headers]}] + ;; HTTP 101 Switching Protocols + ;; https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/101 + (and (= 101 status) + (let [headers (lower-case-keys headers)] + (and (= "websocket" (lower-case (get headers "upgrade"))) + (= "upgrade" (lower-case (get headers "connection"))))))) + +(defn ^:internal wrap-proxy-handler + "Wraps a Jetty handler in a ServletContextHandler. + + Websocket upgrades require a servlet context which makes it + necessary to wrap the handler in a servlet context handler." + [jetty-handler] + (doto (ServletContextHandler.) + (.setContextPath "/*") + (.setAllowNullPathInfo true) + (JettyWebSocketServletContainerInitializer/configure nil) + (.setHandler jetty-handler))) + (defn ^:internal proxy-handler "Returns an Jetty Handler implementation for the given Ring handler." [handler] - (proxy [AbstractHandler] [] - (handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response] - (try - (let [request-map (build-request-map request) - response-map (-> (handler request-map) - normalize-response)] - (when response-map - (servlet/update-servlet-response response response-map))) - (catch Throwable e - (.sendError response 500 (.getMessage e))) - (finally - (.setHandled base-request true)))))) + (wrap-proxy-handler + (proxy [AbstractHandler] [] + (handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response] + (try + (let [request-map (build-request-map request) + response-map (-> (handler request-map) + normalize-response)] + (when response-map + (if (websocket-upgrade-response? response-map) + (ws/upgrade-websocket request response (:ws response-map) {}) + (servlet/update-servlet-response response response-map)))) + (catch Throwable e + (.sendError response 500 (.getMessage e))) + (finally + (.setHandled base-request true))))))) (defn ^:internal proxy-async-handler "Returns an Jetty Handler implementation for the given Ring **async** handler." [handler] - (proxy [AbstractHandler] [] - (handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response] - (try - (let [^AsyncContext context (.startAsync request)] - (handler - (servlet/build-request-map request) - (fn [response-map] - (let [response-map (normalize-response response-map)] - (servlet/update-servlet-response response context response-map))) - (fn [^Throwable exception] - (.sendError response 500 (.getMessage exception)) - (.complete context)))) - (finally - (.setHandled base-request true)))))) + (wrap-proxy-handler + (proxy [AbstractHandler] [] + (handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response] + (try + (let [^AsyncContext context (.startAsync request)] + (handler + (servlet/build-request-map request) + (fn [response-map] + (let [response-map (normalize-response response-map)] + (if (websocket-upgrade-response? response-map) + (ws/upgrade-websocket request response context (:ws response-map) {}) + (servlet/update-servlet-response response context response-map)))) + (fn [^Throwable exception] + (.sendError response 500 (.getMessage exception)) + (.complete context)))) + (finally + (.setHandled base-request true))))))) (defn- http-config "Creates jetty http configurator" diff --git a/src/ring/adapter/jetty9/common.clj b/src/ring/adapter/jetty9/common.clj index 2fccc28..0b6539a 100644 --- a/src/ring/adapter/jetty9/common.clj +++ b/src/ring/adapter/jetty9/common.clj @@ -30,3 +30,10 @@ (string/join ",")))) {} (enumeration-seq (.getHeaderNames request)))) + +(defn lower-case-keys [m] + (->> m + (map #(if (string? (first %)) + (update % 0 string/lower-case) + %)) + (into {}))) diff --git a/src/ring/adapter/jetty9/websocket.clj b/src/ring/adapter/jetty9/websocket.clj index 3584b40..b0540f0 100644 --- a/src/ring/adapter/jetty9/websocket.clj +++ b/src/ring/adapter/jetty9/websocket.clj @@ -6,12 +6,14 @@ [org.eclipse.jetty.websocket.server JettyWebSocketServerContainer JettyWebSocketCreator JettyServerUpgradeRequest] [org.eclipse.jetty.websocket.common JettyExtensionConfig] - [javax.servlet.http HttpServlet] + [javax.servlet AsyncContext] + [javax.servlet.http HttpServlet HttpServletRequest HttpServletResponse] [clojure.lang IFn] [java.nio ByteBuffer] [java.util Locale] [java.time Duration]) - (:require [ring.adapter.jetty9.common :refer [RequestMapDecoder build-request-map get-headers set-headers]])) + (:require [clojure.string :refer [lower-case]] + [ring.adapter.jetty9.common :refer [RequestMapDecoder build-request-map get-headers set-headers]])) (defprotocol WebSocketProtocol (send! [this msg] [this msg callback]) @@ -185,18 +187,68 @@ (.setExtensions resp (mapv #(JettyExtensionConfig. ^String %) exts))) (proxy-ws-adapter ws-results))))))) -(defn proxy-ws-servlet [ws {:as _ - :keys [ws-max-idle-time - ws-max-text-message-size] - :or {ws-max-idle-time 500000 - ws-max-text-message-size 65536}}] +(defn upgrade-websocket + ([req res ws options] + (upgrade-websocket req res nil ws options)) + ([^HttpServletRequest req + ^HttpServletResponse res + ^AsyncContext async-context + ws + {:as _options + :keys [ws-max-idle-time + ws-max-text-message-size] + :or {ws-max-idle-time 500000 + ws-max-text-message-size 65536}}] + {:pre [(or (map? ws) (fn? ws))]} + (let [creator (if (map? ws) + (reify-default-ws-creator ws) + (reify-custom-ws-creator ws)) + container (JettyWebSocketServerContainer/getContainer (.getServletContext req))] + (.setIdleTimeout container (Duration/ofMillis ws-max-idle-time)) + (.setMaxTextMessageSize container ws-max-text-message-size) + (.upgrade container creator req res) + (when async-context + (.complete async-context))))) + +(defn proxy-ws-servlet [ws options] (ServletHolder. (proxy [HttpServlet] [] (doGet [req res] - (let [creator (if (map? ws) - (reify-default-ws-creator ws) - (reify-custom-ws-creator ws)) - container (JettyWebSocketServerContainer/getContainer (.getServletContext ^HttpServlet this))] - (.setIdleTimeout container (Duration/ofMillis ws-max-idle-time)) - (.setMaxTextMessageSize container ws-max-text-message-size) - (.upgrade container creator req res)))))) + (upgrade-websocket req res ws options))))) + +(defn ws-upgrade-request? + "Checks if a request is a websocket upgrade request. + + It is a websocket upgrade request when it contains the following headers: + - connection: upgrade + - upgrade: websocket + " + [{:keys [headers] :as _request-map}] + (let [upgrade (get headers "upgrade") + connection (get headers "connection")] + (and upgrade + connection + (= "websocket" (lower-case upgrade)) + (= "upgrade" (lower-case connection))))) + +(defn ws-upgrade-response + "Returns a websocket upgrade response. + + ws-handler must be a map of handler fns: + {:on-connect #(create-fn %) ; ^Session ws-session + :on-text #(text-fn % %2 %3 %4) ; ^Session ws-session message + :on-bytes #(binary-fn % %2 %3 %4 %5 %6) ; ^Session ws-session payload offset len + :on-close #(close-fn % %2 %3 %4) ; ^Session ws-session statusCode reason + :on-error #(error-fn % %2 %3)} ; ^Session ws-session e + or a custom creator function take upgrade request as parameter and returns a handler fns map (or error info). + + The response contains HTTP status 101 (Switching Protocols) + and the following headers: + - connection: upgrade + - upgrade: websocket + " + [ws-handler] + {:status 101 ;; http 101 switching protocols + :headers {"upgrade" "websocket" + "connection" "upgrade"} + :ws ws-handler})