Use persistent storage with better-sqlite3
This commit is contained in:
15
src/app.ts
15
src/app.ts
@@ -1,12 +1,17 @@
|
||||
import { HTTPService } from './services/http-router';
|
||||
import { InvitationsRoute } from './routes/invitations';
|
||||
import { InvitationStore } from './services/invitation-store';
|
||||
import { SSEBroadcaster } from './services/sse-broadcast';
|
||||
import { HTTPService } from './services/http-router.js';
|
||||
import { InvitationsRoute } from './routes/invitations.js';
|
||||
import { StorageSQLite, StoreSQLite } from './services/invitation-store.js';
|
||||
import { SSEBroadcaster } from './services/sse-broadcast.js';
|
||||
|
||||
import type { InvitationSchema } from './utils/invitation-parser.js';
|
||||
|
||||
export class App {
|
||||
static async create() {
|
||||
const invitationStoragePath = "./xo-invitations.db";
|
||||
|
||||
// Create the invitation store (this is a in-memory store for now)
|
||||
const invitationStore = new InvitationStore();
|
||||
const storage = await StorageSQLite.createOrOpen(invitationStoragePath);
|
||||
const invitationStore = await storage.createOrGetStore<InvitationSchema>("invitations");
|
||||
|
||||
// Create the SSE Broadcaster
|
||||
const sseBroadcaster = new SSEBroadcaster();
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
import type { FastifyRequest, FastifyReply, RouteOptions } from 'fastify';
|
||||
|
||||
import type { SSEBroadcaster } from '../services/sse-broadcast';
|
||||
import type { InvitationStore } from '../services/invitation-store';
|
||||
import { parseInvitation } from '../utils/invitation-parser';
|
||||
import type { SSEBroadcaster } from '../services/sse-broadcast.js';
|
||||
import type { StoreSQLite } from '../services/invitation-store.js';
|
||||
import { parseInvitation } from '../utils/invitation-parser.js';
|
||||
|
||||
import Z from 'zod';
|
||||
|
||||
export class InvitationsRoute {
|
||||
constructor(
|
||||
private readonly invitationStore: InvitationStore,
|
||||
private readonly invitationStore: StoreSQLite<Z.infer<typeof parseInvitation>>,
|
||||
private readonly sseBroadcaster: SSEBroadcaster,
|
||||
) {}
|
||||
|
||||
@@ -35,7 +37,7 @@ export class InvitationsRoute {
|
||||
}
|
||||
|
||||
// Get the invitation from the store
|
||||
const storedInvitation = await this.invitationStore.getInvitation(invitationIdentifier);
|
||||
const storedInvitation = await this.invitationStore.get(invitationIdentifier);
|
||||
|
||||
if (request.headers['accept'] === 'text/event-stream') {
|
||||
// Subscribe the client to the SSE stream.
|
||||
@@ -67,15 +69,14 @@ export class InvitationsRoute {
|
||||
// Parse the invitation
|
||||
const invitation = parseInvitation.parse(request.body);
|
||||
|
||||
// If the invitation doesnt exist yet, create it
|
||||
if (!await this.invitationStore.getInvitation(invitation.invitationIdentifier)) {
|
||||
await this.invitationStore.storeInvitation({ ...invitation, commits: [] });
|
||||
}
|
||||
// Get the existing invitation
|
||||
const existingInvitation = await this.invitationStore.get(invitation.invitationIdentifier);
|
||||
|
||||
// Store each commit individually (I dont know)
|
||||
for(const commit of invitation.commits) {
|
||||
await this.invitationStore.updateInvitation(invitation.invitationIdentifier, commit);
|
||||
}
|
||||
// Merge the existing invitation with the new invitation
|
||||
const mergedInvitation = InvitationsRoute.mergeInvitations(invitation, existingInvitation);
|
||||
|
||||
// Store the merged invitation
|
||||
await this.invitationStore.set(invitation.invitationIdentifier, mergedInvitation);
|
||||
|
||||
// Broadcast the invitation update (We send down the whole invitation. Clients will have to compare commitIds)
|
||||
await this.sseBroadcaster.broadcast(invitation.invitationIdentifier, 'invitation-updated', invitation);
|
||||
@@ -85,5 +86,18 @@ export class InvitationsRoute {
|
||||
return reply.status(200).send(invitation);
|
||||
}
|
||||
|
||||
static mergeInvitations(invitation1: Z.infer<typeof parseInvitation>, invitation2: Z.infer<typeof parseInvitation> | undefined): Z.infer<typeof parseInvitation> {
|
||||
const result = invitation1;
|
||||
|
||||
for(const commit of invitation2?.commits ?? []) {
|
||||
if(result.commits.some(c => c.commitIdentifier === commit.commitIdentifier)) {
|
||||
continue;
|
||||
}
|
||||
result.commits.push(commit);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static parseInvitation = parseInvitation
|
||||
}
|
||||
@@ -1,46 +1,138 @@
|
||||
import type { XOInvitation, XOInvitationCommit } from '@xo-cash/types';
|
||||
import sqlite3, { type Database } from 'better-sqlite3';
|
||||
import { pack, unpack } from 'msgpackr';
|
||||
|
||||
export class InvitationStore {
|
||||
|
||||
private readonly invitations: Map<string, XOInvitation> = new Map();
|
||||
|
||||
constructor() {
|
||||
this.invitations = new Map();
|
||||
}
|
||||
|
||||
async getInvitation(invitationIdentifier: string): Promise<XOInvitation | undefined> {
|
||||
return this.invitations.get(invitationIdentifier);
|
||||
}
|
||||
|
||||
async storeInvitation(invitation: XOInvitation): Promise<void> {
|
||||
const invitationIdentifier = invitation.invitationIdentifier;
|
||||
this.invitations.set(invitationIdentifier, invitation);
|
||||
}
|
||||
|
||||
async deleteInvitation(invitationIdentifier: string): Promise<void> {
|
||||
this.invitations.delete(invitationIdentifier);
|
||||
}
|
||||
export interface SQLiteOptions {
|
||||
wal: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Storage Adapter that uses SQLite as the storage backend. Everything is stored as a BLOB, to avoid type issues.
|
||||
*
|
||||
* @remarks This implementation is single threaded. It WILL block execution of the main thread.
|
||||
*/
|
||||
export class StorageSQLite {
|
||||
/**
|
||||
* TODO: This should maybe merge? I dont know. Currently, setting is not the best idea
|
||||
* @param invitation
|
||||
* Create or open a SQLite database.
|
||||
* @param filepath - The path to the SQLite database file.
|
||||
* @param options - The options for the SQLite database.
|
||||
* @returns A new instance of StorageSQLite.
|
||||
*/
|
||||
async updateInvitation(id: string, commit: XOInvitationCommit): Promise<XOInvitation> {
|
||||
// Get the invitation identifier
|
||||
const invitation = await this.getInvitation(id);
|
||||
if (!invitation) {
|
||||
throw new Error(`Invitation not found: ${id}`);
|
||||
static async createOrOpen(
|
||||
filepath: string,
|
||||
options: Partial<SQLiteOptions> = {},
|
||||
) {
|
||||
const db = sqlite3(filepath);
|
||||
|
||||
const opts: SQLiteOptions = {
|
||||
wal: true,
|
||||
...options,
|
||||
};
|
||||
|
||||
if (opts.wal) {
|
||||
db.pragma('journal_mode = WAL');
|
||||
}
|
||||
|
||||
// If the commit already exists, return the invitation
|
||||
if (invitation.commits.some(c => c.commitIdentifier === commit.commitIdentifier)) {
|
||||
return invitation;
|
||||
}
|
||||
|
||||
// Update the invitation with the commit
|
||||
invitation.commits.push(commit);
|
||||
this.invitations.set(id, invitation);
|
||||
|
||||
return invitation;
|
||||
return new StorageSQLite(db);
|
||||
}
|
||||
}
|
||||
|
||||
constructor(public db: Database) {}
|
||||
|
||||
async listStores() {
|
||||
const result = this.db
|
||||
.prepare(`SELECT * FROM sqlite_master WHERE type='table'`)
|
||||
.all() as { name: string }[];
|
||||
return result.map((row) => row.name);
|
||||
}
|
||||
|
||||
async createStore<T>(storeName: string): Promise<StoreSQLite<T>> {
|
||||
// Create table with proper schema for key-value storage
|
||||
this.db
|
||||
.prepare(
|
||||
`
|
||||
CREATE TABLE IF NOT EXISTS "${storeName}" (
|
||||
key TEXT PRIMARY KEY,
|
||||
value BLOB
|
||||
)
|
||||
`,
|
||||
)
|
||||
.run();
|
||||
|
||||
return new StoreSQLite(this.db, storeName);
|
||||
}
|
||||
|
||||
async getStore<T>(storeName: string): Promise<StoreSQLite<T> | null> {
|
||||
// Check if table exists
|
||||
const tableExists = this.db
|
||||
.prepare(
|
||||
`
|
||||
SELECT name FROM sqlite_master
|
||||
WHERE type='table' AND name=?
|
||||
`,
|
||||
)
|
||||
.get(storeName);
|
||||
|
||||
if (!tableExists) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new StoreSQLite(this.db, storeName);
|
||||
}
|
||||
|
||||
async createOrGetStore<T>(storeName: string): Promise<StoreSQLite<T>> {
|
||||
return await this.createStore<T>(storeName);
|
||||
}
|
||||
|
||||
async deleteStore(storeName: string) {
|
||||
this.db.prepare(`DROP TABLE IF EXISTS "${storeName}"`).run();
|
||||
}
|
||||
|
||||
async deleteDatabase() {
|
||||
this.db.close();
|
||||
}
|
||||
}
|
||||
|
||||
export class StoreSQLite<T> {
|
||||
constructor(
|
||||
protected db: Database,
|
||||
protected storeName: string,
|
||||
) {}
|
||||
|
||||
async keys() {
|
||||
const result = this.db
|
||||
.prepare(`SELECT key FROM "${this.storeName}"`)
|
||||
.all() as { key: string }[];
|
||||
return result.map((row) => row.key);
|
||||
}
|
||||
|
||||
async get(key: string): Promise<T | undefined> {
|
||||
const result = this.db
|
||||
.prepare(`SELECT value FROM "${this.storeName}" WHERE key = ?`)
|
||||
.get(key) as { value: Buffer } | undefined;
|
||||
|
||||
if (!result) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Deserialize using msgpackr for consistency with other implementations
|
||||
return unpack(result.value) as T;
|
||||
}
|
||||
|
||||
async set(key: string, value: T): Promise<void> {
|
||||
// Serialize using msgpackr for consistency with other implementations
|
||||
const serializedValue = pack(value);
|
||||
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT OR REPLACE INTO "${this.storeName}" (key, value) VALUES (?, ?)`,
|
||||
)
|
||||
.run(key, Buffer.from(serializedValue));
|
||||
}
|
||||
|
||||
async delete(key: string): Promise<void> {
|
||||
this.db.prepare(`DELETE FROM "${this.storeName}" WHERE key = ?`).run(key);
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
this.db.prepare(`DELETE FROM "${this.storeName}"`).run();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { z } from 'zod';
|
||||
import Z, { z } from 'zod';
|
||||
|
||||
/**
|
||||
* Zod schemas for invitation validation.
|
||||
@@ -63,4 +63,6 @@ export const parseInvitation = z.object({
|
||||
createdAtTimestamp: z.number(),
|
||||
templateIdentifier: z.string(),
|
||||
actionIdentifier: z.string(),
|
||||
}).passthrough();
|
||||
}).passthrough();
|
||||
|
||||
export type InvitationSchema = Z.infer<typeof parseInvitation>;
|
||||
Reference in New Issue
Block a user