off_broadway_websocket

An Off-Broadway producer enabling real-time ingestion of WebSocket data.


Keywords
broadway, elixir, websocket
License
Apache-2.0

Documentation

OffBroadwayWebSocket

An Elixir library that provides a Broadway producer for handling WebSocket connections using the gun library. It supports ping/pong timeout monitoring, and demand-based message dispatching in an Off-Broadway setup.

Features

  • Manages WebSocket connections.
  • Monitors WebSocket connections with ping/pong messages and triggers timeouts.
  • Integrates seamlessly with Broadway for demand-driven message processing.

Installation

Add off_broadway_websocket to your list of dependencies in mix.exs:

def deps do
  [
    {:off_broadway_websocket, "~> 0.0.7"}
  ]
end

Run the following to fetch and compile the dependency:

mix deps.get
mix deps.compile

Usage

Basic Setup with Broadway

In your project, create a Broadway module to use the OffBroadwayWebSocket.Producer as the producer.

defmodule MyApp.Broadway do
  use Broadway
  require Logger

  alias Broadway.Message

  def start_link(_args) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          OffBroadwayWebSocket.Producer,
          url: "wss://example.com",
          path: "/path_to_ws_endpoint",
          ws_timeout: 15_000,
          ws_opts: %{keepalive: 10_000, silence_pings: false},
          http_opts: %{version: :"HTTP/1.1"},
          telemetry_id: :custom_telemetry_id,
          headers: [  # Optional headers
            {"X-ABC-APIKEY", "api-key"},
            {"X-ABC-PAYLOAD", %{}},
            {"X-ABC-SIGNATURE", "signature"}
          ],
        },
        transformer: {__MODULE__, :transform, []},
        concurrency: 1
      ],
      processors: [
        default: [min_demand: 0, max_demand: 100, concurrency: 8]
      ]
    )
  end

  @impl true
  def handle_message(_processor, %Message{data: raw_message} = message, _context) do
    case Jason.decode(raw_message) do
      {:ok, %{"type" => "heartbeat"}} ->
        Logger.debug("Heartbeat message received.")
        message

      {:ok, data} ->
        Logger.info("Data received: #{inspect(data)}")
        message

      {:error, error} ->
        Logger.error("Failed to decode message: #{inspect(error)}")
        message
    end
  end

  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(:ack_id, _successful, _failed) do
    :ok
  end
end

Configuration Options

  • url: The WebSocket URL.
  • path: The WebSocket endpoint path.
  • ws_timeout: Time in milliseconds to wait for a pong response before assuming the connection is lost.
  • ws_opts: WebSocket-specific options passed to the gun 2.1 library, such as keepalive and silence_pings.
  • http_opts: HTTP-specific options also compatible with gun 2.1, including version or custom headers.
  • headers: Optional headers to use when upgrading to WebSocket.
  • telemetry_id: Optional custom identifier for telemetry events. Defaults to :websocket_producer. Complete list of options accepted by http_opts and ws_opts is available here.

Telemetry Events

OffBroadwayWebSocket emits telemetry events for key WebSocket operations. These events can be used for monitoring and integration with tools like Prometheus, Datadog, or other observability platforms.

Event Table

Event Name Measurements Metadata Description
[:websocket_producer, :connection, :success] count: 1 url: String Emitted when a connection is successfully established.
[:websocket_producer, :connection, :failure] count: 1 reason: term() Emitted when a connection attempt fails.
[:websocket_producer, :connection, :disconnected] count: 1 reason: term() Emitted when the WebSocket connection is disconnected.
[:websocket_producer, :connection, :timeout] count: 1 (none) Emitted when a ping/pong timeout occurs.
[:websocket_producer, :connection, :status] value: [0,1] (none) Emitted to indicate the current WebSocket connection status (0 = down, 1 = up).

Example Usage

You can attach custom handlers to these telemetry events for logging or monitoring purposes. Here's an example:

:telemetry.attach(
  "log-connection-success",
  [:websocket_producer, :connection, :success],
  fn event_name, measurements, metadata, _config ->
    IO.inspect({event_name, measurements, metadata}, label: "Telemetry Event")
  end,
  nil
)

This allows you to customize behavior or integrate the events into observability tools.

Running Tests

To run tests:

mix test

Ensure your WebSocket endpoint is reachable and configured properly for end-to-end tests.

Running Dialyzer

For static analysis with Dialyzer, make sure PLTs are built:

mix dialyzer --plt
mix dialyzer

Contributing

Feel free to open issues or submit PRs to enhance the functionality of OffBroadwayWebSocket. Contributions are welcome!

License

This project is licensed under the Apache License, Version 2.0.