Add Readme
This commit is contained in:
385
README.md
Normal file
385
README.md
Normal file
@@ -0,0 +1,385 @@
|
|||||||
|
# Server-Sent Events (SSE)
|
||||||
|
|
||||||
|
A fetch-based SSE client for browser and Node.js. It supports custom headers, POST bodies, exponential-backoff connect retries, automatic reconnect, optional tab-visibility handling, and `Last-Event-ID` resume semantics.
|
||||||
|
|
||||||
|
Published from `@xo-cash/utils`:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
import {
|
||||||
|
SSESession,
|
||||||
|
SSEEventParser,
|
||||||
|
AsyncPushIterator,
|
||||||
|
type SSEvent,
|
||||||
|
type SSESessionOptions,
|
||||||
|
} from "@xo-cash/utils";
|
||||||
|
```
|
||||||
|
|
||||||
|
## Module layout
|
||||||
|
|
||||||
|
```
|
||||||
|
sse-session/
|
||||||
|
├── sse-session.ts # SSESession — main client
|
||||||
|
├── sse-event-parser.ts # Incremental SSE frame parser
|
||||||
|
├── async-push-iterator.ts # Push-based async iterable for messages
|
||||||
|
├── types.ts # Options, events, and SSEvent
|
||||||
|
└── index.ts # Public exports
|
||||||
|
```
|
||||||
|
|
||||||
|
`SSESession` is built on three internal utilities from the same package:
|
||||||
|
|
||||||
|
| Utility | Role |
|
||||||
|
|---------|------|
|
||||||
|
| `EventEmitter` | Typed pub/sub for session lifecycle and message events |
|
||||||
|
| `ExponentialBackoff` | Retries the initial HTTP connection until it succeeds |
|
||||||
|
| `tryAsync` | Runs callbacks safely without letting handler throws break the session |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Quick start
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const session = await SSESession.create("https://api.example.com/events");
|
||||||
|
|
||||||
|
session.on("message", (event) => {
|
||||||
|
console.log(event.data);
|
||||||
|
});
|
||||||
|
|
||||||
|
for await (const event of session.messages) {
|
||||||
|
handle(event);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
When finished:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
await session.disconnect();
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## How it works
|
||||||
|
|
||||||
|
Each `SSESession` holds one HTTP streaming connection at a time. Bytes from the response body flow through `SSEEventParser`, which turns line-oriented SSE frames into `SSEvent` objects. Parsed events are delivered two ways:
|
||||||
|
|
||||||
|
1. **Events** — listen with `session.on("message", …)` (and other lifecycle events).
|
||||||
|
2. **Messages** — consume with `for await (const event of session.messages)`.
|
||||||
|
|
||||||
|
`connect()` resolves once the HTTP stream is open. Reading continues in the background; you do not need to await the full stream lifetime.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## SSESession
|
||||||
|
|
||||||
|
### Factory methods
|
||||||
|
|
||||||
|
| Method | Description |
|
||||||
|
|--------|-------------|
|
||||||
|
| `SSESession.create(url, options?)` | Creates a session and connects immediately. |
|
||||||
|
| `SSESession.withBrowserVisibility(url, options?)` | Same as `create`, plus tab visibility handling. Defers the first connect when the document is hidden. |
|
||||||
|
| `SSESession.addBrowserVisibilityHandler(session)` | Attaches visibility handling to an existing session. |
|
||||||
|
| `SSESession.addLastEventIdReconnect(session)` | Tracks the latest event `id` and sends `Last-Event-ID` on reconnect. |
|
||||||
|
|
||||||
|
### Instance methods
|
||||||
|
|
||||||
|
| Method | Description |
|
||||||
|
|--------|-------------|
|
||||||
|
| `connect()` | Opens or reopens the transport. |
|
||||||
|
| `abort()` | Stops the in-flight fetch without ending the session. Used for tab visibility. |
|
||||||
|
| `disconnect()` | Closes the session, closes `messages`, and emits `"closed"`. |
|
||||||
|
|
||||||
|
### Properties
|
||||||
|
|
||||||
|
| Property | Description |
|
||||||
|
|----------|-------------|
|
||||||
|
| `messages` | `AsyncPushIterator<SSEvent>` for `for await…of` consumption. |
|
||||||
|
| `onRequest` | Getter/setter for the pre-fetch hook (auth headers, `Last-Event-ID`, etc.). |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Lifecycle
|
||||||
|
|
||||||
|
```
|
||||||
|
connect() ──► stream open ──► "connected"
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
read & parse events ──► "message" + messages.push()
|
||||||
|
│
|
||||||
|
┌───────────┼───────────┐
|
||||||
|
▼ ▼ ▼
|
||||||
|
abort() server done read error
|
||||||
|
│ │ │
|
||||||
|
▼ ▼ ▼
|
||||||
|
"disconnected" "disconnected" "disconnected"
|
||||||
|
(messages open) │ │
|
||||||
|
│ "error" (unless aborted)
|
||||||
|
│ │
|
||||||
|
persistent? attemptReconnect?
|
||||||
|
│ │
|
||||||
|
└──── connect() again
|
||||||
|
│
|
||||||
|
disconnect() ──► close messages ──► "closed"
|
||||||
|
```
|
||||||
|
|
||||||
|
### `connect`
|
||||||
|
|
||||||
|
- Resets the event parser and reopens `messages` if it was previously closed.
|
||||||
|
- Retries the fetch via `ExponentialBackoff` (unlimited attempts by default).
|
||||||
|
- Emits `"connected"` once the stream is established.
|
||||||
|
- Starts background reading; the returned promise does not wait for the stream to end.
|
||||||
|
|
||||||
|
### `abort`
|
||||||
|
|
||||||
|
- Aborts the active fetch and invalidates the current read loop.
|
||||||
|
- Emits `"disconnected"` but **not** `"closed"`.
|
||||||
|
- Keeps `messages` open so an existing `for await` consumer resumes after reconnect.
|
||||||
|
- Resets the parser so partial frames from the abandoned transport are discarded.
|
||||||
|
|
||||||
|
### `disconnect`
|
||||||
|
|
||||||
|
- Closes `messages` and emits `"closed"`.
|
||||||
|
- Aborts any active transport.
|
||||||
|
- Detaches browser visibility handling until the next manual `connect()`.
|
||||||
|
|
||||||
|
### Automatic reconnect
|
||||||
|
|
||||||
|
Two independent flags control reconnect behaviour:
|
||||||
|
|
||||||
|
| Option | Triggers reconnect when… |
|
||||||
|
|--------|--------------------------|
|
||||||
|
| `persistent: true` | The **server** closes the stream normally. |
|
||||||
|
| `attemptReconnect: true` | A **transport error** occurs (not an intentional `abort()`). |
|
||||||
|
|
||||||
|
Both default to `true` and `false` respectively.
|
||||||
|
|
||||||
|
### Connection supersession
|
||||||
|
|
||||||
|
Each `connect()` or `abort()` bumps an internal `connectionId`. Background read loops capture their id at start and exit quietly when superseded, so stale transports never emit duplicate events or spurious errors.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Events
|
||||||
|
|
||||||
|
| Event | Payload | When |
|
||||||
|
|-------|---------|------|
|
||||||
|
| `"connected"` | — | HTTP stream established. |
|
||||||
|
| `"message"` | `SSEvent` | A complete SSE frame was parsed. |
|
||||||
|
| `"disconnected"` | — | Active transport ended (including `abort()`). |
|
||||||
|
| `"error"` | `Error` | Fetch or read failure (not intentional abort). |
|
||||||
|
| `"closed"` | — | `disconnect()` was called. |
|
||||||
|
|
||||||
|
Listen with the standard `EventEmitter` API:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const off = session.on("message", (event) => { … });
|
||||||
|
off(); // unsubscribe
|
||||||
|
|
||||||
|
session.once("connected", () => { … });
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Messages iterator
|
||||||
|
|
||||||
|
`session.messages` is an `AsyncPushIterator<SSEvent>` — a push-based queue you consume with async iteration:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
for await (const event of session.messages) {
|
||||||
|
process(event);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### When `messages` stays open
|
||||||
|
|
||||||
|
- `abort()` (tab hidden)
|
||||||
|
- Automatic reconnect (`persistent` / `attemptReconnect`)
|
||||||
|
|
||||||
|
### When `messages` closes
|
||||||
|
|
||||||
|
- `disconnect()`
|
||||||
|
- Server ends the stream and `persistent` is `false`
|
||||||
|
- Transport error with `attemptReconnect: false`
|
||||||
|
|
||||||
|
After a terminal close, call `connect()` again to open a fresh iterator. Read from `session.messages` directly rather than caching a reference across disconnects.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Pass a partial `SSESessionOptions` to `create()` or `withBrowserVisibility()`:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const session = await SSESession.create("/events", {
|
||||||
|
method: "POST",
|
||||||
|
headers: { Authorization: "Bearer …" },
|
||||||
|
body: JSON.stringify({ filter: "all" }),
|
||||||
|
|
||||||
|
onRequest: async (request) => {
|
||||||
|
// Mutate headers before each connect/reconnect
|
||||||
|
return request;
|
||||||
|
},
|
||||||
|
|
||||||
|
onConnected: () => console.log("connected"),
|
||||||
|
onDisconnected: () => console.log("disconnected"),
|
||||||
|
onError: (error) => console.error(error),
|
||||||
|
|
||||||
|
attemptReconnect: true,
|
||||||
|
persistent: false,
|
||||||
|
|
||||||
|
// Custom fetch (LibP2P, test doubles, etc.)
|
||||||
|
fetch: myFetch,
|
||||||
|
|
||||||
|
// Custom retry policy for the initial connection
|
||||||
|
retry: ExponentialBackoff.from({ baseDelay: 500, maxAttempts: 5 }),
|
||||||
|
|
||||||
|
// Custom parser (must implement parseEvents + reset)
|
||||||
|
eventParser: new SSEEventParser(),
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
### Callbacks
|
||||||
|
|
||||||
|
| Callback | Purpose |
|
||||||
|
|----------|---------|
|
||||||
|
| `onRequest` | Transform `RequestInit` before each fetch (auth, `Last-Event-ID`). |
|
||||||
|
| `onConnected` | Stream is open; reading is about to begin. |
|
||||||
|
| `onDisconnected` | Transport ended (including `abort`). |
|
||||||
|
| `onError` | Unexpected failure. Not called for intentional aborts. |
|
||||||
|
|
||||||
|
### Default connect retry
|
||||||
|
|
||||||
|
The default `retry` is an `ExponentialBackoff` with unlimited attempts (`maxAttempts: 0`), `baseDelay: 1000`, `maxDelay: 10000`, `growthRate: 1.3`, and `jitter: 0.3`. This means `SSESession.create()` blocks until the first connection succeeds or the caller aborts/disconnects.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Browser tab visibility
|
||||||
|
|
||||||
|
For browser clients that should pause SSE while a tab is in the background:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const session = await SSESession.withBrowserVisibility("/events");
|
||||||
|
```
|
||||||
|
|
||||||
|
Behaviour:
|
||||||
|
|
||||||
|
- **Tab hidden** → `abort()` stops the fetch; `messages` stays open.
|
||||||
|
- **Tab visible** → `connect()` re-establishes the stream.
|
||||||
|
- **`disconnect()`** → removes the visibility listener until the next manual `connect()`.
|
||||||
|
|
||||||
|
In Node.js (no `document`), visibility handling is a no-op.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Last-Event-ID resume
|
||||||
|
|
||||||
|
To follow SSE resume semantics:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const session = await SSESession.create("/events");
|
||||||
|
await SSESession.addLastEventIdReconnect(session);
|
||||||
|
```
|
||||||
|
|
||||||
|
This tracks the most recent `event.id` from incoming messages and adds a `Last-Event-ID` header on every subsequent connect/reconnect. The existing `onRequest` callback is preserved and runs afterward.
|
||||||
|
|
||||||
|
The header is omitted until at least one event with an `id` field has been received.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## SSEvent
|
||||||
|
|
||||||
|
Parsed events match the SSE wire format:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
interface SSEvent {
|
||||||
|
data: string; // required — event payload
|
||||||
|
event?: string; // event type (default: "message")
|
||||||
|
id?: string; // last event id for resume
|
||||||
|
retry?: number; // server-suggested reconnect delay (ms)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Multi-line `data:` fields are joined with `\n`. Trailing newlines on the payload are trimmed.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## SSEEventParser
|
||||||
|
|
||||||
|
Incremental parser for raw stream bytes. Used internally by `SSESession` but exported for testing or custom integrations.
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const parser = new SSEEventParser();
|
||||||
|
|
||||||
|
for (const chunk of chunks) {
|
||||||
|
for (const event of parser.parseEvents(chunk)) {
|
||||||
|
console.log(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear partial state when abandoning a stream
|
||||||
|
parser.reset();
|
||||||
|
```
|
||||||
|
|
||||||
|
- Accepts arbitrary chunk boundaries; incomplete frames stay buffered.
|
||||||
|
- Handles `\r\n`, `\r`, and `\n` line endings.
|
||||||
|
- Supports `data`, `event`, `id`, and `retry` fields.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## AsyncPushIterator
|
||||||
|
|
||||||
|
A push-based async iterable: producers call `push()`, consumers use `for await…of`.
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const stream = new AsyncPushIterator<SSEvent>();
|
||||||
|
|
||||||
|
stream.push({ data: "hello" });
|
||||||
|
stream.close(); // end iteration
|
||||||
|
```
|
||||||
|
|
||||||
|
Used by `SSESession.messages` to bridge callback-driven stream reading with pull-based async consumers.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Supporting utilities
|
||||||
|
|
||||||
|
### EventEmitter
|
||||||
|
|
||||||
|
Lightweight typed event emitter used as the base class for `SSESession`. Provides `on`, `once`, `off`, and `emit` with optional debouncing.
|
||||||
|
|
||||||
|
### ExponentialBackoff
|
||||||
|
|
||||||
|
Retries a function with increasing delays and jitter. Used for the initial HTTP connection in `SSESession.connect()`.
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const backoff = ExponentialBackoff.from({
|
||||||
|
baseDelay: 1000,
|
||||||
|
maxDelay: 10000,
|
||||||
|
maxAttempts: 0, // 0 = unlimited
|
||||||
|
});
|
||||||
|
|
||||||
|
await backoff.run(() => fetch(url));
|
||||||
|
```
|
||||||
|
|
||||||
|
### tryAsync
|
||||||
|
|
||||||
|
Executes an async function and routes failures to an optional error handler without rethrowing. Used internally when invoking user callbacks (`onConnected`, `onDisconnected`, `onError`) so a throwing handler does not crash the session.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Custom fetch adapters
|
||||||
|
|
||||||
|
`fetch` is typed as `(url: string, options: RequestInit) => Promise<Response>` rather than the native `fetch` signature. This allows non-standard URL formats (for example LibP2P paths):
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const session = await SSESession.create("libp2p://peer/events", {
|
||||||
|
fetch: async (url, options) => libp2pFetch(url, options),
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Notes
|
||||||
|
|
||||||
|
- Use `create()` or `withBrowserVisibility()` to construct sessions; the constructor is private.
|
||||||
|
- `connect()` is idempotent while already connected.
|
||||||
|
- Intentional aborts (tab visibility) do not emit `"error"` and do not trigger `attemptReconnect`.
|
||||||
|
- Each session instance owns its own `SSEEventParser` and `ExponentialBackoff` — instances do not share parser buffers.
|
||||||
Reference in New Issue
Block a user