Files
sse-session/src/sse-session.ts
2026-05-24 14:26:34 +02:00

543 lines
17 KiB
TypeScript

import type {
SSESessionOptions,
SSESessionEventMap,
SSEvent,
} from "./types.js";
import { tryAsync } from "./utils/misc.js";
import { EventEmitter } from "./utils/event-emitter.js";
import { AsyncPushIterator } from "./utils/async-push-iterator.js";
import { ExponentialBackoff } from "./utils/exponential-backoff.js";
import { SSEEventParser } from "./sse-event-parser.js";
/**
* A fetch-based Server-Sent Events (SSE) client with reconnect and optional
* browser tab visibility handling.
*
* Each session maintains one HTTP streaming connection at a time. Incoming
* bytes are parsed into {@link SSEvent} objects and delivered through two
* surfaces:
*
* - **Events** — `"connected"`, `"message"`, `"disconnected"`, `"error"`,
* and `"closed"` on the session itself (extends {@link EventEmitter}).
* - **Messages** — {@link messages}, an async iterable for `for await...of`
* consumers.
*
* Typical usage:
*
* ```ts
* const session = await SSESession.create("/events");
*
* session.on("message", (event) => console.log(event.data));
*
* for await (const event of session.messages) {
* handle(event);
* }
* ```
*
* ## Lifecycle
*
* - {@link connect} opens (or reopens) the transport. It resolves once the
* HTTP stream is established; reading continues in the background.
* - {@link abort} stops the in-flight fetch without ending the session.
* Used internally for tab visibility. The {@link messages} iterator stays
* open so an existing consumer resumes when the tab becomes visible again.
* - {@link disconnect} aborts the transport, closes {@link messages}, emits
* `"closed"`, and disables attached visibility handlers until the next
* manual {@link connect}.
*
* Automatic reconnect is controlled by {@link SSESessionOptions.persistent}
* (server closed the stream) and
* {@link SSESessionOptions.attemptReconnect} (transport error).
*
* ## Connection supersession
*
* Each {@link connect} or {@link abort} bumps an internal `connectionId`.
* Background read loops capture their id at start and exit quietly when a
* newer connection supersedes them, avoiding duplicate events or errors from
* stale transports.
*/
export class SSESession extends EventEmitter<SSESessionEventMap> {
/**
* Creates a session and waits until the first connection is established.
*
* @param url - The SSE endpoint URL.
* @param options - Configuration merged with instance defaults.
* @returns A connected session.
* @throws When the initial connection cannot be established.
*/
static async create(
url: string,
options: Partial<SSESessionOptions> = {},
): Promise<SSESession> {
const client = new SSESession(url, options);
await client.connect();
return client;
}
/**
* Creates a session with tab visibility handling for browser clients.
*
* Registers {@link addBrowserVisibilityHandler} before connecting. If the
* document is hidden at creation time (for example a background tab), the
* initial connect is deferred until the tab becomes visible.
*
* @param url - The SSE endpoint URL.
* @param options - Session configuration.
* @returns A session with visibility handling attached. May not yet be
* connected when the tab is hidden.
*/
static async withBrowserVisibility(
url: string,
options: Partial<SSESessionOptions> = {},
): Promise<SSESession> {
const client = new SSESession(url, options);
SSESession.addBrowserVisibilityHandler(client);
// Avoid opening a connection while the tab is in the background.
if (
typeof document === "undefined" ||
document.visibilityState === "visible"
) {
await client.connect();
}
return client;
}
/**
* Enables SSE resume semantics by sending `Last-Event-ID` on reconnect.
*
* Listens for incoming `"message"` events and remembers the most recent
* {@link SSEvent.id}. On every subsequent connect or reconnect, the session's
* {@link onRequest} hook is wrapped so that header is attached when an id is
* known, allowing the server to replay only events the client has not yet
* received.
*
* The existing {@link onRequest} callback is preserved and runs after the
* header is applied, so auth or other header mutations continue to work.
*
* Attach as early in the session lifetime as possible. When added after
* {@link create}, the initial connection omits the header (no id yet);
* all later reconnects include it. To instrument before the first connect,
* call this on the session returned from {@link withBrowserVisibility}
* before awaiting a separate {@link connect} when the tab starts hidden.
*
* ```ts
* const session = await SSESession.create(url);
* await SSESession.addLastEventIdReconnect(session);
* // Reconnects send Last-Event-ID once an event with an id is received.
* ```
*
* @param client - The session to instrument.
* @returns The same session, for chaining.
*/
static async addLastEventIdReconnect(client: SSESession): Promise<SSESession> {
let lastEventId: string | undefined;
client.on("message", (event) => {
lastEventId = event.id;
});
const originalOnRequest = client.onRequest;
client.onRequest = async (request) => {
if (lastEventId) {
request.headers = { ...request.headers, "Last-Event-ID": lastEventId };
}
return originalOnRequest(request);
};
return client;
}
/**
* Pauses and resumes a session based on browser tab visibility.
*
* Uses the Page Visibility API (`document.visibilitychange`):
*
* - **hidden** — {@link abort} stops the active fetch. {@link messages}
* stays open; `"disconnected"` fires but `"closed"` does not.
* - **visible** — {@link connect} re-establishes the stream if needed.
*
* The listener is removed when {@link disconnect} emits `"closed"`, and
* re-attached automatically on the next `"connected"` event.
*
* No-op in non-browser environments where `document` is undefined.
*
* @param client - The session to manage.
*/
static addBrowserVisibilityHandler(client: SSESession): SSESession {
if (typeof document === "undefined") return client;
const handleVisibilityChange = (): void => {
if (document.visibilityState === "hidden") {
void client.abort();
return;
}
client.connect().catch(() => {
// connect() reports failures via onError and the "error" event.
});
};
document.addEventListener("visibilitychange", handleVisibilityChange);
// Stop managing visibility after an explicit disconnect; re-register
// when the same instance is manually connected again.
client.once("closed", () => {
document.removeEventListener("visibilitychange", handleVisibilityChange);
client.once("connected", () => {
SSESession.addBrowserVisibilityHandler(client);
});
});
return client;
}
/** SSE endpoint URL for this session. */
private readonly url: string;
/**
* Per-instance configuration.
*
* Defaults live on the instance field (not a shared static) so each session
* gets its own {@link SSEEventParser} and {@link ExponentialBackoff}.
*/
private options: SSESessionOptions = {
fetch: (...args) => fetch(...args),
method: "GET",
headers: {
Accept: "text/event-stream",
"Cache-Control": "no-cache",
},
body: new FormData(),
onRequest: (request) => Promise.resolve(request),
onConnected: () => {},
onDisconnected: () => {},
onError: (error) => console.error("SSEClient error:", error),
// Retry the initial fetch until it succeeds (maxAttempts: 0 = unlimited).
retry: new ExponentialBackoff({
baseDelay: 1000,
maxDelay: 10000,
maxAttempts: 0,
growthRate: 1.3,
jitter: 0.3,
}),
attemptReconnect: true,
persistent: false,
eventParser: new SSEEventParser(),
};
/** AbortController for the currently active fetch, if any. */
private controller: AbortController = new AbortController();
/** Whether a transport is currently established or connecting. */
private connected = false;
/**
* Monotonic id bumped on each {@link connect} and {@link abort}.
*
* Background read loops compare against this to detect superseded transports.
*/
private connectionId = 0;
/**
* Asynchronous stream of parsed SSE events for the active connection.
*
* Stays open across {@link abort} and automatic reconnects so an existing
* `for await` consumer keeps receiving events after visibility resumes.
*
* Closes when:
* - the server ends the stream and {@link SSESessionOptions.persistent}
* is false,
* - {@link disconnect} is called, or
* - a transport error occurs with
* {@link SSESessionOptions.attemptReconnect} disabled.
*
* A later {@link connect} replaces this with a new iterator when the
* previous one was closed. Consumers should read from `session.messages`
* rather than caching a reference across terminal disconnects.
*/
public messages: AsyncPushIterator<SSEvent> = new AsyncPushIterator<SSEvent>();
private constructor(url: string, options: Partial<SSESessionOptions>) {
super();
this.url = url;
this.options = {
...this.options,
...options,
// Shallow merge would drop default headers when options.headers is set.
headers: { ...this.options.headers, ...options.headers },
};
}
get onRequest(): (request: RequestInit) => Promise<RequestInit> {
return this.options.onRequest;
}
set onRequest(callback: (request: RequestInit) => Promise<RequestInit>) {
this.options.onRequest = callback;
}
/**
* Connects or reconnects to the SSE endpoint.
*
* Resolves once the HTTP stream is established and `"connected"` has been
* emitted. Body reading continues asynchronously in the background via
* {@link readStream}.
*
* @throws When the fetch retry policy exhausts attempts or the connection
* is superseded before the reader is handed off (in the latter case the
* promise resolves without throwing).
*/
public async connect(): Promise<void> {
if (this.connected) return;
// Prepare for a fresh transport. Parser state from an abandoned connection
// must not bleed into the next one; reopen messages if a prior terminal
// close ended the consumer's iteration loop.
this.resetEventParser();
this.ensureMessageStreamOpen();
const connectionId = ++this.connectionId;
const controller = new AbortController();
this.connected = true;
this.controller = controller;
const { method, headers, body } = this.options;
const fetchOptions: RequestInit = {
method,
headers: headers || {},
body: body || null,
signal: controller.signal,
cache: "no-store",
};
let reader: ReadableStreamDefaultReader<Uint8Array>;
try {
reader = await this.options.retry.run(() =>
this.createReader(fetchOptions),
);
} catch (error) {
// A newer abort/connect superseded this attempt — leave state to the winner.
if (!this.isCurrentConnection(connectionId, controller)) return;
this.connected = false;
await this.notifyDisconnected();
await this.notifyError(error);
this.closeMessageStream();
throw error;
}
// Connection succeeded but was already replaced (for example abort during fetch).
if (!this.isCurrentConnection(connectionId, controller)) {
await reader.cancel();
return;
}
await tryAsync(
() => this.options.onConnected(),
(error) => this.options.onError(error),
);
this.emit("connected", undefined);
// Fire-and-forget: connect() resolves while the stream is consumed.
this.readStream(reader, connectionId, controller);
}
/**
* Aborts only the currently active transport.
*
* The session remains reusable: {@link messages} stays open, visibility
* handling stays attached, and {@link connect} can reopen the stream.
* Partial parser state from the abandoned transport is discarded.
*
* Emits `"disconnected"` but not `"closed"`.
*/
public async abort(): Promise<void> {
if (!this.connected) return;
this.connected = false;
// Invalidate any in-flight read loop and fetch for this transport.
this.connectionId++;
this.controller.abort();
this.resetEventParser();
await this.notifyDisconnected();
}
/**
* Terminates the session and disables attached visibility handling until
* the same instance is manually {@link connect connected} again.
*
* Closes {@link messages} and emits `"closed"`.
*/
public async disconnect(): Promise<void> {
this.closeMessageStream();
this.emit("closed", undefined);
if (this.connected) {
await this.abort();
} else {
this.resetEventParser();
}
}
/**
* Performs the HTTP request and returns a reader for the response body.
*
* {@link SSESessionOptions.onRequest} may mutate headers (for example auth
* tokens or `Last-Event-ID`) before the fetch runs.
*/
private async createReader(
fetchOptions: RequestInit,
): Promise<ReadableStreamDefaultReader<Uint8Array>> {
const requestOptions = await this.options.onRequest(fetchOptions);
const response = await this.options.fetch(this.url, requestOptions);
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status}`);
}
if (!response.body) {
throw new Error("Response body is null");
}
return response.body.getReader();
}
/**
* Reads bytes from an established stream until it ends, errors, or is
* superseded by a newer connection.
*/
private async readStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
connectionId: number,
controller: AbortController,
): Promise<void> {
try {
while (this.isCurrentConnection(connectionId, controller)) {
const { done, value } = await reader.read();
// abort() or a newer connect() may have landed while we were awaiting.
if (!this.isCurrentConnection(connectionId, controller)) return;
if (done) {
this.connected = false;
await this.notifyDisconnected();
if (this.options.persistent) {
// Server closed gracefully — reopen unless the consumer opted out.
await this.connect();
} else {
this.closeMessageStream();
}
return;
}
// Some environments yield `{ done: false, value: undefined }`.
if (!value) continue;
for (const event of this.options.eventParser.parseEvents(value)) {
this.emit("message", event);
this.messages.push(event);
}
}
} catch (error) {
if (!this.isCurrentConnection(connectionId, controller)) return;
this.connected = false;
await this.notifyDisconnected();
// Expected path for abort() — do not treat as an error or reconnect.
if (controller.signal.aborted) return;
await this.notifyError(error);
if (this.options.attemptReconnect) {
await this.connect();
} else {
this.closeMessageStream();
}
}
}
/** Clears partial SSE frames left over from an abandoned transport. */
private resetEventParser(): void {
this.options.eventParser.reset();
}
/**
* Creates a new {@link messages} iterator when the previous one was closed
* by a terminal disconnect or server stream end.
*/
private ensureMessageStreamOpen(): void {
if (!this.messages.closed) return;
this.messages = new AsyncPushIterator<SSEvent>();
}
/** Ends the message iteration loop for the current connection span. */
private closeMessageStream(): void {
if (this.messages.closed) return;
this.messages.close();
}
/**
* Returns whether a read loop still owns the active transport.
*
* A loop is stale when the session disconnected, a newer connection id was
* assigned, or the fetch was aborted.
*/
private isCurrentConnection(
connectionId: number,
controller: AbortController,
): boolean {
return (
this.connected &&
this.connectionId === connectionId &&
!controller.signal.aborted
);
}
/** Invokes {@link SSESessionOptions.onDisconnected} and emits `"disconnected"`. */
private async notifyDisconnected(): Promise<void> {
await tryAsync(
() => this.options.onDisconnected(),
(error) => this.options.onError(error),
);
this.emit("disconnected", undefined);
}
/** Invokes {@link SSESessionOptions.onError} and emits `"error"`. */
private async notifyError(error: unknown): Promise<void> {
const errorInstance = error instanceof Error ? error : new Error(String(error));
await tryAsync(
() => this.options.onError(errorInstance),
(callbackError) =>
console.error("SSESession error:", callbackError),
);
this.emit("error", errorInstance);
}
}