An Elixir library providing a Broadway producer for resilient WebSocket connections using gun. Supports unified gun_opts
, idle‐timeout detection (ping/pong & data frames), demand‐based dispatch, and custom retry strategies.
Add to your mix.exs
:
def deps do
[
{:off_broadway_websocket, "~> 1.0.2"}
]
end
Fetch & compile:
mix deps.get
mix deps.compile
defmodule MyApp.Broadway do
use Broadway
require Logger
alias Broadway.Message
alias Broadway.NoopAcknowledger
def start_link(_args) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
OffBroadwayWebSocket.Producer,
# Your WebSocket endpoint:
url: "wss://example.com:443",
path: "/stream/updates",
# Idle timeout (ms) for no ping/data before reconnect:
ws_timeout: 15_000,
# How long to wait (ms) for Gun to come up:
await_timeout: 8_000,
# Retry configuration – must include at least :retries_left and :delay:
ws_retry_opts: %{
max_retries: 5,
retries_left: 5,
delay: 1_000, # initial backoff (ms)
max_delay: 30_000, # cap for backoff (ms)
backoff_factor: 2, # exponential factor
jitter_fraction: 0.1 # ±10% random jitter
},
ws_retry_fun: &MyApp.Backoff.exponential_backoff_with_jitter/1,
# Gun options (TCP/TLS, HTTP, WS):
gun_opts: %{
connect_timeout: 5_000, # TCP/TLS handshake timeout
protocols: [:http], # application protocols
transport: :tls, # :tcp or :tls
tls_opts: [
verify: :verify_peer,
cacertfile: CAStore.file_path(),
depth: 10,
reuse_sessions: false,
verify_fun: {
&:ssl_verify_hostname.verify_fun/3,
[check_hostname: String.to_charlist("example.com")]
}
],
ws_opts: %{
keepalive: 10_000, # send ping if silent
silence_pings: false
},
http_opts: %{
version: :"HTTP/1.1"
}
},
# Prefix for telemetry events:
telemetry_id: :custom_telemetry,
# Optional headers
headers: [
{"X-ABC-APIKEY", "api-key"},
{"X-ABC-PAYLOAD", %{}},
{"X-ABC-SIGNATURE", "signature"}
],
},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [
default: [min_demand: 0, max_demand: 100, concurrency: 8]
],
context: []
)
end
@impl true
def handle_message(_stage, %Message{data: raw} = msg, _ctx) do
case Jason.decode(raw) do
{:ok, data} ->
Logger.debug(fn -> "Data: #{inspect(data)}" end)
msg
{:error, err} ->
Logger.error("Decode error: #{inspect(err)}")
Message.failed(msg, err)
end
end
def transform(event, _opts) do
%Broadway.Message{
data: event,
acknowledger: NoopAcknowledger.init()
}
end
end
When calling OffBroadwayWebSocket.Producer
, you may pass:
-
:url
(string, required) — WebSocket base URL. -
:path
(string, required) — Upgrade path and querystring. -
:ws_timeout
(ms, optional) — Idle timeout for no ping/data. -
:await_timeout
(ms, optional) — Timeout for:gun.await_up/2
. -
:headers
(list, optional) — HTTP headers for WS upgrade. -
:min_demand
/:max_demand
(integer) — Broadway backpressure. -
:telemetry_id
(atom) — Prefix for telemetry events. -
:gun_opts
(map) — All options forwarded to:gun.open/3
and friends. -
:ws_retry_opts
(map) — Your initial retry state; must include:-
:retries_left
,:delay
(ms). - Extra keys (e.g.
:backoff_factor
,:jitter_fraction
) are carried through.
-
-
:ws_retry_fun
(function) — A(retry_opts() -> retry_opts())
function. After each failed connect, the returned map’s:delay
is used and stored as the next call’s input. After successful reconnection,:ws_retry_opts
are reset to initial value.
Out of the box, OffBroadwayWebSocket.Producer
uses these defaults:
Option | Default | Description |
---|---|---|
:url |
— | WebSocket URL (required) |
:path |
— | WebSocket path (required) |
:ws_timeout |
nil |
Idle timeout (ms) for ping/data |
:await_timeout |
10_000 |
gun.await_up/2 timeout (ms) |
:headers |
[] |
Upgrade HTTP headers |
:min_demand |
10 |
Broadway min_demand
|
:max_demand |
100 |
Broadway max_demand
|
:telemetry_id |
:websocket_producer |
Prefix for telemetry events |
:gun_opts |
%{} |
Direct options to :gun.open/3 , etc. |
:ws_retry_opts |
see Default ws_retry_opts
|
Initial retry state |
:ws_retry_fun |
&OffBroadwayWebSocket.State.default_ws_retry_fun/1 |
Backoff function contract |
By default, a constant backoff function is used with the config shown below:
%{
max_retries: 5, # total retry attempts
retries_left: 5, # decremented on each failure
delay: 10_000 # constant delay in ms between retries
}
Fired under [:<telemetry_id>, :connection, <event>]
:
Event | Measurements | Metadata | Description |
---|---|---|---|
:success |
%{count: 1} |
%{url: String} |
Handshake completed |
:failure |
%{count: 1} |
%{reason: term} |
Connect or upgrade failed |
:disconnected |
%{count: 1} |
%{reason: term} |
Underlying TCP connection dropped |
:timeout |
%{count: 1} |
%{} |
Idle ping/data timeout |
:status |
`%{value: 0 | 1}` | %{} |
Attach as usual:
:telemetry.attach(
"log-connection-success",
[:websocket_producer, :connection, :success],
fn event_name, measurements, metadata, _config ->
IO.inspect({event_name, measurements, metadata}, label: "Telemetry Event")
end,
nil
)
mix test
mix dialyzer --plt
mix dialyzer
PRs and issues welcome! Please follow Elixir conventions and include tests.
Apache License 2.0 © 2025