Files
sse-session/README.md
2026-05-24 15:40:21 +02:00

386 lines
12 KiB
Markdown

# Server-Sent Events (SSE)
A fetch-based SSE client for browser and Node.js. It supports custom headers, POST bodies, exponential-backoff connect retries, automatic reconnect, optional tab-visibility handling, and `Last-Event-ID` resume semantics.
Published from `@xo-cash/utils`:
```ts
import {
SSESession,
SSEEventParser,
AsyncPushIterator,
type SSEvent,
type SSESessionOptions,
} from "@xo-cash/utils";
```
## Module layout
```
sse-session/
├── sse-session.ts # SSESession — main client
├── sse-event-parser.ts # Incremental SSE frame parser
├── async-push-iterator.ts # Push-based async iterable for messages
├── types.ts # Options, events, and SSEvent
└── index.ts # Public exports
```
`SSESession` is built on three internal utilities from the same package:
| Utility | Role |
|---------|------|
| `EventEmitter` | Typed pub/sub for session lifecycle and message events |
| `ExponentialBackoff` | Retries the initial HTTP connection until it succeeds |
| `tryAsync` | Runs callbacks safely without letting handler throws break the session |
---
## Quick start
```ts
const session = await SSESession.create("https://api.example.com/events");
session.on("message", (event) => {
console.log(event.data);
});
for await (const event of session.messages) {
handle(event);
}
```
When finished:
```ts
await session.disconnect();
```
---
## How it works
Each `SSESession` holds one HTTP streaming connection at a time. Bytes from the response body flow through `SSEEventParser`, which turns line-oriented SSE frames into `SSEvent` objects. Parsed events are delivered two ways:
1. **Events** — listen with `session.on("message", …)` (and other lifecycle events).
2. **Messages** — consume with `for await (const event of session.messages)`.
`connect()` resolves once the HTTP stream is open. Reading continues in the background; you do not need to await the full stream lifetime.
---
## SSESession
### Factory methods
| Method | Description |
|--------|-------------|
| `SSESession.create(url, options?)` | Creates a session and connects immediately. |
| `SSESession.withBrowserVisibility(url, options?)` | Same as `create`, plus tab visibility handling. Defers the first connect when the document is hidden. |
| `SSESession.addBrowserVisibilityHandler(session)` | Attaches visibility handling to an existing session. |
| `SSESession.addLastEventIdReconnect(session)` | Tracks the latest event `id` and sends `Last-Event-ID` on reconnect. |
### Instance methods
| Method | Description |
|--------|-------------|
| `connect()` | Opens or reopens the transport. |
| `abort()` | Stops the in-flight fetch without ending the session. Used for tab visibility. |
| `disconnect()` | Closes the session, closes `messages`, and emits `"closed"`. |
### Properties
| Property | Description |
|----------|-------------|
| `messages` | `AsyncPushIterator<SSEvent>` for `for await…of` consumption. |
| `onRequest` | Getter/setter for the pre-fetch hook (auth headers, `Last-Event-ID`, etc.). |
---
## Lifecycle
```
connect() ──► stream open ──► "connected"
read & parse events ──► "message" + messages.push()
┌───────────┼───────────┐
▼ ▼ ▼
abort() server done read error
│ │ │
▼ ▼ ▼
"disconnected" "disconnected" "disconnected"
(messages open) │ │
│ "error" (unless aborted)
│ │
persistent? attemptReconnect?
│ │
└──── connect() again
disconnect() ──► close messages ──► "closed"
```
### `connect`
- Resets the event parser and reopens `messages` if it was previously closed.
- Retries the fetch via `ExponentialBackoff` (unlimited attempts by default).
- Emits `"connected"` once the stream is established.
- Starts background reading; the returned promise does not wait for the stream to end.
### `abort`
- Aborts the active fetch and invalidates the current read loop.
- Emits `"disconnected"` but **not** `"closed"`.
- Keeps `messages` open so an existing `for await` consumer resumes after reconnect.
- Resets the parser so partial frames from the abandoned transport are discarded.
### `disconnect`
- Closes `messages` and emits `"closed"`.
- Aborts any active transport.
- Detaches browser visibility handling until the next manual `connect()`.
### Automatic reconnect
Two independent flags control reconnect behaviour:
| Option | Triggers reconnect when… |
|--------|--------------------------|
| `persistent: true` | The **server** closes the stream normally. |
| `attemptReconnect: true` | A **transport error** occurs (not an intentional `abort()`). |
Both default to `true` and `false` respectively.
### Connection supersession
Each `connect()` or `abort()` bumps an internal `connectionId`. Background read loops capture their id at start and exit quietly when superseded, so stale transports never emit duplicate events or spurious errors.
---
## Events
| Event | Payload | When |
|-------|---------|------|
| `"connected"` | — | HTTP stream established. |
| `"message"` | `SSEvent` | A complete SSE frame was parsed. |
| `"disconnected"` | — | Active transport ended (including `abort()`). |
| `"error"` | `Error` | Fetch or read failure (not intentional abort). |
| `"closed"` | — | `disconnect()` was called. |
Listen with the standard `EventEmitter` API:
```ts
const off = session.on("message", (event) => { });
off(); // unsubscribe
session.once("connected", () => { });
```
---
## Messages iterator
`session.messages` is an `AsyncPushIterator<SSEvent>` — a push-based queue you consume with async iteration:
```ts
for await (const event of session.messages) {
process(event);
}
```
### When `messages` stays open
- `abort()` (tab hidden)
- Automatic reconnect (`persistent` / `attemptReconnect`)
### When `messages` closes
- `disconnect()`
- Server ends the stream and `persistent` is `false`
- Transport error with `attemptReconnect: false`
After a terminal close, call `connect()` again to open a fresh iterator. Read from `session.messages` directly rather than caching a reference across disconnects.
---
## Configuration
Pass a partial `SSESessionOptions` to `create()` or `withBrowserVisibility()`:
```ts
const session = await SSESession.create("/events", {
method: "POST",
headers: { Authorization: "Bearer …" },
body: JSON.stringify({ filter: "all" }),
onRequest: async (request) => {
// Mutate headers before each connect/reconnect
return request;
},
onConnected: () => console.log("connected"),
onDisconnected: () => console.log("disconnected"),
onError: (error) => console.error(error),
attemptReconnect: true,
persistent: false,
// Custom fetch (LibP2P, test doubles, etc.)
fetch: myFetch,
// Custom retry policy for the initial connection
retry: ExponentialBackoff.from({ baseDelay: 500, maxAttempts: 5 }),
// Custom parser (must implement parseEvents + reset)
eventParser: new SSEEventParser(),
});
```
### Callbacks
| Callback | Purpose |
|----------|---------|
| `onRequest` | Transform `RequestInit` before each fetch (auth, `Last-Event-ID`). |
| `onConnected` | Stream is open; reading is about to begin. |
| `onDisconnected` | Transport ended (including `abort`). |
| `onError` | Unexpected failure. Not called for intentional aborts. |
### Default connect retry
The default `retry` is an `ExponentialBackoff` with unlimited attempts (`maxAttempts: 0`), `baseDelay: 1000`, `maxDelay: 10000`, `growthRate: 1.3`, and `jitter: 0.3`. This means `SSESession.create()` blocks until the first connection succeeds or the caller aborts/disconnects.
---
## Browser tab visibility
For browser clients that should pause SSE while a tab is in the background:
```ts
const session = await SSESession.withBrowserVisibility("/events");
```
Behaviour:
- **Tab hidden** → `abort()` stops the fetch; `messages` stays open.
- **Tab visible** → `connect()` re-establishes the stream.
- **`disconnect()`** → removes the visibility listener until the next manual `connect()`.
In Node.js (no `document`), visibility handling is a no-op.
---
## Last-Event-ID resume
To follow SSE resume semantics:
```ts
const session = await SSESession.create("/events");
await SSESession.addLastEventIdReconnect(session);
```
This tracks the most recent `event.id` from incoming messages and adds a `Last-Event-ID` header on every subsequent connect/reconnect. The existing `onRequest` callback is preserved and runs afterward.
The header is omitted until at least one event with an `id` field has been received.
---
## SSEvent
Parsed events match the SSE wire format:
```ts
interface SSEvent {
data: string; // required — event payload
event?: string; // event type (default: "message")
id?: string; // last event id for resume
retry?: number; // server-suggested reconnect delay (ms)
}
```
Multi-line `data:` fields are joined with `\n`. Trailing newlines on the payload are trimmed.
---
## SSEEventParser
Incremental parser for raw stream bytes. Used internally by `SSESession` but exported for testing or custom integrations.
```ts
const parser = new SSEEventParser();
for (const chunk of chunks) {
for (const event of parser.parseEvents(chunk)) {
console.log(event);
}
}
// Clear partial state when abandoning a stream
parser.reset();
```
- Accepts arbitrary chunk boundaries; incomplete frames stay buffered.
- Handles `\r\n`, `\r`, and `\n` line endings.
- Supports `data`, `event`, `id`, and `retry` fields.
---
## AsyncPushIterator
A push-based async iterable: producers call `push()`, consumers use `for await…of`.
```ts
const stream = new AsyncPushIterator<SSEvent>();
stream.push({ data: "hello" });
stream.close(); // end iteration
```
Used by `SSESession.messages` to bridge callback-driven stream reading with pull-based async consumers.
---
## Supporting utilities
### EventEmitter
Lightweight typed event emitter used as the base class for `SSESession`. Provides `on`, `once`, `off`, and `emit` with optional debouncing.
### ExponentialBackoff
Retries a function with increasing delays and jitter. Used for the initial HTTP connection in `SSESession.connect()`.
```ts
const backoff = ExponentialBackoff.from({
baseDelay: 1000,
maxDelay: 10000,
maxAttempts: 0, // 0 = unlimited
});
await backoff.run(() => fetch(url));
```
### tryAsync
Executes an async function and routes failures to an optional error handler without rethrowing. Used internally when invoking user callbacks (`onConnected`, `onDisconnected`, `onError`) so a throwing handler does not crash the session.
---
## Custom fetch adapters
`fetch` is typed as `(url: string, options: RequestInit) => Promise<Response>` rather than the native `fetch` signature. This allows non-standard URL formats (for example LibP2P paths):
```ts
const session = await SSESession.create("libp2p://peer/events", {
fetch: async (url, options) => libp2pFetch(url, options),
});
```
---
## Notes
- Use `create()` or `withBrowserVisibility()` to construct sessions; the constructor is private.
- `connect()` is idempotent while already connected.
- Intentional aborts (tab visibility) do not emit `"error"` and do not trigger `attemptReconnect`.
- Each session instance owns its own `SSEEventParser` and `ExponentialBackoff` — instances do not share parser buffers.