3/12/2020
This is Part 2 of a small series exploring a websocket server powered by µWebSockets.js.
Part 1 established a skeleton websocket server.
In this post we'll build up a client library for establishing a connection to our websocket server using standard websockets.
I encourage you to follow along in your own IDE. A GitHub project is also available with tags at each pinnacle commit.
We left off Part 1 with a fully functional websocket server that admittedly didn't do anything too exciting beyond echoing back messages.
Our client connection would also timeout after idling for a short time. A key capability of realtime communication over websockets is establishing and maintaining a connection to a server. This capability is often times provided out-of-the-box with many websocket libraries, but not so using standard websockets ourselves.
Let's fix that.
Create a new file. I called mine client.js
. In it we'll create a small library for establishing a connection to our websocket server, and implementing solutions for maintaining that connection and auto-reconnecting indefinitely should that connection ever be severed.
The only thing we need to establish a websocket connection is a URL. So let's export a function that takes a single argument - url
.
let ws
export function connect(url) {
if (!url) throw new Error('url is a required argument')
// Create websocket connection
ws = new WebSocket(url)
ws.binaryType = 'arraybuffer'
// Called when a new connection is established
ws.onopen = () => {}
// Called when a connection has been closed
ws.onclose = () => {}
// Called when a message is received from the server
ws.onmessage = (packet) => {}
}
Here we see some familiar things - a new Websocket
is created using the url
we pass in, and event listeners are assigned for each of the three different websocket actions: onopen
, onclose
, and onmessage
.
NOTE : Valid websocket binary types are "blob" and "ArrayBuffer". As we learned in Part 1, µWebSockets.js trasmits data using
ArrayBuffers
, so we explicitly configurews.binaryType = 'arraybuffer'
A socket connection will timeout if idling for too long. A common practice to combat this is to intermittently send an empty message to the websocket server for the sole purpose of keeping the connection alive. We'll configure our library to send a keep-alive
message every minute so our connection never idles for too long.
let ws,
keepAlive,
retries = 0
const MAX_WAIT_TIME = 4096
// Algorithm for throttling reconnecting clients
const exponentialBackoff = () => 2 ** retries++ + Math.random(0, 1000)
const emit = (action, data) => ws.send(JSON.stringify({ action, data }))
export function connect(url) {
// ...
ws.onopen = () => {
retries = 0
// Keeps the connection alive. An idle connection will timeout and disconnect.
keepAlive = setInterval(() => emit('keep-alive'), 60000)
}
ws.onclose = () => {
clearInterval(keepAlive)
// Auto-reconnect indefinitely
setTimeout(connect.bind(null, url), Math.min(exponentialBackoff(), MAX_WAIT_TIME))
}
// ...
}
We define a few new variables:
keepAlive
: tracks an interval Idretries
: counter for reconnection attemptsMAX_WAIT_TIME
: The maximum amount of time between reconnect attemptsAnd also a couple of arrow functions:
exponentialBackoff()
: throttles reconnecting clients to avoid flooding the serveremit(action, data)
: handles packaging and shipping a { action, data } payload to the websocket serverUpon successfully connecting to the websocket server, we'll reset our retries
counter back to 0
and assign keepAlive
to a new Interval
that will emit
up to our websocket server a keep-alive
action every 60,000ms (1 minute).
If and when the connection is severed for any reason (the server reboots, the client loses internet connectivity, etc) we'll clear the interval and set a timeout to automatically reconnect utilizing our exponentialBackoff
algorithm and MAX_WAIT_TIME
.
What's nice about this configuration is that if the client is unsuccessful in re-establishing its connection, the onclose
handler will fire, set a new timeout to reconnect, attempt to reconnect, if it can't, call onclose
, which will set a new timeout to reconnect... This process will continue until it finally establishes its connection and calls onopen
, or the User exits the client application.
NOTE : You could of course pass in an argument for, say,
maxReconnectAttempts
, and give up afterX
amount of retries, but in my experience whenever there's a websocket connection involved it needs to be a persistent connection, so we use an indefinite auto-reconnect strategy here. However, we will provide a means to permanentlyend
the connection - requiring a new connection to be made - should that be desired.
With our client library mostly constructed with auto-reconnect built-in, primarily what's left is handling messages we receive from the websocket server.
Thinking about how we'd like to use this library, we'd like consumers to be able to register and unregister their own handler methods
for any given action
. To facilitate this, part of our module export will be functions for adding and removing onmessage
event handlers.
// ...
let onMessages = []
const addMessage = (action, fn) => {
const exists = onMessages.find(m => m.action === action && m.fn === fn)
if (!exists)
onMessages.push({ action, fn })
}
const delMessage = (fn) => { onMessages = onMessages.filter(m => m.fn !== fn) }
// Helper for wrapping a function in a try/catch block
function tryCatch(fn) {
try {
fn()
} catch (e) {
console.error(e)
}
}
// Helper to escape any RegEx special characters in a received 'action' name
const sanitizeRegex = (action) => action.replace(/(\^|\$|\.|\?|\*|\+|\(|\)|\/)/g, '\\$1')
function connect(url) {
// ...
ws.onmessage = (packet) => {
const { action, data } = JSON.parse(packet.data)
const regex = new RegExp(`^${sanitizeRegex(action)}$`, 'gi')
const messages = onMessages.filter(m => m.action.match(regex))
messages.forEach(m => { tryCatch(m.fn.bind(null, data)) })
if (!messages.length)
console.warn(`No registered onMessage handlers for action '${action}'`)
}
}
export {
connect,
addMessage,
delMessage
}
We restructure our library a little bit by moving our export
from directly on the connect
function to a central named export
at the bottom. Along with connect
we export two new functions - addMessage
and delMessage
- for consumers to be able to register and unregister their event handlers:
// example
import { addMessage, delMessage } from './client.js'
function pong (data) { console.log(data) }
addMessage('pong', pong)
// Some time later...
delMessage(pong)
When a message is received from the websocket server we extract out the action
name (required) and the data
payload (optional), search through our onMessages
collection for any event handlers that match the given action
, and if we find any, call them, passing along the data
payload to each.
This same pattern can be applied to the onopen
and onclose
events of the websocket connection.
// ...
let onOpen = []
const addOnOpen = (fn) => {
const exists = onOpen.find(fn)
if (!exists)
onOpen.push(fn)
}
const delOnOpen = (fn) => { onOpen = onOpen.filter(o => o !== fn) }
let onClose = []
const addOnClose = (fn) => {
const exists = onClose.find(fn)
if (!exists)
onClose.push(fn)
}
const delOnClose = (fn) => { onClose = onClose.filter(o => o !== fn) }
// ...
function connect(url) {
// ...
ws.onopen = () => {
// ...
onOpen.forEach(fn => tryCatch(fn))
}
ws.onclose = () => {
// ...
onClose.forEach(fn => tryCatch(fn))
// ...
}
// ...
}
export {
connect,
addOnOpen,
delOnOpen,
addOnClose,
delOnClose,
addMessage,
delMessage
}
With these in place a consumer can register event handlers for when the websocket connection opens - perhaps fetching data to make sure everything's up-to-date - and closes down, performing any cleanup operations.
// example
import { addOnOpen, delOnOpen, addOnClose, delOnClose } from './client.js'
async function onOpen() { console.log('Fetching some data') }
function onClose() { console.log('Cleaning things up') }
addOnOpen(onOpen)
addOnClose(onClose)
// Some time later...
delOnOpen(onOpen)
delOnClose(onClose)
Finally we'll add the ability to permanently sever the websocket connection. This can be useful if you need to kill the connection for certain application states. For example, if you only want a live websocket connection when a User is logged in, you can start the connection on login and kill it on logout.
// ...
function connect(url) {
if (!url) throw new Error('url is a required argument')
// Ensure a single websocket connection exists
const existingOpenConnection = ws && ws.readyState === 1
if (existingOpenConnection)
disconnect()
// ...
ws.onclose = (event) => {
clearInterval(keepAlive)
// Auto-reconnect indefinitely
// "event.target" will be the closing websocket
if (!event.target.forceClose)
setTimeout(connect.bind(null, url), Math.min(exponentialBackoff(), MAX_WAIT_TIME))
onClose.forEach(fn => tryCatch(fn))
}
// ...
}
// ...
// Mark the websocket as force closed so it doesn't attempt to reconnect
function disconnect() {
ws.forceClose = true
ws.close()
}
export {
connect,
disconnect,
emit,
addOnOpen,
delOnOpen,
addOnClose,
delOnClose,
addMessage,
delMessage
}
By adding a simple forceClose
flag to the websocket we can bypass the setTimeout
in the onclose
handler that would otherwise attempt to reconnect the websocket.
NOTE : Neither in the
onclose
handler or in thedisconnect
function are we clearing theonMessages
,onOpen
, oronClose
handler collections. It is the responsiblity of the registering call sites to handle unregistering their registered event handlers when appropriate. For example, given a Vue or React component, during their render or mounted lifecycle hooks they could register websocket event handlers. Subsequently during their unmount or destroy lifecycle hooks could they then (and should) unregister the handlers. But in the event this is forgotten or missed by the developer, we guard against adding duplicate handlers within the library.
Below is the full client library example. I also added a close
function so you can manually close the connection and see its auto-reconnect capabilities in action.
If you would like to see a simple example of using this library feel free to clone the companion repo to this blog series. Follow its README and give it a run.
In Part 3 we'll flesh out the websocket server further to manage client connections, as well as make use of the http server capabilities it ships with.
let retries = 0,
keepAlive,
ws
const MAX_WAIT_TIME = 4096
/**
* Algorithm for throttling reconnecting clients
*/
const exponentialBackoff = () => 2 ** retries++ + Math.random(0, 1000)
/**
* Emit an action to the websocket server with a data payload
* @param {!string} action - the message label
* @param {any} data - data payload
*/
const emit = (action, data) => ws.send(JSON.stringify({ action, data }))
/**
* Functions to run when messages are received from the websocket server
*/
let onMessages = []
/**
* Register an onMessage function. Guard against duplicates.
* @param {!string} action - action name
* @param {!Function} fn - function to execute
*/
const addMessage = (action, fn) => {
const exists = onMessages.find(m => m.action === action && m.fn === fn)
if (!exists)
onMessages.push({ action, fn })
}
/**
* Unregister onMessage functions
*/
const delMessage = (fn) => { onMessages = onMessages.filter(m => m.fn !== fn) }
/**
* Functions to run when a connection is established with the websocket server
*/
let onOpen = []
/**
* Register an onOpen function. Guard against duplicates.
*/
const addOnOpen = (fn) => {
const exists = onOpen.find(fn)
if (!exists)
onOpen.push(fn)
}
/**
* Unregister an onOpen function
*/
const delOnOpen = (fn) => { onOpen = onOpen.filter(o => o !== fn) }
/**
* Functions to run when a connection is lost to the websocket server
*/
let onClose = []
/**
* Register an onClose function
*/
const addOnClose = (fn) => {
const exists = onClose.find(fn)
if (!exists)
onClose.push(fn)
}
/**
* Unregister an onClose function
*/
const delOnClose = (fn) => { onClose = onClose.filter(o => o !== fn) }
// Helper for wrapping a function in a try/catch block
function tryCatch(fn) {
try {
fn()
} catch (e) {
console.error(e)
}
}
// Helper to escape any RegEx special characters in a received 'action' name
const sanitizeRegex = (action) => action.replace(/(\^|\$|\.|\?|\*|\+|\(|\)|\/)/gi, '\\$1')
/**
* Creates a websocket connection with indefinite auto-reconnection.
* @param {!string} url - websocket endpoint to connect to.
*/
function connect(url) {
if (!url) throw new Error('url is a required argument')
// Ensure a single websocket connection exists
const existingOpenConnection = ws && ws.readyState === 1
if (existingOpenConnection)
disconnect()
ws = new WebSocket(url)
ws.binaryType = 'arraybuffer'
// Called when a new connection is established
ws.onopen = () => {
retries = 0
// Keeps the TCP connection alive. Otherwise it times out every 2 minutes
keepAlive = setInterval(() => { emit('keep-alive') }, 60000)
onOpen.forEach(fn => tryCatch(fn))
}
// Called when a connection has been closed
ws.onclose = (event) => {
clearInterval(keepAlive)
// Auto-reconnect indefinitely
// "event.target" will be the closing websocket, whereas "ws" could be a new websocket created in connect(url)
if (!event.target.forceClose)
setTimeout(connect.bind(null, url), Math.min(exponentialBackoff(), MAX_WAIT_TIME))
onClose.forEach(fn => tryCatch(fn))
}
// Called when a message is received from the server
ws.onmessage = (packet) => {
const { action, data } = JSON.parse(packet.data)
const regex = new RegExp(`^${sanitizeRegex(action)}$`, 'gi')
const messages = onMessages.filter(m => m.action.match(regex))
messages.forEach(m => { tryCatch(m.fn.bind(null, data)) })
if (!messages.length)
console.warn(`No registered onMessage handlers for action '${action}'`)
}
}
/**
* Manually close the websocket connection to test reconnection strategy
*/
function close () {
ws.close()
}
/**
* Force-terminate the websocket connection without auto-reconnecting
* Will require explicitly calling connect(url) again to re-open
*/
function disconnect() {
ws.forceClose = true
ws.close()
}
export {
connect,
disconnect,
close,
emit,
addOnOpen,
delOnOpen,
addOnClose,
delOnClose,
addMessage,
delMessage
}