Initial Commit

This commit is contained in:
2026-05-11 13:36:12 +00:00
commit 036512d580
11 changed files with 2112 additions and 0 deletions

138
src/services/http-router.ts Normal file
View File

@@ -0,0 +1,138 @@
import debug from "debug";
import fastify, { type FastifyInstance, type RouteOptions } from "fastify";
import cors from "@fastify/cors";
import { z } from "zod";
import {
decodeExtendedJsonObject,
encodeExtendedJsonObject,
} from "../utils/ext-json";
// Interface to add to our route classes so that we can register them.
// NOTE: I hate this pattern. But ExpressJS is odd in that it is structured as a singleton that still needs registration.
export interface APIRoutes {
getRoutes(): Promise<Array<RouteOptions>>;
}
export class HTTPService {
private debug: debug.Debugger;
private server: FastifyInstance;
constructor(
private routes: Array<APIRoutes> = [],
private port: number = 3000,
private host: string = "0.0.0.0",
) {
this.debug = debug("xo:http-router");
this.server = fastify({
logger: false,
});
}
async start(): Promise<void> {
this.debug(`Starting on http://${this.host}:${this.port}`);
// Setup ExtJSON handling.
this.handleExtJSON();
// Setup Error Handling (to give more verbose Zod errors)
this.handleErrors();
// Allow CORS requests. This allows requests from any origin/domain.
// Capacitor apps (like XO Wallet) use localhost:3000 as the origin, making it difficult to meaningfully restrict requests by origin for security.
await this.server.register(cors);
// Register your routes here before starting the server
this.server.get("/health", async () => {
return { status: "ok" };
});
// Register each route.
for (const routes of this.routes) {
for (const routeOptions of await routes.getRoutes()) {
this.server.route(routeOptions);
}
}
await this.server.ready();
await this.server.listen({
port: this.port,
host: this.host,
});
this.debug(`Started on http://${this.host}:${this.port}`);
}
// Helper method to access the server instance
getInstance(): FastifyInstance {
return this.server;
}
private handleErrors() {
// Customize our error handler to give better errors.
// NOTE: This will nicely format the Zod validation errors.
this.server.setErrorHandler((error, _request, reply) => {
if (error instanceof z.ZodError) {
const formattedErrors = error.issues.map((issue) => ({
path: issue.path.join("."),
message: issue.message,
}));
this.debug(`Error: ${error}`);
return reply.status(400).send({
statusCode: 400,
error: "Validation Error",
details: formattedErrors,
});
}
this.debug(`Error: ${error}`);
// Handle other types of errors
reply.status(500).send({ error: "Internal Server Error" });
});
}
private handleExtJSON() {
// Add onRequest hook to decode requests from ExtJSON
this.server.addHook("onRequest", async (request, _reply) => {
this.debug(`Request: ${JSON.stringify(request.body)}`);
this.debug(`Request URL: ${request.method} ${request.url}`);
// Only transform JSON requests
if (
request.headers["content-type"]?.includes("application/json") &&
request.body
) {
try {
// Decode ExtJSON body
request.body = decodeExtendedJsonObject(request.body);
} catch (error) {
request.log.error(
{
err: error,
body: request.body,
},
"Failed to decode ExtJSON request body",
);
throw new Error("Invalid JSON in request body");
}
}
});
// Add onSend hook to encode responses to ExtJSON
this.server.addHook("onSend", async (_request, reply, payload) => {
// Only transform JSON responses
if (
reply.getHeader("content-type")?.toString().includes("application/json")
) {
// If payload is a string (already serialized), parse it first
const data =
typeof payload === "string" ? JSON.parse(payload) : payload;
return JSON.stringify(encodeExtendedJsonObject(data));
}
return payload;
});
}
}

View File

@@ -0,0 +1,46 @@
import type { XOInvitation, XOInvitationCommit } from '@xo-cash/types';
export class InvitationStore {
private readonly invitations: Map<string, XOInvitation> = new Map();
constructor() {
this.invitations = new Map();
}
async getInvitation(invitationIdentifier: string): Promise<XOInvitation | undefined> {
return this.invitations.get(invitationIdentifier);
}
async storeInvitation(invitation: XOInvitation): Promise<void> {
const invitationIdentifier = invitation.invitationIdentifier;
this.invitations.set(invitationIdentifier, invitation);
}
async deleteInvitation(invitationIdentifier: string): Promise<void> {
this.invitations.delete(invitationIdentifier);
}
/**
* TODO: This should maybe merge? I dont know. Currently, setting is not the best idea
* @param invitation
*/
async updateInvitation(id: string, commit: XOInvitationCommit): Promise<XOInvitation> {
// Get the invitation identifier
const invitation = await this.getInvitation(id);
if (!invitation) {
throw new Error(`Invitation not found: ${id}`);
}
// If the commit already exists, return the invitation
if (invitation.commits.some(c => c.commitIdentifier === commit.commitIdentifier)) {
return invitation;
}
// Update the invitation with the commit
invitation.commits.push(commit);
this.invitations.set(id, invitation);
return invitation;
}
}

View File

@@ -0,0 +1,270 @@
import type { FastifyReply, FastifyRequest } from "fastify";
import debug, { type Debugger } from "debug";
/**
* Represents an event stored in the history buffer.
* Used for replaying missed events to reconnecting clients.
*/
interface HistoricalEvent {
/** The event topic/type (e.g., 'invitation-created', 'invitation-updated') */
topic: string;
/** The event payload data */
data: unknown;
/** Unix timestamp in milliseconds when the event was created */
timestamp: number;
}
/**
* Options for configuring the SSE service.
*/
interface SSEOptions {
/** Maximum age of events to keep in history (in milliseconds). Default: 5 minutes */
maxHistoryAge?: number;
/** Maximum number of events to keep per user. Default: 1000 */
maxHistorySize?: number;
}
/**
* Server-Sent Events broadcaster with event history support.
*
* Maintains a per-user event history buffer that allows clients to replay
* missed events when reconnecting. This makes reconnections robust against
* network interruptions.
*/
export class SSEBroadcaster {
/**
* Factory method to create and start an SSE broadcaster.
* @param options - Configuration options for the SSE service
* @returns A started SSE instance
*/
static async from(options?: SSEOptions) {
const broadcaster = new SSEBroadcaster(options);
await broadcaster.start();
return broadcaster;
}
/** Map of Invitation IDs to their connected SSE response streams */
private clients: Map<(string), Set<FastifyReply>> = new Map();
/** Map of Invitation IDs to their event history buffers */
private eventHistory: Map<string, HistoricalEvent[]> = new Map();
/** Maximum age of events to keep in history (in milliseconds) */
private maxHistoryAge: number;
/** Maximum number of events to keep per user */
private maxHistorySize: number;
private debug: Debugger;
constructor(options?: SSEOptions) {
this.clients = new Map();
this.eventHistory = new Map();
this.maxHistoryAge = options?.maxHistoryAge ?? 20 * 60 * 1000; // 20 minutes default
this.maxHistorySize = options?.maxHistorySize ?? 1000; // 1000 events default
this.debug = debug('xo:sse');
}
/**
* Starts the SSE broadcaster.
* @returns The SSE instance for chaining
*/
start() {
this.debug('SSE broadcaster is running (maxHistoryAge: %dms, maxHistorySize: %d)',
this.maxHistoryAge, this.maxHistorySize);
return this;
}
/**
* Sends an event to a client.
*
* @param client - The client to send the event to
* @param topic - The event topic/type
* @param data - The event payload data
*/
static sendEvent(client: FastifyReply, topic: string, data: unknown) {
const timestamp = Date.now();
client.raw.write(`id: ${timestamp}\n`);
client.raw.write(`event: ${topic}\n`);
client.raw.write(`data: ${JSON.stringify(data)}\n\n`);
}
/**
* Sends an event to a client.
*
* @param client - The client to send the event to
* @param topic - The event topic/type
* @param data - The event payload data
*/
sendEvent(client: FastifyReply, topic: string, data: unknown) {
try {
SSEBroadcaster.sendEvent(client, topic, data);
} catch (error) {
this.debug('Error sending event to client', error);
}
}
/**
* Broadcasts an event to all connected clients for a user and stores it in history.
*
* @param clientId - The user ID to broadcast to
* @param topic - The event topic/type
* @param data - The event payload data
*/
async broadcast(clientId: string, topic: string, data: unknown) {
const timestamp = Date.now();
// Store the event in history for potential replay
this.storeEvent(clientId, topic, data, timestamp);
// Broadcast to all connected clients
this.clients.get(clientId)?.forEach((client: FastifyReply) => {
try {
this.sendEvent(client, topic, data);
} catch (error) {
this.debug('Error sending event to client', error);
}
});
this.debug('SSE broadcasted message', topic, data);
}
/**
* Subscribes a client to receive SSE events.
*
* If lastEventTime is provided, all events that occurred after that timestamp
* will be replayed to the client before starting the live stream.
*
* @param req - The authenticated request containing the user ID
* @param res - The Express response object to use for SSE streaming
* @param lastEventTime - Optional timestamp to replay events from (in milliseconds)
*/
async subscribe(req: FastifyRequest, res: FastifyReply, lastEventTime?: number) {
// Get the invitation ID from the request
const { invitationIdentifier } = req.query as { invitationIdentifier?: string };
if (!invitationIdentifier) {
throw new Error('Invitation Identifier is required');
}
// Initialize client set for this user if needed
if (!this.clients.has(invitationIdentifier)) {
this.clients.set(invitationIdentifier, new Set());
}
// Manually include the CORS header since `writeHead` bypasses the auto-injection by the @fastify/cors plugin.
// This statement grabs the CORS header that the CORS plugin would have added to the response, configured in the HTTP service.
const corsHeader = res.getHeader("access-control-allow-origin");
// Disable timeout for the connection. Without this, the connection will drop and the client will have to reconnect.
req.raw.socket.setTimeout(0);
// Set up SSE headers
// Set headers for SSE
res.raw.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"Access-Control-Allow-Origin": corsHeader,
});
// Force the headers to be dispatched to the client.
// NOTE: This is very important: A `fetch` call will NOT resolve until it has received the headers.
// And Fastify, unless otherwise specified, will not send the headers until it sends the body.
res.raw.flushHeaders();
// Set retry interval for automatic reconnection
res.raw.write('retry: 3000\n\n');
// Replay missed events if a lastEventTime was provided
if (lastEventTime !== undefined) {
const missedEvents = this.getEventsAfter(invitationIdentifier, lastEventTime);
this.debug('SSE replaying %d missed events for invitation %s (since %d)',
missedEvents.length, invitationIdentifier, lastEventTime);
for (const event of missedEvents) {
try {
await this.sendEvent(res, event.topic, event.data);
} catch (error) {
this.debug('Error sending event to client', error);
}
}
}
// Add client to the set for live updates
this.clients.get(invitationIdentifier)?.add(res);
this.debug('SSE subscribed to client (invitationId: %s, lastEventTime: %s)',
invitationIdentifier, lastEventTime ?? 'none');
// Clean up when client disconnects
res.raw.on('close', () => {
this.clients.get(invitationIdentifier)?.delete(res);
this.debug('SSE client disconnected (invitationIdentifier: %s)', invitationIdentifier);
});
}
/**
* Stores an event in the user's history buffer.
* Automatically prunes old events based on age and size limits.
*
* @param invitationId - The invitation ID to store the event for
* @param topic - The event topic/type
* @param data - The event payload data
* @param timestamp - The event timestamp
*/
private storeEvent(invitationId: string, topic: string, data: unknown, timestamp: number) {
// Initialize history array for this user if needed
if (!this.eventHistory.has(invitationId)) {
this.eventHistory.set(invitationId, []);
}
const history = this.eventHistory.get(invitationId)!;
// Add the new event
history.push({ topic, data, timestamp });
// Prune old events
this.pruneHistory(invitationId, timestamp);
}
/**
* Removes old events from a invitation's history based on age and size limits.
*
* @param invitationId - The invitation ID whose history to prune
* @param currentTime - The current timestamp for age calculations
*/
private pruneHistory(invitationId: string, currentTime: number) {
const history = this.eventHistory.get(invitationId);
if (!history) return;
const cutoffTime = currentTime - this.maxHistoryAge;
// Remove events older than maxHistoryAge
const prunedByAge = history.filter(event => event.timestamp > cutoffTime);
// If still over size limit, remove oldest events
const prunedBySize = prunedByAge.length > this.maxHistorySize
? prunedByAge.slice(-this.maxHistorySize)
: prunedByAge;
this.eventHistory.set(invitationId, prunedBySize);
}
/**
* Retrieves all events for a user that occurred after a given timestamp.
*
* @param invitationId - The invitation ID to get events for
* @param afterTimestamp - The timestamp to get events after (exclusive)
* @returns Array of events that occurred after the timestamp
*/
private getEventsAfter(invitationId: string, afterTimestamp: number): HistoricalEvent[] {
const history = this.eventHistory.get(invitationId);
if (!history) return [];
// First prune old events to ensure we don't return stale data
this.pruneHistory(invitationId, Date.now());
return history.filter(event => event.timestamp > afterTimestamp);
}
}