From ca933de6cecfc0474f23984eb6d9308d428c7c10 Mon Sep 17 00:00:00 2001 From: Harvey Zuccon Date: Sun, 24 May 2026 15:40:21 +0200 Subject: [PATCH] Add Readme --- README.md | 385 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 385 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..45f5552 --- /dev/null +++ b/README.md @@ -0,0 +1,385 @@ +# Server-Sent Events (SSE) + +A fetch-based SSE client for browser and Node.js. It supports custom headers, POST bodies, exponential-backoff connect retries, automatic reconnect, optional tab-visibility handling, and `Last-Event-ID` resume semantics. + +Published from `@xo-cash/utils`: + +```ts +import { + SSESession, + SSEEventParser, + AsyncPushIterator, + type SSEvent, + type SSESessionOptions, +} from "@xo-cash/utils"; +``` + +## Module layout + +``` +sse-session/ +├── sse-session.ts # SSESession — main client +├── sse-event-parser.ts # Incremental SSE frame parser +├── async-push-iterator.ts # Push-based async iterable for messages +├── types.ts # Options, events, and SSEvent +└── index.ts # Public exports +``` + +`SSESession` is built on three internal utilities from the same package: + +| Utility | Role | +|---------|------| +| `EventEmitter` | Typed pub/sub for session lifecycle and message events | +| `ExponentialBackoff` | Retries the initial HTTP connection until it succeeds | +| `tryAsync` | Runs callbacks safely without letting handler throws break the session | + +--- + +## Quick start + +```ts +const session = await SSESession.create("https://api.example.com/events"); + +session.on("message", (event) => { + console.log(event.data); +}); + +for await (const event of session.messages) { + handle(event); +} +``` + +When finished: + +```ts +await session.disconnect(); +``` + +--- + +## How it works + +Each `SSESession` holds one HTTP streaming connection at a time. Bytes from the response body flow through `SSEEventParser`, which turns line-oriented SSE frames into `SSEvent` objects. Parsed events are delivered two ways: + +1. **Events** — listen with `session.on("message", …)` (and other lifecycle events). +2. **Messages** — consume with `for await (const event of session.messages)`. + +`connect()` resolves once the HTTP stream is open. Reading continues in the background; you do not need to await the full stream lifetime. + +--- + +## SSESession + +### Factory methods + +| Method | Description | +|--------|-------------| +| `SSESession.create(url, options?)` | Creates a session and connects immediately. | +| `SSESession.withBrowserVisibility(url, options?)` | Same as `create`, plus tab visibility handling. Defers the first connect when the document is hidden. | +| `SSESession.addBrowserVisibilityHandler(session)` | Attaches visibility handling to an existing session. | +| `SSESession.addLastEventIdReconnect(session)` | Tracks the latest event `id` and sends `Last-Event-ID` on reconnect. | + +### Instance methods + +| Method | Description | +|--------|-------------| +| `connect()` | Opens or reopens the transport. | +| `abort()` | Stops the in-flight fetch without ending the session. Used for tab visibility. | +| `disconnect()` | Closes the session, closes `messages`, and emits `"closed"`. | + +### Properties + +| Property | Description | +|----------|-------------| +| `messages` | `AsyncPushIterator` for `for await…of` consumption. | +| `onRequest` | Getter/setter for the pre-fetch hook (auth headers, `Last-Event-ID`, etc.). | + +--- + +## Lifecycle + +``` +connect() ──► stream open ──► "connected" + │ + ▼ + read & parse events ──► "message" + messages.push() + │ + ┌───────────┼───────────┐ + ▼ ▼ ▼ + abort() server done read error + │ │ │ + ▼ ▼ ▼ + "disconnected" "disconnected" "disconnected" + (messages open) │ │ + │ "error" (unless aborted) + │ │ + persistent? attemptReconnect? + │ │ + └──── connect() again + │ + disconnect() ──► close messages ──► "closed" +``` + +### `connect` + +- Resets the event parser and reopens `messages` if it was previously closed. +- Retries the fetch via `ExponentialBackoff` (unlimited attempts by default). +- Emits `"connected"` once the stream is established. +- Starts background reading; the returned promise does not wait for the stream to end. + +### `abort` + +- Aborts the active fetch and invalidates the current read loop. +- Emits `"disconnected"` but **not** `"closed"`. +- Keeps `messages` open so an existing `for await` consumer resumes after reconnect. +- Resets the parser so partial frames from the abandoned transport are discarded. + +### `disconnect` + +- Closes `messages` and emits `"closed"`. +- Aborts any active transport. +- Detaches browser visibility handling until the next manual `connect()`. + +### Automatic reconnect + +Two independent flags control reconnect behaviour: + +| Option | Triggers reconnect when… | +|--------|--------------------------| +| `persistent: true` | The **server** closes the stream normally. | +| `attemptReconnect: true` | A **transport error** occurs (not an intentional `abort()`). | + +Both default to `true` and `false` respectively. + +### Connection supersession + +Each `connect()` or `abort()` bumps an internal `connectionId`. Background read loops capture their id at start and exit quietly when superseded, so stale transports never emit duplicate events or spurious errors. + +--- + +## Events + +| Event | Payload | When | +|-------|---------|------| +| `"connected"` | — | HTTP stream established. | +| `"message"` | `SSEvent` | A complete SSE frame was parsed. | +| `"disconnected"` | — | Active transport ended (including `abort()`). | +| `"error"` | `Error` | Fetch or read failure (not intentional abort). | +| `"closed"` | — | `disconnect()` was called. | + +Listen with the standard `EventEmitter` API: + +```ts +const off = session.on("message", (event) => { … }); +off(); // unsubscribe + +session.once("connected", () => { … }); +``` + +--- + +## Messages iterator + +`session.messages` is an `AsyncPushIterator` — a push-based queue you consume with async iteration: + +```ts +for await (const event of session.messages) { + process(event); +} +``` + +### When `messages` stays open + +- `abort()` (tab hidden) +- Automatic reconnect (`persistent` / `attemptReconnect`) + +### When `messages` closes + +- `disconnect()` +- Server ends the stream and `persistent` is `false` +- Transport error with `attemptReconnect: false` + +After a terminal close, call `connect()` again to open a fresh iterator. Read from `session.messages` directly rather than caching a reference across disconnects. + +--- + +## Configuration + +Pass a partial `SSESessionOptions` to `create()` or `withBrowserVisibility()`: + +```ts +const session = await SSESession.create("/events", { + method: "POST", + headers: { Authorization: "Bearer …" }, + body: JSON.stringify({ filter: "all" }), + + onRequest: async (request) => { + // Mutate headers before each connect/reconnect + return request; + }, + + onConnected: () => console.log("connected"), + onDisconnected: () => console.log("disconnected"), + onError: (error) => console.error(error), + + attemptReconnect: true, + persistent: false, + + // Custom fetch (LibP2P, test doubles, etc.) + fetch: myFetch, + + // Custom retry policy for the initial connection + retry: ExponentialBackoff.from({ baseDelay: 500, maxAttempts: 5 }), + + // Custom parser (must implement parseEvents + reset) + eventParser: new SSEEventParser(), +}); +``` + +### Callbacks + +| Callback | Purpose | +|----------|---------| +| `onRequest` | Transform `RequestInit` before each fetch (auth, `Last-Event-ID`). | +| `onConnected` | Stream is open; reading is about to begin. | +| `onDisconnected` | Transport ended (including `abort`). | +| `onError` | Unexpected failure. Not called for intentional aborts. | + +### Default connect retry + +The default `retry` is an `ExponentialBackoff` with unlimited attempts (`maxAttempts: 0`), `baseDelay: 1000`, `maxDelay: 10000`, `growthRate: 1.3`, and `jitter: 0.3`. This means `SSESession.create()` blocks until the first connection succeeds or the caller aborts/disconnects. + +--- + +## Browser tab visibility + +For browser clients that should pause SSE while a tab is in the background: + +```ts +const session = await SSESession.withBrowserVisibility("/events"); +``` + +Behaviour: + +- **Tab hidden** → `abort()` stops the fetch; `messages` stays open. +- **Tab visible** → `connect()` re-establishes the stream. +- **`disconnect()`** → removes the visibility listener until the next manual `connect()`. + +In Node.js (no `document`), visibility handling is a no-op. + +--- + +## Last-Event-ID resume + +To follow SSE resume semantics: + +```ts +const session = await SSESession.create("/events"); +await SSESession.addLastEventIdReconnect(session); +``` + +This tracks the most recent `event.id` from incoming messages and adds a `Last-Event-ID` header on every subsequent connect/reconnect. The existing `onRequest` callback is preserved and runs afterward. + +The header is omitted until at least one event with an `id` field has been received. + +--- + +## SSEvent + +Parsed events match the SSE wire format: + +```ts +interface SSEvent { + data: string; // required — event payload + event?: string; // event type (default: "message") + id?: string; // last event id for resume + retry?: number; // server-suggested reconnect delay (ms) +} +``` + +Multi-line `data:` fields are joined with `\n`. Trailing newlines on the payload are trimmed. + +--- + +## SSEEventParser + +Incremental parser for raw stream bytes. Used internally by `SSESession` but exported for testing or custom integrations. + +```ts +const parser = new SSEEventParser(); + +for (const chunk of chunks) { + for (const event of parser.parseEvents(chunk)) { + console.log(event); + } +} + +// Clear partial state when abandoning a stream +parser.reset(); +``` + +- Accepts arbitrary chunk boundaries; incomplete frames stay buffered. +- Handles `\r\n`, `\r`, and `\n` line endings. +- Supports `data`, `event`, `id`, and `retry` fields. + +--- + +## AsyncPushIterator + +A push-based async iterable: producers call `push()`, consumers use `for await…of`. + +```ts +const stream = new AsyncPushIterator(); + +stream.push({ data: "hello" }); +stream.close(); // end iteration +``` + +Used by `SSESession.messages` to bridge callback-driven stream reading with pull-based async consumers. + +--- + +## Supporting utilities + +### EventEmitter + +Lightweight typed event emitter used as the base class for `SSESession`. Provides `on`, `once`, `off`, and `emit` with optional debouncing. + +### ExponentialBackoff + +Retries a function with increasing delays and jitter. Used for the initial HTTP connection in `SSESession.connect()`. + +```ts +const backoff = ExponentialBackoff.from({ + baseDelay: 1000, + maxDelay: 10000, + maxAttempts: 0, // 0 = unlimited +}); + +await backoff.run(() => fetch(url)); +``` + +### tryAsync + +Executes an async function and routes failures to an optional error handler without rethrowing. Used internally when invoking user callbacks (`onConnected`, `onDisconnected`, `onError`) so a throwing handler does not crash the session. + +--- + +## Custom fetch adapters + +`fetch` is typed as `(url: string, options: RequestInit) => Promise` rather than the native `fetch` signature. This allows non-standard URL formats (for example LibP2P paths): + +```ts +const session = await SSESession.create("libp2p://peer/events", { + fetch: async (url, options) => libp2pFetch(url, options), +}); +``` + +--- + +## Notes + +- Use `create()` or `withBrowserVisibility()` to construct sessions; the constructor is private. +- `connect()` is idempotent while already connected. +- Intentional aborts (tab visibility) do not emit `"error"` and do not trigger `attemptReconnect`. +- Each session instance owns its own `SSEEventParser` and `ExponentialBackoff` — instances do not share parser buffers.