Websockets with NodeJS and µWebSockets.js — Part 2

3/12/2020

This is Part 2 of a small series exploring a websocket server powered by µWebSockets.js.

Part 1 established a skeleton websocket server.

In this post we'll build up a client library for establishing a connection to our websocket server using standard websockets.

I encourage you to follow along in your own IDE. A GitHub project is also available with tags at each pinnacle commit.

Websocket Client

We left off Part 1 with a fully functional websocket server that admittedly didn't do anything too exciting beyond echoing back messages.

Our client connection would also timeout after idling for a short time. A key capability of realtime communication over websockets is establishing and maintaining a connection to a server. This capability is often times provided out-of-the-box with many websocket libraries, but not so using standard websockets ourselves.

Let's fix that.

Client Library

Create a new file. I called mine client.js. In it we'll create a small library for establishing a connection to our websocket server, and implementing solutions for maintaining that connection and auto-reconnecting indefinitely should that connection ever be severed.

The only thing we need to establish a websocket connection is a URL. So let's export a function that takes a single argument - url.

let ws

export function connect(url) {

  if (!url) throw new Error('url is a required argument')

  // Create websocket connection
  ws = new WebSocket(url)

  ws.binaryType = 'arraybuffer'

  // Called when a new connection is established
  ws.onopen = () => {}

  // Called when a connection has been closed
  ws.onclose = () => {}

  // Called when a message is received from the server
  ws.onmessage = (packet) => {}
}

Here we see some familiar things - a new Websocket is created using the url we pass in, and event listeners are assigned for each of the three different websocket actions: onopen, onclose, and onmessage.

NOTE : Valid websocket binary types are "blob" and "ArrayBuffer". As we learned in Part 1, µWebSockets.js trasmits data using ArrayBuffers, so we explicitly configure ws.binaryType = 'arraybuffer'

Reconnection Strategy

A socket connection will timeout if idling for too long. A common practice to combat this is to intermittently send an empty message to the websocket server for the sole purpose of keeping the connection alive. We'll configure our library to send a keep-alive message every minute so our connection never idles for too long.

let ws,
  keepAlive,
  retries = 0

const MAX_WAIT_TIME = 4096

// Algorithm for throttling reconnecting clients
const exponentialBackoff = () => 2 ** retries++ + Math.random(0, 1000)

const emit = (action, data) => ws.send(JSON.stringify({ action, data }))

export function connect(url) {

  // ...

  ws.onopen = () => {
    retries = 0

    // Keeps the connection alive. An idle connection will timeout and disconnect.
    keepAlive = setInterval(() => emit('keep-alive'), 60000)
  }

  ws.onclose = () => {
    clearInterval(keepAlive)

    // Auto-reconnect indefinitely
    setTimeout(connect.bind(null, url), Math.min(exponentialBackoff(), MAX_WAIT_TIME))
  }

  // ...

}

We define a few new variables:

  • keepAlive: tracks an interval Id
  • retries: counter for reconnection attempts
  • MAX_WAIT_TIME: The maximum amount of time between reconnect attempts

And also a couple of arrow functions:

  • exponentialBackoff(): throttles reconnecting clients to avoid flooding the server
  • emit(action, data): handles packaging and shipping a { action, data } payload to the websocket server

Upon successfully connecting to the websocket server, we'll reset our retries counter back to 0 and assign keepAlive to a new Interval that will emit up to our websocket server a keep-alive action every 60,000ms (1 minute).

If and when the connection is severed for any reason (the server reboots, the client loses internet connectivity, etc) we'll clear the interval and set a timeout to automatically reconnect utilizing our exponentialBackoff algorithm and MAX_WAIT_TIME.

What's nice about this configuration is that if the client is unsuccessful in re-establishing its connection, the onclose handler will fire, set a new timeout to reconnect, attempt to reconnect, if it can't, call onclose, which will set a new timeout to reconnect... This process will continue until it finally establishes its connection and calls onopen, or the User exits the client application.

NOTE : You could of course pass in an argument for, say, maxReconnectAttempts, and give up after X amount of retries, but in my experience whenever there's a websocket connection involved it needs to be a persistent connection, so we use an indefinite auto-reconnect strategy here. However, we will provide a means to permanently end the connection - requiring a new connection to be made - should that be desired.

Handling Messages

With our client library mostly constructed with auto-reconnect built-in, primarily what's left is handling messages we receive from the websocket server.

Thinking about how we'd like to use this library, we'd like consumers to be able to register and unregister their own handler methods for any given action. To facilitate this, part of our module export will be functions for adding and removing onmessage event handlers.

// ...

let onMessages = []
const addMessage = (action, fn) => {
  const exists = onMessages.find(m => m.action === action && m.fn === fn)
  if (!exists)
    onMessages.push({ action, fn })
}
const delMessage = (fn) => { onMessages = onMessages.filter(m => m.fn !== fn) }

// Helper for wrapping a function in a try/catch block
function tryCatch(fn) {
  try {
    fn()
  } catch (e) {
    console.error(e)
  }
}

// Helper to escape any RegEx special characters in a received 'action' name
const sanitizeRegex = (action) => action.replace(/(\^|\$|\.|\?|\*|\+|\(|\)|\/)/g, '\\$1')

function connect(url) {
  // ...

  ws.onmessage = (packet) => {
    const { action, data } = JSON.parse(packet.data)
    const regex = new RegExp(`^${sanitizeRegex(action)}$`, 'gi')
    const messages = onMessages.filter(m => m.action.match(regex))
    messages.forEach(m => { tryCatch(m.fn.bind(null, data)) })
    if (!messages.length)
      console.warn(`No registered onMessage handlers for action '${action}'`)
  }
}

export {
  connect,
  addMessage,
  delMessage
}

We restructure our library a little bit by moving our export from directly on the connect function to a central named export at the bottom. Along with connect we export two new functions - addMessage and delMessage - for consumers to be able to register and unregister their event handlers:

// example

import { addMessage, delMessage } from './client.js'

function pong (data) { console.log(data) }

addMessage('pong', pong)
// Some time later...
delMessage(pong)

When a message is received from the websocket server we extract out the action name (required) and the data payload (optional), search through our onMessages collection for any event handlers that match the given action, and if we find any, call them, passing along the data payload to each.

This same pattern can be applied to the onopen and onclose events of the websocket connection.

// ...

let onOpen = []
const addOnOpen = (fn) => {
  const exists = onOpen.find(fn)
  if (!exists)
    onOpen.push(fn)
}
const delOnOpen = (fn) => { onOpen = onOpen.filter(o => o !== fn) }

let onClose = []
const addOnClose = (fn) => {
  const exists = onClose.find(fn)
  if (!exists)
    onClose.push(fn)
}
const delOnClose = (fn) => { onClose = onClose.filter(o => o !== fn) }

// ...

function connect(url) {
  // ...

  ws.onopen = () => {
    // ...
    onOpen.forEach(fn => tryCatch(fn))
  }

  ws.onclose = () => {
    // ...
    onClose.forEach(fn => tryCatch(fn))
    // ...
  }

  // ...
}

export {
  connect,
  addOnOpen,
  delOnOpen,
  addOnClose,
  delOnClose,
  addMessage,
  delMessage
}

With these in place a consumer can register event handlers for when the websocket connection opens - perhaps fetching data to make sure everything's up-to-date - and closes down, performing any cleanup operations.

// example

import { addOnOpen, delOnOpen, addOnClose, delOnClose } from './client.js'

async function onOpen() { console.log('Fetching some data') }
function onClose() { console.log('Cleaning things up') }

addOnOpen(onOpen)
addOnClose(onClose)
// Some time later...
delOnOpen(onOpen)
delOnClose(onClose)

Finally we'll add the ability to permanently sever the websocket connection. This can be useful if you need to kill the connection for certain application states. For example, if you only want a live websocket connection when a User is logged in, you can start the connection on login and kill it on logout.

// ...
function connect(url) {
  if (!url) throw new Error('url is a required argument')

  // Ensure a single websocket connection exists
  const existingOpenConnection = ws && ws.readyState === 1
  if (existingOpenConnection)
    disconnect()

  // ...

  ws.onclose = (event) => {
    clearInterval(keepAlive)

    // Auto-reconnect indefinitely
    // "event.target" will be the closing websocket
    if (!event.target.forceClose)
      setTimeout(connect.bind(null, url), Math.min(exponentialBackoff(), MAX_WAIT_TIME))

    onClose.forEach(fn => tryCatch(fn))
  }
  // ...
}
// ...

// Mark the websocket as force closed so it doesn't attempt to reconnect
function disconnect() {
  ws.forceClose = true
  ws.close()
}

export {
  connect,
  disconnect,
  emit,
  addOnOpen,
  delOnOpen,
  addOnClose,
  delOnClose,
  addMessage,
  delMessage
}

By adding a simple forceClose flag to the websocket we can bypass the setTimeout in the onclose handler that would otherwise attempt to reconnect the websocket.

NOTE : Neither in the onclose handler or in the disconnect function are we clearing the onMessages, onOpen, or onClose handler collections. It is the responsiblity of the registering call sites to handle unregistering their registered event handlers when appropriate. For example, given a Vue or React component, during their render or mounted lifecycle hooks they could register websocket event handlers. Subsequently during their unmount or destroy lifecycle hooks could they then (and should) unregister the handlers. But in the event this is forgotten or missed by the developer, we guard against adding duplicate handlers within the library.

Below is the full client library example. I also added a close function so you can manually close the connection and see its auto-reconnect capabilities in action.

If you would like to see a simple example of using this library feel free to clone the companion repo to this blog series. Follow its README and give it a run.

In Part 3 we'll flesh out the websocket server further to manage client connections, as well as make use of the http server capabilities it ships with.

let retries = 0,
  keepAlive,
  ws

const MAX_WAIT_TIME = 4096

/**
 * Algorithm for throttling reconnecting clients
*/
const exponentialBackoff = () => 2 ** retries++ + Math.random(0, 1000)

/**
 * Emit an action to the websocket server with a data payload
 * @param {!string} action - the message label
 * @param {any} data - data payload
*/
const emit = (action, data) => ws.send(JSON.stringify({ action, data }))

/**
 * Functions to run when messages are received from the websocket server
*/
let onMessages = []

/**
 * Register an onMessage function. Guard against duplicates.
 * @param {!string} action - action name
 * @param {!Function} fn - function to execute
 */
const addMessage = (action, fn) => {
  const exists = onMessages.find(m => m.action === action && m.fn === fn)
  if (!exists)
    onMessages.push({ action, fn })
}

/**
 * Unregister onMessage functions
*/
const delMessage = (fn) => { onMessages = onMessages.filter(m => m.fn !== fn) }

/**
 * Functions to run when a connection is established with the websocket server
*/
let onOpen = []

/**
 * Register an onOpen function. Guard against duplicates.
*/
const addOnOpen = (fn) => {
  const exists = onOpen.find(fn)
  if (!exists)
    onOpen.push(fn)
}

/**
 * Unregister an onOpen function
*/
const delOnOpen = (fn) => { onOpen = onOpen.filter(o => o !== fn) }

/**
 * Functions to run when a connection is lost to the websocket server
*/
let onClose = []
/**
 * Register an onClose function
*/
const addOnClose = (fn) => {
  const exists = onClose.find(fn)
  if (!exists)
    onClose.push(fn)
}

/**
 * Unregister an onClose function
*/
const delOnClose = (fn) => { onClose = onClose.filter(o => o !== fn) }

// Helper for wrapping a function in a try/catch block
function tryCatch(fn) {
  try {
    fn()
  } catch (e) {
    console.error(e)
  }
}

// Helper to escape any RegEx special characters in a received 'action' name
const sanitizeRegex = (action) => action.replace(/(\^|\$|\.|\?|\*|\+|\(|\)|\/)/gi, '\\$1')

/**
 * Creates a websocket connection with indefinite auto-reconnection.
 * @param {!string} url - websocket endpoint to connect to.
 */
function connect(url) {
  if (!url) throw new Error('url is a required argument')

  // Ensure a single websocket connection exists
  const existingOpenConnection = ws && ws.readyState === 1
  if (existingOpenConnection)
    disconnect()

  ws = new WebSocket(url)

  ws.binaryType = 'arraybuffer'

  // Called when a new connection is established
  ws.onopen = () => {
    retries = 0

    // Keeps the TCP connection alive. Otherwise it times out every 2 minutes
    keepAlive = setInterval(() => { emit('keep-alive') }, 60000)

    onOpen.forEach(fn => tryCatch(fn))
  }

  // Called when a connection has been closed
  ws.onclose = (event) => {
    clearInterval(keepAlive)

    // Auto-reconnect indefinitely
    // "event.target" will be the closing websocket, whereas "ws" could be a new websocket created in connect(url)
    if (!event.target.forceClose)
      setTimeout(connect.bind(null, url), Math.min(exponentialBackoff(), MAX_WAIT_TIME))

    onClose.forEach(fn => tryCatch(fn))
  }

  // Called when a message is received from the server
  ws.onmessage = (packet) => {
    const { action, data } = JSON.parse(packet.data)
    const regex = new RegExp(`^${sanitizeRegex(action)}$`, 'gi')
    const messages = onMessages.filter(m => m.action.match(regex))
    messages.forEach(m => { tryCatch(m.fn.bind(null, data)) })
    if (!messages.length)
      console.warn(`No registered onMessage handlers for action '${action}'`)
  }
}

/**
 * Manually close the websocket connection to test reconnection strategy
*/
function close () {
  ws.close()
}

/**
 * Force-terminate the websocket connection without auto-reconnecting
 * Will require explicitly calling connect(url) again to re-open
 */
function disconnect() {
  ws.forceClose = true
  ws.close()
}

export {
  connect,
  disconnect,
  close,
  emit,
  addOnOpen,
  delOnOpen,
  addOnClose,
  delOnClose,
  addMessage,
  delMessage
}