Files
sse-session/README.md
2026-05-24 15:40:21 +02:00

12 KiB

Server-Sent Events (SSE)

A fetch-based SSE client for browser and Node.js. It supports custom headers, POST bodies, exponential-backoff connect retries, automatic reconnect, optional tab-visibility handling, and Last-Event-ID resume semantics.

Published from @xo-cash/utils:

import {
  SSESession,
  SSEEventParser,
  AsyncPushIterator,
  type SSEvent,
  type SSESessionOptions,
} from "@xo-cash/utils";

Module layout

sse-session/
├── sse-session.ts        # SSESession — main client
├── sse-event-parser.ts   # Incremental SSE frame parser
├── async-push-iterator.ts # Push-based async iterable for messages
├── types.ts              # Options, events, and SSEvent
└── index.ts              # Public exports

SSESession is built on three internal utilities from the same package:

Utility Role
EventEmitter Typed pub/sub for session lifecycle and message events
ExponentialBackoff Retries the initial HTTP connection until it succeeds
tryAsync Runs callbacks safely without letting handler throws break the session

Quick start

const session = await SSESession.create("https://api.example.com/events");

session.on("message", (event) => {
  console.log(event.data);
});

for await (const event of session.messages) {
  handle(event);
}

When finished:

await session.disconnect();

How it works

Each SSESession holds one HTTP streaming connection at a time. Bytes from the response body flow through SSEEventParser, which turns line-oriented SSE frames into SSEvent objects. Parsed events are delivered two ways:

  1. Events — listen with session.on("message", …) (and other lifecycle events).
  2. Messages — consume with for await (const event of session.messages).

connect() resolves once the HTTP stream is open. Reading continues in the background; you do not need to await the full stream lifetime.


SSESession

Factory methods

Method Description
SSESession.create(url, options?) Creates a session and connects immediately.
SSESession.withBrowserVisibility(url, options?) Same as create, plus tab visibility handling. Defers the first connect when the document is hidden.
SSESession.addBrowserVisibilityHandler(session) Attaches visibility handling to an existing session.
SSESession.addLastEventIdReconnect(session) Tracks the latest event id and sends Last-Event-ID on reconnect.

Instance methods

Method Description
connect() Opens or reopens the transport.
abort() Stops the in-flight fetch without ending the session. Used for tab visibility.
disconnect() Closes the session, closes messages, and emits "closed".

Properties

Property Description
messages AsyncPushIterator<SSEvent> for for await…of consumption.
onRequest Getter/setter for the pre-fetch hook (auth headers, Last-Event-ID, etc.).

Lifecycle

connect()  ──►  stream open  ──►  "connected"
                    │
                    ▼
              read & parse events  ──►  "message" + messages.push()
                    │
        ┌───────────┼───────────┐
        ▼           ▼           ▼
     abort()   server done   read error
        │           │           │
        ▼           ▼           ▼
 "disconnected"  "disconnected"  "disconnected"
 (messages open)     │              │
                     │         "error" (unless aborted)
                     │              │
              persistent?     attemptReconnect?
                     │              │
                     └──── connect() again
                                   │
 disconnect()  ──►  close messages  ──►  "closed"

connect

  • Resets the event parser and reopens messages if it was previously closed.
  • Retries the fetch via ExponentialBackoff (unlimited attempts by default).
  • Emits "connected" once the stream is established.
  • Starts background reading; the returned promise does not wait for the stream to end.

abort

  • Aborts the active fetch and invalidates the current read loop.
  • Emits "disconnected" but not "closed".
  • Keeps messages open so an existing for await consumer resumes after reconnect.
  • Resets the parser so partial frames from the abandoned transport are discarded.

disconnect

  • Closes messages and emits "closed".
  • Aborts any active transport.
  • Detaches browser visibility handling until the next manual connect().

Automatic reconnect

Two independent flags control reconnect behaviour:

Option Triggers reconnect when…
persistent: true The server closes the stream normally.
attemptReconnect: true A transport error occurs (not an intentional abort()).

Both default to true and false respectively.

Connection supersession

Each connect() or abort() bumps an internal connectionId. Background read loops capture their id at start and exit quietly when superseded, so stale transports never emit duplicate events or spurious errors.


Events

Event Payload When
"connected" HTTP stream established.
"message" SSEvent A complete SSE frame was parsed.
"disconnected" Active transport ended (including abort()).
"error" Error Fetch or read failure (not intentional abort).
"closed" disconnect() was called.

Listen with the standard EventEmitter API:

const off = session.on("message", (event) => {  });
off(); // unsubscribe

session.once("connected", () => {  });

Messages iterator

session.messages is an AsyncPushIterator<SSEvent> — a push-based queue you consume with async iteration:

for await (const event of session.messages) {
  process(event);
}

When messages stays open

  • abort() (tab hidden)
  • Automatic reconnect (persistent / attemptReconnect)

When messages closes

  • disconnect()
  • Server ends the stream and persistent is false
  • Transport error with attemptReconnect: false

After a terminal close, call connect() again to open a fresh iterator. Read from session.messages directly rather than caching a reference across disconnects.


Configuration

Pass a partial SSESessionOptions to create() or withBrowserVisibility():

const session = await SSESession.create("/events", {
  method: "POST",
  headers: { Authorization: "Bearer …" },
  body: JSON.stringify({ filter: "all" }),

  onRequest: async (request) => {
    // Mutate headers before each connect/reconnect
    return request;
  },

  onConnected: () => console.log("connected"),
  onDisconnected: () => console.log("disconnected"),
  onError: (error) => console.error(error),

  attemptReconnect: true,
  persistent: false,

  // Custom fetch (LibP2P, test doubles, etc.)
  fetch: myFetch,

  // Custom retry policy for the initial connection
  retry: ExponentialBackoff.from({ baseDelay: 500, maxAttempts: 5 }),

  // Custom parser (must implement parseEvents + reset)
  eventParser: new SSEEventParser(),
});

Callbacks

Callback Purpose
onRequest Transform RequestInit before each fetch (auth, Last-Event-ID).
onConnected Stream is open; reading is about to begin.
onDisconnected Transport ended (including abort).
onError Unexpected failure. Not called for intentional aborts.

Default connect retry

The default retry is an ExponentialBackoff with unlimited attempts (maxAttempts: 0), baseDelay: 1000, maxDelay: 10000, growthRate: 1.3, and jitter: 0.3. This means SSESession.create() blocks until the first connection succeeds or the caller aborts/disconnects.


Browser tab visibility

For browser clients that should pause SSE while a tab is in the background:

const session = await SSESession.withBrowserVisibility("/events");

Behaviour:

  • Tab hiddenabort() stops the fetch; messages stays open.
  • Tab visibleconnect() re-establishes the stream.
  • disconnect() → removes the visibility listener until the next manual connect().

In Node.js (no document), visibility handling is a no-op.


Last-Event-ID resume

To follow SSE resume semantics:

const session = await SSESession.create("/events");
await SSESession.addLastEventIdReconnect(session);

This tracks the most recent event.id from incoming messages and adds a Last-Event-ID header on every subsequent connect/reconnect. The existing onRequest callback is preserved and runs afterward.

The header is omitted until at least one event with an id field has been received.


SSEvent

Parsed events match the SSE wire format:

interface SSEvent {
  data: string;       // required — event payload
  event?: string;   // event type (default: "message")
  id?: string;      // last event id for resume
  retry?: number;   // server-suggested reconnect delay (ms)
}

Multi-line data: fields are joined with \n. Trailing newlines on the payload are trimmed.


SSEEventParser

Incremental parser for raw stream bytes. Used internally by SSESession but exported for testing or custom integrations.

const parser = new SSEEventParser();

for (const chunk of chunks) {
  for (const event of parser.parseEvents(chunk)) {
    console.log(event);
  }
}

// Clear partial state when abandoning a stream
parser.reset();
  • Accepts arbitrary chunk boundaries; incomplete frames stay buffered.
  • Handles \r\n, \r, and \n line endings.
  • Supports data, event, id, and retry fields.

AsyncPushIterator

A push-based async iterable: producers call push(), consumers use for await…of.

const stream = new AsyncPushIterator<SSEvent>();

stream.push({ data: "hello" });
stream.close(); // end iteration

Used by SSESession.messages to bridge callback-driven stream reading with pull-based async consumers.


Supporting utilities

EventEmitter

Lightweight typed event emitter used as the base class for SSESession. Provides on, once, off, and emit with optional debouncing.

ExponentialBackoff

Retries a function with increasing delays and jitter. Used for the initial HTTP connection in SSESession.connect().

const backoff = ExponentialBackoff.from({
  baseDelay: 1000,
  maxDelay: 10000,
  maxAttempts: 0, // 0 = unlimited
});

await backoff.run(() => fetch(url));

tryAsync

Executes an async function and routes failures to an optional error handler without rethrowing. Used internally when invoking user callbacks (onConnected, onDisconnected, onError) so a throwing handler does not crash the session.


Custom fetch adapters

fetch is typed as (url: string, options: RequestInit) => Promise<Response> rather than the native fetch signature. This allows non-standard URL formats (for example LibP2P paths):

const session = await SSESession.create("libp2p://peer/events", {
  fetch: async (url, options) => libp2pFetch(url, options),
});

Notes

  • Use create() or withBrowserVisibility() to construct sessions; the constructor is private.
  • connect() is idempotent while already connected.
  • Intentional aborts (tab visibility) do not emit "error" and do not trigger attemptReconnect.
  • Each session instance owns its own SSEEventParser and ExponentialBackoff — instances do not share parser buffers.