Skip to content

Commit

Permalink
Wrap phoenix.js transports
Browse files Browse the repository at this point in the history
Phoenix expected a transport to be API compatible with the JS
WebSocket API. This is a little bit unflexible and configuring
the authToken that is handled differently depending on the transport
was a bit awkward. Thus, we introduce a new Transport class that
should be implemented by the given transport, falling back to a
a WebSocket compatible wrapper, keeping API compatibility.

The websocket specific handling (readyState, onopen, etc.) could
be further abstracted in the future, to make integrating new
transports like WebTransport easier. Then, it would also make
sense to document the Transport API for others to implement
their own transports.
  • Loading branch information
SteffenDE committed Feb 15, 2025
1 parent cbb832f commit 8e0b2a6
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 45 deletions.
42 changes: 16 additions & 26 deletions assets/js/phoenix/longpoll.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {
SOCKET_STATES,
TRANSPORTS,
AUTH_TOKEN_PREFIX
TRANSPORTS
} from "./constants"

import Transport from "./transport"
import Ajax from "./ajax"

let arrayBufferToBase64 = (buffer) => {
Expand All @@ -14,14 +14,9 @@ let arrayBufferToBase64 = (buffer) => {
return btoa(binary)
}

export default class LongPoll {

constructor(endPoint, protocols){
// we only support subprotocols for authToken
// ["phoenix", "base64url.bearer.phx.BASE64_ENCODED_TOKEN"]
if (protocols.length === 2 && protocols[1].startsWith(AUTH_TOKEN_PREFIX)) {
this.authToken = atob(protocols[1].slice(AUTH_TOKEN_PREFIX.length))
}
export default class LongPoll extends Transport {
constructor(endPoint, options){
super(endPoint, options)
this.endPoint = null
this.token = null
this.skipHeartbeat = true
Expand All @@ -30,12 +25,9 @@ export default class LongPoll {
this.currentBatch = null
this.currentBatchTimer = null
this.batchBuffer = []
this.onopen = function (){ } // noop
this.onerror = function (){ } // noop
this.onmessage = function (){ } // noop
this.onclose = function (){ } // noop
this.pollEndpoint = this.normalizeEndpoint(endPoint)
this.readyState = SOCKET_STATES.connecting
this.authToken = options.authToken
this.timeout = options.timeout
// we must wait for the caller to finish setting up our callbacks and timeout properties
setTimeout(() => this.poll(), 0)
}
Expand All @@ -57,7 +49,7 @@ export default class LongPoll {
}

ontimeout(){
this.onerror("timeout")
this.triggerError("timeout")
this.closeAndRetry(1005, "timeout", false)
}

Expand Down Expand Up @@ -97,25 +89,24 @@ export default class LongPoll {
//
// In order to emulate this behaviour, we need to make sure each
// onmessage handler is run within its own macrotask.
setTimeout(() => this.onmessage({data: msg}), 0)
setTimeout(() => this.triggerMessage({data: msg}), 0)
})
this.poll()
break
case 204:
this.poll()
break
case 410:
this.readyState = SOCKET_STATES.open
this.onopen({})
this.triggerOpen({})
this.poll()
break
case 403:
this.onerror(403)
this.triggerError(403)
this.close(1008, "forbidden", false)
break
case 0:
case 500:
this.onerror(500)
this.triggerError(500)
this.closeAndRetry(1011, "internal server error", 500)
break
default: throw new Error(`unhandled poll status ${status}`)
Expand Down Expand Up @@ -144,10 +135,10 @@ export default class LongPoll {

batchSend(messages){
this.awaitingBatchAck = true
this.ajax("POST", "application/x-ndjson", messages.join("\n"), () => this.onerror("timeout"), resp => {
this.ajax("POST", "application/x-ndjson", messages.join("\n"), () => this.triggerError("timeout"), resp => {
this.awaitingBatchAck = false
if(!resp || resp.status !== 200){
this.onerror(resp && resp.status)
this.triggerError(resp && resp.status)
this.closeAndRetry(1011, "internal server error", false)
} else if(this.batchBuffer.length > 0){
this.batchSend(this.batchBuffer)
Expand All @@ -158,15 +149,14 @@ export default class LongPoll {

close(code, reason, wasClean){
for(let req of this.reqs){ req.abort() }
this.readyState = SOCKET_STATES.closed
let opts = Object.assign({code: 1000, reason: undefined, wasClean: true}, {code, reason, wasClean})
this.batchBuffer = []
clearTimeout(this.currentBatchTimer)
this.currentBatchTimer = null
if(typeof(CloseEvent) !== "undefined"){
this.onclose(new CloseEvent("close", opts))
this.triggerClose(new CloseEvent("close", opts))
} else {
this.onclose(opts)
this.triggerClose(opts)
}
}

Expand Down
43 changes: 31 additions & 12 deletions assets/js/phoenix/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {
DEFAULT_VSN,
SOCKET_STATES,
TRANSPORTS,
WS_CLOSE_NORMAL,
AUTH_TOKEN_PREFIX
WS_CLOSE_NORMAL
} from "./constants"

import {
Expand All @@ -17,6 +16,7 @@ import {
import Ajax from "./ajax"
import Channel from "./channel"
import LongPoll from "./longpoll"
import Transport, { WebSocketTransport, WrapperTransport } from "./transport"
import Serializer from "./serializer"
import Timer from "./timer"

Expand All @@ -28,11 +28,12 @@ import Timer from "./timer"
* `"wss://example.com"`
* `"/socket"` (inherited host & protocol)
* @param {Object} [opts] - Optional configuration
* @param {Function} [opts.transport] - The Websocket Transport, for example WebSocket or Phoenix.LongPoll.
* @param {Function} [opts.transport] - The Transport, for example WebSocket or Phoenix.LongPoll.
*
* Defaults to WebSocket with automatic LongPoll fallback if WebSocket is not defined.
* To fallback to LongPoll when WebSocket attempts fail, use `longPollFallbackMs: 2500`.
*
* @param {Object} [opts.transportOpts] - Extra options to pass to the transport. Useful for custom transport implementations.
* @param {number} [opts.longPollFallbackMs] - The millisecond time to attempt the primary transport
* before falling back to the LongPoll transport. Disabled by default.
*
Expand Down Expand Up @@ -117,7 +118,8 @@ export default class Socket {
this.sendBuffer = []
this.ref = 0
this.timeout = opts.timeout || DEFAULT_TIMEOUT
this.transport = opts.transport || global.WebSocket || LongPoll
this.transport = this.initializeTransport(opts)
this.transportOpts = opts.transportOpts || {}
this.primaryPassedHealthCheck = false
this.longPollFallbackMs = opts.longPollFallbackMs
this.fallbackTimer = null
Expand Down Expand Up @@ -346,18 +348,35 @@ export default class Socket {
* @private
*/

initializeTransport(opts) {
if (opts.transport && Transport.isTransport(opts.transport)) {
return opts.transport
} else if (opts.transport) {
// legacy transport (WebSocket or WebSocket compatible class)
return new WrapperTransport(opts.transport)
} else {
// no transport specified, use WebSocket if available, otherwise LongPoll
return global.WebSocket ? WebSocketTransport : LongPoll
}
}

transportConnect(){
this.connectClock++
this.closeWasClean = false
let protocols = ["phoenix"]
// Sec-WebSocket-Protocol based token
// (longpoll uses Authorization header instead)
if (this.authToken) {
protocols.push(`${AUTH_TOKEN_PREFIX}${btoa(this.authToken).replace(/=/g, "")}`)

const options = {
authToken: this.authToken,
...this.transportOpts
}

if (this.transport === LongPoll) {
// special options for longpoll
options.timeout = this.longpollerTimeout
} else {
options.binaryType = this.binaryType
}
this.conn = new this.transport(this.endPointURL(), protocols)
this.conn.binaryType = this.binaryType
this.conn.timeout = this.longpollerTimeout

this.conn = new this.transport(this.endPointURL(), options)
this.conn.onopen = () => this.onConnOpen()
this.conn.onerror = error => this.onConnError(error)
this.conn.onmessage = event => this.onConnMessage(event)
Expand Down
86 changes: 86 additions & 0 deletions assets/js/phoenix/transport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { global, SOCKET_STATES, AUTH_TOKEN_PREFIX } from "./constants"

export default class Transport {
constructor(url, options = {}) {
this.url = url
this.options = options
// TODO: abstract websocket specifics (e.g. readyState)?
this.readyState = SOCKET_STATES.connecting
this.onopen = null
this.onerror = null
this.onmessage = null
this.onclose = null
this.authToken = options.authToken
}

static isTransport(transport) {
return transport.prototype instanceof Transport
}

send(_data) {
throw new Error("send() must be implemented by subclass")
}

close(_code, _reason) {
throw new Error("close() must be implemented by subclass")
}

// Helper methods for subclasses to trigger events
triggerOpen() {
this.readyState = SOCKET_STATES.open
if (this.onopen) this.onopen()
}

triggerError(error) {
if (this.onerror) this.onerror(error)
}

triggerMessage(message) {
if (this.onmessage) this.onmessage(message)
}

triggerClose(event) {
this.readyState = SOCKET_STATES.closed
if (this.onclose) this.onclose(event)
}
}

export class WebSocketTransport extends Transport {
constructor(url, options = {}) {
super(url, options)

// Handle WebSocket-specific protocol setup
const subprotocols = ["phoenix"]
if (this.authToken) {
subprotocols.push(`${AUTH_TOKEN_PREFIX}${btoa(this.authToken).replace(/=/g, "")}`)
}

const WebSocket = options.WebSocket || global.WebSocket
this.ws = new WebSocket(url, subprotocols)
this.ws.binaryType = options.binaryType

this.ws.onopen = () => this.triggerOpen()
this.ws.onerror = (error) => this.triggerError(error)
this.ws.onmessage = (event) => this.triggerMessage(event)
this.ws.onclose = (event) => this.triggerClose(event)
}

send(data) {
this.ws.send(data)
}

close(code, reason) {
this.ws.close(code, reason)
}
}

export class WrapperTransport {
constructor(transport) {
return class extends WebSocketTransport {
constructor(url, options) {
options.WebSocket = transport
super(url, options)
}
}
}
}
2 changes: 1 addition & 1 deletion assets/test/channel_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe("with transport", function (){
const socket = new Socket("/socket", {authToken})

socket.connect()
expect(socket.conn.protocols).toEqual(["phoenix", "base64url.bearer.phx.MTIzNA"])
expect(socket.conn.ws.protocols).toEqual(["phoenix", "base64url.bearer.phx.MTIzNA"])
})
})

Expand Down
Loading

0 comments on commit 8e0b2a6

Please # to comment.