# Server-Sent Events (SSE) A fetch-based SSE client for browser and Node.js. It supports custom headers, POST bodies, exponential-backoff connect retries, automatic reconnect, optional tab-visibility handling, and `Last-Event-ID` resume semantics. Published from `@xo-cash/utils`: ```ts import { SSESession, SSEEventParser, AsyncPushIterator, type SSEvent, type SSESessionOptions, } from "@xo-cash/utils"; ``` ## Module layout ``` sse-session/ ├── sse-session.ts # SSESession — main client ├── sse-event-parser.ts # Incremental SSE frame parser ├── async-push-iterator.ts # Push-based async iterable for messages ├── types.ts # Options, events, and SSEvent └── index.ts # Public exports ``` `SSESession` is built on three internal utilities from the same package: | Utility | Role | |---------|------| | `EventEmitter` | Typed pub/sub for session lifecycle and message events | | `ExponentialBackoff` | Retries the initial HTTP connection until it succeeds | | `tryAsync` | Runs callbacks safely without letting handler throws break the session | --- ## Quick start ```ts const session = await SSESession.create("https://api.example.com/events"); session.on("message", (event) => { console.log(event.data); }); for await (const event of session.messages) { handle(event); } ``` When finished: ```ts await session.disconnect(); ``` --- ## How it works Each `SSESession` holds one HTTP streaming connection at a time. Bytes from the response body flow through `SSEEventParser`, which turns line-oriented SSE frames into `SSEvent` objects. Parsed events are delivered two ways: 1. **Events** — listen with `session.on("message", …)` (and other lifecycle events). 2. **Messages** — consume with `for await (const event of session.messages)`. `connect()` resolves once the HTTP stream is open. Reading continues in the background; you do not need to await the full stream lifetime. --- ## SSESession ### Factory methods | Method | Description | |--------|-------------| | `SSESession.create(url, options?)` | Creates a session and connects immediately. | | `SSESession.withBrowserVisibility(url, options?)` | Same as `create`, plus tab visibility handling. Defers the first connect when the document is hidden. | | `SSESession.addBrowserVisibilityHandler(session)` | Attaches visibility handling to an existing session. | | `SSESession.addLastEventIdReconnect(session)` | Tracks the latest event `id` and sends `Last-Event-ID` on reconnect. | ### Instance methods | Method | Description | |--------|-------------| | `connect()` | Opens or reopens the transport. | | `abort()` | Stops the in-flight fetch without ending the session. Used for tab visibility. | | `disconnect()` | Closes the session, closes `messages`, and emits `"closed"`. | ### Properties | Property | Description | |----------|-------------| | `messages` | `AsyncPushIterator` for `for await…of` consumption. | | `onRequest` | Getter/setter for the pre-fetch hook (auth headers, `Last-Event-ID`, etc.). | --- ## Lifecycle ``` connect() ──► stream open ──► "connected" │ ▼ read & parse events ──► "message" + messages.push() │ ┌───────────┼───────────┐ ▼ ▼ ▼ abort() server done read error │ │ │ ▼ ▼ ▼ "disconnected" "disconnected" "disconnected" (messages open) │ │ │ "error" (unless aborted) │ │ persistent? attemptReconnect? │ │ └──── connect() again │ disconnect() ──► close messages ──► "closed" ``` ### `connect` - Resets the event parser and reopens `messages` if it was previously closed. - Retries the fetch via `ExponentialBackoff` (unlimited attempts by default). - Emits `"connected"` once the stream is established. - Starts background reading; the returned promise does not wait for the stream to end. ### `abort` - Aborts the active fetch and invalidates the current read loop. - Emits `"disconnected"` but **not** `"closed"`. - Keeps `messages` open so an existing `for await` consumer resumes after reconnect. - Resets the parser so partial frames from the abandoned transport are discarded. ### `disconnect` - Closes `messages` and emits `"closed"`. - Aborts any active transport. - Detaches browser visibility handling until the next manual `connect()`. ### Automatic reconnect Two independent flags control reconnect behaviour: | Option | Triggers reconnect when… | |--------|--------------------------| | `persistent: true` | The **server** closes the stream normally. | | `attemptReconnect: true` | A **transport error** occurs (not an intentional `abort()`). | Both default to `true` and `false` respectively. ### Connection supersession Each `connect()` or `abort()` bumps an internal `connectionId`. Background read loops capture their id at start and exit quietly when superseded, so stale transports never emit duplicate events or spurious errors. --- ## Events | Event | Payload | When | |-------|---------|------| | `"connected"` | — | HTTP stream established. | | `"message"` | `SSEvent` | A complete SSE frame was parsed. | | `"disconnected"` | — | Active transport ended (including `abort()`). | | `"error"` | `Error` | Fetch or read failure (not intentional abort). | | `"closed"` | — | `disconnect()` was called. | Listen with the standard `EventEmitter` API: ```ts const off = session.on("message", (event) => { … }); off(); // unsubscribe session.once("connected", () => { … }); ``` --- ## Messages iterator `session.messages` is an `AsyncPushIterator` — a push-based queue you consume with async iteration: ```ts for await (const event of session.messages) { process(event); } ``` ### When `messages` stays open - `abort()` (tab hidden) - Automatic reconnect (`persistent` / `attemptReconnect`) ### When `messages` closes - `disconnect()` - Server ends the stream and `persistent` is `false` - Transport error with `attemptReconnect: false` After a terminal close, call `connect()` again to open a fresh iterator. Read from `session.messages` directly rather than caching a reference across disconnects. --- ## Configuration Pass a partial `SSESessionOptions` to `create()` or `withBrowserVisibility()`: ```ts const session = await SSESession.create("/events", { method: "POST", headers: { Authorization: "Bearer …" }, body: JSON.stringify({ filter: "all" }), onRequest: async (request) => { // Mutate headers before each connect/reconnect return request; }, onConnected: () => console.log("connected"), onDisconnected: () => console.log("disconnected"), onError: (error) => console.error(error), attemptReconnect: true, persistent: false, // Custom fetch (LibP2P, test doubles, etc.) fetch: myFetch, // Custom retry policy for the initial connection retry: ExponentialBackoff.from({ baseDelay: 500, maxAttempts: 5 }), // Custom parser (must implement parseEvents + reset) eventParser: new SSEEventParser(), }); ``` ### Callbacks | Callback | Purpose | |----------|---------| | `onRequest` | Transform `RequestInit` before each fetch (auth, `Last-Event-ID`). | | `onConnected` | Stream is open; reading is about to begin. | | `onDisconnected` | Transport ended (including `abort`). | | `onError` | Unexpected failure. Not called for intentional aborts. | ### Default connect retry The default `retry` is an `ExponentialBackoff` with unlimited attempts (`maxAttempts: 0`), `baseDelay: 1000`, `maxDelay: 10000`, `growthRate: 1.3`, and `jitter: 0.3`. This means `SSESession.create()` blocks until the first connection succeeds or the caller aborts/disconnects. --- ## Browser tab visibility For browser clients that should pause SSE while a tab is in the background: ```ts const session = await SSESession.withBrowserVisibility("/events"); ``` Behaviour: - **Tab hidden** → `abort()` stops the fetch; `messages` stays open. - **Tab visible** → `connect()` re-establishes the stream. - **`disconnect()`** → removes the visibility listener until the next manual `connect()`. In Node.js (no `document`), visibility handling is a no-op. --- ## Last-Event-ID resume To follow SSE resume semantics: ```ts const session = await SSESession.create("/events"); await SSESession.addLastEventIdReconnect(session); ``` This tracks the most recent `event.id` from incoming messages and adds a `Last-Event-ID` header on every subsequent connect/reconnect. The existing `onRequest` callback is preserved and runs afterward. The header is omitted until at least one event with an `id` field has been received. --- ## SSEvent Parsed events match the SSE wire format: ```ts interface SSEvent { data: string; // required — event payload event?: string; // event type (default: "message") id?: string; // last event id for resume retry?: number; // server-suggested reconnect delay (ms) } ``` Multi-line `data:` fields are joined with `\n`. Trailing newlines on the payload are trimmed. --- ## SSEEventParser Incremental parser for raw stream bytes. Used internally by `SSESession` but exported for testing or custom integrations. ```ts const parser = new SSEEventParser(); for (const chunk of chunks) { for (const event of parser.parseEvents(chunk)) { console.log(event); } } // Clear partial state when abandoning a stream parser.reset(); ``` - Accepts arbitrary chunk boundaries; incomplete frames stay buffered. - Handles `\r\n`, `\r`, and `\n` line endings. - Supports `data`, `event`, `id`, and `retry` fields. --- ## AsyncPushIterator A push-based async iterable: producers call `push()`, consumers use `for await…of`. ```ts const stream = new AsyncPushIterator(); stream.push({ data: "hello" }); stream.close(); // end iteration ``` Used by `SSESession.messages` to bridge callback-driven stream reading with pull-based async consumers. --- ## Supporting utilities ### EventEmitter Lightweight typed event emitter used as the base class for `SSESession`. Provides `on`, `once`, `off`, and `emit` with optional debouncing. ### ExponentialBackoff Retries a function with increasing delays and jitter. Used for the initial HTTP connection in `SSESession.connect()`. ```ts const backoff = ExponentialBackoff.from({ baseDelay: 1000, maxDelay: 10000, maxAttempts: 0, // 0 = unlimited }); await backoff.run(() => fetch(url)); ``` ### tryAsync Executes an async function and routes failures to an optional error handler without rethrowing. Used internally when invoking user callbacks (`onConnected`, `onDisconnected`, `onError`) so a throwing handler does not crash the session. --- ## Custom fetch adapters `fetch` is typed as `(url: string, options: RequestInit) => Promise` rather than the native `fetch` signature. This allows non-standard URL formats (for example LibP2P paths): ```ts const session = await SSESession.create("libp2p://peer/events", { fetch: async (url, options) => libp2pFetch(url, options), }); ``` --- ## Notes - Use `create()` or `withBrowserVisibility()` to construct sessions; the constructor is private. - `connect()` is idempotent while already connected. - Intentional aborts (tab visibility) do not emit `"error"` and do not trigger `attemptReconnect`. - Each session instance owns its own `SSEEventParser` and `ExponentialBackoff` — instances do not share parser buffers.