Reformat files. Make thenable iterator generic

This commit is contained in:
2026-02-04 04:32:50 +00:00
parent af440dcbc7
commit e68427e53d
8 changed files with 202 additions and 111 deletions

440
src/utils/sse-session.ts Normal file
View File

@@ -0,0 +1,440 @@
import { ExponentialBackoff } from './exponential-backoff.js';
import { AsyncPushIterator } from './async-push-iterator.js';
/**
* A Server-Sent Events client implementation using fetch API.
* Supports custom headers, POST requests, and is non-blocking.
*/
export class SSESession {
/**
* Creates and connects a new SSESession instance.
* @param url The URL to connect to
* @param options Configuration options
* @returns A new connected SSESession instance
*/
public static async from(
url: string,
options: Partial<SSESessionOptions> = {},
): Promise<SSESession> {
const client = new SSESession(url, options);
await client.connect();
return client;
}
// State.
private url: string;
private controller: AbortController;
private connected: boolean = false;
protected options: SSESessionOptions;
protected messageBuffer: Uint8Array = new Uint8Array();
public messages: AsyncPushIterator<SSEvent> = new AsyncPushIterator<SSEvent>();
// Listener for when the tab is hidden or shown.
private visibilityChangeHandler: ((event: Event) => void) | null = null;
// Text decoders and encoders for parsing the message buffer.
private textDecoder: TextDecoder = new TextDecoder();
private textEncoder: TextEncoder = new TextEncoder();
/**
* Creates a new SSESession instance.
* @param url The URL to connect to
* @param options Configuration options
*/
constructor(url: string, options: Partial<SSESessionOptions> = {}) {
this.url = url;
this.options = {
// Use default fetch function.
fetch: (...args) => fetch(...args),
method: 'GET',
headers: {
Accept: 'text/event-stream',
'Cache-Control': 'no-cache',
},
onConnected: () => {},
onMessage: () => {},
onError: (error) => console.error('SSESession error:', error),
onDisconnected: () => {},
onReconnect: (options) => Promise.resolve(options),
// Reconnection options
attemptReconnect: true,
retryDelay: 1000,
persistent: false,
...options,
};
this.controller = new AbortController();
// Set up visibility change handling if in mobile browser environment
if (typeof document !== 'undefined') {
this.visibilityChangeHandler = this.handleVisibilityChange.bind(this);
document.addEventListener(
'visibilitychange',
this.visibilityChangeHandler as (this: Document, ev: Event) => any,
);
}
}
/**
* Handles visibility change events in the browser.
*/
private async handleVisibilityChange(): Promise<void> {
// When going to background, close the current connection cleanly
// This allows us to reconnect mobile devices when they come back after leaving the tab or browser app.
if (document.visibilityState === 'hidden') {
this.controller.abort();
}
// When coming back to foreground, attempt to reconnect if not connected
if (document.visibilityState === 'visible' && !this.connected) {
await this.connect();
}
}
/**
* Connects to the SSE endpoint.
*/
public async connect(): Promise<void> {
if (this.connected) return;
this.connected = true;
this.controller = new AbortController();
const { method, headers, body } = this.options;
const fetchOptions: RequestInit = {
method,
headers,
body,
signal: this.controller.signal,
cache: 'no-store',
};
const exponentialBackoff = ExponentialBackoff.from({
baseDelay: this.options.retryDelay,
maxDelay: 10000,
maxAttempts: 0,
growthRate: 1.3,
jitter: 0.3,
});
// Establish the connection and get the reader using the exponential backoff
const reader = await exponentialBackoff.run(async () => {
const reconnectOptions = await this.handleCallback(
this.options.onReconnect,
fetchOptions,
);
// Extract URL override if provided, use remaining options for fetch
const { url: urlOverride, ...restOptions } = reconnectOptions ?? {};
const updatedFetchOptions = {
...fetchOptions,
...restOptions,
};
// Use URL override if provided, otherwise use the original URL
const targetUrl = urlOverride ?? this.url;
const res = await this.options.fetch(targetUrl, updatedFetchOptions);
if (!res.ok) {
throw new Error(`HTTP error! Status: ${res.status}`);
}
if (!res.body) {
throw new Error('Response body is null');
}
return res.body.getReader();
});
// Call the onConnected callback
this.handleCallback(this.options.onConnected);
const readStream = async () => {
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
this.connected = false;
// Call the onDisconnected callback.
this.handleCallback(this.options.onDisconnected, undefined);
// If the connection was closed by the server, we want to attempt a reconnect if the connection should be persistent.
if (this.options.persistent) {
await this.connect();
}
break;
}
const events = this.parseEvents(value);
for (const event of events) {
this.messages?.push(event);
if (this.options.onMessage) {
this.handleCallback(this.options.onMessage, event);
}
}
}
this.messages?.close();
} catch (error) {
this.connected = false;
// Call the onDisconnected callback.
this.handleCallback(this.options.onDisconnected, error);
// If the connection was aborted using the controller, we don't need to call onError.
if (this.controller.signal.aborted) {
return;
}
// Call the onError callback.
// NOTE: we dont use the handleCallback here because it would result in 2 error callbacks.
try {
this.options.onError(error);
} catch (error) {
console.log(`SSE Session: onError callback error:`, error);
}
// Attempt to reconnect if enabled
if (this.options.attemptReconnect) {
await this.connect();
}
}
};
readStream();
return;
}
protected parseEvents(chunk: Uint8Array): SSEvent[] {
// Append new chunk to existing buffer
this.messageBuffer = new Uint8Array([...this.messageBuffer, ...chunk]);
const events: SSEvent[] = [];
const lines = this.textDecoder
.decode(this.messageBuffer)
.split(/\r\n|\r|\n/);
let currentEvent: Partial<SSEvent> = {};
let completeEventCount = 0;
// Iterate over the lines to find complete events
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
// Empty line signals the end of an event
if (line === '') {
if (currentEvent.data) {
// Remove trailing newline if present
currentEvent.data = currentEvent.data.replace(/\n$/, '');
events.push(currentEvent as SSEvent);
currentEvent = {};
completeEventCount = i + 1;
}
continue;
}
if (!line) continue;
// Parse field: value format
const colonIndex = line.indexOf(':');
if (colonIndex === -1) continue;
const field = line.slice(0, colonIndex);
// Skip initial space after colon if present
const valueStartIndex =
colonIndex + 1 + (line[colonIndex + 1] === ' ' ? 1 : 0);
const value = line.slice(valueStartIndex);
if (field === 'data') {
currentEvent.data = currentEvent.data
? currentEvent.data + '\n' + value
: value;
} else if (field === 'event') {
currentEvent.event = value;
} else if (field === 'id') {
currentEvent.id = value;
} else if (field === 'retry') {
const retryMs = parseInt(value, 10);
if (!isNaN(retryMs)) {
currentEvent.retry = retryMs;
}
}
}
// Store the remainder of the buffer for the next chunk
const remainder = lines.slice(completeEventCount).join('\n');
this.messageBuffer = this.textEncoder.encode(remainder);
return events;
}
/**
* Override the onMessage callback.
*
* @param onMessage The callback to set.
*/
public setOnMessage(onMessage: (event: SSEvent) => void): void {
this.options.onMessage = onMessage;
}
/**
* Closes the SSE connection and cleans up event listeners.
*/
public close(): void {
// Clean up everything including the visibility handler
this.controller.abort();
// Remove the visibility handler (This is only required on browsers)
if (this.visibilityChangeHandler && typeof document !== 'undefined') {
document.removeEventListener(
'visibilitychange',
this.visibilityChangeHandler,
);
this.visibilityChangeHandler = null;
}
}
/**
* Checks if the client is currently connected.
* @returns Whether the client is connected
*/
public isConnected(): boolean {
return this.connected;
}
/**
* Will handle thrown errors from the callback and call the onError callback.
* This is to avoid the sse-session from disconnecting from errors that are not a result of the sse-session itself.
*
* @param callback The callback to handle.
* @param args The arguments to pass to the callback.
*/
private handleCallback<T extends (...args: Parameters<T>) => ReturnType<T>>(
callback: T,
...args: Parameters<T>
): ReturnType<T> | undefined {
try {
return callback(...args);
} catch (error) {
try {
this.options.onError(error);
} catch (error) {
console.log(`SSE Session: onError callback error:`, error);
}
}
}
}
/**
* Options returned from the onReconnect callback.
* Extends RequestInit with an optional URL override for reconnection.
*/
export interface ReconnectOptions extends RequestInit {
/**
* Optional URL override for the reconnection.
* If provided, the SSE session will connect to this URL instead of the original.
*/
url?: string;
}
/**
* Configuration options for the SSESession.
*/
export interface SSESessionOptions {
/**
* The fetch function to use.
*
* NOTE: This is compatible with Browser/Node's native "fetcH" function.
* We use this in place of "typeof fetch" so that we can accept non-standard URLs ("url" is a "string" here).
* For example, a LibP2P adapter might not use a standardized URL format (and might only include "path").
* This would cause a type error as native fetch expects type "URL".
*/
fetch: (url: string, options: RequestInit) => Promise<Response>;
/**
* HTTP method to use (GET or POST).
*/
method: 'GET' | 'POST';
/**
* HTTP headers to send with the request.
*/
headers?: Record<string, string>;
/**
* Body to send with POST requests.
*/
body?: string | FormData | URLSearchParams;
/**
* Called when the connection is established.
*/
onConnected: () => void;
/**
* Called when a message is received.
*/
onMessage: (event: SSEvent) => void;
/**
* Called when an error occurs.
*/
onError: (error: unknown) => void;
/**
* Called when the connection is closed.
*/
onDisconnected: (error: unknown) => void;
/**
* Called when the connection is going to try to reconnect.
* Can return modified request options including an optional URL override.
*/
onReconnect: (options: RequestInit) => Promise<ReconnectOptions>;
/**
* Whether to attempt to reconnect.
*/
attemptReconnect: boolean;
/**
* The delay in milliseconds between reconnection attempts.
*/
retryDelay: number;
/**
* Whether to reconnect when the session is terminated by the server.
*/
persistent: boolean;
}
/**
* Represents a Server-Sent Event.
*/
export interface SSEvent {
/**
* Event data.
*/
data: string;
/**
* Event type.
*/
event?: string;
/**
* Event ID.
*/
id?: string;
/**
* Reconnection time in milliseconds.
*/
retry?: number;
}