Initial Commit

This commit is contained in:
2026-01-29 07:13:33 +00:00
commit 399e93f714
34 changed files with 7663 additions and 0 deletions

View File

@@ -0,0 +1,155 @@
/**
* Exponential backoff is a technique used to retry a function after a delay.
*
* The delay increases exponentially with each attempt, up to a maximum delay.
*
* The jitter is a random amount of time added to the delay to prevent thundering herd problems.
*
* The growth rate is the factor by which the delay increases with each attempt.
*/
export class ExponentialBackoff {
/**
* Create a new ExponentialBackoff instance
*
* @param config - The configuration for the exponential backoff
* @returns The ExponentialBackoff instance
*/
static from(config?: Partial<ExponentialBackoffOptions>): ExponentialBackoff {
const backoff = new ExponentialBackoff(config);
return backoff;
}
/**
* Run the function with exponential backoff
*
* @param fn - The function to run
* @param onError - The callback to call when an error occurs
* @param options - The configuration for the exponential backoff
*
* @throws The last error if the function fails and we have hit the max attempts
*
* @returns The result of the function
*/
static run<T>(
fn: () => Promise<T>,
onError = (_error: Error) => {},
options?: Partial<ExponentialBackoffOptions>,
): Promise<T> {
const backoff = ExponentialBackoff.from(options);
return backoff.run(fn, onError);
}
private readonly options: ExponentialBackoffOptions;
constructor(options?: Partial<ExponentialBackoffOptions>) {
this.options = {
maxDelay: 10000,
maxAttempts: 10,
baseDelay: 1000,
growthRate: 2,
jitter: 0.1,
...options,
};
}
/**
* Run the function with exponential backoff
*
* If the function fails but we have not hit the max attempts, the error will be passed to the onError callback
* and the function will be retried with an exponential delay
*
* If the function fails and we have hit the max attempts, the last error will be thrown
*
* @param fn - The function to run
* @param onError - The callback to call when an error occurs
*
* @throws The last error if the function fails and we have hit the max attempts
*
* @returns The result of the function
*/
async run<T>(
fn: () => Promise<T>,
onError = (_error: Error) => {},
): Promise<T> {
let lastError: Error = new Error('Exponential backoff: Max retries hit');
let attempt = 0;
while (
attempt < this.options.maxAttempts ||
this.options.maxAttempts == 0
) {
try {
return await fn();
} catch (error) {
// Store the error in case we fail every attempt
lastError = error instanceof Error ? error : new Error(`${error}`);
onError(lastError);
// Wait before going to the next attempt
const delay = this.calculateDelay(attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
}
attempt++;
}
// We completed the loop without ever succeeding. Throw the last error we got
throw lastError;
}
/**
* Calculate the delay before we should attempt to retry
*
* NOTE: The maximum delay is (maxDelay * (1 + jitter))
*
* @param attempt
* @returns The time in milliseconds before another attempt should be made
*/
private calculateDelay(attempt: number): number {
// Get the power of the growth rate
const power = Math.pow(this.options.growthRate, attempt);
// Get the delay before jitter or limit
const rawDelay = this.options.baseDelay * power;
// Cap the delay to the maximum. Do this before the jitter so jitter does not become larger than delay
const cappedDelay = Math.min(rawDelay, this.options.maxDelay);
// Get the jitter direction. This will be between -1 and 1
const jitterDirection = 2 * Math.random() - 1;
// Calculate the jitter
const jitter = jitterDirection * this.options.jitter * cappedDelay;
// Add the jitter to the delay
return cappedDelay + jitter;
}
}
export type ExponentialBackoffOptions = {
/**
* The maximum delay between attempts in milliseconds
*/
maxDelay: number;
/**
* The maximum number of attempts. Passing 0 will result in infinite attempts.
*/
maxAttempts: number;
/**
* The base delay between attempts in milliseconds
*/
baseDelay: number;
/**
* The growth rate of the delay
*/
growthRate: number;
/**
* The jitter of the delay as a percentage of growthRate
*/
jitter: number;
};

158
src/utils/ext-json.ts Normal file
View File

@@ -0,0 +1,158 @@
/**
* Extended JSON encoding/decoding utilities.
* Handles BigInt and Uint8Array serialization for communication with sync-server.
*
* TODO: These are intended as temporary stand-ins until this functionality has been implemented directly in LibAuth.
* We are doing this so that we may better standardize with the rest of the BCH eco-system in future.
* See: https://github.com/bitauth/libauth/pull/108
*/
import { binToHex, hexToBin } from '@bitauth/libauth';
/**
* Replaces BigInt and Uint8Array values with their ExtJSON string representations.
* @param value - The value to potentially replace
* @returns The replaced value as an ExtJSON string, or the original value
*/
export const extendedJsonReplacer = function (value: unknown): unknown {
if (typeof value === 'bigint') {
return `<bigint: ${value.toString()}n>`;
} else if (value instanceof Uint8Array) {
return `<Uint8Array: ${binToHex(value)}>`;
}
return value;
};
/**
* Revives ExtJSON string representations back to their original types.
* @param value - The value to potentially revive
* @returns The revived value (BigInt or Uint8Array), or the original value
*/
export const extendedJsonReviver = function (value: unknown): unknown {
// Define RegEx that matches our Extended JSON fields.
const bigIntRegex = /^<bigint: (?<bigint>[+-]?[0-9]*)n>$/;
const uint8ArrayRegex = /^<Uint8Array: (?<hex>[a-f0-9]*)>$/;
// Only perform a check if the value is a string.
// NOTE: We can skip all other values as all Extended JSON encoded fields WILL be a string.
if (typeof value === 'string') {
// Check if this value matches an Extended JSON encoded bigint.
const bigintMatch = value.match(bigIntRegex);
if (bigintMatch) {
// Access the named group directly instead of using array indices
const { bigint } = bigintMatch.groups!;
// Return the value casted to bigint.
return BigInt(bigint!);
}
const uint8ArrayMatch = value.match(uint8ArrayRegex);
if (uint8ArrayMatch) {
// Access the named group directly instead of using array indices
const { hex } = uint8ArrayMatch.groups!;
// Return the value casted to Uint8Array.
return hexToBin(hex!);
}
}
// Return the original value.
return value;
};
/**
* Recursively encodes an object to ExtJSON format.
* @param value - The value to encode
* @returns The ExtJSON encoded value
*/
export const encodeExtendedJsonObject = function (value: unknown): unknown {
// If this is an object type (and it is not null - which is technically an "object")...
// ... and it is not an ArrayBuffer (e.g. Uint8Array) which is also technically an "object...
if (
typeof value === 'object' &&
value !== null &&
!ArrayBuffer.isView(value)
) {
// If this is an array, recursively call this function on each value.
if (Array.isArray(value)) {
return value.map(encodeExtendedJsonObject);
}
// Declare object to store extended JSON entries.
const encodedObject: Record<string, unknown> = {};
// Iterate through each entry and encode it to extended JSON.
for (const [key, valueToEncode] of Object.entries(value as Record<string, unknown>)) {
encodedObject[key] = encodeExtendedJsonObject(valueToEncode);
}
// Return the extended JSON encoded object.
return encodedObject;
}
// Return the replaced value.
return extendedJsonReplacer(value);
};
/**
* Recursively decodes an ExtJSON object back to its original types.
* @param value - The ExtJSON value to decode
* @returns The decoded value with BigInt and Uint8Array restored
*/
export const decodeExtendedJsonObject = function (value: unknown): unknown {
// If this is an object type (and it is not null - which is technically an "object")...
// ... and it is not an ArrayBuffer (e.g. Uint8Array) which is also technically an "object...
if (
typeof value === 'object' &&
value !== null &&
!ArrayBuffer.isView(value)
) {
// If this is an array, recursively call this function on each value.
if (Array.isArray(value)) {
return value.map(decodeExtendedJsonObject);
}
// Declare object to store decoded JSON entries.
const decodedObject: Record<string, unknown> = {};
// Iterate through each entry and decode it from extended JSON.
for (const [key, valueToEncode] of Object.entries(value as Record<string, unknown>)) {
decodedObject[key] = decodeExtendedJsonObject(valueToEncode);
}
// Return the extended JSON encoded object.
return decodedObject;
}
// Return the revived value.
return extendedJsonReviver(value);
};
/**
* Encodes a value to an ExtJSON string.
* @param value - The value to encode
* @param space - Optional spacing for pretty printing
* @returns The ExtJSON encoded string
*/
export const encodeExtendedJson = function (
value: unknown,
space: number | undefined = undefined,
): string {
const replacedObject = encodeExtendedJsonObject(value);
const stringifiedObject = JSON.stringify(replacedObject, null, space);
return stringifiedObject;
};
/**
* Decodes an ExtJSON string back to its original value.
* @param json - The ExtJSON string to decode
* @returns The decoded value with BigInt and Uint8Array restored
*/
export const decodeExtendedJson = function (json: string): unknown {
const parsedObject = JSON.parse(json) as unknown;
const revivedObject = decodeExtendedJsonObject(parsedObject);
return revivedObject;
};

427
src/utils/sse-client.ts Normal file
View File

@@ -0,0 +1,427 @@
import { ExponentialBackoff } from './exponential-backoff.js';
// Type declarations for browser environment (not available in Node.js)
declare const document: {
visibilityState: 'visible' | 'hidden';
addEventListener: (event: string, handler: (event: Event) => void) => void;
removeEventListener: (event: string, handler: (event: Event) => void) => void;
} | undefined;
/**
* A Server-Sent Events client implementation using fetch API.
* Supports custom headers, POST requests, and is non-blocking.
*/
export class SSESession {
/**
* Creates and connects a new SSESession instance.
* @param url The URL to connect to
* @param options Configuration options
* @returns A new connected SSESession instance
*/
public static async from(
url: string,
options: Partial<SSESessionOptions> = {},
): Promise<SSESession> {
const client = new SSESession(url, options);
await client.connect();
return client;
}
// State.
private url: string;
private controller: AbortController;
private connected: boolean = false;
protected options: SSESessionOptions;
protected messageBuffer: Uint8Array = new Uint8Array();
// Listener for when the tab is hidden or shown.
private visibilityChangeHandler: ((event: Event) => void) | null = null;
// Text decoders and encoders for parsing the message buffer.
private textDecoder: TextDecoder = new TextDecoder();
private textEncoder: TextEncoder = new TextEncoder();
/**
* Creates a new SSESession instance.
* @param url The URL to connect to
* @param options Configuration options
*/
constructor(url: string, options: Partial<SSESessionOptions> = {}) {
this.url = url;
this.options = {
// Use default fetch function.
fetch: (...args) => fetch(...args),
method: 'GET',
headers: {
Accept: 'text/event-stream',
'Cache-Control': 'no-cache',
},
onConnected: () => {},
onMessage: () => {},
onError: (error) => console.error('SSESession error:', error),
onDisconnected: () => {},
onReconnect: (options) => Promise.resolve(options),
// Reconnection options
attemptReconnect: true,
retryDelay: 1000,
persistent: false,
...options,
};
this.controller = new AbortController();
// Set up visibility change handling if in mobile browser environment
if (typeof document !== 'undefined') {
this.visibilityChangeHandler = this.handleVisibilityChange.bind(this);
document.addEventListener(
'visibilitychange',
this.visibilityChangeHandler,
);
}
}
/**
* Handles visibility change events in the browser.
*/
private async handleVisibilityChange(): Promise<void> {
// Guard for Node.js environment where document is undefined
if (typeof document === 'undefined') return;
// When going to background, close the current connection cleanly
// This allows us to reconnect mobile devices when they come back after leaving the tab or browser app.
if (document.visibilityState === 'hidden') {
this.controller.abort();
}
// When coming back to foreground, attempt to reconnect if not connected
if (document.visibilityState === 'visible' && !this.connected) {
await this.connect();
}
}
/**
* Connects to the SSE endpoint.
*/
public async connect(): Promise<void> {
if (this.connected) return;
this.connected = true;
this.controller = new AbortController();
const { method, headers, body } = this.options;
const fetchOptions: RequestInit = {
method,
headers: headers || {},
body: body || null,
signal: this.controller.signal,
cache: 'no-store',
};
const exponentialBackoff = ExponentialBackoff.from({
baseDelay: this.options.retryDelay,
maxDelay: 10000,
maxAttempts: 0,
growthRate: 1.3,
jitter: 0.3,
});
// Establish the connection and get the reader using the exponential backoff
const reader = await exponentialBackoff.run(async () => {
const reconnectOptions = await this.handleCallback(
this.options.onReconnect,
fetchOptions,
);
const updatedFetchOptions = {
...fetchOptions,
...reconnectOptions,
};
const res = await this.options.fetch(this.url, updatedFetchOptions);
if (!res.ok) {
throw new Error(`HTTP error! Status: ${res.status}`);
}
if (!res.body) {
throw new Error('Response body is null');
}
return res.body.getReader();
});
// Call the onConnected callback
this.handleCallback(this.options.onConnected);
const readStream = async () => {
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
this.connected = false;
// Call the onDisconnected callback.
this.handleCallback(this.options.onDisconnected, undefined);
// If the connection was closed by the server, we want to attempt a reconnect if the connection should be persistent.
if (this.options.persistent) {
await this.connect();
}
break;
}
const events = this.parseEvents(value);
for (const event of events) {
if (this.options.onMessage) {
this.handleCallback(this.options.onMessage, event);
}
}
}
} catch (error) {
this.connected = false;
// Call the onDisconnected callback.
this.handleCallback(this.options.onDisconnected, error);
// If the connection was aborted using the controller, we don't need to call onError.
if (this.controller.signal.aborted) {
return;
}
// Call the onError callback.
// NOTE: we dont use the handleCallback here because it would result in 2 error callbacks.
try {
this.options.onError(error);
} catch (error) {
console.log(`SSE Session: onError callback error:`, error);
}
// Attempt to reconnect if enabled
if (this.options.attemptReconnect) {
await this.connect();
}
}
};
readStream();
return;
}
protected parseEvents(chunk: Uint8Array): SSEvent[] {
// Append new chunk to existing buffer
this.messageBuffer = new Uint8Array([...this.messageBuffer, ...chunk]);
const events: SSEvent[] = [];
const lines = this.textDecoder
.decode(this.messageBuffer)
.split(/\r\n|\r|\n/);
let currentEvent: Partial<SSEvent> = {};
let completeEventCount = 0;
// Iterate over the lines to find complete events
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
// Empty line signals the end of an event
if (line === '') {
if (currentEvent.data) {
// Remove trailing newline if present
currentEvent.data = currentEvent.data.replace(/\n$/, '');
events.push(currentEvent as SSEvent);
currentEvent = {};
completeEventCount = i + 1;
}
continue;
}
if (!line) continue;
// Parse field: value format
const colonIndex = line.indexOf(':');
if (colonIndex === -1) continue;
const field = line.slice(0, colonIndex);
// Skip initial space after colon if present
const valueStartIndex =
colonIndex + 1 + (line[colonIndex + 1] === ' ' ? 1 : 0);
const value = line.slice(valueStartIndex);
if (field === 'data') {
currentEvent.data = currentEvent.data
? currentEvent.data + '\n' + value
: value;
} else if (field === 'event') {
currentEvent.event = value;
} else if (field === 'id') {
currentEvent.id = value;
} else if (field === 'retry') {
const retryMs = parseInt(value, 10);
if (!isNaN(retryMs)) {
currentEvent.retry = retryMs;
}
}
}
// Store the remainder of the buffer for the next chunk
const remainder = lines.slice(completeEventCount).join('\n');
this.messageBuffer = this.textEncoder.encode(remainder);
return events;
}
/**
* Override the onMessage callback.
*
* @param onMessage The callback to set.
*/
public setOnMessage(onMessage: (event: SSEvent) => void): void {
this.options.onMessage = onMessage;
}
/**
* Closes the SSE connection and cleans up event listeners.
*/
public close(): void {
// Clean up everything including the visibility handler
this.controller.abort();
// Remove the visibility handler (This is only required on browsers)
if (this.visibilityChangeHandler && typeof document !== 'undefined') {
document.removeEventListener(
'visibilitychange',
this.visibilityChangeHandler,
);
this.visibilityChangeHandler = null;
}
}
/**
* Checks if the client is currently connected.
* @returns Whether the client is connected
*/
public isConnected(): boolean {
return this.connected;
}
/**
* Will handle thrown errors from the callback and call the onError callback.
* This is to avoid the sse-session from disconnecting from errors that are not a result of the sse-session itself.
*
* @param callback The callback to handle.
* @param args The arguments to pass to the callback.
*/
private handleCallback<T extends (...args: Parameters<T>) => ReturnType<T>>(
callback: T,
...args: Parameters<T>
): ReturnType<T> | undefined {
try {
return callback(...args);
} catch (error) {
try {
this.options.onError(error);
} catch (error) {
console.log(`SSE Session: onError callback error:`, error);
}
}
}
}
/**
* Configuration options for the SSESession.
*/
export interface SSESessionOptions {
/**
* The fetch function to use.
*
* NOTE: This is compatible with Browser/Node's native "fetcH" function.
* We use this in place of "typeof fetch" so that we can accept non-standard URLs ("url" is a "string" here).
* For example, a LibP2P adapter might not use a standardized URL format (and might only include "path").
* This would cause a type error as native fetch expects type "URL".
*/
fetch: (url: string, options: RequestInit) => Promise<Response>;
/**
* HTTP method to use (GET or POST).
*/
method: 'GET' | 'POST';
/**
* HTTP headers to send with the request.
*/
headers?: Record<string, string>;
/**
* Body to send with POST requests.
*/
body?: string | FormData;
/**
* Called when the connection is established.
*/
onConnected: () => void;
/**
* Called when a message is received.
*/
onMessage: (event: SSEvent) => void;
/**
* Called when an error occurs.
*/
onError: (error: unknown) => void;
/**
* Called when the connection is closed.
*/
onDisconnected: (error: unknown) => void;
/*
* Called when the connection is going to try to reconnect.
*/
onReconnect: (options: RequestInit) => Promise<RequestInit>;
/**
* Whether to attempt to reconnect.
*/
attemptReconnect: boolean;
/**
* The delay in milliseconds between reconnection attempts.
*/
retryDelay: number;
/**
* Whether to reconnect when the session is terminated by the server.
*/
persistent: boolean;
}
/**
* Represents a Server-Sent Event.
*/
export interface SSEvent {
/**
* Event data.
*/
data: string;
/**
* Event type.
*/
event?: string;
/**
* Event ID.
*/
id?: string;
/**
* Reconnection time in milliseconds.
*/
retry?: number;
}