import type { FastifyReply, FastifyRequest } from "fastify"; import debug, { type Debugger } from "debug"; /** * Represents an event stored in the history buffer. * Used for replaying missed events to reconnecting clients. */ interface HistoricalEvent { /** The event topic/type (e.g., 'invitation-created', 'invitation-updated') */ topic: string; /** The event payload data */ data: unknown; /** Unix timestamp in milliseconds when the event was created */ timestamp: number; } /** * Options for configuring the SSE service. */ interface SSEOptions { /** Maximum age of events to keep in history (in milliseconds). Default: 5 minutes */ maxHistoryAge?: number; /** Maximum number of events to keep per user. Default: 1000 */ maxHistorySize?: number; } /** * Server-Sent Events broadcaster with event history support. * * Maintains a per-user event history buffer that allows clients to replay * missed events when reconnecting. This makes reconnections robust against * network interruptions. */ export class SSEBroadcaster { /** * Factory method to create and start an SSE broadcaster. * @param options - Configuration options for the SSE service * @returns A started SSE instance */ static async from(options?: SSEOptions) { const broadcaster = new SSEBroadcaster(options); await broadcaster.start(); return broadcaster; } /** Map of Invitation IDs to their connected SSE response streams */ private clients: Map> = new Map(); /** Map of Invitation IDs to their event history buffers */ private eventHistory: Map = new Map(); /** Maximum age of events to keep in history (in milliseconds) */ private maxHistoryAge: number; /** Maximum number of events to keep per user */ private maxHistorySize: number; private debug: Debugger; constructor(options?: SSEOptions) { this.clients = new Map(); this.eventHistory = new Map(); this.maxHistoryAge = options?.maxHistoryAge ?? 20 * 60 * 1000; // 20 minutes default this.maxHistorySize = options?.maxHistorySize ?? 1000; // 1000 events default this.debug = debug("xo:sse"); } /** * Starts the SSE broadcaster. * @returns The SSE instance for chaining */ start() { this.debug( "SSE broadcaster is running (maxHistoryAge: %dms, maxHistorySize: %d)", this.maxHistoryAge, this.maxHistorySize, ); return this; } /** * Sends an event to a client. * * @param client - The client to send the event to * @param topic - The event topic/type * @param data - The event payload data */ static sendEvent(client: FastifyReply, topic: string, data: unknown) { const timestamp = Date.now(); client.raw.write(`id: ${timestamp}\n`); client.raw.write(`event: ${topic}\n`); client.raw.write(`data: ${JSON.stringify(data)}\n\n`); } /** * Sends an event to a client. * * @param client - The client to send the event to * @param topic - The event topic/type * @param data - The event payload data */ sendEvent(client: FastifyReply, topic: string, data: unknown) { try { SSEBroadcaster.sendEvent(client, topic, data); } catch (error) { this.debug("Error sending event to client", error); } } /** * Broadcasts an event to all connected clients for a user and stores it in history. * * @param clientId - The user ID to broadcast to * @param topic - The event topic/type * @param data - The event payload data */ async broadcast(clientId: string, topic: string, data: unknown) { const timestamp = Date.now(); // Store the event in history for potential replay this.storeEvent(clientId, topic, data, timestamp); // Broadcast to all connected clients this.clients.get(clientId)?.forEach((client: FastifyReply) => { try { this.sendEvent(client, topic, data); } catch (error) { this.debug("Error sending event to client", error); } }); this.debug("SSE broadcasted message", topic, data); } /** * Subscribes a client to receive SSE events. * * If lastEventTime is provided, all events that occurred after that timestamp * will be replayed to the client before starting the live stream. * * @param req - The authenticated request containing the user ID * @param res - The Express response object to use for SSE streaming * @param lastEventTime - Optional timestamp to replay events from (in milliseconds) */ async subscribe( req: FastifyRequest, res: FastifyReply, lastEventTime?: number, ) { // Get the invitation ID from the request const { invitationIdentifier } = req.query as { invitationIdentifier?: string; }; if (!invitationIdentifier) { throw new Error("Invitation Identifier is required"); } // Initialize client set for this user if needed if (!this.clients.has(invitationIdentifier)) { this.clients.set(invitationIdentifier, new Set()); } // Manually include the CORS header since `writeHead` bypasses the auto-injection by the @fastify/cors plugin. // This statement grabs the CORS header that the CORS plugin would have added to the response, configured in the HTTP service. const corsHeader = res.getHeader("access-control-allow-origin"); // Disable timeout for the connection. Without this, the connection will drop and the client will have to reconnect. req.raw.socket.setTimeout(0); // Set up SSE headers // Set headers for SSE res.raw.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", "Access-Control-Allow-Origin": corsHeader, }); // Force the headers to be dispatched to the client. // NOTE: This is very important: A `fetch` call will NOT resolve until it has received the headers. // And Fastify, unless otherwise specified, will not send the headers until it sends the body. res.raw.flushHeaders(); // Set retry interval for automatic reconnection res.raw.write("retry: 3000\n\n"); // Replay missed events if a lastEventTime was provided if (lastEventTime !== undefined) { const missedEvents = this.getEventsAfter( invitationIdentifier, lastEventTime, ); this.debug( "SSE replaying %d missed events for invitation %s (since %d)", missedEvents.length, invitationIdentifier, lastEventTime, ); for (const event of missedEvents) { try { await this.sendEvent(res, event.topic, event.data); } catch (error) { this.debug("Error sending event to client", error); } } } // Add client to the set for live updates this.clients.get(invitationIdentifier)?.add(res); this.debug( "SSE subscribed to client (invitationId: %s, lastEventTime: %s)", invitationIdentifier, lastEventTime ?? "none", ); // Clean up when client disconnects res.raw.on("close", () => { this.clients.get(invitationIdentifier)?.delete(res); this.debug( "SSE client disconnected (invitationIdentifier: %s)", invitationIdentifier, ); }); } /** * Stores an event in the user's history buffer. * Automatically prunes old events based on age and size limits. * * @param invitationId - The invitation ID to store the event for * @param topic - The event topic/type * @param data - The event payload data * @param timestamp - The event timestamp */ private storeEvent( invitationId: string, topic: string, data: unknown, timestamp: number, ) { // Initialize history array for this user if needed if (!this.eventHistory.has(invitationId)) { this.eventHistory.set(invitationId, []); } const history = this.eventHistory.get(invitationId)!; // Add the new event history.push({ topic, data, timestamp }); // Prune old events this.pruneHistory(invitationId, timestamp); } /** * Removes old events from a invitation's history based on age and size limits. * * @param invitationId - The invitation ID whose history to prune * @param currentTime - The current timestamp for age calculations */ private pruneHistory(invitationId: string, currentTime: number) { const history = this.eventHistory.get(invitationId); if (!history) return; const cutoffTime = currentTime - this.maxHistoryAge; // Remove events older than maxHistoryAge const prunedByAge = history.filter((event) => event.timestamp > cutoffTime); // If still over size limit, remove oldest events const prunedBySize = prunedByAge.length > this.maxHistorySize ? prunedByAge.slice(-this.maxHistorySize) : prunedByAge; this.eventHistory.set(invitationId, prunedBySize); } /** * Retrieves all events for a user that occurred after a given timestamp. * * @param invitationId - The invitation ID to get events for * @param afterTimestamp - The timestamp to get events after (exclusive) * @returns Array of events that occurred after the timestamp */ private getEventsAfter( invitationId: string, afterTimestamp: number, ): HistoricalEvent[] { const history = this.eventHistory.get(invitationId); if (!history) return []; // First prune old events to ensure we don't return stale data this.pruneHistory(invitationId, Date.now()); return history.filter((event) => event.timestamp > afterTimestamp); } }