off_broadway_websocket

An Off-Broadway producer enabling real-time ingestion of WebSocket data.


Keywords
broadway, elixir, websocket
License
Apache-2.0

Documentation

codecov Hex.pm Version License Documentation Build Status Elixir Version

OffBroadwayWebSocket

An Elixir library providing a Broadway producer for resilient WebSocket connections using gun. Supports unified gun_opts, idle‐timeout detection (ping/pong & data frames), demand‐based dispatch, and custom retry strategies.


Installation

Add to your mix.exs:

def deps do
  [
    {:off_broadway_websocket, "~> 1.0.2"}
  ]
end

Fetch & compile:

mix deps.get
mix deps.compile

Quickstart

defmodule MyApp.Broadway do
  use Broadway
  
  require Logger
  
  alias Broadway.Message
  alias Broadway.NoopAcknowledger

  def start_link(_args) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          OffBroadwayWebSocket.Producer,
          # Your WebSocket endpoint:
          url: "wss://example.com:443",
          path: "/stream/updates",

          # Idle timeout (ms) for no ping/data before reconnect:
          ws_timeout: 15_000,
          # How long to wait (ms) for Gun to come up:
          await_timeout: 8_000,

          # Retry configuration – must include at least :retries_left and :delay:
          ws_retry_opts: %{
            max_retries:     5,
            retries_left:    5,
            delay:           1_000,   # initial backoff (ms)
            max_delay:       30_000,  # cap for backoff (ms)
            backoff_factor:  2,       # exponential factor
            jitter_fraction: 0.1      # ±10% random jitter
          },
          ws_retry_fun: &MyApp.Backoff.exponential_backoff_with_jitter/1,

          # Gun options (TCP/TLS, HTTP, WS):
          gun_opts: %{
            connect_timeout: 5_000,      # TCP/TLS handshake timeout
            protocols:       [:http],     # application protocols
            transport:       :tls,        # :tcp or :tls

            tls_opts: [
              verify:         :verify_peer,
              cacertfile:     CAStore.file_path(),
              depth:          10,
              reuse_sessions: false,
              verify_fun:     {
                &:ssl_verify_hostname.verify_fun/3,
                [check_hostname: String.to_charlist("example.com")]
              }
            ],

            ws_opts: %{
              keepalive:     10_000,  # send ping if silent
              silence_pings: false
            },

            http_opts: %{
              version:       :"HTTP/1.1"
            }
          },

          # Prefix for telemetry events:
          telemetry_id: :custom_telemetry,
          # Optional headers
          headers: [  
            {"X-ABC-APIKEY", "api-key"},
            {"X-ABC-PAYLOAD", %{}},
            {"X-ABC-SIGNATURE", "signature"}
          ],
        },
        transformer: {__MODULE__, :transform, []},
        concurrency: 1
      ],
      processors: [
        default: [min_demand: 0, max_demand: 100, concurrency: 8]
      ],
      context: []
    )
  end

  @impl true
  def handle_message(_stage, %Message{data: raw} = msg, _ctx) do
    case Jason.decode(raw) do
      {:ok, data} ->
        Logger.debug(fn -> "Data: #{inspect(data)}" end)
        msg

      {:error, err} ->
        Logger.error("Decode error: #{inspect(err)}")
        Message.failed(msg, err)
    end
  end

  def transform(event, _opts) do
    %Broadway.Message{
      data:        event,
      acknowledger: NoopAcknowledger.init()
    }
  end
end

Configuration Options

When calling OffBroadwayWebSocket.Producer, you may pass:

  • :url (string, required) — WebSocket base URL.
  • :path (string, required) — Upgrade path and querystring.
  • :ws_timeout (ms, optional) — Idle timeout for no ping/data.
  • :await_timeout (ms, optional) — Timeout for :gun.await_up/2.
  • :headers (list, optional) — HTTP headers for WS upgrade.
  • :min_demand / :max_demand (integer) — Broadway backpressure.
  • :telemetry_id (atom) — Prefix for telemetry events.
  • :gun_opts (map) — All options forwarded to :gun.open/3 and friends.
  • :ws_retry_opts (map) — Your initial retry state; must include:
    • :retries_left, :delay (ms).
    • Extra keys (e.g. :backoff_factor, :jitter_fraction) are carried through.
  • :ws_retry_fun (function) — A (retry_opts() -> retry_opts()) function. After each failed connect, the returned map’s :delay is used and stored as the next call’s input. After successful reconnection, :ws_retry_opts are reset to initial value.

Default Configuration

Out of the box, OffBroadwayWebSocket.Producer uses these defaults:

Option Default Description
:url WebSocket URL (required)
:path WebSocket path (required)
:ws_timeout nil Idle timeout (ms) for ping/data
:await_timeout 10_000 gun.await_up/2 timeout (ms)
:headers [] Upgrade HTTP headers
:min_demand 10 Broadway min_demand
:max_demand 100 Broadway max_demand
:telemetry_id :websocket_producer Prefix for telemetry events
:gun_opts %{} Direct options to :gun.open/3, etc.
:ws_retry_opts see Default ws_retry_opts Initial retry state
:ws_retry_fun &OffBroadwayWebSocket.State.default_ws_retry_fun/1 Backoff function contract

Default Backoff Function

By default, a constant backoff function is used with the config shown below:

%{
  max_retries:  5,     # total retry attempts
  retries_left: 5,     # decremented on each failure
  delay:        10_000 # constant delay in ms between retries
}

Telemetry Events

Fired under [:<telemetry_id>, :connection, <event>]:

Event Measurements Metadata Description
:success %{count: 1} %{url: String} Handshake completed
:failure %{count: 1} %{reason: term} Connect or upgrade failed
:disconnected %{count: 1} %{reason: term} Underlying TCP connection dropped
:timeout %{count: 1} %{} Idle ping/data timeout
:status `%{value: 0 1}` %{}

Attach as usual:

:telemetry.attach(
  "log-connection-success",
  [:websocket_producer, :connection, :success],
  fn event_name, measurements, metadata, _config ->
    IO.inspect({event_name, measurements, metadata}, label: "Telemetry Event")
  end,
  nil
)

Running Tests

mix test

Dialyzer

mix dialyzer --plt
mix dialyzer

Contributing

PRs and issues welcome! Please follow Elixir conventions and include tests.


License

Apache License 2.0 © 2025