import { ExponentialBackoff } from "./exponential-backoff.js"; // Type declarations for browser environment (not available in Node.js) declare const document: | { visibilityState: "visible" | "hidden"; addEventListener: ( event: string, handler: (event: Event) => void, ) => void; removeEventListener: ( event: string, handler: (event: Event) => void, ) => void; } | undefined; /** * A Server-Sent Events client implementation using fetch API. * Supports custom headers, POST requests, and is non-blocking. */ export class SSESession { /** * Creates and connects a new SSESession instance. * @param url The URL to connect to * @param options Configuration options * @returns A new connected SSESession instance */ public static async from( url: string, options: Partial = {}, ): Promise { const client = new SSESession(url, options); await client.connect(); return client; } // State. private url: string; private controller: AbortController; private connected: boolean = false; protected options: SSESessionOptions; protected messageBuffer: Uint8Array = new Uint8Array(); // Listener for when the tab is hidden or shown. private visibilityChangeHandler: ((event: Event) => void) | null = null; // Text decoders and encoders for parsing the message buffer. private textDecoder: TextDecoder = new TextDecoder(); private textEncoder: TextEncoder = new TextEncoder(); /** * Creates a new SSESession instance. * @param url The URL to connect to * @param options Configuration options */ constructor(url: string, options: Partial = {}) { this.url = url; this.options = { // Use default fetch function. fetch: (...args) => fetch(...args), method: "GET", headers: { Accept: "text/event-stream", "Cache-Control": "no-cache", }, onConnected: () => {}, onMessage: () => {}, onError: (error) => console.error("SSESession error:", error), onDisconnected: () => {}, onReconnect: (options) => Promise.resolve(options), // Reconnection options attemptReconnect: true, retryDelay: 1000, persistent: false, ...options, }; this.controller = new AbortController(); // Set up visibility change handling if in mobile browser environment if (typeof document !== "undefined") { this.visibilityChangeHandler = this.handleVisibilityChange.bind(this); document.addEventListener( "visibilitychange", this.visibilityChangeHandler, ); } } /** * Handles visibility change events in the browser. */ private async handleVisibilityChange(): Promise { // Guard for Node.js environment where document is undefined if (typeof document === "undefined") return; // When going to background, close the current connection cleanly // This allows us to reconnect mobile devices when they come back after leaving the tab or browser app. if (document.visibilityState === "hidden") { this.controller.abort(); } // When coming back to foreground, attempt to reconnect if not connected if (document.visibilityState === "visible" && !this.connected) { await this.connect(); } } /** * Connects to the SSE endpoint. */ public async connect(): Promise { if (this.connected) return; this.connected = true; this.controller = new AbortController(); const { method, headers, body } = this.options; const fetchOptions: RequestInit = { method, headers: headers || {}, body: body || null, signal: this.controller.signal, cache: "no-store", }; const exponentialBackoff = ExponentialBackoff.from({ baseDelay: this.options.retryDelay, maxDelay: 10000, maxAttempts: 0, growthRate: 1.3, jitter: 0.3, }); // Establish the connection and get the reader using the exponential backoff const reader = await exponentialBackoff.run(async () => { const reconnectOptions = await this.handleCallback( this.options.onReconnect, fetchOptions, ); const updatedFetchOptions = { ...fetchOptions, ...reconnectOptions, }; const res = await this.options.fetch(this.url, updatedFetchOptions); if (!res.ok) { throw new Error(`HTTP error! Status: ${res.status}`); } if (!res.body) { throw new Error("Response body is null"); } return res.body.getReader(); }); // Call the onConnected callback this.handleCallback(this.options.onConnected); const readStream = async () => { try { while (true) { const { done, value } = await reader.read(); if (done) { this.connected = false; // Call the onDisconnected callback. this.handleCallback(this.options.onDisconnected, undefined); // If the connection was closed by the server, we want to attempt a reconnect if the connection should be persistent. if (this.options.persistent) { await this.connect(); } break; } const events = this.parseEvents(value); for (const event of events) { if (this.options.onMessage) { this.handleCallback(this.options.onMessage, event); } } } } catch (error) { this.connected = false; // Call the onDisconnected callback. this.handleCallback(this.options.onDisconnected, error); // If the connection was aborted using the controller, we don't need to call onError. if (this.controller.signal.aborted) { return; } // Call the onError callback. // NOTE: we dont use the handleCallback here because it would result in 2 error callbacks. try { this.options.onError(error); } catch (error) { console.log(`SSE Session: onError callback error:`, error); } // Attempt to reconnect if enabled if (this.options.attemptReconnect) { await this.connect(); } } }; readStream(); return; } protected parseEvents(chunk: Uint8Array): SSEvent[] { // Append new chunk to existing buffer this.messageBuffer = new Uint8Array([...this.messageBuffer, ...chunk]); const events: SSEvent[] = []; const lines = this.textDecoder .decode(this.messageBuffer) .split(/\r\n|\r|\n/); let currentEvent: Partial = {}; let completeEventCount = 0; // Iterate over the lines to find complete events for (let i = 0; i < lines.length; i++) { const line = lines[i]; // Empty line signals the end of an event if (line === "") { if (currentEvent.data) { // Remove trailing newline if present currentEvent.data = currentEvent.data.replace(/\n$/, ""); events.push(currentEvent as SSEvent); currentEvent = {}; completeEventCount = i + 1; } continue; } if (!line) continue; // Parse field: value format const colonIndex = line.indexOf(":"); if (colonIndex === -1) continue; const field = line.slice(0, colonIndex); // Skip initial space after colon if present const valueStartIndex = colonIndex + 1 + (line[colonIndex + 1] === " " ? 1 : 0); const value = line.slice(valueStartIndex); if (field === "data") { currentEvent.data = currentEvent.data ? currentEvent.data + "\n" + value : value; } else if (field === "event") { currentEvent.event = value; } else if (field === "id") { currentEvent.id = value; } else if (field === "retry") { const retryMs = parseInt(value, 10); if (!isNaN(retryMs)) { currentEvent.retry = retryMs; } } } // Store the remainder of the buffer for the next chunk const remainder = lines.slice(completeEventCount).join("\n"); this.messageBuffer = this.textEncoder.encode(remainder); return events; } /** * Override the onMessage callback. * * @param onMessage The callback to set. */ public setOnMessage(onMessage: (event: SSEvent) => void): void { this.options.onMessage = onMessage; } /** * Closes the SSE connection and cleans up event listeners. */ public close(): void { // Clean up everything including the visibility handler this.controller.abort(); // Remove the visibility handler (This is only required on browsers) if (this.visibilityChangeHandler && typeof document !== "undefined") { document.removeEventListener( "visibilitychange", this.visibilityChangeHandler, ); this.visibilityChangeHandler = null; } } /** * Checks if the client is currently connected. * @returns Whether the client is connected */ public isConnected(): boolean { return this.connected; } /** * Will handle thrown errors from the callback and call the onError callback. * This is to avoid the sse-session from disconnecting from errors that are not a result of the sse-session itself. * * @param callback The callback to handle. * @param args The arguments to pass to the callback. */ private handleCallback) => ReturnType>( callback: T, ...args: Parameters ): ReturnType | undefined { try { return callback(...args); } catch (error) { try { this.options.onError(error); } catch (error) { console.log(`SSE Session: onError callback error:`, error); } } } } /** * Configuration options for the SSESession. */ export interface SSESessionOptions { /** * The fetch function to use. * * NOTE: This is compatible with Browser/Node's native "fetcH" function. * We use this in place of "typeof fetch" so that we can accept non-standard URLs ("url" is a "string" here). * For example, a LibP2P adapter might not use a standardized URL format (and might only include "path"). * This would cause a type error as native fetch expects type "URL". */ fetch: (url: string, options: RequestInit) => Promise; /** * HTTP method to use (GET or POST). */ method: "GET" | "POST"; /** * HTTP headers to send with the request. */ headers?: Record; /** * Body to send with POST requests. */ body?: string | FormData; /** * Called when the connection is established. */ onConnected: () => void; /** * Called when a message is received. */ onMessage: (event: SSEvent) => void; /** * Called when an error occurs. */ onError: (error: unknown) => void; /** * Called when the connection is closed. */ onDisconnected: (error: unknown) => void; /* * Called when the connection is going to try to reconnect. */ onReconnect: (options: RequestInit) => Promise; /** * Whether to attempt to reconnect. */ attemptReconnect: boolean; /** * The delay in milliseconds between reconnection attempts. */ retryDelay: number; /** * Whether to reconnect when the session is terminated by the server. */ persistent: boolean; } /** * Represents a Server-Sent Event. */ export interface SSEvent { /** * Event data. */ data: string; /** * Event type. */ event?: string; /** * Event ID. */ id?: string; /** * Reconnection time in milliseconds. */ retry?: number; }