Skip to content

Commit

Permalink
Merge pull request #59 from celiocidral/websocket-protocol-upgrade-su…
Browse files Browse the repository at this point in the history
…pport

Add support for websocket protocol upgrade.
  • Loading branch information
sunng87 authored Nov 1, 2021
2 parents 9b91eda + 81d2b0b commit 152d012
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 42 deletions.
59 changes: 59 additions & 0 deletions examples/rj9a/websocket_upgrade.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
(ns rj9a.websocket-upgrade
(:gen-class)
(:require [ring.adapter.jetty9 :as jetty]
[ring.adapter.jetty9.websocket :refer [ws-upgrade-request? ws-upgrade-response]]))

(defn my-websocket-handler [_]
{:on-connect (fn on-connect [_]
(tap> [:ws :connect]))
:on-text (fn on-text [ws text-message]
(tap> [:ws :msg text-message])
(jetty/send! ws (str "echo: " text-message)))
:on-bytes (fn on-bytes [_ _ _ _]
(tap> [:ws :bytes]))
:on-close (fn on-close [_ status-code reason]
(tap> [:ws :close status-code reason]))
:on-ping (fn on-ping [ws payload]
(tap> [:ws :ping])
(jetty/send! ws payload))
:on-pong (fn on-pong [_ _]
(tap> [:ws :pong]))
:on-error (fn on-error [_ e]
(tap> [:ws :error e]))})

(defn handler [req]
(if (ws-upgrade-request? req)
(ws-upgrade-response my-websocket-handler)
{:status 200 :body "hello"}))

(defn async-handler [request send-response _]
(send-response
(if (ws-upgrade-request? request)
(ws-upgrade-response my-websocket-handler)
{:status 200 :body "hello"})))

(defonce server (atom nil))

(defn start! [async?]
(when-not @server
(reset! server (jetty/run-jetty
(if async? #'async-handler #'handler)
{:port 5000
:join? false
:async? async?
:allow-null-path-info true
;; The same ws can also be available via the old regular websocket endpoints.
;; It's added here in this example just for regression testing purposes.
:websockets {"/mywebsocket" my-websocket-handler}}))))

(defn stop! []
(when @server
(jetty/stop-server @server)
(reset! server nil)))

(comment
(start! false)
(stop!))

(defn -main [& _]
(start! false))
83 changes: 55 additions & 28 deletions src/ring/adapter/jetty9.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
HTTP2CServerConnectionFactory HTTP2ServerConnectionFactory]
[org.eclipse.jetty.alpn.server ALPNServerConnectionFactory]
[java.security KeyStore])
(:require [ring.util.servlet :as servlet]
[ring.adapter.jetty9.common :refer [RequestMapDecoder build-request-map]]
(:require [clojure.string :refer [lower-case]]
[ring.util.servlet :as servlet]
[ring.adapter.jetty9.common :refer [RequestMapDecoder build-request-map lower-case-keys]]
[ring.adapter.jetty9.websocket :as ws]))

(def send! ws/send!)
Expand All @@ -44,39 +45,65 @@
(string? response) {:body response}
:else response))

(defn- websocket-upgrade-response? [{:keys [status headers]}]
;; HTTP 101 Switching Protocols
;; https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/101
(and (= 101 status)
(let [headers (lower-case-keys headers)]
(and (= "websocket" (lower-case (get headers "upgrade")))
(= "upgrade" (lower-case (get headers "connection")))))))

(defn ^:internal wrap-proxy-handler
"Wraps a Jetty handler in a ServletContextHandler.
Websocket upgrades require a servlet context which makes it
necessary to wrap the handler in a servlet context handler."
[jetty-handler]
(doto (ServletContextHandler.)
(.setContextPath "/*")
(.setAllowNullPathInfo true)
(JettyWebSocketServletContainerInitializer/configure nil)
(.setHandler jetty-handler)))

(defn ^:internal proxy-handler
"Returns an Jetty Handler implementation for the given Ring handler."
[handler]
(proxy [AbstractHandler] []
(handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response]
(try
(let [request-map (build-request-map request)
response-map (-> (handler request-map)
normalize-response)]
(when response-map
(servlet/update-servlet-response response response-map)))
(catch Throwable e
(.sendError response 500 (.getMessage e)))
(finally
(.setHandled base-request true))))))
(wrap-proxy-handler
(proxy [AbstractHandler] []
(handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response]
(try
(let [request-map (build-request-map request)
response-map (-> (handler request-map)
normalize-response)]
(when response-map
(if (websocket-upgrade-response? response-map)
(ws/upgrade-websocket request response (:ws response-map) {})
(servlet/update-servlet-response response response-map))))
(catch Throwable e
(.sendError response 500 (.getMessage e)))
(finally
(.setHandled base-request true)))))))

(defn ^:internal proxy-async-handler
"Returns an Jetty Handler implementation for the given Ring **async** handler."
[handler]
(proxy [AbstractHandler] []
(handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response]
(try
(let [^AsyncContext context (.startAsync request)]
(handler
(servlet/build-request-map request)
(fn [response-map]
(let [response-map (normalize-response response-map)]
(servlet/update-servlet-response response context response-map)))
(fn [^Throwable exception]
(.sendError response 500 (.getMessage exception))
(.complete context))))
(finally
(.setHandled base-request true))))))
(wrap-proxy-handler
(proxy [AbstractHandler] []
(handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response]
(try
(let [^AsyncContext context (.startAsync request)]
(handler
(servlet/build-request-map request)
(fn [response-map]
(let [response-map (normalize-response response-map)]
(if (websocket-upgrade-response? response-map)
(ws/upgrade-websocket request response context (:ws response-map) {})
(servlet/update-servlet-response response context response-map))))
(fn [^Throwable exception]
(.sendError response 500 (.getMessage exception))
(.complete context))))
(finally
(.setHandled base-request true)))))))

(defn- http-config
"Creates jetty http configurator"
Expand Down
7 changes: 7 additions & 0 deletions src/ring/adapter/jetty9/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@
(string/join ","))))
{}
(enumeration-seq (.getHeaderNames request))))

(defn lower-case-keys [m]
(->> m
(map #(if (string? (first %))
(update % 0 string/lower-case)
%))
(into {})))
80 changes: 66 additions & 14 deletions src/ring/adapter/jetty9/websocket.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
[org.eclipse.jetty.websocket.server JettyWebSocketServerContainer
JettyWebSocketCreator JettyServerUpgradeRequest]
[org.eclipse.jetty.websocket.common JettyExtensionConfig]
[javax.servlet.http HttpServlet]
[javax.servlet AsyncContext]
[javax.servlet.http HttpServlet HttpServletRequest HttpServletResponse]
[clojure.lang IFn]
[java.nio ByteBuffer]
[java.util Locale]
[java.time Duration])
(:require [ring.adapter.jetty9.common :refer [RequestMapDecoder build-request-map get-headers set-headers]]))
(:require [clojure.string :refer [lower-case]]
[ring.adapter.jetty9.common :refer [RequestMapDecoder build-request-map get-headers set-headers]]))

(defprotocol WebSocketProtocol
(send! [this msg] [this msg callback])
Expand Down Expand Up @@ -185,18 +187,68 @@
(.setExtensions resp (mapv #(JettyExtensionConfig. ^String %) exts)))
(proxy-ws-adapter ws-results)))))))

(defn proxy-ws-servlet [ws {:as _
:keys [ws-max-idle-time
ws-max-text-message-size]
:or {ws-max-idle-time 500000
ws-max-text-message-size 65536}}]
(defn upgrade-websocket
([req res ws options]
(upgrade-websocket req res nil ws options))
([^HttpServletRequest req
^HttpServletResponse res
^AsyncContext async-context
ws
{:as _options
:keys [ws-max-idle-time
ws-max-text-message-size]
:or {ws-max-idle-time 500000
ws-max-text-message-size 65536}}]
{:pre [(or (map? ws) (fn? ws))]}
(let [creator (if (map? ws)
(reify-default-ws-creator ws)
(reify-custom-ws-creator ws))
container (JettyWebSocketServerContainer/getContainer (.getServletContext req))]
(.setIdleTimeout container (Duration/ofMillis ws-max-idle-time))
(.setMaxTextMessageSize container ws-max-text-message-size)
(.upgrade container creator req res)
(when async-context
(.complete async-context)))))

(defn proxy-ws-servlet [ws options]
(ServletHolder.
(proxy [HttpServlet] []
(doGet [req res]
(let [creator (if (map? ws)
(reify-default-ws-creator ws)
(reify-custom-ws-creator ws))
container (JettyWebSocketServerContainer/getContainer (.getServletContext ^HttpServlet this))]
(.setIdleTimeout container (Duration/ofMillis ws-max-idle-time))
(.setMaxTextMessageSize container ws-max-text-message-size)
(.upgrade container creator req res))))))
(upgrade-websocket req res ws options)))))

(defn ws-upgrade-request?
"Checks if a request is a websocket upgrade request.
It is a websocket upgrade request when it contains the following headers:
- connection: upgrade
- upgrade: websocket
"
[{:keys [headers] :as _request-map}]
(let [upgrade (get headers "upgrade")
connection (get headers "connection")]
(and upgrade
connection
(= "websocket" (lower-case upgrade))
(= "upgrade" (lower-case connection)))))

(defn ws-upgrade-response
"Returns a websocket upgrade response.
ws-handler must be a map of handler fns:
{:on-connect #(create-fn %) ; ^Session ws-session
:on-text #(text-fn % %2 %3 %4) ; ^Session ws-session message
:on-bytes #(binary-fn % %2 %3 %4 %5 %6) ; ^Session ws-session payload offset len
:on-close #(close-fn % %2 %3 %4) ; ^Session ws-session statusCode reason
:on-error #(error-fn % %2 %3)} ; ^Session ws-session e
or a custom creator function take upgrade request as parameter and returns a handler fns map (or error info).
The response contains HTTP status 101 (Switching Protocols)
and the following headers:
- connection: upgrade
- upgrade: websocket
"
[ws-handler]
{:status 101 ;; http 101 switching protocols
:headers {"upgrade" "websocket"
"connection" "upgrade"}
:ws ws-handler})

0 comments on commit 152d012

Please # to comment.