Server-Sent Events (SSE)
A fetch-based SSE client for browser and Node.js. It supports custom headers, POST bodies, exponential-backoff connect retries, automatic reconnect, optional tab-visibility handling, and Last-Event-ID resume semantics.
Published from @xo-cash/utils:
import {
SSESession,
SSEEventParser,
AsyncPushIterator,
type SSEvent,
type SSESessionOptions,
} from "@xo-cash/utils";
Module layout
sse-session/
├── sse-session.ts # SSESession — main client
├── sse-event-parser.ts # Incremental SSE frame parser
├── async-push-iterator.ts # Push-based async iterable for messages
├── types.ts # Options, events, and SSEvent
└── index.ts # Public exports
SSESession is built on three internal utilities from the same package:
| Utility | Role |
|---|---|
EventEmitter |
Typed pub/sub for session lifecycle and message events |
ExponentialBackoff |
Retries the initial HTTP connection until it succeeds |
tryAsync |
Runs callbacks safely without letting handler throws break the session |
Quick start
const session = await SSESession.create("https://api.example.com/events");
session.on("message", (event) => {
console.log(event.data);
});
for await (const event of session.messages) {
handle(event);
}
When finished:
await session.disconnect();
How it works
Each SSESession holds one HTTP streaming connection at a time. Bytes from the response body flow through SSEEventParser, which turns line-oriented SSE frames into SSEvent objects. Parsed events are delivered two ways:
- Events — listen with
session.on("message", …)(and other lifecycle events). - Messages — consume with
for await (const event of session.messages).
connect() resolves once the HTTP stream is open. Reading continues in the background; you do not need to await the full stream lifetime.
SSESession
Factory methods
| Method | Description |
|---|---|
SSESession.create(url, options?) |
Creates a session and connects immediately. |
SSESession.withBrowserVisibility(url, options?) |
Same as create, plus tab visibility handling. Defers the first connect when the document is hidden. |
SSESession.addBrowserVisibilityHandler(session) |
Attaches visibility handling to an existing session. |
SSESession.addLastEventIdReconnect(session) |
Tracks the latest event id and sends Last-Event-ID on reconnect. |
Instance methods
| Method | Description |
|---|---|
connect() |
Opens or reopens the transport. |
abort() |
Stops the in-flight fetch without ending the session. Used for tab visibility. |
disconnect() |
Closes the session, closes messages, and emits "closed". |
Properties
| Property | Description |
|---|---|
messages |
AsyncPushIterator<SSEvent> for for await…of consumption. |
onRequest |
Getter/setter for the pre-fetch hook (auth headers, Last-Event-ID, etc.). |
Lifecycle
connect() ──► stream open ──► "connected"
│
▼
read & parse events ──► "message" + messages.push()
│
┌───────────┼───────────┐
▼ ▼ ▼
abort() server done read error
│ │ │
▼ ▼ ▼
"disconnected" "disconnected" "disconnected"
(messages open) │ │
│ "error" (unless aborted)
│ │
persistent? attemptReconnect?
│ │
└──── connect() again
│
disconnect() ──► close messages ──► "closed"
connect
- Resets the event parser and reopens
messagesif it was previously closed. - Retries the fetch via
ExponentialBackoff(unlimited attempts by default). - Emits
"connected"once the stream is established. - Starts background reading; the returned promise does not wait for the stream to end.
abort
- Aborts the active fetch and invalidates the current read loop.
- Emits
"disconnected"but not"closed". - Keeps
messagesopen so an existingfor awaitconsumer resumes after reconnect. - Resets the parser so partial frames from the abandoned transport are discarded.
disconnect
- Closes
messagesand emits"closed". - Aborts any active transport.
- Detaches browser visibility handling until the next manual
connect().
Automatic reconnect
Two independent flags control reconnect behaviour:
| Option | Triggers reconnect when… |
|---|---|
persistent: true |
The server closes the stream normally. |
attemptReconnect: true |
A transport error occurs (not an intentional abort()). |
Both default to true and false respectively.
Connection supersession
Each connect() or abort() bumps an internal connectionId. Background read loops capture their id at start and exit quietly when superseded, so stale transports never emit duplicate events or spurious errors.
Events
| Event | Payload | When |
|---|---|---|
"connected" |
— | HTTP stream established. |
"message" |
SSEvent |
A complete SSE frame was parsed. |
"disconnected" |
— | Active transport ended (including abort()). |
"error" |
Error |
Fetch or read failure (not intentional abort). |
"closed" |
— | disconnect() was called. |
Listen with the standard EventEmitter API:
const off = session.on("message", (event) => { … });
off(); // unsubscribe
session.once("connected", () => { … });
Messages iterator
session.messages is an AsyncPushIterator<SSEvent> — a push-based queue you consume with async iteration:
for await (const event of session.messages) {
process(event);
}
When messages stays open
abort()(tab hidden)- Automatic reconnect (
persistent/attemptReconnect)
When messages closes
disconnect()- Server ends the stream and
persistentisfalse - Transport error with
attemptReconnect: false
After a terminal close, call connect() again to open a fresh iterator. Read from session.messages directly rather than caching a reference across disconnects.
Configuration
Pass a partial SSESessionOptions to create() or withBrowserVisibility():
const session = await SSESession.create("/events", {
method: "POST",
headers: { Authorization: "Bearer …" },
body: JSON.stringify({ filter: "all" }),
onRequest: async (request) => {
// Mutate headers before each connect/reconnect
return request;
},
onConnected: () => console.log("connected"),
onDisconnected: () => console.log("disconnected"),
onError: (error) => console.error(error),
attemptReconnect: true,
persistent: false,
// Custom fetch (LibP2P, test doubles, etc.)
fetch: myFetch,
// Custom retry policy for the initial connection
retry: ExponentialBackoff.from({ baseDelay: 500, maxAttempts: 5 }),
// Custom parser (must implement parseEvents + reset)
eventParser: new SSEEventParser(),
});
Callbacks
| Callback | Purpose |
|---|---|
onRequest |
Transform RequestInit before each fetch (auth, Last-Event-ID). |
onConnected |
Stream is open; reading is about to begin. |
onDisconnected |
Transport ended (including abort). |
onError |
Unexpected failure. Not called for intentional aborts. |
Default connect retry
The default retry is an ExponentialBackoff with unlimited attempts (maxAttempts: 0), baseDelay: 1000, maxDelay: 10000, growthRate: 1.3, and jitter: 0.3. This means SSESession.create() blocks until the first connection succeeds or the caller aborts/disconnects.
Browser tab visibility
For browser clients that should pause SSE while a tab is in the background:
const session = await SSESession.withBrowserVisibility("/events");
Behaviour:
- Tab hidden →
abort()stops the fetch;messagesstays open. - Tab visible →
connect()re-establishes the stream. disconnect()→ removes the visibility listener until the next manualconnect().
In Node.js (no document), visibility handling is a no-op.
Last-Event-ID resume
To follow SSE resume semantics:
const session = await SSESession.create("/events");
await SSESession.addLastEventIdReconnect(session);
This tracks the most recent event.id from incoming messages and adds a Last-Event-ID header on every subsequent connect/reconnect. The existing onRequest callback is preserved and runs afterward.
The header is omitted until at least one event with an id field has been received.
SSEvent
Parsed events match the SSE wire format:
interface SSEvent {
data: string; // required — event payload
event?: string; // event type (default: "message")
id?: string; // last event id for resume
retry?: number; // server-suggested reconnect delay (ms)
}
Multi-line data: fields are joined with \n. Trailing newlines on the payload are trimmed.
SSEEventParser
Incremental parser for raw stream bytes. Used internally by SSESession but exported for testing or custom integrations.
const parser = new SSEEventParser();
for (const chunk of chunks) {
for (const event of parser.parseEvents(chunk)) {
console.log(event);
}
}
// Clear partial state when abandoning a stream
parser.reset();
- Accepts arbitrary chunk boundaries; incomplete frames stay buffered.
- Handles
\r\n,\r, and\nline endings. - Supports
data,event,id, andretryfields.
AsyncPushIterator
A push-based async iterable: producers call push(), consumers use for await…of.
const stream = new AsyncPushIterator<SSEvent>();
stream.push({ data: "hello" });
stream.close(); // end iteration
Used by SSESession.messages to bridge callback-driven stream reading with pull-based async consumers.
Supporting utilities
EventEmitter
Lightweight typed event emitter used as the base class for SSESession. Provides on, once, off, and emit with optional debouncing.
ExponentialBackoff
Retries a function with increasing delays and jitter. Used for the initial HTTP connection in SSESession.connect().
const backoff = ExponentialBackoff.from({
baseDelay: 1000,
maxDelay: 10000,
maxAttempts: 0, // 0 = unlimited
});
await backoff.run(() => fetch(url));
tryAsync
Executes an async function and routes failures to an optional error handler without rethrowing. Used internally when invoking user callbacks (onConnected, onDisconnected, onError) so a throwing handler does not crash the session.
Custom fetch adapters
fetch is typed as (url: string, options: RequestInit) => Promise<Response> rather than the native fetch signature. This allows non-standard URL formats (for example LibP2P paths):
const session = await SSESession.create("libp2p://peer/events", {
fetch: async (url, options) => libp2pFetch(url, options),
});
Notes
- Use
create()orwithBrowserVisibility()to construct sessions; the constructor is private. connect()is idempotent while already connected.- Intentional aborts (tab visibility) do not emit
"error"and do not triggerattemptReconnect. - Each session instance owns its own
SSEEventParserandExponentialBackoff— instances do not share parser buffers.