Initial Commit

This commit is contained in:
2026-05-23 10:39:41 +02:00
commit 7890669eda
16 changed files with 3003 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
/node_modules
/data
/dist
.env

1834
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

34
package.json Normal file
View File

@@ -0,0 +1,34 @@
{
"name": "vending-machine",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"dev": "tsx watch src/index.ts",
"build": "tsc",
"start": "node dist/index.js",
"format": "prettier --write \"src/**/*.{ts,tsx,js,jsx,json,css,md}\"",
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"type": "module",
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
"@types/node": "^25.9.1",
"prettier": "^3.8.3",
"tsx": "^4.22.3",
"typescript": "^6.0.3"
},
"dependencies": {
"@fastify/cors": "^11.2.0",
"@types/debug": "^4.1.13",
"@xo-cash/engine": "file:../../engine",
"better-sqlite3": "^12.10.0",
"debug": "^4.4.3",
"fastify": "^5.8.5",
"kysely": "^0.29.2",
"zod": "^4.4.3"
}
}

47
src/index.ts Normal file
View File

@@ -0,0 +1,47 @@
import Debug from "debug";
import { Engine } from "@xo-cash/engine";
import { Config } from "./services/config.js";
import { Database } from "./services/database/database.js";
import { ItemsRoute } from "./routes/items.js";
import { OrdersRoute } from "./routes/orders.js";
import { HTTPService } from "./services/http-router.js";
type VendingMachineDeps = {
config: Config;
httpService: HTTPService;
database: Database;
engine: Engine;
}
export class VendingMachine {
static async from(config: Config) {
const debug = Debug("vending-machine");
const engine = await Engine.create(config.engine.mnemonic, { databasePath: config.engine.database.path });
const database = new Database({ config: config.database, debug });
// Create the routes
const routes = [
new ItemsRoute({ database: database, engine: engine, debug }),
new OrdersRoute({ database: database, engine: engine, debug }),
];
// Create the HTTP service, passing in the routes and config.
const httpService = new HTTPService({ routes: [], config: config.server, debug });
return new VendingMachine({ config, httpService, database, engine });
}
private constructor(private readonly deps: VendingMachineDeps) {}
public async start() {
await this.deps.httpService.start();
}
}
VendingMachine.from(Config.fromEnv()).then((vendingMachine) => {
vendingMachine.start();
});

2
src/routes/index.ts Normal file
View File

@@ -0,0 +1,2 @@
export * from './items.js';
export * from './orders.js';

73
src/routes/items.ts Normal file
View File

@@ -0,0 +1,73 @@
import type { Debugger as Debug } from 'debug';
import type { RouteOptions, FastifyRequest, FastifyReply } from 'fastify';
import type { Engine } from '@xo-cash/engine'
import type { Database } from '../services/database/database.js'
import { z } from 'zod';
export type ItemsRouteDeps = {
database: Database;
engine: Engine;
debug: Debug;
}
export class ItemsRoute {
public constructor(private readonly deps: ItemsRouteDeps) {}
public async getRoutes(): Promise<Array<RouteOptions>> {
return [
{
method: 'GET',
url: '/items',
handler: this.getItems.bind(this),
},
{
method: 'GET',
url: '/items/:id',
handler: this.getItem.bind(this),
},
]
}
/**
* Get all items from the database
* @param request
* @param reply
* @returns
*/
private async getItems(request: FastifyRequest, reply: FastifyReply) {
// Get all items from the database.
const items = await this.deps.database.db.selectFrom('items').selectAll().execute();
// Return the items.
return reply.send(items);
}
/**
* Get an item from the database by id
* @param request
* @param reply
* @returns
*/
private async getItem(request: FastifyRequest, reply: FastifyReply) {
// Parse the request parameters.
const { id } = ItemsRoute.getItemSchema.parse(request.params);
// Get the item from the database.
const item = await this.deps.database.db.selectFrom('items').where('id', '=', id).selectAll().executeTakeFirst();
// If the item is not found, return a 404 error.
if (!item) {
return reply.status(404).send({
error: 'Item not found'
});
}
// Return the item.
return reply.send(item);
}
static getItemSchema = z.object({
id: z.string(),
});
}

88
src/routes/orders.ts Normal file
View File

@@ -0,0 +1,88 @@
import type { Debugger as Debug } from 'debug';
import type { RouteOptions, FastifyRequest, FastifyReply } from 'fastify';
import type { Engine } from '@xo-cash/engine'
import type { Database } from '../services/database/database.js'
import { z } from 'zod';
export type OrdersRouteDeps = {
database: Database;
engine: Engine
debug: Debug;
}
export class OrdersRoute {
public constructor(private readonly deps: OrdersRouteDeps) {}
public async getRoutes(): Promise<Array<RouteOptions>> {
return [
{
method: 'GET',
url: '/orders',
handler: this.getOrders.bind(this),
},
{
method: 'POST',
url: '/orders',
handler: this.createOrder.bind(this),
},
]
}
private async getOrders(request: FastifyRequest, reply: FastifyReply) {
// Get all orders from the database.
const orders = await this.deps.database.db.selectFrom('orders').selectAll().execute();
// Return the orders.
return reply.send(orders);
}
private async createOrder(request: FastifyRequest, reply: FastifyReply) {
// Parse the request body.
const { items: itemsInput } = OrdersRoute.createOrderSchema.parse(request.body);
// Get the items from the database.
const items = await this.deps.database.db.selectFrom('items').where('id', 'in', itemsInput.map((item) => item.id)).selectAll().execute();
// If the items are not found, return a 404 error.
if (items.length !== items.length) {
return reply.status(404).send({
error: 'Items not found'
});
}
// TODO: Create an XO Engine Invitation with the relavent data in it so we can pass it back to the client.
// Create the order in the database.
const order = await this.deps.database.db.insertInto('orders').values({
// user_id: request.user.id,
status: 'pending',
total_price: 0,
total_quantity: 0,
items: JSON.stringify(items.map((item) => ({
id: item.id,
quantity: item.quantity,
}))),
}).execute();
// If the order is not created, return a 500 error.
if (!order) {
return reply.status(500).send({
error: 'Failed to create order'
});
}
// Return the order.
return reply.send(order);
}
/**
* Schema for creating an order.
*/
static createOrderSchema = z.object({
items: z.array(z.object({
id: z.string(),
quantity: z.number(),
})),
});
}

82
src/services/config.ts Normal file
View File

@@ -0,0 +1,82 @@
import { z } from "zod";
const configSchema = z.object({
engine: z.object({
mnemonic: z.string(),
database: z.object({
path: z.string().default("data/engine"),
}),
}),
syncServer: z.object({
url: z.string().default("http://localhost:3000"),
}),
database: z.object({
path: z.string().default("data.db"),
}),
server: z.object({
port: z.number().default(3000),
host: z.string().default("0.0.0.0"),
cors: z
.object({
origin: z.string().default("*"),
methods: z
.array(z.string())
.default(["GET", "POST", "PUT", "DELETE", "OPTIONS"]),
allowedHeaders: z
.array(z.string())
.default(["Content-Type", "Authorization"]),
})
.partial()
.prefault({}),
}),
});
type ConfigInput = z.input<typeof configSchema>;
type ConfigSchema = z.output<typeof configSchema>;
/**
* Converts an object's keys to camelCase.
* @param obj - The object to convert to camelCase.
* @returns The camelCase object.
*/
const toCamelCaseObject = (obj: Record<string, string>): Record<string, string> => {
return Object.fromEntries(Object.entries(obj).map(([key, value]) => {
const camelCaseKey = key.toLowerCase().replace(/([-_][a-z])/g, (group) => group.toUpperCase().replace("-", "").replace("_", ""));
return [camelCaseKey, value];
}));
}
/**
* The Config class is used to load and parse the configuration for the vending machine.
*/
export class Config {
static fromEnv(): Config {
// Parse through process.env, and convert the upperCase keys to camelCase.
const envConfig = toCamelCaseObject(Object(process.env));
// Parse the environment config.
return this.from(configSchema.parse(envConfig));
}
static from(config: ConfigInput): Config {
return new Config(configSchema.parse(config));
}
public get syncServer() {
return this.config.syncServer;
}
public get engine() {
return this.config.engine;
}
public get database() {
return this.config.database;
}
public get server() {
return this.config.server;
}
private constructor(private readonly config: ConfigSchema) {}
}

View File

@@ -0,0 +1,84 @@
import { type Debugger } from "debug";
import DatabaseConstructor from "better-sqlite3";
import { Kysely, SqliteDialect } from "kysely";
import type { Database as DatabaseTables } from "./tables.js";
import { z } from "zod";
import { debuggerSchema } from "../../types.js";
export const databaseOptionsSchema = z.object({
config: z.object({
path: z.string(),
}),
debug: debuggerSchema,
});
type DatabaseOptionsInput = z.input<typeof databaseOptionsSchema>;
/**
* Database service that owns SQLite + Kysely connections.
*
* @remarks
* This service is intentionally connection-only:
* it manages lifecycle and exposes a typed Kysely client.
*/
export class Database {
// Debugger instance used for logging.
private readonly debug: Debugger;
// SQLite connection instance.
private readonly sqlite: DatabaseConstructor.Database;
// Kysely database client instance.
private readonly kysely: Kysely<DatabaseTables>;
public constructor(options: DatabaseOptionsInput) {
// Parse the options with the zod schema.
const { config, debug } = databaseOptionsSchema.parse(options);
// Extend the debug instance.
this.debug = debug.extend("database");
// Create the SQLite database instance.
this.sqlite = new DatabaseConstructor(config.path);
// Configure the SQLite database.
this.configurePragmas();
// Create the Kysely database client.
this.kysely = new Kysely<DatabaseTables>({ dialect: new SqliteDialect({ database: this.sqlite }) });
}
/**
* Accessor for the typed Kysely database client.
*/
public get db(): Kysely<DatabaseTables> {
return this.kysely;
}
/**
* Gracefully closes all storage resources.
*/
public async destroy(): Promise<void> {
this.debug("Destroying storage resources");
await this.kysely.destroy();
}
/**
* Applies required SQLite pragmas for correctness and performance.
*/
private configurePragmas(): void {
this.debug("Configuring SQLite pragmas");
/**
* WAL improves concurrent read behavior, useful for streaming-heavy APIs.
*/
this.sqlite.pragma("journal_mode = WAL");
/**
* Foreign keys are disabled by default in SQLite; enable explicitly.
*/
this.sqlite.pragma("foreign_keys = ON");
}
}

View File

@@ -0,0 +1,50 @@
import { promises as fs } from "node:fs";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { FileMigrationProvider, Migrator } from "kysely/migration";
import type { Database } from "./database.js";
/**
* Handles migration execution for the storage layer.
*/
export class MigrationService {
private readonly migrator: Migrator;
public constructor(database: Database) {
const currentFilePath = fileURLToPath(import.meta.url);
const currentDirectory = path.dirname(currentFilePath);
const migrationsPath = path.join(currentDirectory, "migrations");
this.migrator = new Migrator({
db: database.db,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: migrationsPath,
}),
});
}
/**
* Runs all pending migrations.
*
* @throws Error when one or more migrations fail.
*/
public async migrateToLatest(): Promise<void> {
const { error, results } = await this.migrator.migrateToLatest();
for (const result of results ?? []) {
if (result.status === "Success") {
console.info(`[migration] Applied: ${result.migrationName}`);
} else if (result.status === "Error") {
console.error(`[migration] Failed: ${result.migrationName}`);
}
}
if (error) {
throw error;
}
}
}

View File

@@ -0,0 +1,153 @@
import { Kysely, sql } from "kysely";
import type { Database } from "../tables.js";
/**
* UUID v4 default for primary key columns.
*
* @remarks
* SQLite has no native UUID type, so we generate v4 strings in SQL.
*/
const uuid4Default = sql`(lower(hex(randomblob(4))) || '-' || lower(hex(randomblob(2))) || '-4' || substr(lower(hex(randomblob(2))),2) || '-' || substr('89ab',abs(random()) % 4 + 1,1) || substr(lower(hex(randomblob(2))),2) || '-' || lower(hex(randomblob(6))))`;
/**
* Millisecond timestamp for created_at/updated_at column defaults.
*
* @remarks
* unixepoch('subsec') returns seconds with a fractional part; multiplying by
* 1000 and casting to INTEGER gives a millisecond epoch value.
*/
const millisecondTime = sql`(CAST(unixepoch('subsec') * 1000 AS INTEGER))`;
/**
* Same expression as {@link millisecondTime}, but as a raw SQL string.
*
* @remarks
* SQLite triggers cannot use Kysely's `sql` tagged templates directly, so we
* keep a plain string copy for trigger bodies.
*/
const millisecondTimeRaw = `(CAST(unixepoch('subsec') * 1000 AS INTEGER))`;
/**
* Tables that receive an automatic updated_at trigger on row modification.
*/
const UPDATED_AT_TRIGGER_TABLES = ["users", "items", "orders"] as const;
/**
* Initial schema for the vending machine.
*
* @remarks
* Creates users, catalog items, and orders. Orders store a JSON snapshot of
* line items in the `items` column rather than a normalized join table.
*/
export async function up(db: Kysely<Database>): Promise<void> {
// -------------------------------------------------------------------------
// Users
// -------------------------------------------------------------------------
// Authentication credentials for vending machine operators / customers.
await db.schema
.createTable("users")
.addColumn("id", "text", (col) => col.primaryKey().defaultTo(uuid4Default))
.addColumn("username", "text", (col) => col.notNull().unique())
.addColumn("password", "text", (col) => col.notNull())
.addColumn("salt", "text", (col) => col.notNull())
.addColumn("created_at", "integer", (col) => col.notNull().defaultTo(millisecondTime))
.addColumn("updated_at", "integer", (col) => col.notNull().defaultTo(millisecondTime))
.execute();
// -------------------------------------------------------------------------
// Items
// -------------------------------------------------------------------------
// Vending machine catalog entries available for purchase.
await db.schema
.createTable("items")
.addColumn("id", "text", (col) => col.primaryKey().defaultTo(uuid4Default))
.addColumn("name", "text", (col) => col.notNull())
.addColumn("description", "text", (col) => col.notNull().defaultTo(""))
// Price stored as an integer in the smallest currency unit (e.g. sats).
.addColumn("price", "integer", (col) => col.notNull())
// Current stock level for this slot/product.
.addColumn("quantity", "integer", (col) => col.notNull().defaultTo(0))
// URL or path to the product image shown in the UI.
.addColumn("image", "text", (col) => col.notNull().defaultTo(""))
.addColumn("created_at", "integer", (col) => col.notNull().defaultTo(millisecondTime))
.addColumn("updated_at", "integer", (col) => col.notNull().defaultTo(millisecondTime))
.addCheckConstraint("items_price_check", sql`price >= 0`)
.addCheckConstraint("items_quantity_check", sql`quantity >= 0`)
.execute();
// -------------------------------------------------------------------------
// Orders
// -------------------------------------------------------------------------
// A purchase attempt. Line items are denormalized into JSON at creation time.
await db.schema
.createTable("orders")
.addColumn("id", "text", (col) => col.primaryKey().defaultTo(uuid4Default))
.addColumn("status", "text", (col) => col.notNull().defaultTo("pending"))
.addColumn("total_price", "integer", (col) => col.notNull().defaultTo(0))
.addColumn("total_quantity", "integer", (col) => col.notNull().defaultTo(0))
// JSON array of { id, quantity } objects; serialized by application code.
.addColumn("items", "text", (col) => col.notNull().defaultTo("[]"))
.addColumn("created_at", "integer", (col) => col.notNull().defaultTo(millisecondTime))
.addColumn("updated_at", "integer", (col) => col.notNull().defaultTo(millisecondTime))
.addCheckConstraint(
"orders_status_check",
sql`status in ('pending', 'paid', 'completed', 'cancelled')`,
)
.addCheckConstraint("orders_total_price_check", sql`total_price >= 0`)
.addCheckConstraint("orders_total_quantity_check", sql`total_quantity >= 0`)
.execute();
// -------------------------------------------------------------------------
// Indexes
// -------------------------------------------------------------------------
// Look up catalog entries by display name.
await db.schema.createIndex("idx_items_name").on("items").column("name").execute();
// Filter orders by lifecycle state (e.g. pending payments).
await db.schema
.createIndex("idx_orders_status")
.on("orders")
.column("status")
.execute();
// List recent orders chronologically.
await db.schema
.createIndex("idx_orders_created_at")
.on("orders")
.columns(["created_at"])
.execute();
// -------------------------------------------------------------------------
// Updated-at triggers
// -------------------------------------------------------------------------
// Keep updated_at in sync without requiring every query to set it explicitly.
for (const tableName of UPDATED_AT_TRIGGER_TABLES) {
await sql
.raw(`
CREATE TRIGGER trg_${tableName}_updated_at
AFTER UPDATE ON ${tableName}
FOR EACH ROW
BEGIN
UPDATE ${tableName}
SET updated_at = ${millisecondTimeRaw}
WHERE id = NEW.id;
END;
`)
.execute(db);
}
}
/**
* Drops the full schema in reverse dependency order.
*/
export async function down(db: Kysely<Database>): Promise<void> {
// Remove triggers before dropping the tables they reference.
for (const tableName of UPDATED_AT_TRIGGER_TABLES) {
await sql.raw(`DROP TRIGGER IF EXISTS trg_${tableName}_updated_at`).execute(db);
}
await db.schema.dropTable("orders").ifExists().execute();
await db.schema.dropTable("items").ifExists().execute();
await db.schema.dropTable("users").ifExists().execute();
}

View File

@@ -0,0 +1,71 @@
import type { ColumnType, Generated } from "kysely";
/**
* SQLite timestamp column represented as INTEGER.
*
* @remarks
* The application stores timestamps as numbers and may provide explicit
* values or rely on database defaults.
*/
export type Timestamp = ColumnType<number, number | undefined, number | undefined>;
/**
* SQLite JSON column represented as TEXT.
*
* @remarks
* Serialize values with JSON.stringify and parse with JSON.parse in app code.
*/
export type JsonText = ColumnType<string, string | undefined, string | undefined>;
/**
* SQLite boolean emulation represented as INTEGER (0/1).
*/
export type SqliteBoolean = ColumnType<number, number | boolean | undefined, number | boolean | undefined>;
/**
* Users table.
*/
export interface UsersTable {
id: Generated<string>;
username: string;
password: string;
salt: string;
created_at: Generated<Timestamp>;
updated_at: Generated<Timestamp>;
}
/**
* Items table.
*/
export interface ItemsTable {
id: Generated<string>;
name: string;
description: string;
price: number;
quantity: number;
image: string;
created_at: Generated<Timestamp>;
updated_at: Generated<Timestamp>;
}
/**
* Orders table.
*/
export interface OrdersTable {
id: Generated<string>;
status: string;
total_price: number;
total_quantity: number;
items: JsonText;
created_at: Generated<Timestamp>;
updated_at: Generated<Timestamp>;
}
/**
* Kysely database schema.
*/
export interface Database {
users: UsersTable;
items: ItemsTable;
orders: OrdersTable;
}

153
src/services/http-router.ts Normal file
View File

@@ -0,0 +1,153 @@
import Debug from "debug";
import { z } from "zod";
import fastify, { type FastifyInstance, type RouteOptions } from "fastify";
import cors from "@fastify/cors";
import { debuggerSchema } from "../types.js";
// Interface to add to our route classes so that we can register them.
// NOTE: I hate this pattern. But ExpressJS is odd in that it is structured as a singleton that still needs registration.
export interface APIRoutes {
getRoutes(): Promise<Array<RouteOptions>>;
}
// Zod schema for the API routes.
export const apiRoutesSchema = z.array(z.custom<APIRoutes>());
// Zod schema for the server config.
export const serverConfigSchema = z.object({
port: z.number().default(3000),
host: z.string().default("0.0.0.0"),
cors: z.object({
origin: z.string().default("*"),
methods: z.array(z.string()).default(["GET", "POST", "PUT", "DELETE", "OPTIONS"]),
allowedHeaders: z.array(z.string()).default(["Content-Type", "Authorization"]),
}).prefault({}),
});
// Zod schema for the server debug instance.
export const serverDebugSchema = debuggerSchema.optional().default(Debug("vending-machine"));
// Zod schema for the HTTP service options.
export const HTTPOptions = z.object({
routes: apiRoutesSchema,
config: serverConfigSchema,
debug: serverDebugSchema,
});
// Types.
type HTTPServiceOptionsInput = z.input<typeof HTTPOptions>;
type HTTPServiceOptions = z.output<typeof HTTPOptions>;
export class HTTPService {
static schema() {
return z.object({
routes: apiRoutesSchema,
config: serverConfigSchema,
debug: serverDebugSchema,
});
}
// Public properties.
private server: FastifyInstance;
// Private properties.
private debug: debug.Debugger;
private config: HTTPServiceOptions["config"];
private routes: HTTPServiceOptions["routes"];
constructor(options: HTTPServiceOptionsInput) {
const { routes, config, debug } = HTTPOptions.parse(options);
// Extend the debug instance.
this.debug = debug.extend("http-router");
// Set the config.
this.config = config;
// Set the routes.
this.routes = routes;
// Create the server.
this.server = fastify({
logger: false,
});
}
async start(): Promise<void> {
// Debug the server starting.
this.debug(
`Starting HTTP server on http://${this.config.host}:${this.config.port}`,
);
// Setup Error Handling (to give more verbose Zod errors)
this.handleErrors();
// Allow CORS requests. This allows requests from any origin/domain.
// TODO: Set this to a meaningful value. For now, we allow all origins since we dont know what the origin will bes.
await this.server.register(cors);
// Register your routes here before starting the server
this.server.get("/health", async () => {
return { status: "ok" };
});
// Register each route.
for (const routes of this.routes) {
for (const routeOptions of await routes.getRoutes()) {
this.server.route(routeOptions);
}
}
// Ready the server.
await this.server.ready();
// Listen on the configured port and host.
await this.server.listen({
port: this.config.port,
host: this.config.host,
});
// Debug the server started.
this.debug(
`Started HTTP server on http://${this.config.host}:${this.config.port}`,
);
}
// Helper method to access the server instance
getInstance(): FastifyInstance {
return this.server;
}
private handleErrors() {
// Customize our error handler to give better errors.
// NOTE: This will nicely format the Zod validation errors.
this.server.setErrorHandler((error, _request, reply) => {
// If the error is a Zod error, format the errors and send them to the client.
if (error instanceof z.ZodError) {
// Format the errors.
const formattedErrors = error.issues.map((issue) => ({
path: issue.path.join("."),
message: issue.message,
}));
// Debug the validation error.
this.debug(`Validation Error: ${error}`);
// Send the validation error to the client.
return reply.status(400).send({
statusCode: 400,
error: "Validation Error",
details: formattedErrors,
});
}
// Debug the internal server error.
this.debug(`Internal Server Error: ${error}`);
// Handle other types of errors
reply.status(500).send({ error: "Internal Server Error" });
});
}
}

View File

@@ -0,0 +1,270 @@
import type { FastifyReply, FastifyRequest } from "fastify";
import debug, { type Debugger } from "debug";
/**
* Represents an event stored in the history buffer.
* Used for replaying missed events to reconnecting clients.
*/
interface HistoricalEvent {
/** The event topic/type (e.g., 'invitation-created', 'invitation-updated') */
topic: string;
/** The event payload data */
data: unknown;
/** Unix timestamp in milliseconds when the event was created */
timestamp: number;
}
/**
* Options for configuring the SSE service.
*/
interface SSEOptions {
/** Maximum age of events to keep in history (in milliseconds). Default: 5 minutes */
maxHistoryAge?: number;
/** Maximum number of events to keep per user. Default: 1000 */
maxHistorySize?: number;
}
/**
* Server-Sent Events broadcaster with event history support.
*
* Maintains a per-user event history buffer that allows clients to replay
* missed events when reconnecting. This makes reconnections robust against
* network interruptions.
*/
export class SSEBroadcaster {
/**
* Factory method to create and start an SSE broadcaster.
* @param options - Configuration options for the SSE service
* @returns A started SSE instance
*/
static async from(options?: SSEOptions) {
const broadcaster = new SSEBroadcaster(options);
await broadcaster.start();
return broadcaster;
}
/** Map of Invitation IDs to their connected SSE response streams */
private clients: Map<(string), Set<FastifyReply>> = new Map();
/** Map of Invitation IDs to their event history buffers */
private eventHistory: Map<string, HistoricalEvent[]> = new Map();
/** Maximum age of events to keep in history (in milliseconds) */
private maxHistoryAge: number;
/** Maximum number of events to keep per user */
private maxHistorySize: number;
private debug: Debugger;
constructor(options?: SSEOptions) {
this.clients = new Map();
this.eventHistory = new Map();
this.maxHistoryAge = options?.maxHistoryAge ?? 20 * 60 * 1000; // 20 minutes default
this.maxHistorySize = options?.maxHistorySize ?? 1000; // 1000 events default
this.debug = debug('xo:sse');
}
/**
* Starts the SSE broadcaster.
* @returns The SSE instance for chaining
*/
start() {
this.debug('SSE broadcaster is running (maxHistoryAge: %dms, maxHistorySize: %d)',
this.maxHistoryAge, this.maxHistorySize);
return this;
}
/**
* Sends an event to a client.
*
* @param client - The client to send the event to
* @param topic - The event topic/type
* @param data - The event payload data
*/
static sendEvent(client: FastifyReply, topic: string, data: unknown) {
const timestamp = Date.now();
client.raw.write(`id: ${timestamp}\n`);
client.raw.write(`event: ${topic}\n`);
client.raw.write(`data: ${JSON.stringify(data)}\n\n`);
}
/**
* Sends an event to a client.
*
* @param client - The client to send the event to
* @param topic - The event topic/type
* @param data - The event payload data
*/
sendEvent(client: FastifyReply, topic: string, data: unknown) {
try {
SSEBroadcaster.sendEvent(client, topic, data);
} catch (error) {
this.debug('Error sending event to client', error);
}
}
/**
* Broadcasts an event to all connected clients for a user and stores it in history.
*
* @param clientId - The user ID to broadcast to
* @param topic - The event topic/type
* @param data - The event payload data
*/
async broadcast(clientId: string, topic: string, data: unknown) {
const timestamp = Date.now();
// Store the event in history for potential replay
this.storeEvent(clientId, topic, data, timestamp);
// Broadcast to all connected clients
this.clients.get(clientId)?.forEach((client: FastifyReply) => {
try {
this.sendEvent(client, topic, data);
} catch (error) {
this.debug('Error sending event to client', error);
}
});
this.debug('SSE broadcasted message', topic, data);
}
/**
* Subscribes a client to receive SSE events.
*
* If lastEventTime is provided, all events that occurred after that timestamp
* will be replayed to the client before starting the live stream.
*
* @param req - The authenticated request containing the user ID
* @param res - The Express response object to use for SSE streaming
* @param lastEventTime - Optional timestamp to replay events from (in milliseconds)
*/
async subscribe(req: FastifyRequest, res: FastifyReply, lastEventTime?: number) {
// Get the invitation ID from the request
const { invitationIdentifier } = req.query as { invitationIdentifier?: string };
if (!invitationIdentifier) {
throw new Error('Invitation Identifier is required');
}
// Initialize client set for this user if needed
if (!this.clients.has(invitationIdentifier)) {
this.clients.set(invitationIdentifier, new Set());
}
// Manually include the CORS header since `writeHead` bypasses the auto-injection by the @fastify/cors plugin.
// This statement grabs the CORS header that the CORS plugin would have added to the response, configured in the HTTP service.
const corsHeader = res.getHeader("access-control-allow-origin");
// Disable timeout for the connection. Without this, the connection will drop and the client will have to reconnect.
req.raw.socket.setTimeout(0);
// Set up SSE headers
// Set headers for SSE
res.raw.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"Access-Control-Allow-Origin": corsHeader,
});
// Force the headers to be dispatched to the client.
// NOTE: This is very important: A `fetch` call will NOT resolve until it has received the headers.
// And Fastify, unless otherwise specified, will not send the headers until it sends the body.
res.raw.flushHeaders();
// Set retry interval for automatic reconnection
res.raw.write('retry: 3000\n\n');
// Replay missed events if a lastEventTime was provided
if (lastEventTime !== undefined) {
const missedEvents = this.getEventsAfter(invitationIdentifier, lastEventTime);
this.debug('SSE replaying %d missed events for invitation %s (since %d)',
missedEvents.length, invitationIdentifier, lastEventTime);
for (const event of missedEvents) {
try {
await this.sendEvent(res, event.topic, event.data);
} catch (error) {
this.debug('Error sending event to client', error);
}
}
}
// Add client to the set for live updates
this.clients.get(invitationIdentifier)?.add(res);
this.debug('SSE subscribed to client (invitationId: %s, lastEventTime: %s)',
invitationIdentifier, lastEventTime ?? 'none');
// Clean up when client disconnects
res.raw.on('close', () => {
this.clients.get(invitationIdentifier)?.delete(res);
this.debug('SSE client disconnected (invitationIdentifier: %s)', invitationIdentifier);
});
}
/**
* Stores an event in the user's history buffer.
* Automatically prunes old events based on age and size limits.
*
* @param invitationId - The invitation ID to store the event for
* @param topic - The event topic/type
* @param data - The event payload data
* @param timestamp - The event timestamp
*/
private storeEvent(invitationId: string, topic: string, data: unknown, timestamp: number) {
// Initialize history array for this user if needed
if (!this.eventHistory.has(invitationId)) {
this.eventHistory.set(invitationId, []);
}
const history = this.eventHistory.get(invitationId)!;
// Add the new event
history.push({ topic, data, timestamp });
// Prune old events
this.pruneHistory(invitationId, timestamp);
}
/**
* Removes old events from a invitation's history based on age and size limits.
*
* @param invitationId - The invitation ID whose history to prune
* @param currentTime - The current timestamp for age calculations
*/
private pruneHistory(invitationId: string, currentTime: number) {
const history = this.eventHistory.get(invitationId);
if (!history) return;
const cutoffTime = currentTime - this.maxHistoryAge;
// Remove events older than maxHistoryAge
const prunedByAge = history.filter(event => event.timestamp > cutoffTime);
// If still over size limit, remove oldest events
const prunedBySize = prunedByAge.length > this.maxHistorySize
? prunedByAge.slice(-this.maxHistorySize)
: prunedByAge;
this.eventHistory.set(invitationId, prunedBySize);
}
/**
* Retrieves all events for a user that occurred after a given timestamp.
*
* @param invitationId - The invitation ID to get events for
* @param afterTimestamp - The timestamp to get events after (exclusive)
* @returns Array of events that occurred after the timestamp
*/
private getEventsAfter(invitationId: string, afterTimestamp: number): HistoricalEvent[] {
const history = this.eventHistory.get(invitationId);
if (!history) return [];
// First prune old events to ensure we don't return stale data
this.pruneHistory(invitationId, Date.now());
return history.filter(event => event.timestamp > afterTimestamp);
}
}

18
src/types.ts Normal file
View File

@@ -0,0 +1,18 @@
import { type Debugger } from "debug";
import { z } from "zod";
export const isDebugger = (value: unknown): value is Debugger => {
if (typeof value !== "function") return false;
const candidate = value as Partial<Debugger>;
return (
typeof candidate.namespace === "string" &&
typeof candidate.extend === "function" &&
typeof candidate.enabled === "boolean"
);
};
export const debuggerSchema = z.custom<Debugger>(isDebugger, {
message: "Expected a debug.Debugger instance",
});

40
tsconfig.json Normal file
View File

@@ -0,0 +1,40 @@
{
// Visit https://aka.ms/tsconfig to read more about this file
"compilerOptions": {
// File Layout
"rootDir": "./src",
"outDir": "./dist",
// Environment Settings
// See also https://aka.ms/tsconfig/module
"module": "nodenext",
"target": "esnext",
"types": ["node"],
// Other Outputs
"sourceMap": true,
"declaration": true,
"declarationMap": true,
// Stricter Typechecking Options
"noUncheckedIndexedAccess": true,
"exactOptionalPropertyTypes": true,
// Style Options
// "noImplicitReturns": true,
// "noImplicitOverride": true,
// "noUnusedLocals": true,
// "noUnusedParameters": true,
// "noFallthroughCasesInSwitch": true,
// "noPropertyAccessFromIndexSignature": true,
// Recommended Options
"strict": true,
"jsx": "react-jsx",
"verbatimModuleSyntax": true,
"isolatedModules": true,
"noUncheckedSideEffectImports": true,
"moduleDetection": "force",
"skipLibCheck": true,
}
}