Compare commits
2 Commits
af31aac19c
...
dfebd0576e
| Author | SHA1 | Date | |
|---|---|---|---|
|
dfebd0576e
|
|||
|
400fb5a8cb
|
3
.gitignore
vendored
3
.gitignore
vendored
@@ -14,3 +14,6 @@
|
||||
|
||||
.env
|
||||
xo-invitations*
|
||||
data.db*
|
||||
|
||||
.vscode
|
||||
15
bloom-filter.md
Normal file
15
bloom-filter.md
Normal file
@@ -0,0 +1,15 @@
|
||||
# Bloom filter challenges
|
||||
|
||||
## Incremental filter
|
||||
|
||||
If we can make the filter incremental instead of needing to be recomputed, we can keep the filter in the cache and just update it instead of deleting it when we get an updated event.
|
||||
|
||||
It should also allow us to have the filter passed by the client, apply the server's items to it, and figure out which values its missimg.
|
||||
|
||||
## Oversizing filter
|
||||
|
||||
If we can oversize the filter, we can work around the above challenge and just oversize them instead.
|
||||
|
||||
## Filter metadata
|
||||
|
||||
Store the item count in the filter object so we can do a quick match with that?
|
||||
271
docs/api.md
Normal file
271
docs/api.md
Normal file
@@ -0,0 +1,271 @@
|
||||
# XO Sync Server API
|
||||
|
||||
This server stores encrypted client data as append-only events. The server does
|
||||
not interpret event payloads; `data` is accepted and returned as a base64 string.
|
||||
|
||||
## Data Model
|
||||
|
||||
Stored events have this shape:
|
||||
|
||||
```ts
|
||||
type StorageEvent = {
|
||||
storageIdentifier: string;
|
||||
data: string; // base64 encoded encrypted payload
|
||||
hash: string; // base64 sha256 hash of decoded data bytes
|
||||
created_at: number; // unix timestamp in milliseconds
|
||||
};
|
||||
```
|
||||
|
||||
`storageIdentifier` identifies the logical append-only storage feed. It is also
|
||||
the realtime subscription topic.
|
||||
|
||||
## Errors
|
||||
|
||||
Validation errors return HTTP `400` with:
|
||||
|
||||
```json
|
||||
{
|
||||
"statusCode": 400,
|
||||
"error": "Validation Error",
|
||||
"details": [
|
||||
{
|
||||
"path": "storageIdentifier",
|
||||
"message": "Required"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Unexpected errors return HTTP `500`:
|
||||
|
||||
```json
|
||||
{
|
||||
"error": "Internal Server Error"
|
||||
}
|
||||
```
|
||||
|
||||
## Stats / Bloom Negotiation
|
||||
|
||||
Use these endpoints before filtered reads to learn the server's event count,
|
||||
the inverse-bloom-filter size the client should use, and the server's current
|
||||
filter fingerprint.
|
||||
|
||||
### `GET /stats`
|
||||
|
||||
Alias: `GET /bloom`
|
||||
|
||||
Query parameters:
|
||||
|
||||
```ts
|
||||
storageIdentifier: string // required; can be comma-separated for batching
|
||||
```
|
||||
|
||||
Example:
|
||||
|
||||
```http
|
||||
GET /stats?storageIdentifier=/wallet/a,/wallet/b
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```json
|
||||
{
|
||||
"items": [
|
||||
{
|
||||
"storageIdentifier": "/wallet/a",
|
||||
"count": 42,
|
||||
"negotiatedBloomFilterSize": 256,
|
||||
"bloomFilter": "BASE64_SERVER_FILTER",
|
||||
"bloomFilterHash": "BASE64_SHA256_OF_SERVER_FILTER",
|
||||
"cached": false
|
||||
},
|
||||
{
|
||||
"storageIdentifier": "/wallet/b",
|
||||
"count": 1000,
|
||||
"negotiatedBloomFilterSize": 1000,
|
||||
"bloomFilter": "BASE64_SERVER_FILTER",
|
||||
"bloomFilterHash": "BASE64_SHA256_OF_SERVER_FILTER",
|
||||
"cached": true
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
`negotiatedBloomFilterSize` is currently `max(count, 256)`. Clients should build
|
||||
their inverse bloom filter using this size before calling `/data` with a filter.
|
||||
`count` is not a sync check; two peers can have the same count but different
|
||||
events. Use `bloomFilter` or `bloomFilterHash` as the server-side set
|
||||
fingerprint.
|
||||
|
||||
### `POST /stats`
|
||||
|
||||
Alias: `POST /bloom`
|
||||
|
||||
Request body:
|
||||
|
||||
```json
|
||||
{
|
||||
"storageIdentifiers": ["/wallet/a", "/wallet/b"]
|
||||
}
|
||||
```
|
||||
|
||||
Response is the same as `GET /stats`.
|
||||
|
||||
## Read Data
|
||||
|
||||
### `GET /data`
|
||||
|
||||
Reads events for one storage identifier.
|
||||
|
||||
Query parameters:
|
||||
|
||||
```ts
|
||||
storageIdentifier: string; // required
|
||||
bloomFilter?: string; // optional base64 serialized inverse bloom filter
|
||||
count?: number; // required when bloomFilter is supplied
|
||||
```
|
||||
|
||||
Without a filter, the server returns all events:
|
||||
|
||||
```http
|
||||
GET /data?storageIdentifier=/wallet/a
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"storageIdentifier": "/wallet/a",
|
||||
"data": "BASE64_PAYLOAD",
|
||||
"hash": "BASE64_SHA256_HASH",
|
||||
"created_at": 1760000000000
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
With a compatible filter, the server returns only events that appear missing from
|
||||
the client:
|
||||
|
||||
```http
|
||||
GET /data?storageIdentifier=/wallet/a&bloomFilter=BASE64_FILTER&count=256
|
||||
```
|
||||
|
||||
If `count` does not match the current negotiated size, the server falls back to
|
||||
returning the full event list for correctness.
|
||||
|
||||
If the supplied filter matches the server's current filter, the response is an
|
||||
empty list because the client already has all current server events. If the
|
||||
filters differ, the server builds or reuses its current filter, computes the
|
||||
difference, then iterates storage rows and returns only rows whose hashes appear
|
||||
missing from the client filter.
|
||||
|
||||
## Append Data
|
||||
|
||||
### `POST /data`
|
||||
|
||||
Appends one event to a storage feed.
|
||||
|
||||
Request body:
|
||||
|
||||
```json
|
||||
{
|
||||
"storageIdentifier": "/wallet/a",
|
||||
"data": "BASE64_PAYLOAD"
|
||||
}
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```json
|
||||
{
|
||||
"storageIdentifier": "/wallet/a",
|
||||
"data": "BASE64_PAYLOAD",
|
||||
"hash": "BASE64_SHA256_HASH",
|
||||
"created_at": 1760000000000
|
||||
}
|
||||
```
|
||||
|
||||
Side effects:
|
||||
|
||||
- Invalidates the cached bloom filter for `storageIdentifier`.
|
||||
- Broadcasts a realtime `storage-event` to subscribers of that storage feed.
|
||||
|
||||
## Stream Data
|
||||
|
||||
### `GET /data/stream`
|
||||
|
||||
Opens a Server-Sent Events stream for one storage identifier.
|
||||
|
||||
Query parameters are the same as `GET /data`:
|
||||
|
||||
```ts
|
||||
storageIdentifier: string;
|
||||
bloomFilter?: string;
|
||||
count?: number;
|
||||
```
|
||||
|
||||
Connection behavior:
|
||||
|
||||
1. The server opens an SSE connection.
|
||||
2. The server subscribes the connection to `storage:${storageIdentifier}`.
|
||||
3. The server calls the same handler as `GET /data`.
|
||||
4. If the handler returns data, the server sends it as the initial `sync` event.
|
||||
5. The connection remains open for live `storage-event` broadcasts.
|
||||
|
||||
Initial sync event:
|
||||
|
||||
```txt
|
||||
event: sync
|
||||
data: [{"storageIdentifier":"/wallet/a","data":"...","hash":"...","created_at":1760000000000}]
|
||||
```
|
||||
|
||||
Live append event:
|
||||
|
||||
```txt
|
||||
event: storage-event
|
||||
data: {"storageIdentifier":"/wallet/a","data":"...","hash":"...","created_at":1760000001000}
|
||||
```
|
||||
|
||||
SSE also sends:
|
||||
|
||||
```txt
|
||||
retry: 3000
|
||||
```
|
||||
|
||||
Reconnect behavior uses the same sync mechanism as the first connection. Clients
|
||||
should reconnect with a fresh local inverse bloom filter:
|
||||
|
||||
```http
|
||||
GET /data/stream?storageIdentifier=/wallet/a&bloomFilter=BASE64_FILTER&count=256
|
||||
```
|
||||
|
||||
The server sends any missing persistent events as the initial `sync` event, then
|
||||
continues with live `storage-event` broadcasts. The server does not use
|
||||
`Last-Event-ID` for SSE replay.
|
||||
|
||||
## Recommended Sync Flow
|
||||
|
||||
1. Call `POST /stats` with all storage identifiers the client wants to sync.
|
||||
2. For each storage identifier, compare the server `bloomFilterHash` with the
|
||||
client's local filter hash if available.
|
||||
3. If filters differ, build a local inverse bloom filter using
|
||||
`negotiatedBloomFilterSize`.
|
||||
4. Call `GET /data` or `GET /data/stream` with the client's `bloomFilter` and
|
||||
`count`.
|
||||
5. Apply returned server events locally.
|
||||
6. Compare the server `bloomFilter` from `/stats` against local events to find
|
||||
local events the server is missing, then upload those with `POST /data`.
|
||||
7. Keep `/data/stream` open if realtime updates are needed.
|
||||
8. On reconnect, repeat the stats and bloom-filtered stream request.
|
||||
|
||||
## Notes
|
||||
|
||||
- Storage is append-only. There is no update or delete endpoint.
|
||||
- Payloads are opaque to the server.
|
||||
- `hash` is computed by the server from decoded `data` bytes using SHA-256.
|
||||
- `/bloom` currently exists as an alias for `/stats`; reconciliation happens in
|
||||
`/data`, not `/bloom`.
|
||||
- The bloom-filter cache stores the current server filter for each
|
||||
`storageIdentifier`; it does not store event payloads or a copy of all event
|
||||
hashes. The cache is invalidated when new data is appended.
|
||||
23
package-lock.json
generated
23
package-lock.json
generated
@@ -13,7 +13,9 @@
|
||||
"@hono/node-server": "^2.0.4",
|
||||
"better-sqlite3": "^12.10.0",
|
||||
"debug": "^4.4.3",
|
||||
"dotenv": "^17.4.2",
|
||||
"hono": "^4.12.24",
|
||||
"kysely": "^0.29.2",
|
||||
"msgpackr": "^2.0.2",
|
||||
"zod": "^4.4.3"
|
||||
},
|
||||
@@ -737,6 +739,18 @@
|
||||
"node": ">=8"
|
||||
}
|
||||
},
|
||||
"node_modules/dotenv": {
|
||||
"version": "17.4.2",
|
||||
"resolved": "https://registry.npmjs.org/dotenv/-/dotenv-17.4.2.tgz",
|
||||
"integrity": "sha512-nI4U3TottKAcAD9LLud4Cb7b2QztQMUEfHbvhTH09bqXTxnSie8WnjPALV/WMCrJZ6UV/qHJ6L03OqO3LcdYZw==",
|
||||
"license": "BSD-2-Clause",
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://dotenvx.com"
|
||||
}
|
||||
},
|
||||
"node_modules/end-of-stream": {
|
||||
"version": "1.4.5",
|
||||
"resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.5.tgz",
|
||||
@@ -871,6 +885,15 @@
|
||||
"integrity": "sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew==",
|
||||
"license": "ISC"
|
||||
},
|
||||
"node_modules/kysely": {
|
||||
"version": "0.29.2",
|
||||
"resolved": "https://registry.npmjs.org/kysely/-/kysely-0.29.2.tgz",
|
||||
"integrity": "sha512-s6WVJyEZrbm6jhBpiKHsGHyePMrVQKJ85wZCFCr9W4QHv6WTjWIrdvTmO9hDEA3bNK0xkrE2DqrHsXMLWuZpQg==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=22.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/mimic-response": {
|
||||
"version": "3.1.0",
|
||||
"resolved": "https://registry.npmjs.org/mimic-response/-/mimic-response-3.1.0.tgz",
|
||||
|
||||
@@ -18,7 +18,9 @@
|
||||
"@hono/node-server": "^2.0.4",
|
||||
"better-sqlite3": "^12.10.0",
|
||||
"debug": "^4.4.3",
|
||||
"dotenv": "^17.4.2",
|
||||
"hono": "^4.12.24",
|
||||
"kysely": "^0.29.2",
|
||||
"msgpackr": "^2.0.2",
|
||||
"zod": "^4.4.3"
|
||||
},
|
||||
|
||||
68
readme.md
Normal file
68
readme.md
Normal file
@@ -0,0 +1,68 @@
|
||||
Using this as a scratchpad for now:
|
||||
|
||||
### SSE Middleware
|
||||
Move SSE to be a middleware on the HTTP-Router. We want to do this so we can completely abstract away the "subscribing" from the routes themselves. We *could* also do this by putting a "subscribe()" method on the RouteRequest object, but im undecided on whether this is a better approach
|
||||
|
||||
### Handling subscribers
|
||||
Each "user" is going to be subscribing to many "storageIdentifiers". Having a Record<storageIdentifer, Subscribers[]> would eat up a bunch of memory because of the multiple duplications of Subscribers. Instead, we will maintain an Array of `storageIdentifiers: string[]`, and we will index those identifiers in a separate Record<Subscriber, Record<storageIdentifier, boolean>>.
|
||||
|
||||
When we want to broadcast an event, we iterate through the `storageIdentifers` array and find the index. We then iterate over all the subscribers and check if `subscriber[storageIdentifier] === true`. Its O(N) where N is the number of subscribers. There may be a faster way to do this in practice, as a key lookup is slower than iterating over an array, but its a relatively simple approach, IMO. Better ideas are welcome, though.
|
||||
|
||||
As for maintaining the Array of `storageIdentifiers`, its probably simplest just to prune it on a routine. We could also look at doing something similar to Redis, where it creates a second instance of the data and swaps between them with the more up-to-date version. Basically, its so they can multi-thread read vs write (I think. I haven't looked too much into it, just briefly saw something about redis maintaining a shadow version of the cache)
|
||||
|
||||
### Endpoints
|
||||
|
||||
#### Read from storage
|
||||
(All will return an SSE response if the `accept` is `event/text-stream`)
|
||||
|
||||
```bash
|
||||
GET /get?id=string?bloomFilter=string
|
||||
```
|
||||
|
||||
```
|
||||
GET /get?items=string[] | { id: string, bloomFilter: string }[]
|
||||
```
|
||||
|
||||
```
|
||||
POST /get
|
||||
{
|
||||
items: string | string[] | { id: string, bloomFilter: string }[]
|
||||
}
|
||||
```
|
||||
|
||||
#### Appending to a storage
|
||||
|
||||
Storages are append-only, you can not set or delete a storage
|
||||
|
||||
```
|
||||
POST /set/:id
|
||||
{
|
||||
value: string
|
||||
}
|
||||
```
|
||||
|
||||
#### Syncing
|
||||
|
||||
To avoid syncing the entire event history every time it connects, we allow for inverse-bloom-filters to be passed into the endpoints on reads, but do create a inverse-bloom-filter, there are negotiation steps that need to be followed to ensure both parties are computing valid inverse-bloom-filters
|
||||
|
||||
--- Inverse Bloom Filters will give false-negatives for items, meaning it *may* return **false** when an item **does** exist in the list, rather than a tranditional bloom filter which **may** return **true** on an item that is **not** in the list ---
|
||||
|
||||
To allow for negotiation, we provide
|
||||
|
||||
```
|
||||
GET /bloom?bloomFilter=string & count=number
|
||||
```
|
||||
|
||||
```
|
||||
POST /bloom
|
||||
{
|
||||
items: {
|
||||
bloomFilter: string
|
||||
count: number
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Because the larger number must be used for the bloom filter, we assert the client should always compute a bloom filter, but if the server has one with a larger number of events, **it will compute its own** and then return that new count + filter to the client.
|
||||
|
||||
The response from the request will just be a list of hashes of encrypted data. The list is of items which the client (or server?) are missing.
|
||||
65
src/app.ts
65
src/app.ts
@@ -1,46 +1,61 @@
|
||||
import { HTTPService } from './services/http-router.js';
|
||||
import { InvitationsRoute } from './routes/invitations.js';
|
||||
import { StorageSQLite } from './services/storage.js';
|
||||
import { SSEBroadcaster } from './services/sse-broadcaster.js';
|
||||
import { Config } from "./services/config.js";
|
||||
|
||||
import type { InvitationSchema } from './utils/invitation-parser.js';
|
||||
import { Database } from "./services/database/database.js";
|
||||
import { MigrationService } from "./services/database/migrate.js";
|
||||
import { StorageService } from "./services/storage.js";
|
||||
|
||||
import { Broadcaster } from "./services/broadcaster/broadcaster.js";
|
||||
|
||||
import { Cache } from "./utils/cache.js";
|
||||
import { Logger } from "./utils/logger.js";
|
||||
|
||||
import type { BloomFilterCacheEntry } from "./routes/bloom-filter-cache.js";
|
||||
import { BloomRoute } from "./routes/bloom.js";
|
||||
import { DataRoute } from "./routes/data.js";
|
||||
import { HTTPService } from "./services/http-router.js";
|
||||
|
||||
export class App {
|
||||
static async create() {
|
||||
// TODO: Make this configurable
|
||||
const invitationStoragePath = "./xo-invitations.db";
|
||||
// Load the config from the environment variables
|
||||
// NOTE: Defaults will be applied inside the config class
|
||||
const config = Config.fromEnv();
|
||||
const debug = new Logger("xo-sync-server");
|
||||
debug("config loaded: %O", config);
|
||||
|
||||
// Create the invitation store (this is a in-memory store for now)
|
||||
const storage = await StorageSQLite.createOrOpen(invitationStoragePath);
|
||||
const invitationStore = await storage.createOrGetStore<InvitationSchema>("invitations");
|
||||
// Database setup
|
||||
const database = new Database({ path: config.database.path, debug });
|
||||
const migrations = new MigrationService(database, debug);
|
||||
await migrations.migrateToLatest();
|
||||
const storage = new StorageService(database);
|
||||
|
||||
// Create the SSE Broadcaster
|
||||
const sseBroadcaster = new SSEBroadcaster();
|
||||
// Create our broadcaster
|
||||
const broadcaster = new Broadcaster(debug);
|
||||
|
||||
// Create our bloom filter cache
|
||||
const bloomFilterCache = new Cache<BloomFilterCacheEntry>(config.bloomFilterCache);
|
||||
|
||||
// Create the Invitation route, passing in the invitation store and sse broadcaster
|
||||
const invitationsRoute = new InvitationsRoute(invitationStore, sseBroadcaster);
|
||||
// Create our routes
|
||||
const dataRoute = new DataRoute(storage, broadcaster, bloomFilterCache);
|
||||
const bloomRoute = new BloomRoute(storage, bloomFilterCache);
|
||||
|
||||
// Create the HTTP service, passing in the invitation route
|
||||
const http = new HTTPService([
|
||||
invitationsRoute,
|
||||
]);
|
||||
// Create our HTTP service
|
||||
const http = new HTTPService(
|
||||
[dataRoute, bloomRoute],
|
||||
config.server.port,
|
||||
config.server.host,
|
||||
debug,
|
||||
);
|
||||
|
||||
// Create the app instance, passing in the HTTP service
|
||||
return new App(http);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance of App.
|
||||
* @param http - The HTTP service instance.
|
||||
*/
|
||||
constructor(private readonly http: HTTPService) {}
|
||||
|
||||
// Start the application
|
||||
async start() {
|
||||
// Start the HTTP service
|
||||
await this.http.start();
|
||||
}
|
||||
}
|
||||
|
||||
// Create the app instance and start it
|
||||
const app = await App.create();
|
||||
await app.start();
|
||||
|
||||
8
src/routes/bloom-filter-cache.ts
Normal file
8
src/routes/bloom-filter-cache.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
import type { InverseBloomFilter } from "../utils/inverse-bloom-filter.js";
|
||||
|
||||
export type BloomFilterCacheEntry = {
|
||||
count: number;
|
||||
filter: InverseBloomFilter;
|
||||
filterBase64: string;
|
||||
filterHash: string;
|
||||
};
|
||||
133
src/routes/bloom.ts
Normal file
133
src/routes/bloom.ts
Normal file
@@ -0,0 +1,133 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import { z } from "zod";
|
||||
import type { StorageService } from "../services/storage.js";
|
||||
import type { Cache } from "../utils/cache.js";
|
||||
import { InverseBloomFilter } from "../utils/inverse-bloom-filter.js";
|
||||
import type { BloomFilterCacheEntry } from "./bloom-filter-cache.js";
|
||||
import type { AnyRouteOptions, Context } from "./types.js";
|
||||
|
||||
const statsRequest = z.object({
|
||||
storageIdentifier: z.string().optional(),
|
||||
storageIdentifiers: z.array(z.string()).optional(),
|
||||
});
|
||||
|
||||
export class BloomRoute {
|
||||
constructor(
|
||||
private readonly storage: StorageService,
|
||||
private readonly bloomFilterCache: Cache<BloomFilterCacheEntry>,
|
||||
) {}
|
||||
|
||||
async getRoutes(): Promise<Array<AnyRouteOptions>> {
|
||||
return [
|
||||
{
|
||||
method: "GET",
|
||||
url: "/bloom",
|
||||
handler: this.getStats.bind(this),
|
||||
},
|
||||
{
|
||||
method: "POST",
|
||||
url: "/bloom",
|
||||
handler: this.getStats.bind(this),
|
||||
},
|
||||
{
|
||||
method: "GET",
|
||||
url: "/stats",
|
||||
handler: this.getStats.bind(this),
|
||||
},
|
||||
{
|
||||
method: "POST",
|
||||
url: "/stats",
|
||||
handler: this.getStats.bind(this),
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
async getStats(context: Context) {
|
||||
const storageIdentifiers = this.getStorageIdentifiers(context);
|
||||
const counts = await this.storage.countMany(storageIdentifiers);
|
||||
const items = [];
|
||||
|
||||
for (const storageIdentifier of storageIdentifiers) {
|
||||
const count = counts[storageIdentifier] ?? 0;
|
||||
const negotiatedBloomFilterSize = BloomRoute.getNegotiatedBloomFilterSize(count);
|
||||
const cached = this.bloomFilterCache.get(storageIdentifier) !== undefined;
|
||||
const bloomFilter = await this.getOrCreateBloomFilter(
|
||||
storageIdentifier,
|
||||
negotiatedBloomFilterSize,
|
||||
);
|
||||
|
||||
items.push({
|
||||
storageIdentifier,
|
||||
count,
|
||||
negotiatedBloomFilterSize,
|
||||
bloomFilter: bloomFilter.filterBase64,
|
||||
bloomFilterHash: bloomFilter.filterHash,
|
||||
cached,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
items,
|
||||
};
|
||||
}
|
||||
|
||||
private getStorageIdentifiers(context: Context): string[] {
|
||||
const request = statsRequest.parse({
|
||||
...context.params,
|
||||
...(typeof context.body === "object" && context.body !== null ? context.body : {}),
|
||||
});
|
||||
|
||||
const fromSingle = request.storageIdentifier
|
||||
? splitStorageIdentifiers(request.storageIdentifier)
|
||||
: [];
|
||||
const fromBatch = request.storageIdentifiers ?? [];
|
||||
const storageIdentifiers = [...fromSingle, ...fromBatch];
|
||||
|
||||
if (storageIdentifiers.length === 0) {
|
||||
throw new z.ZodError([
|
||||
{
|
||||
code: "custom",
|
||||
path: ["storageIdentifier"],
|
||||
message: "At least one storage identifier is required",
|
||||
input: context.params,
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
||||
return [...new Set(storageIdentifiers)];
|
||||
}
|
||||
|
||||
private async getOrCreateBloomFilter(
|
||||
storageIdentifier: string,
|
||||
count: number,
|
||||
): Promise<BloomFilterCacheEntry> {
|
||||
const cached = this.bloomFilterCache.get(storageIdentifier);
|
||||
if (cached?.count === count) {
|
||||
return cached;
|
||||
}
|
||||
|
||||
const hashes = await this.storage.listHashes(storageIdentifier);
|
||||
const filter = InverseBloomFilter.from(hashes, count);
|
||||
const filterBytes = filter.toBytes();
|
||||
const entry = {
|
||||
count,
|
||||
filter,
|
||||
filterBase64: Buffer.from(filterBytes).toString("base64"),
|
||||
filterHash: createHash("sha256").update(filterBytes).digest("base64"),
|
||||
};
|
||||
|
||||
this.bloomFilterCache.set(storageIdentifier, entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
private static getNegotiatedBloomFilterSize(count: number): number {
|
||||
return Math.max(count, 256);
|
||||
}
|
||||
}
|
||||
|
||||
function splitStorageIdentifiers(value: string): string[] {
|
||||
return value
|
||||
.split(",")
|
||||
.map((item) => item.trim())
|
||||
.filter((item) => item.length > 0);
|
||||
}
|
||||
145
src/routes/data.ts
Normal file
145
src/routes/data.ts
Normal file
@@ -0,0 +1,145 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import { z } from "zod";
|
||||
import type { Broadcaster } from "../services/broadcaster/broadcaster.js";
|
||||
import type { StorageService } from "../services/storage.js";
|
||||
import type { Cache } from "../utils/cache.js";
|
||||
import { InverseBloomFilter } from "../utils/inverse-bloom-filter.js";
|
||||
import type { BloomFilterCacheEntry } from "./bloom-filter-cache.js";
|
||||
import type { AnyRouteOptions, Context } from "./types.js";
|
||||
|
||||
const storageParams = z.object({
|
||||
storageIdentifier: z.string(),
|
||||
bloomFilter: z.string().optional(),
|
||||
count: z.coerce.number().int().positive().optional(),
|
||||
});
|
||||
|
||||
const appendBody = z.object({
|
||||
storageIdentifier: z.string(),
|
||||
data: z.string(),
|
||||
});
|
||||
|
||||
export class DataRoute {
|
||||
constructor(
|
||||
private readonly storage: StorageService,
|
||||
private readonly broadcaster: Broadcaster,
|
||||
private readonly bloomFilterCache: Cache<BloomFilterCacheEntry>,
|
||||
) {}
|
||||
|
||||
async getRoutes(): Promise<Array<AnyRouteOptions>> {
|
||||
return [
|
||||
{
|
||||
method: "GET",
|
||||
url: "/data",
|
||||
handler: this.getData.bind(this),
|
||||
},
|
||||
{
|
||||
method: "GET",
|
||||
url: "/data/stream",
|
||||
mode: "sse",
|
||||
broadcaster: this.broadcaster,
|
||||
topics: this.getStorageTopics.bind(this),
|
||||
handler: this.getData.bind(this),
|
||||
},
|
||||
{
|
||||
method: "POST",
|
||||
url: "/data",
|
||||
handler: this.appendData.bind(this),
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
async getData(context: Context) {
|
||||
const { storageIdentifier, bloomFilter, count } = storageParams.parse(context.params);
|
||||
if (bloomFilter && count) {
|
||||
return this.getMissingData(storageIdentifier, bloomFilter, count);
|
||||
}
|
||||
|
||||
return this.storage.list(storageIdentifier);
|
||||
}
|
||||
|
||||
async appendData(context: Context) {
|
||||
const input = appendBody.parse(context.body);
|
||||
const event = await this.storage.append(input);
|
||||
this.bloomFilterCache.delete(input.storageIdentifier);
|
||||
|
||||
await this.broadcaster.publish(DataRoute.storageTopic(input.storageIdentifier), {
|
||||
type: "storage-event",
|
||||
data: event,
|
||||
});
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
getStorageTopics(context: Context): string[] {
|
||||
const { storageIdentifier } = storageParams.parse(context.params);
|
||||
return [DataRoute.storageTopic(storageIdentifier)];
|
||||
}
|
||||
|
||||
private static storageTopic(storageIdentifier: string): string {
|
||||
return `storage:${storageIdentifier}`;
|
||||
}
|
||||
|
||||
private async getMissingData(
|
||||
storageIdentifier: string,
|
||||
bloomFilter: string,
|
||||
count: number,
|
||||
) {
|
||||
const serverCount = await this.storage.count(storageIdentifier);
|
||||
const negotiatedCount = DataRoute.getNegotiatedBloomFilterSize(serverCount);
|
||||
|
||||
if (count !== negotiatedCount) {
|
||||
return this.storage.list(storageIdentifier);
|
||||
}
|
||||
|
||||
const cacheEntry = await this.getOrCreateBloomFilter(storageIdentifier, negotiatedCount);
|
||||
const remoteFilter = Buffer.from(bloomFilter, "base64");
|
||||
if (cacheEntry.filterBase64 === bloomFilter || cacheEntry.filter.hasSameBytes(remoteFilter)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
let difference: ReturnType<InverseBloomFilter["createDifference"]>;
|
||||
try {
|
||||
difference = cacheEntry.filter.createDifference(remoteFilter);
|
||||
} catch (error) {
|
||||
throw new z.ZodError([
|
||||
{
|
||||
code: "custom",
|
||||
path: ["bloomFilter"],
|
||||
message: error instanceof Error ? error.message : "Invalid bloom filter",
|
||||
input: bloomFilter,
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
||||
return this.storage.listWhereHash(storageIdentifier, (hash) =>
|
||||
difference.maybeMissing(hash),
|
||||
);
|
||||
}
|
||||
|
||||
private async getOrCreateBloomFilter(
|
||||
storageIdentifier: string,
|
||||
count: number,
|
||||
): Promise<BloomFilterCacheEntry> {
|
||||
const cached = this.bloomFilterCache.get(storageIdentifier);
|
||||
if (cached?.count === count) {
|
||||
return cached;
|
||||
}
|
||||
|
||||
const hashes = await this.storage.listHashes(storageIdentifier);
|
||||
const filter = InverseBloomFilter.from(hashes, count);
|
||||
const filterBytes = filter.toBytes();
|
||||
const entry = {
|
||||
count,
|
||||
filter,
|
||||
filterBase64: Buffer.from(filterBytes).toString("base64"),
|
||||
filterHash: createHash("sha256").update(filterBytes).digest("base64"),
|
||||
};
|
||||
|
||||
this.bloomFilterCache.set(storageIdentifier, entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
private static getNegotiatedBloomFilterSize(count: number): number {
|
||||
return Math.max(count, 256);
|
||||
}
|
||||
}
|
||||
@@ -1 +1,2 @@
|
||||
export * from './invitations.js';
|
||||
export * from "./bloom.js";
|
||||
export * from "./data.js";
|
||||
|
||||
@@ -1,130 +0,0 @@
|
||||
import type { AnyRouteOptions, RouteEventHandlers, RouteRequest, RouteResponse } from './types.js';
|
||||
|
||||
type InvitationRouteResponse = RouteResponse<Partial<RouteEventHandlers>>;
|
||||
|
||||
import type { SSEBroadcaster } from '../services/sse-broadcaster.js';
|
||||
import type { StoreSQLite } from '../services/storage.js';
|
||||
import { parseInvitation } from '../utils/invitation-parser.js';
|
||||
|
||||
import Z from 'zod';
|
||||
export class InvitationsRoute {
|
||||
constructor(
|
||||
private readonly invitationStore: StoreSQLite<Z.infer<typeof parseInvitation>>,
|
||||
private readonly sseBroadcaster: SSEBroadcaster,
|
||||
) {}
|
||||
|
||||
async getRoutes(): Promise<Array<AnyRouteOptions>> {
|
||||
return [
|
||||
{
|
||||
method: 'GET',
|
||||
url: '/invitations',
|
||||
handler: this.getInvitation.bind(this),
|
||||
},
|
||||
{
|
||||
method: 'POST',
|
||||
url: '/invitations',
|
||||
handler: this.updateInvitation.bind(this),
|
||||
}
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an invitation, and if the text/event-stream header is present, subscribe the client to the SSE stream.
|
||||
* @param request - The request.
|
||||
* @param reply - The reply.
|
||||
* @returns The invitation.
|
||||
*/
|
||||
async getInvitation(request: RouteRequest, reply: InvitationRouteResponse): Promise<InvitationRouteResponse> {
|
||||
// Get the invitation identifier from the query
|
||||
const { invitationIdentifier } = request.query as { invitationIdentifier?: string };
|
||||
|
||||
// If the invitation identifier is not provided, return an error.
|
||||
if (!invitationIdentifier) {
|
||||
reply.status = 400;
|
||||
reply.body = { error: 'Invitation Identifier is required' };
|
||||
return reply;
|
||||
}
|
||||
|
||||
// Get the invitation from the store
|
||||
const storedInvitation = await this.invitationStore.get(invitationIdentifier);
|
||||
|
||||
// If the client is not subscribing to the SSE stream, return the invitation.
|
||||
if (request.headers['accept'] !== 'text/event-stream') {
|
||||
reply.body = storedInvitation || {};
|
||||
return reply;
|
||||
}
|
||||
|
||||
// Its an SSE request, so we need to subscribe the client to the SSE stream.
|
||||
await this.sseBroadcaster.subscribe(request, reply);
|
||||
|
||||
// If the invitation doesn't exist, don't send anything.
|
||||
if (!storedInvitation) {
|
||||
reply.status = 204;
|
||||
reply.body = {};
|
||||
return reply;
|
||||
}
|
||||
|
||||
// Send the invitation to the client as if it was a get request.
|
||||
this.sseBroadcaster.sendEvent(reply, 'invitation-updated', storedInvitation);
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the invitation.
|
||||
* @param request - The request.
|
||||
* @param reply - The reply.
|
||||
* @returns The merged invitation.
|
||||
*/
|
||||
async updateInvitation(request: RouteRequest, reply: InvitationRouteResponse): Promise<InvitationRouteResponse> {
|
||||
// Parse the invitation
|
||||
const invitation = parseInvitation.parse(request.body);
|
||||
|
||||
// Get the existing invitation
|
||||
const existingInvitation = await this.invitationStore.get(invitation.invitationIdentifier);
|
||||
|
||||
// Merge the existing invitation with the new invitation
|
||||
const mergedInvitation = InvitationsRoute.mergeInvitations(invitation, existingInvitation);
|
||||
|
||||
// Store the merged invitation
|
||||
await this.invitationStore.set(invitation.invitationIdentifier, mergedInvitation);
|
||||
|
||||
// Broadcast the invitation update (We send down the whole invitation. Clients will have to compare commitIds)
|
||||
await this.sseBroadcaster.broadcast(invitation.invitationIdentifier, 'invitation-updated', invitation);
|
||||
|
||||
reply.status = 200;
|
||||
reply.body = mergedInvitation;
|
||||
|
||||
// Return the reply.
|
||||
return reply;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge two invitations by commit identifiers.
|
||||
* This wont work in an actual commit merging situation since the invitations will be encrypted.
|
||||
*
|
||||
* @param invitation1 - The first invitation.
|
||||
* @param invitation2 - The second invitation.
|
||||
* @returns The merged invitation.
|
||||
*/
|
||||
static mergeInvitations(invitation1: Z.infer<typeof parseInvitation>, invitation2: Z.infer<typeof parseInvitation> | undefined): Z.infer<typeof parseInvitation> {
|
||||
// Initialize the result with the first invitation.
|
||||
const result = invitation1;
|
||||
|
||||
// Loop over the commits in the second invitation.
|
||||
for(const commit of invitation2?.commits ?? []) {
|
||||
// If the commit already exists in the result, skip it.
|
||||
if(result.commits.some(c => c.commitIdentifier === commit.commitIdentifier)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Add the commit to the result.
|
||||
result.commits.push(commit);
|
||||
}
|
||||
|
||||
// Return the merged invitation.
|
||||
return result;
|
||||
}
|
||||
|
||||
static parseInvitation = parseInvitation
|
||||
}
|
||||
@@ -1,113 +1,38 @@
|
||||
/**
|
||||
* Canonical shape for route-level response event handlers.
|
||||
*
|
||||
* Router implementations define what they can provide by extending this map.
|
||||
*/
|
||||
export type RouteEventHandlers = Record<string, (...args: unknown[]) => void>;
|
||||
import type { StreamEvent } from "../services/stream/base-stream.js";
|
||||
import type { Broadcaster } from "../services/broadcaster/broadcaster.js";
|
||||
|
||||
/**
|
||||
* Utility type for a route that does not require any response events.
|
||||
*/
|
||||
export type EmptyRouteEventHandlers = Record<never, never>;
|
||||
export type Context = {
|
||||
headers: Record<string, string>;
|
||||
params: Record<string, unknown>;
|
||||
body?: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* Routes that use any {@link RouteResponse} implementation.
|
||||
*/
|
||||
export type AnyRouteOptions = RouteOptions<Partial<RouteEventHandlers>>;
|
||||
export type RouteHandler = (context: Context) => unknown | Promise<unknown>;
|
||||
|
||||
/**
|
||||
* RouteOptions defines a route configuration.
|
||||
*
|
||||
* The generic parameter `RequiredEvents` specifies what events the route handler requires
|
||||
* from the response object. Routes that don't need any events should use the default (empty object).
|
||||
*
|
||||
* @template RequiredEvents - The events this route requires from the response (default: none)
|
||||
*/
|
||||
export type RouteOptions<RequiredEvents extends Partial<RouteEventHandlers> = EmptyRouteEventHandlers> = {
|
||||
export type HttpMethod =
|
||||
| "GET"
|
||||
| "POST"
|
||||
| "PUT"
|
||||
| "DELETE"
|
||||
| "PATCH"
|
||||
| "OPTIONS";
|
||||
|
||||
export type HTTPRouteOptions = {
|
||||
method: HttpMethod;
|
||||
url: string;
|
||||
method: 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH' | 'OPTIONS';
|
||||
handler: (req: RouteRequest, res: RouteResponse<RequiredEvents>) => Promise<RouteResponse<RequiredEvents>>;
|
||||
}
|
||||
mode?: "http";
|
||||
handler: RouteHandler;
|
||||
};
|
||||
|
||||
/**
|
||||
* Generic HTTP Request class that can be extended for specific HTTP routers, eg. express, fastify, elysia
|
||||
*
|
||||
* NOTE: This is likely incomplete. Some special actions may require additional properties or methods.
|
||||
* TODO: Add to this as we find more properties that are needed for specific actions.
|
||||
*/
|
||||
export abstract class RouteRequest {
|
||||
/** The body of the request */
|
||||
abstract body: Record<string, unknown>;
|
||||
export type SSERouteOptions = {
|
||||
method: "GET" | "POST";
|
||||
url: string;
|
||||
mode: "sse";
|
||||
broadcaster: Broadcaster;
|
||||
topics: (context: Context) => string[] | Promise<string[]>;
|
||||
handler: RouteHandler;
|
||||
};
|
||||
|
||||
/** The query parameters of the request */
|
||||
abstract query: Record<string, unknown>;
|
||||
export type AnyRouteOptions = HTTPRouteOptions | SSERouteOptions;
|
||||
|
||||
/** The path parameters of the request */
|
||||
abstract params: Record<string, unknown>;
|
||||
|
||||
/** The headers of the request */
|
||||
abstract headers: Record<string, unknown>;
|
||||
|
||||
/** The cookies of the request */
|
||||
abstract cookies: Record<string, unknown>;
|
||||
|
||||
/** The raw request object from the HTTP router */
|
||||
abstract raw: {
|
||||
/** Set the timeout for the request */
|
||||
setTimeout: (timeout: number) => void;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic HTTP Response class that can be extended for specific HTTP routers.
|
||||
*
|
||||
* The generic parameter `EventHandlers` defines what events this response provides.
|
||||
* Implementations should specify all events they support.
|
||||
*
|
||||
* NOTE: This is likely incomplete. Some special actions may require additional properties or methods.
|
||||
* TODO: Add to this as we find more properties that are needed for specific actions.
|
||||
*
|
||||
* @template EventHandlers - The events this response implementation provides (default: none)
|
||||
*/
|
||||
export abstract class RouteResponse<EventHandlers extends Partial<RouteEventHandlers> = EmptyRouteEventHandlers> {
|
||||
/**
|
||||
* Internal compile-time brand carrying the concrete event map.
|
||||
* This is intentionally optional and has no runtime effect.
|
||||
*/
|
||||
readonly __eventHandlersBrand?: EventHandlers;
|
||||
|
||||
/** The status code of the response */
|
||||
abstract status: number;
|
||||
|
||||
/** The body of the response */
|
||||
abstract body: Record<string, unknown> | unknown;
|
||||
|
||||
/** The headers of the response */
|
||||
abstract headers: Record<string, string>;
|
||||
|
||||
/** The raw response object from the HTTP router */
|
||||
abstract raw: {
|
||||
/** Set a header of the response */
|
||||
setHeader: (key: string, value: string) => void;
|
||||
|
||||
/** Write the headers of the response */
|
||||
writeHead: (status: number, headers: Record<string, string>) => void;
|
||||
|
||||
/** Write the body of the response */
|
||||
write: (data: string) => void;
|
||||
|
||||
/** Flush the headers of the response */
|
||||
flushHeaders: () => void;
|
||||
|
||||
/** End the response */
|
||||
end: () => void;
|
||||
|
||||
/** Add an event listener to the response */
|
||||
on: <K extends keyof EventHandlers>(event: K, callback: EventHandlers[K]) => void;
|
||||
|
||||
/** Remove an event listener from the response */
|
||||
off: <K extends keyof EventHandlers>(event: K, callback: EventHandlers[K]) => void;
|
||||
}
|
||||
|
||||
abstract ignore: boolean;
|
||||
}
|
||||
export type SSEInitialEvent = Omit<StreamEvent, "id">;
|
||||
|
||||
96
src/services/broadcaster/broadcaster.ts
Normal file
96
src/services/broadcaster/broadcaster.ts
Normal file
@@ -0,0 +1,96 @@
|
||||
import type { BaseStream, StreamEvent } from "../stream/base-stream.js";
|
||||
import { Logger } from "../../utils/logger.js";
|
||||
|
||||
export abstract class BaseBroadcaster {
|
||||
abstract subscribe(stream: BaseStream, topics: string[]): Promise<void>
|
||||
|
||||
abstract unsubscribe(stream: BaseStream, topics?: string[]): Promise<void>
|
||||
|
||||
abstract publish(topic: string, event: Omit<StreamEvent, "id">): Promise<StreamEvent>
|
||||
|
||||
abstract sendEvent(stream: BaseStream, event: StreamEvent): Promise<void>
|
||||
}
|
||||
|
||||
export class Broadcaster extends BaseBroadcaster {
|
||||
private readonly debug: Logger;
|
||||
private readonly topicStreams = new Map<string, Set<BaseStream>>();
|
||||
private readonly streamTopics = new Map<BaseStream, Set<string>>();
|
||||
private readonly streams = new Set<BaseStream>();
|
||||
|
||||
constructor(debug: Logger) {
|
||||
super();
|
||||
this.debug = debug.extend("broadcaster");
|
||||
}
|
||||
|
||||
async subscribe(stream: BaseStream, topics: string[]): Promise<void> {
|
||||
this.streams.add(stream);
|
||||
|
||||
let trackedTopics = this.streamTopics.get(stream);
|
||||
if (!trackedTopics) {
|
||||
trackedTopics = new Set();
|
||||
this.streamTopics.set(stream, trackedTopics);
|
||||
stream.onClose(() => {
|
||||
void this.unsubscribe(stream);
|
||||
});
|
||||
}
|
||||
|
||||
for (const topic of topics) {
|
||||
let streams = this.topicStreams.get(topic);
|
||||
if (!streams) {
|
||||
streams = new Set();
|
||||
this.topicStreams.set(topic, streams);
|
||||
}
|
||||
|
||||
streams.add(stream);
|
||||
trackedTopics.add(topic);
|
||||
}
|
||||
|
||||
this.debug("subscribed stream %s to topics %o", stream, topics);
|
||||
}
|
||||
|
||||
async unsubscribe(stream: BaseStream, topics?: string[]): Promise<void> {
|
||||
const trackedTopics = this.streamTopics.get(stream);
|
||||
const topicsToRemove = topics ?? Array.from(trackedTopics ?? []);
|
||||
|
||||
for (const topic of topicsToRemove) {
|
||||
this.topicStreams.get(topic)?.delete(stream);
|
||||
trackedTopics?.delete(topic);
|
||||
|
||||
if (this.topicStreams.get(topic)?.size === 0) {
|
||||
this.topicStreams.delete(topic);
|
||||
}
|
||||
}
|
||||
|
||||
if (!trackedTopics || trackedTopics.size === 0) {
|
||||
this.streamTopics.delete(stream);
|
||||
this.streams.delete(stream);
|
||||
}
|
||||
|
||||
this.debug("unsubscribed stream %s from topics %o", stream, topicsToRemove);
|
||||
}
|
||||
|
||||
async publish(topic: string, event: Omit<StreamEvent, "id">): Promise<StreamEvent> {
|
||||
const timestamp = Date.now();
|
||||
const eventWithId: StreamEvent = {
|
||||
...event,
|
||||
id: String(timestamp),
|
||||
};
|
||||
|
||||
for (const stream of this.topicStreams.get(topic) ?? []) {
|
||||
await this.sendEvent(stream, eventWithId);
|
||||
}
|
||||
|
||||
this.debug("published %s to topic %s", event.type, topic);
|
||||
return eventWithId;
|
||||
}
|
||||
|
||||
async sendEvent(stream: BaseStream, event: StreamEvent): Promise<void> {
|
||||
try {
|
||||
await stream.send(event);
|
||||
} catch (error) {
|
||||
this.debug("failed to send event to stream %s: %o", stream, error);
|
||||
stream.close();
|
||||
await this.unsubscribe(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
73
src/services/config.ts
Normal file
73
src/services/config.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import "dotenv/config";
|
||||
import { z } from "zod";
|
||||
|
||||
// TODO: Decide if zod schema is best. So far, im liking it in the config file.
|
||||
const configSchema = z.object({
|
||||
database: z.object({
|
||||
path: z.string().default("data.db"),
|
||||
}),
|
||||
server: z.object({
|
||||
port: z.coerce.number().default(3000),
|
||||
host: z.string().default("0.0.0.0"),
|
||||
cors: z
|
||||
.object({
|
||||
origin: z.string().default("*"),
|
||||
methods: z
|
||||
.array(z.string())
|
||||
.default(["GET", "POST", "PUT", "DELETE", "OPTIONS"]),
|
||||
allowedHeaders: z
|
||||
.array(z.string())
|
||||
.default(["Content-Type", "Authorization"]),
|
||||
})
|
||||
.partial()
|
||||
.prefault({}),
|
||||
}),
|
||||
bloomFilterCache: z.object({
|
||||
maxSize: z.number().default(-1),
|
||||
ttl: z.number().default(-1),
|
||||
})
|
||||
.partial()
|
||||
.prefault({})
|
||||
});
|
||||
|
||||
type ConfigInput = z.input<typeof configSchema>;
|
||||
type ConfigSchema = z.output<typeof configSchema>;
|
||||
|
||||
/**
|
||||
* The Config class is used to load and parse the configuration for the vending machine.
|
||||
*/
|
||||
export class Config {
|
||||
static fromEnv(): Config {
|
||||
return this.from({
|
||||
database: {
|
||||
path: process.env.DATABASE_PATH,
|
||||
},
|
||||
server: {
|
||||
port: process.env.SERVER_PORT,
|
||||
host: process.env.SERVER_HOST,
|
||||
},
|
||||
bloomFilterCache: {
|
||||
maxSize: parseInt(process.env.BLOOM_FILTER_MAX_SIZE ?? "-1"),
|
||||
ttl: parseInt(process.env.BLOOM_FILTER_TTL ?? "-1"),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static from(config: ConfigInput): Config {
|
||||
return new Config(configSchema.parse(config));
|
||||
}
|
||||
|
||||
public get database() {
|
||||
return this.config.database;
|
||||
}
|
||||
|
||||
public get server() {
|
||||
return this.config.server;
|
||||
}
|
||||
|
||||
public get bloomFilterCache() {
|
||||
return this.config.bloomFilterCache;
|
||||
}
|
||||
|
||||
private constructor(private readonly config: ConfigSchema) {}
|
||||
}
|
||||
40
src/services/database/database.ts
Normal file
40
src/services/database/database.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import DatabaseConstructor from "better-sqlite3";
|
||||
import { Kysely, SqliteDialect } from "kysely";
|
||||
import type { DatabaseTables } from "./tables.js";
|
||||
import { Logger } from "../../utils/logger.js";
|
||||
|
||||
export type DatabaseOptions = {
|
||||
path: string;
|
||||
debug: Logger;
|
||||
};
|
||||
|
||||
export class Database {
|
||||
private readonly debug: Logger;
|
||||
private readonly sqlite: DatabaseConstructor.Database;
|
||||
private readonly kysely: Kysely<DatabaseTables>;
|
||||
|
||||
constructor(options: DatabaseOptions) {
|
||||
this.debug = options.debug.extend("database");
|
||||
this.sqlite = new DatabaseConstructor(options.path);
|
||||
this.configurePragmas();
|
||||
|
||||
this.kysely = new Kysely<DatabaseTables>({
|
||||
dialect: new SqliteDialect({ database: this.sqlite }),
|
||||
});
|
||||
}
|
||||
|
||||
get db(): Kysely<DatabaseTables> {
|
||||
return this.kysely;
|
||||
}
|
||||
|
||||
async destroy(): Promise<void> {
|
||||
this.debug("destroying database connection");
|
||||
await this.kysely.destroy();
|
||||
}
|
||||
|
||||
private configurePragmas(): void {
|
||||
this.debug("configuring SQLite pragmas");
|
||||
this.sqlite.pragma("journal_mode = WAL");
|
||||
this.sqlite.pragma("foreign_keys = ON");
|
||||
}
|
||||
}
|
||||
39
src/services/database/migrate.ts
Normal file
39
src/services/database/migrate.ts
Normal file
@@ -0,0 +1,39 @@
|
||||
import { promises as fs } from "node:fs";
|
||||
import path from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { FileMigrationProvider, Migrator } from "kysely/migration";
|
||||
import type { Database } from "./database.js";
|
||||
import { Logger } from "../../utils/logger.js";
|
||||
|
||||
export class MigrationService {
|
||||
private readonly debug: Logger;
|
||||
private readonly migrator: Migrator;
|
||||
|
||||
constructor(database: Database, debug: Logger) {
|
||||
this.debug = debug.extend("migrations");
|
||||
const currentFilePath = fileURLToPath(import.meta.url);
|
||||
const currentDirectory = path.dirname(currentFilePath);
|
||||
const migrationsPath = path.join(currentDirectory, "migrations");
|
||||
|
||||
this.migrator = new Migrator({
|
||||
db: database.db,
|
||||
provider: new FileMigrationProvider({
|
||||
fs,
|
||||
path,
|
||||
migrationFolder: migrationsPath,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
async migrateToLatest(): Promise<void> {
|
||||
this.debug("migrating database to latest");
|
||||
const { error } = await this.migrator.migrateToLatest();
|
||||
|
||||
if (error) {
|
||||
this.debug("migration failed: %O", error);
|
||||
throw error;
|
||||
}
|
||||
|
||||
this.debug("database migrations complete");
|
||||
}
|
||||
}
|
||||
44
src/services/database/migrations/001-storage-events.ts
Normal file
44
src/services/database/migrations/001-storage-events.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { Kysely, sql } from "kysely";
|
||||
import type { DatabaseTables } from "../tables.js";
|
||||
|
||||
const millisecondTime = sql`(CAST(unixepoch('subsec') * 1000 AS INTEGER))`;
|
||||
|
||||
export async function up(db: Kysely<DatabaseTables>): Promise<void> {
|
||||
await db.schema
|
||||
.createTable("storage_events")
|
||||
.ifNotExists()
|
||||
.addColumn("storageIdentifier", "text", (col) => col.notNull())
|
||||
.addColumn("data", "blob", (col) => col.notNull())
|
||||
.addColumn("hash", "text", (col) => col.notNull())
|
||||
.addColumn("created_at", "integer", (col) =>
|
||||
col.notNull().defaultTo(millisecondTime),
|
||||
)
|
||||
.execute();
|
||||
|
||||
// TOOD: I dont think we needs this index? We can just rely on the (storageIdentifier, created_at) index, right?
|
||||
await db.schema
|
||||
.createIndex("idx_storage_events_storageIdentifier")
|
||||
.ifNotExists()
|
||||
.on("storage_events")
|
||||
.column("storageIdentifier")
|
||||
.execute();
|
||||
|
||||
await db.schema
|
||||
.createIndex("idx_storage_events_storageIdentifier_created_at")
|
||||
.ifNotExists()
|
||||
.on("storage_events")
|
||||
.columns(["storageIdentifier", "created_at"])
|
||||
.execute();
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<DatabaseTables>): Promise<void> {
|
||||
await db.schema
|
||||
.dropIndex("idx_storage_events_storageIdentifier_created_at")
|
||||
.ifExists()
|
||||
.execute();
|
||||
await db.schema
|
||||
.dropIndex("idx_storage_events_storageIdentifier")
|
||||
.ifExists()
|
||||
.execute();
|
||||
await db.schema.dropTable("storage_events").ifExists().execute();
|
||||
}
|
||||
20
src/services/database/tables.ts
Normal file
20
src/services/database/tables.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import type { ColumnType } from "kysely";
|
||||
|
||||
export type Timestamp = ColumnType<
|
||||
number,
|
||||
number | undefined,
|
||||
number | undefined
|
||||
>;
|
||||
|
||||
export type BlobColumn = ColumnType<Buffer, Buffer | Uint8Array, Buffer>;
|
||||
|
||||
export interface StorageEventsTable {
|
||||
storageIdentifier: string;
|
||||
data: BlobColumn;
|
||||
hash: string;
|
||||
created_at: Timestamp;
|
||||
}
|
||||
|
||||
export interface DatabaseTables {
|
||||
storage_events: StorageEventsTable;
|
||||
}
|
||||
@@ -1,11 +1,9 @@
|
||||
import debug from "debug";
|
||||
|
||||
import { type Context, Hono } from "hono";
|
||||
import { Hono, type Context as HonoContext } from "hono";
|
||||
import { type HttpBindings, serve } from "@hono/node-server";
|
||||
import type { ContentfulStatusCode, StatusCode } from "hono/utils/http-status";
|
||||
import { RESPONSE_ALREADY_SENT } from "@hono/node-server/utils/response";
|
||||
import { cors } from "hono/cors";
|
||||
import { getCookie } from "hono/cookie";
|
||||
import { compress } from 'hono/compress'
|
||||
|
||||
import { z } from "zod";
|
||||
|
||||
@@ -14,82 +12,71 @@ import {
|
||||
encodeExtendedJson,
|
||||
} from "../utils/ext-json.js";
|
||||
|
||||
import {
|
||||
type AnyRouteOptions,
|
||||
type RouteEventHandlers,
|
||||
type RouteOptions,
|
||||
RouteRequest,
|
||||
RouteResponse,
|
||||
import type {
|
||||
AnyRouteOptions,
|
||||
Context,
|
||||
HttpMethod,
|
||||
} from "../routes/types.js";
|
||||
import { SSEStream } from "./stream/stream-sse.js";
|
||||
import { Logger } from "../utils/logger.js";
|
||||
|
||||
/** Context variable key for ExtJSON-decoded request bodies. */
|
||||
const PARSED_BODY_KEY = "parsedBody";
|
||||
|
||||
/**
|
||||
* Hono environment bindings used by this service.
|
||||
* Includes Node's raw request/response objects for SSE streaming.
|
||||
*/
|
||||
type AppEnv = {
|
||||
Bindings: HttpBindings;
|
||||
Variables: {
|
||||
parsedBody?: Record<string, unknown>;
|
||||
parsedBody?: unknown;
|
||||
};
|
||||
};
|
||||
|
||||
// Interface to add to our route classes so that we can register them.
|
||||
// NOTE: I hate this pattern. But ExpressJS is odd in that it is structured as a singleton that still needs registration.
|
||||
export interface APIRoutes {
|
||||
getRoutes(): Promise<Array<AnyRouteOptions>>;
|
||||
}
|
||||
|
||||
export class HTTPService {
|
||||
private debug: debug.Debugger;
|
||||
private debug: Logger;
|
||||
private server: Hono<AppEnv>;
|
||||
|
||||
constructor(
|
||||
private routes: Array<APIRoutes> = [],
|
||||
private port: number = 3000,
|
||||
private host: string = "0.0.0.0",
|
||||
debug: Logger,
|
||||
) {
|
||||
this.debug = debug("xo:http-router");
|
||||
|
||||
this.debug = debug.extend("http-router");
|
||||
this.server = new Hono<AppEnv>();
|
||||
}
|
||||
|
||||
async start(): Promise<void> {
|
||||
this.debug(`Starting on http://${this.host}:${this.port}`);
|
||||
|
||||
// Setup ExtJSON handling.
|
||||
this.handleExtJSON();
|
||||
|
||||
// Setup Error Handling (to give more verbose Zod errors)
|
||||
this.handleErrors();
|
||||
|
||||
// Allow CORS requests. This allows requests from any origin/domain.
|
||||
// Capacitor apps (like XO Wallet) use localhost:3000 as the origin, making it difficult to meaningfully restrict requests by origin for security.
|
||||
this.server.use("*", cors());
|
||||
this.server.use(compress())
|
||||
|
||||
// Register your routes here before starting the server
|
||||
this.server.get("/health", async (c) => {
|
||||
return c.json({ status: "ok" });
|
||||
});
|
||||
|
||||
// Register each route.
|
||||
for (const routes of this.routes) {
|
||||
for (const routeOptions of await routes.getRoutes()) {
|
||||
const method = routeOptions.method.toLowerCase() as Lowercase<
|
||||
RouteOptions["method"]
|
||||
>;
|
||||
const method = routeOptions.method.toLowerCase() as Lowercase<HttpMethod>;
|
||||
const register = this.server[method].bind(this.server) as (
|
||||
path: string,
|
||||
handler: (c: Context<AppEnv>) => Promise<Response | typeof RESPONSE_ALREADY_SENT>,
|
||||
handler: (c: HonoContext<AppEnv>) => Promise<Response | typeof RESPONSE_ALREADY_SENT>,
|
||||
) => void;
|
||||
|
||||
register(routeOptions.url, async (c) => {
|
||||
const req = await HonoRequest.fromContext(c);
|
||||
const res = new HonoResponse(c);
|
||||
const result = await routeOptions.handler(req, res);
|
||||
return HonoResponse.finalize(c, result);
|
||||
const context = await HTTPService.createContext(c);
|
||||
|
||||
if (routeOptions.mode === "sse") {
|
||||
return HTTPService.handleSSERoute(c, context, routeOptions);
|
||||
}
|
||||
|
||||
const result = await routeOptions.handler(context);
|
||||
return HTTPService.normalizeHTTPResult(c, result);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -103,14 +90,11 @@ export class HTTPService {
|
||||
this.debug(`Started on http://${this.host}:${this.port}`);
|
||||
}
|
||||
|
||||
// Helper method to access the server instance
|
||||
getInstance(): Hono<AppEnv> {
|
||||
return this.server;
|
||||
}
|
||||
|
||||
private handleErrors() {
|
||||
// Customize our error handler to give better errors.
|
||||
// NOTE: This will nicely format the Zod validation errors.
|
||||
this.server.onError((error: Error, c) => {
|
||||
if (error instanceof z.ZodError) {
|
||||
const formattedErrors = error.issues.map((issue) => ({
|
||||
@@ -131,8 +115,6 @@ export class HTTPService {
|
||||
}
|
||||
|
||||
this.debug(`Error: ${error}`);
|
||||
|
||||
// Handle other types of errors
|
||||
return c.json({ error: "Internal Server Error" }, 500);
|
||||
});
|
||||
}
|
||||
@@ -155,191 +137,144 @@ export class HTTPService {
|
||||
|
||||
await next();
|
||||
|
||||
await encodeJsonResponse(c);
|
||||
await HTTPService.encodeJsonResponse(c);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Hono adapter for the generic {@link RouteRequest}.
|
||||
*/
|
||||
export class HonoRequest extends RouteRequest {
|
||||
body: Record<string, unknown>;
|
||||
query: Record<string, unknown>;
|
||||
params: Record<string, unknown>;
|
||||
headers: Record<string, unknown>;
|
||||
cookies: Record<string, unknown>;
|
||||
raw: {
|
||||
setTimeout: (timeout: number) => void;
|
||||
};
|
||||
private static async handleSSERoute(
|
||||
honoContext: HonoContext<AppEnv>,
|
||||
context: Context,
|
||||
routeOptions: Extract<AnyRouteOptions, { mode: "sse" }>,
|
||||
): Promise<typeof RESPONSE_ALREADY_SENT> {
|
||||
const topics = await routeOptions.topics(context);
|
||||
const stream = HTTPService.toSSEStream(honoContext);
|
||||
|
||||
private constructor(
|
||||
context: Context<AppEnv>,
|
||||
body: Record<string, unknown>,
|
||||
) {
|
||||
super();
|
||||
stream.open();
|
||||
|
||||
this.body = body;
|
||||
this.query = { ...context.req.query() };
|
||||
this.params = { ...context.req.param() };
|
||||
this.headers = { ...context.req.header() };
|
||||
this.cookies = { ...getCookie(context) };
|
||||
await routeOptions.broadcaster.subscribe(stream, topics);
|
||||
|
||||
const incoming = context.env.incoming;
|
||||
this.raw = {
|
||||
setTimeout: (timeout: number) => {
|
||||
incoming.socket.setTimeout(timeout);
|
||||
try {
|
||||
const result = await routeOptions.handler(context);
|
||||
if (result !== undefined) {
|
||||
await stream.send({
|
||||
type: "sync",
|
||||
data: result,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
await stream.send({
|
||||
type: "error",
|
||||
data: { error: "Internal Server Error" },
|
||||
});
|
||||
stream.close();
|
||||
throw error;
|
||||
}
|
||||
|
||||
return RESPONSE_ALREADY_SENT;
|
||||
}
|
||||
|
||||
private static toSSEStream(context: HonoContext<AppEnv>): SSEStream {
|
||||
return new SSEStream({
|
||||
setTimeout: (timeout) => {
|
||||
context.env.incoming.socket.setTimeout(timeout);
|
||||
},
|
||||
writeHead: (status, headers) => {
|
||||
context.env.outgoing.writeHead(status, headers);
|
||||
},
|
||||
flushHeaders: () => {
|
||||
context.env.outgoing.flushHeaders();
|
||||
},
|
||||
write: (data) => {
|
||||
context.env.outgoing.write(data);
|
||||
},
|
||||
end: () => {
|
||||
context.env.outgoing.end();
|
||||
},
|
||||
onClose: (callback) => {
|
||||
context.env.outgoing.on("close", callback);
|
||||
},
|
||||
corsOrigin: context.res.headers.get("access-control-allow-origin") ?? "*",
|
||||
});
|
||||
}
|
||||
|
||||
private static async createContext(context: HonoContext<AppEnv>): Promise<Context> {
|
||||
return {
|
||||
headers: HTTPService.normalizeHeaders(context.req.header()),
|
||||
params: {
|
||||
...context.req.query(),
|
||||
...context.req.param(),
|
||||
},
|
||||
body: await HTTPService.getBody(context),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a {@link RouteRequest} from the active Hono context.
|
||||
* Request bodies are parsed once and prefer the ExtJSON-decoded value when present.
|
||||
*/
|
||||
static async fromContext(context: Context<AppEnv>): Promise<HonoRequest> {
|
||||
private static async getBody(context: HonoContext<AppEnv>): Promise<unknown> {
|
||||
const parsedBody = context.get(PARSED_BODY_KEY);
|
||||
if (parsedBody !== undefined) {
|
||||
return new HonoRequest(context, parsedBody);
|
||||
return parsedBody;
|
||||
}
|
||||
|
||||
const contentType = context.req.header("content-type");
|
||||
if (contentType?.includes("application/json")) {
|
||||
const json = await context.req.json();
|
||||
const body =
|
||||
json !== null && typeof json === "object"
|
||||
? (json as Record<string, unknown>)
|
||||
: {};
|
||||
return new HonoRequest(context, body);
|
||||
return json !== null && typeof json === "object" ? json : undefined;
|
||||
}
|
||||
|
||||
return new HonoRequest(context, {});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Hono adapter for the generic {@link RouteResponse}.
|
||||
*
|
||||
* When running on Node via `@hono/node-server`, `raw` delegates to the native
|
||||
* `ServerResponse` so SSE routes can use the same streaming API as Fastify.
|
||||
*/
|
||||
export class HonoResponse<
|
||||
EventHandlers extends Partial<RouteEventHandlers> = Partial<RouteEventHandlers>,
|
||||
> extends RouteResponse<EventHandlers> {
|
||||
status = 200;
|
||||
body: Record<string, unknown> | unknown;
|
||||
headers: Record<string, string> = {};
|
||||
ignore = false;
|
||||
raw: RouteResponse<EventHandlers>["raw"];
|
||||
|
||||
/** Whether the handler already wrote directly to the Node response. */
|
||||
rawResponseSent = false;
|
||||
|
||||
constructor(private readonly context: Context<AppEnv>) {
|
||||
super();
|
||||
|
||||
const outgoing = context.env.outgoing;
|
||||
this.raw = {
|
||||
setHeader: (key, value) => {
|
||||
outgoing.setHeader(key, value);
|
||||
this.headers[key.toLowerCase()] = value;
|
||||
},
|
||||
writeHead: (status, responseHeaders) => {
|
||||
this.status = status;
|
||||
this.rawResponseSent = true;
|
||||
outgoing.writeHead(status, responseHeaders);
|
||||
for (const [key, value] of Object.entries(responseHeaders)) {
|
||||
this.headers[key.toLowerCase()] = value;
|
||||
}
|
||||
},
|
||||
write: (data) => {
|
||||
this.rawResponseSent = true;
|
||||
outgoing.write(data);
|
||||
},
|
||||
flushHeaders: () => {
|
||||
outgoing.flushHeaders();
|
||||
},
|
||||
end: () => {
|
||||
this.rawResponseSent = true;
|
||||
outgoing.end();
|
||||
},
|
||||
on: (event, callback) => {
|
||||
outgoing.on(String(event), callback as (...args: unknown[]) => void);
|
||||
},
|
||||
off: (event, callback) => {
|
||||
outgoing.off(String(event), callback as (...args: unknown[]) => void);
|
||||
},
|
||||
};
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a response header, matching Fastify's `reply.getHeader` behavior.
|
||||
*/
|
||||
getHeader(name: string): string | undefined {
|
||||
const value = this.context.env.outgoing.getHeader(name);
|
||||
if (value === undefined) {
|
||||
return this.headers[name.toLowerCase()];
|
||||
private static normalizeHeaders(headers: Record<string, string> | undefined): Record<string, string> {
|
||||
const normalized: Record<string, string> = {};
|
||||
|
||||
for (const [key, value] of Object.entries(headers ?? {})) {
|
||||
normalized[key.toLowerCase()] = value;
|
||||
}
|
||||
|
||||
return Array.isArray(value) ? value.join(", ") : String(value);
|
||||
return normalized;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a route handler result into a value Hono can return.
|
||||
*/
|
||||
static finalize(
|
||||
context: Context<AppEnv>,
|
||||
response: RouteResponse,
|
||||
): Response | typeof RESPONSE_ALREADY_SENT {
|
||||
const honoResponse = response as HonoResponse;
|
||||
|
||||
if (response.ignore || honoResponse.rawResponseSent) {
|
||||
return RESPONSE_ALREADY_SENT;
|
||||
private static normalizeHTTPResult(
|
||||
context: HonoContext<AppEnv>,
|
||||
result: unknown,
|
||||
): Response {
|
||||
if (result === undefined) {
|
||||
return context.body(null, 204);
|
||||
}
|
||||
|
||||
const status = response.status as StatusCode;
|
||||
|
||||
if (response.body === undefined) {
|
||||
return context.body(null, status);
|
||||
if (result instanceof Response) {
|
||||
return result;
|
||||
}
|
||||
|
||||
const contentType =
|
||||
response.headers["content-type"] ?? response.headers["Content-Type"];
|
||||
if (contentType?.includes("text/")) {
|
||||
return context.text(String(response.body), status as ContentfulStatusCode, response.headers);
|
||||
if (typeof result === "string") {
|
||||
return context.text(result, 200 as ContentfulStatusCode);
|
||||
}
|
||||
|
||||
return new Response(encodeExtendedJson(response.body), {
|
||||
status,
|
||||
return new Response(encodeExtendedJson(result), {
|
||||
status: 200,
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
...response.headers,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode a JSON response body to Extended JSON.
|
||||
* Mirrors the Fastify `onSend` hook: parse string payloads first, then encode.
|
||||
*/
|
||||
async function encodeJsonResponse(context: Context<AppEnv>): Promise<void> {
|
||||
const responseContentType = context.res.headers.get("content-type");
|
||||
if (!responseContentType?.includes("application/json")) {
|
||||
return;
|
||||
private static async encodeJsonResponse(context: HonoContext<AppEnv>): Promise<void> {
|
||||
const responseContentType = context.res.headers.get("content-type");
|
||||
if (!responseContentType?.includes("application/json")) {
|
||||
return;
|
||||
}
|
||||
|
||||
const cloned = context.res.clone();
|
||||
const payload = await cloned.text();
|
||||
if (!payload) {
|
||||
return;
|
||||
}
|
||||
|
||||
const data = JSON.parse(payload);
|
||||
const encoded = encodeExtendedJson(data);
|
||||
|
||||
context.res = new Response(encoded, {
|
||||
status: context.res.status as StatusCode,
|
||||
headers: context.res.headers,
|
||||
});
|
||||
}
|
||||
|
||||
const cloned = context.res.clone();
|
||||
const payload = await cloned.text();
|
||||
if (!payload) {
|
||||
return;
|
||||
}
|
||||
|
||||
const data = JSON.parse(payload);
|
||||
const encoded = encodeExtendedJson(data);
|
||||
|
||||
context.res = new Response(encoded, {
|
||||
status: context.res.status,
|
||||
headers: context.res.headers,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,274 +0,0 @@
|
||||
import debug, { type Debugger } from "debug";
|
||||
import type { RouteEventHandlers, RouteRequest, RouteResponse } from "../routes/types.js";
|
||||
|
||||
/** SSE clients use the generic response stream event API. */
|
||||
type SSERouteResponse = RouteResponse<Partial<RouteEventHandlers>>;
|
||||
|
||||
import { encodeExtendedJson } from "../utils/ext-json.js";
|
||||
|
||||
/**
|
||||
* Represents an event stored in the history buffer.
|
||||
* Used for replaying missed events to reconnecting clients.
|
||||
*/
|
||||
interface HistoricalEvent {
|
||||
/** The event topic/type (e.g., 'invitation-created', 'invitation-updated') */
|
||||
topic: string;
|
||||
/** The event payload data */
|
||||
data: unknown;
|
||||
/** Unix timestamp in milliseconds when the event was created */
|
||||
timestamp: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for configuring the SSE service.
|
||||
*/
|
||||
interface SSEOptions {
|
||||
/** Maximum age of events to keep in history (in milliseconds). Default: 5 minutes */
|
||||
maxHistoryAge?: number;
|
||||
/** Maximum number of events to keep per user. Default: 1000 */
|
||||
maxHistorySize?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Server-Sent Events broadcaster with event history support.
|
||||
*
|
||||
* Maintains a per-user event history buffer that allows clients to replay
|
||||
* missed events when reconnecting. This makes reconnections robust against
|
||||
* network interruptions.
|
||||
*/
|
||||
export class SSEBroadcaster {
|
||||
/**
|
||||
* Factory method to create and start an SSE broadcaster.
|
||||
* @param options - Configuration options for the SSE service
|
||||
* @returns A started SSE instance
|
||||
*/
|
||||
static async from(options?: SSEOptions) {
|
||||
const broadcaster = new SSEBroadcaster(options);
|
||||
await broadcaster.start();
|
||||
return broadcaster;
|
||||
}
|
||||
|
||||
/** Map of Invitation IDs to their connected SSE response streams */
|
||||
private clients: Map<string, Set<SSERouteResponse>> = new Map();
|
||||
|
||||
/** Map of Invitation IDs to their event history buffers */
|
||||
private eventHistory: Map<string, HistoricalEvent[]> = new Map();
|
||||
|
||||
/** Maximum age of events to keep in history (in milliseconds) */
|
||||
private maxHistoryAge: number;
|
||||
|
||||
/** Maximum number of events to keep per user */
|
||||
private maxHistorySize: number;
|
||||
|
||||
private debug: Debugger;
|
||||
|
||||
constructor(options?: SSEOptions) {
|
||||
this.clients = new Map();
|
||||
this.eventHistory = new Map();
|
||||
this.maxHistoryAge = options?.maxHistoryAge ?? 20 * 60 * 1000; // 20 minutes default
|
||||
this.maxHistorySize = options?.maxHistorySize ?? 1000; // 1000 events default
|
||||
this.debug = debug('xo:sse');
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the SSE broadcaster.
|
||||
* @returns The SSE instance for chaining
|
||||
*/
|
||||
start() {
|
||||
this.debug('SSE broadcaster is running (maxHistoryAge: %dms, maxHistorySize: %d)',
|
||||
this.maxHistoryAge, this.maxHistorySize);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends an event to a client.
|
||||
*
|
||||
* @param client - The client to send the event to
|
||||
* @param topic - The event topic/type
|
||||
* @param data - The event payload data
|
||||
*/
|
||||
static sendEvent(client: SSERouteResponse, topic: string, data: unknown) {
|
||||
const timestamp = Date.now();
|
||||
client.raw.write(`id: ${timestamp}\n`);
|
||||
client.raw.write(`event: ${topic}\n`);
|
||||
client.raw.write(`data: ${encodeExtendedJson(data)}\n\n`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends an event to a client.
|
||||
*
|
||||
* @param client - The client to send the event to
|
||||
* @param topic - The event topic/type
|
||||
* @param data - The event payload data
|
||||
*/
|
||||
sendEvent(client: SSERouteResponse, topic: string, data: unknown) {
|
||||
try {
|
||||
SSEBroadcaster.sendEvent(client, topic, data);
|
||||
} catch (error) {
|
||||
this.debug('Error sending event to client', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcasts an event to all connected clients for a user and stores it in history.
|
||||
*
|
||||
* @param clientId - The user ID to broadcast to
|
||||
* @param topic - The event topic/type
|
||||
* @param data - The event payload data
|
||||
*/
|
||||
async broadcast(clientId: string, topic: string, data: unknown) {
|
||||
const timestamp = Date.now();
|
||||
|
||||
// Store the event in history for potential replay
|
||||
this.storeEvent(clientId, topic, data, timestamp);
|
||||
|
||||
// Broadcast to all connected clients
|
||||
this.clients.get(clientId)?.forEach((client: SSERouteResponse) => {
|
||||
try {
|
||||
this.sendEvent(client, topic, data);
|
||||
} catch (error) {
|
||||
this.debug('Error sending event to client', error);
|
||||
}
|
||||
});
|
||||
|
||||
this.debug('SSE broadcasted message', topic, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribes a client to receive SSE events.
|
||||
*
|
||||
* If lastEventTime is provided, all events that occurred after that timestamp
|
||||
* will be replayed to the client before starting the live stream.
|
||||
*
|
||||
* @param req - The authenticated request containing the user ID
|
||||
* @param res - The Express response object to use for SSE streaming
|
||||
* @param lastEventTime - Optional timestamp to replay events from (in milliseconds)
|
||||
*/
|
||||
async subscribe(req: RouteRequest, res: SSERouteResponse, lastEventTime?: number) {
|
||||
// Get the invitation ID from the request
|
||||
const { invitationIdentifier } = req.query as { invitationIdentifier?: string };
|
||||
if (!invitationIdentifier) {
|
||||
throw new Error('Invitation Identifier is required');
|
||||
}
|
||||
|
||||
// Initialize client set for this user if needed
|
||||
if (!this.clients.has(invitationIdentifier)) {
|
||||
this.clients.set(invitationIdentifier, new Set());
|
||||
}
|
||||
|
||||
// Manually include the CORS header since `writeHead` bypasses the auto-injection by the @fastify/cors plugin.
|
||||
// This statement grabs the CORS header that the CORS plugin would have added to the response, configured in the HTTP service.
|
||||
const corsHeader = res.headers["access-control-allow-origin"] ?? "";
|
||||
|
||||
// Disable timeout for the connection. Without this, the connection will drop and the client will have to reconnect.
|
||||
req.raw.setTimeout(0);
|
||||
|
||||
// Set up SSE headers
|
||||
// Set headers for SSE
|
||||
res.raw.writeHead(200, {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
Connection: "keep-alive",
|
||||
"Access-Control-Allow-Origin": corsHeader,
|
||||
});
|
||||
|
||||
// Force the headers to be dispatched to the client.
|
||||
// NOTE: This is very important: A `fetch` call will NOT resolve until it has received the headers.
|
||||
// And Fastify, unless otherwise specified, will not send the headers until it sends the body.
|
||||
res.raw.flushHeaders();
|
||||
|
||||
// Set retry interval for automatic reconnection
|
||||
res.raw.write('retry: 3000\n\n');
|
||||
|
||||
// Replay missed events if a lastEventTime was provided
|
||||
if (lastEventTime !== undefined) {
|
||||
const missedEvents = this.getEventsAfter(invitationIdentifier, lastEventTime);
|
||||
this.debug('SSE replaying %d missed events for invitation %s (since %d)',
|
||||
missedEvents.length, invitationIdentifier, lastEventTime);
|
||||
|
||||
for (const event of missedEvents) {
|
||||
try {
|
||||
await this.sendEvent(res, event.topic, event.data);
|
||||
} catch (error) {
|
||||
this.debug('Error sending event to client', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add client to the set for live updates
|
||||
this.clients.get(invitationIdentifier)?.add(res);
|
||||
|
||||
this.debug('SSE subscribed to client (invitationId: %s, lastEventTime: %s)',
|
||||
invitationIdentifier, lastEventTime ?? 'none');
|
||||
|
||||
// Clean up when client disconnects
|
||||
res.raw.on('close', () => {
|
||||
this.clients.get(invitationIdentifier)?.delete(res);
|
||||
this.debug('SSE client disconnected (invitationIdentifier: %s)', invitationIdentifier);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores an event in the user's history buffer.
|
||||
* Automatically prunes old events based on age and size limits.
|
||||
*
|
||||
* @param invitationId - The invitation ID to store the event for
|
||||
* @param topic - The event topic/type
|
||||
* @param data - The event payload data
|
||||
* @param timestamp - The event timestamp
|
||||
*/
|
||||
private storeEvent(invitationId: string, topic: string, data: unknown, timestamp: number) {
|
||||
// Initialize history array for this user if needed
|
||||
if (!this.eventHistory.has(invitationId)) {
|
||||
this.eventHistory.set(invitationId, []);
|
||||
}
|
||||
|
||||
const history = this.eventHistory.get(invitationId)!;
|
||||
|
||||
// Add the new event
|
||||
history.push({ topic, data, timestamp });
|
||||
|
||||
// Prune old events
|
||||
this.pruneHistory(invitationId, timestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes old events from a invitation's history based on age and size limits.
|
||||
*
|
||||
* @param invitationId - The invitation ID whose history to prune
|
||||
* @param currentTime - The current timestamp for age calculations
|
||||
*/
|
||||
private pruneHistory(invitationId: string, currentTime: number) {
|
||||
const history = this.eventHistory.get(invitationId);
|
||||
if (!history) return;
|
||||
|
||||
const cutoffTime = currentTime - this.maxHistoryAge;
|
||||
|
||||
// Remove events older than maxHistoryAge
|
||||
const prunedByAge = history.filter(event => event.timestamp > cutoffTime);
|
||||
|
||||
// If still over size limit, remove oldest events
|
||||
const prunedBySize = prunedByAge.length > this.maxHistorySize
|
||||
? prunedByAge.slice(-this.maxHistorySize)
|
||||
: prunedByAge;
|
||||
|
||||
this.eventHistory.set(invitationId, prunedBySize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves all events for a user that occurred after a given timestamp.
|
||||
*
|
||||
* @param invitationId - The invitation ID to get events for
|
||||
* @param afterTimestamp - The timestamp to get events after (exclusive)
|
||||
* @returns Array of events that occurred after the timestamp
|
||||
*/
|
||||
private getEventsAfter(invitationId: string, afterTimestamp: number): HistoricalEvent[] {
|
||||
const history = this.eventHistory.get(invitationId);
|
||||
if (!history) return [];
|
||||
|
||||
// First prune old events to ensure we don't return stale data
|
||||
this.pruneHistory(invitationId, Date.now());
|
||||
|
||||
return history.filter(event => event.timestamp > afterTimestamp);
|
||||
}
|
||||
}
|
||||
@@ -1,149 +1,125 @@
|
||||
import sqlite3, { type Database } from 'better-sqlite3';
|
||||
import { pack, unpack } from 'msgpackr';
|
||||
import { encodeExtendedJsonObject, decodeExtendedJsonObject } from '../utils/ext-json.js';
|
||||
import { binToHex, hexToBin } from '@bitauth/libauth';
|
||||
import { createHash } from "node:crypto";
|
||||
import type { Database } from "./database/database.js";
|
||||
|
||||
export interface SQLiteOptions {
|
||||
wal: boolean;
|
||||
}
|
||||
export type StorageEvent = {
|
||||
storageIdentifier: string;
|
||||
data: string;
|
||||
hash: string;
|
||||
created_at: number;
|
||||
};
|
||||
|
||||
/**
|
||||
* Storage Adapter that uses SQLite as the storage backend. Everything is stored as a BLOB, to avoid type issues.
|
||||
*
|
||||
* @remarks This implementation is single threaded. It WILL block execution of the main thread.
|
||||
*/
|
||||
export class StorageSQLite {
|
||||
/**
|
||||
* Create or open a SQLite database.
|
||||
* @param filepath - The path to the SQLite database file.
|
||||
* @param options - The options for the SQLite database.
|
||||
* @returns A new instance of StorageSQLite.
|
||||
*/
|
||||
static async createOrOpen(
|
||||
filepath: string,
|
||||
options: Partial<SQLiteOptions> = {},
|
||||
) {
|
||||
const db = sqlite3(filepath);
|
||||
export type AppendStorageEventInput = {
|
||||
storageIdentifier: string;
|
||||
data: string;
|
||||
};
|
||||
|
||||
const opts: SQLiteOptions = {
|
||||
wal: true,
|
||||
...options,
|
||||
export class StorageService {
|
||||
constructor(private readonly database: Database) {}
|
||||
|
||||
async append(input: AppendStorageEventInput): Promise<StorageEvent> {
|
||||
const data = Buffer.from(input.data, "base64");
|
||||
const hash = createHash("sha256").update(data).digest("base64");
|
||||
const createdAt = Date.now();
|
||||
|
||||
await this.database.db
|
||||
.insertInto("storage_events")
|
||||
.values({
|
||||
storageIdentifier: input.storageIdentifier,
|
||||
data,
|
||||
hash,
|
||||
created_at: createdAt,
|
||||
})
|
||||
.execute();
|
||||
|
||||
return {
|
||||
storageIdentifier: input.storageIdentifier,
|
||||
data: data.toString("base64"),
|
||||
hash,
|
||||
created_at: createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
if (opts.wal) {
|
||||
db.pragma('journal_mode = WAL');
|
||||
async list(storageIdentifier: string): Promise<StorageEvent[]> {
|
||||
const rows = await this.database.db
|
||||
.selectFrom("storage_events")
|
||||
.select(["storageIdentifier", "data", "hash", "created_at"])
|
||||
.where("storageIdentifier", "=", storageIdentifier)
|
||||
.orderBy("created_at", "asc")
|
||||
.execute();
|
||||
|
||||
return rows.map((row) => ({
|
||||
storageIdentifier: row.storageIdentifier,
|
||||
data: Buffer.from(row.data).toString("base64"),
|
||||
hash: row.hash,
|
||||
created_at: row.created_at,
|
||||
}));
|
||||
}
|
||||
|
||||
async listWhereHash(
|
||||
storageIdentifier: string,
|
||||
predicate: (hash: string) => boolean,
|
||||
): Promise<StorageEvent[]> {
|
||||
const rows = await this.database.db
|
||||
.selectFrom("storage_events")
|
||||
.select(["storageIdentifier", "data", "hash", "created_at"])
|
||||
.where("storageIdentifier", "=", storageIdentifier)
|
||||
.orderBy("created_at", "asc")
|
||||
.execute();
|
||||
|
||||
return rows
|
||||
.filter((row) => predicate(row.hash))
|
||||
.map((row) => ({
|
||||
storageIdentifier: row.storageIdentifier,
|
||||
data: Buffer.from(row.data).toString("base64"),
|
||||
hash: row.hash,
|
||||
created_at: row.created_at,
|
||||
}));
|
||||
}
|
||||
|
||||
async count(storageIdentifier: string): Promise<number> {
|
||||
const row = await this.database.db
|
||||
.selectFrom("storage_events")
|
||||
.select(({ fn }) => fn.count<number>("hash").as("count"))
|
||||
.where("storageIdentifier", "=", storageIdentifier)
|
||||
.executeTakeFirstOrThrow();
|
||||
|
||||
return Number(row.count);
|
||||
}
|
||||
|
||||
async countMany(storageIdentifiers: string[]): Promise<Record<string, number>> {
|
||||
if (storageIdentifiers.length === 0) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return new StorageSQLite(db);
|
||||
}
|
||||
const rows = await this.database.db
|
||||
.selectFrom("storage_events")
|
||||
.select(({ fn }) => [
|
||||
"storageIdentifier",
|
||||
fn.count<number>("hash").as("count"),
|
||||
])
|
||||
.where("storageIdentifier", "in", storageIdentifiers)
|
||||
.groupBy("storageIdentifier")
|
||||
.execute();
|
||||
|
||||
constructor(public db: Database) {}
|
||||
const counts = Object.fromEntries(
|
||||
storageIdentifiers.map((storageIdentifier) => [storageIdentifier, 0]),
|
||||
);
|
||||
|
||||
async listStores() {
|
||||
const result = this.db
|
||||
.prepare(`SELECT * FROM sqlite_master WHERE type='table'`)
|
||||
.all() as { name: string }[];
|
||||
return result.map((row) => row.name);
|
||||
}
|
||||
|
||||
async createStore<T>(storeName: string): Promise<StoreSQLite<T>> {
|
||||
// Create table with proper schema for key-value storage
|
||||
this.db
|
||||
.prepare(
|
||||
`
|
||||
CREATE TABLE IF NOT EXISTS "${storeName}" (
|
||||
key TEXT PRIMARY KEY,
|
||||
value BLOB
|
||||
)
|
||||
`,
|
||||
)
|
||||
.run();
|
||||
|
||||
return new StoreSQLite(this.db, storeName);
|
||||
}
|
||||
|
||||
async getStore<T>(storeName: string): Promise<StoreSQLite<T> | null> {
|
||||
// Check if table exists
|
||||
const tableExists = this.db
|
||||
.prepare(
|
||||
`
|
||||
SELECT name FROM sqlite_master
|
||||
WHERE type='table' AND name=?
|
||||
`,
|
||||
)
|
||||
.get(storeName);
|
||||
|
||||
if (!tableExists) {
|
||||
return null;
|
||||
for (const row of rows) {
|
||||
counts[row.storageIdentifier] = Number(row.count);
|
||||
}
|
||||
|
||||
return new StoreSQLite(this.db, storeName);
|
||||
return counts;
|
||||
}
|
||||
|
||||
async createOrGetStore<T>(storeName: string): Promise<StoreSQLite<T>> {
|
||||
return await this.createStore<T>(storeName);
|
||||
}
|
||||
async listHashes(storageIdentifier: string): Promise<string[]> {
|
||||
const rows = await this.database.db
|
||||
.selectFrom("storage_events")
|
||||
.select("hash")
|
||||
.where("storageIdentifier", "=", storageIdentifier)
|
||||
.orderBy("created_at", "asc")
|
||||
.execute();
|
||||
|
||||
async deleteStore(storeName: string) {
|
||||
this.db.prepare(`DROP TABLE IF EXISTS "${storeName}"`).run();
|
||||
}
|
||||
|
||||
async deleteDatabase() {
|
||||
this.db.close();
|
||||
}
|
||||
}
|
||||
|
||||
export class StoreSQLite<T> {
|
||||
constructor(
|
||||
protected db: Database,
|
||||
protected storeName: string,
|
||||
) {}
|
||||
|
||||
async keys() {
|
||||
const result = this.db
|
||||
.prepare(`SELECT key FROM "${this.storeName}"`)
|
||||
.all() as { key: string }[];
|
||||
return result.map((row) => row.key);
|
||||
}
|
||||
|
||||
async get(key: string): Promise<T | undefined> {
|
||||
const result = this.db
|
||||
.prepare(`SELECT value FROM "${this.storeName}" WHERE key = ?`)
|
||||
.get(key) as { value: string } | undefined;
|
||||
|
||||
if (!result) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const binValue = hexToBin(result.value);
|
||||
|
||||
const unpackedValue = unpack(binValue);
|
||||
|
||||
const decodedValue = decodeExtendedJsonObject(unpackedValue);
|
||||
|
||||
return decodedValue as T;
|
||||
}
|
||||
|
||||
async set(key: string, value: T): Promise<void> {
|
||||
const encodedValue = encodeExtendedJsonObject(value);
|
||||
|
||||
// Serialize using msgpackr for consistency with other implementations
|
||||
const packedValue = pack(encodedValue);
|
||||
|
||||
const serializedValue = binToHex(packedValue);
|
||||
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT OR REPLACE INTO "${this.storeName}" (key, value) VALUES (?, ?)`,
|
||||
)
|
||||
.run(key, serializedValue);
|
||||
}
|
||||
|
||||
async delete(key: string): Promise<void> {
|
||||
this.db.prepare(`DELETE FROM "${this.storeName}" WHERE key = ?`).run(key);
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
this.db.prepare(`DELETE FROM "${this.storeName}"`).run();
|
||||
return rows.map((row) => row.hash);
|
||||
}
|
||||
}
|
||||
|
||||
15
src/services/stream/base-stream.ts
Normal file
15
src/services/stream/base-stream.ts
Normal file
@@ -0,0 +1,15 @@
|
||||
export type StreamTransport = "sse" | "ws";
|
||||
|
||||
export type StreamEvent = {
|
||||
id?: string;
|
||||
type: string;
|
||||
data: unknown;
|
||||
};
|
||||
|
||||
export abstract class BaseStream {
|
||||
abstract send(event: StreamEvent): Promise<void>;
|
||||
|
||||
abstract close(): void;
|
||||
|
||||
abstract onClose(callback: () => void): void;
|
||||
}
|
||||
22
src/services/stream/stream-http.ts
Normal file
22
src/services/stream/stream-http.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
import { BaseStream } from "./base-stream.js";
|
||||
|
||||
|
||||
export class StreamHttp extends BaseStream {
|
||||
private data: unknown;
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
}
|
||||
|
||||
async send() {
|
||||
|
||||
}
|
||||
|
||||
async close() {
|
||||
|
||||
}
|
||||
|
||||
async onClose(callback: () => void) {
|
||||
|
||||
}
|
||||
}
|
||||
79
src/services/stream/stream-sse.ts
Normal file
79
src/services/stream/stream-sse.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { encodeExtendedJson } from "../../utils/ext-json.js";
|
||||
import { BaseStream, type StreamEvent } from "./base-stream.js";
|
||||
|
||||
export type SSEStreamOptions = {
|
||||
setTimeout: (timeout: number) => void;
|
||||
writeHead: (status: number, headers: Record<string, string>) => void;
|
||||
flushHeaders: () => void;
|
||||
write: (data: string) => void;
|
||||
end: () => void;
|
||||
onClose: (callback: () => void) => void;
|
||||
corsOrigin?: string;
|
||||
};
|
||||
|
||||
export class SSEStream extends BaseStream {
|
||||
readonly id: string;
|
||||
readonly transport = "sse";
|
||||
|
||||
private closeCallbacks: Array<() => void> = [];
|
||||
private closed = false;
|
||||
|
||||
constructor(private readonly options: SSEStreamOptions) {
|
||||
super();
|
||||
this.id = randomUUID();
|
||||
}
|
||||
|
||||
open(): void {
|
||||
this.options.setTimeout(0);
|
||||
this.options.writeHead(200, {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
"Access-Control-Allow-Origin": this.options.corsOrigin ?? "*",
|
||||
});
|
||||
this.options.flushHeaders();
|
||||
this.options.write("retry: 3000\n\n");
|
||||
|
||||
this.options.onClose(() => {
|
||||
this.closed = true;
|
||||
this.emitClose();
|
||||
});
|
||||
}
|
||||
|
||||
async send(event: StreamEvent): Promise<void> {
|
||||
if (this.closed) {
|
||||
throw new Error("Cannot send to a closed SSE stream");
|
||||
}
|
||||
|
||||
if (event.id !== undefined) {
|
||||
this.options.write(`id: ${event.id}\n`);
|
||||
}
|
||||
|
||||
this.options.write(`event: ${event.type}\n`);
|
||||
this.options.write(`data: ${encodeExtendedJson(event.data)}\n\n`);
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.closed = true;
|
||||
this.options.end();
|
||||
this.emitClose();
|
||||
}
|
||||
|
||||
onClose(callback: () => void): void {
|
||||
this.closeCallbacks.push(callback);
|
||||
}
|
||||
|
||||
private emitClose(): void {
|
||||
const callbacks = this.closeCallbacks;
|
||||
this.closeCallbacks = [];
|
||||
|
||||
for (const callback of callbacks) {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
}
|
||||
0
src/services/stream/stream-ws.ts
Normal file
0
src/services/stream/stream-ws.ts
Normal file
93
src/utils/cache.ts
Normal file
93
src/utils/cache.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
export type KVCacheOptions = {
|
||||
maxSize: number
|
||||
ttl: number
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple, in-memory KV Cache.
|
||||
*/
|
||||
export class Cache<T = unknown> {
|
||||
private readonly cache = new Map<string, { value: T, added: number }>();
|
||||
|
||||
private readonly options: KVCacheOptions
|
||||
|
||||
/**
|
||||
* KV Cache
|
||||
*
|
||||
* @default options.maxSize -1, no max size is set. Items will not be kicked out
|
||||
* @default options.ttl -1, no ttl is set. Items will persist indefinitely
|
||||
*
|
||||
* @param options Pass in options to manage size and TTL of items in the cache
|
||||
* @param options.maxSize The max size the KV Cache will grow to. Once the maxSize has been hit, items will be kicked out on a FIFO basis.
|
||||
* @param options.ttl The time to live for items in the cache. Once the ttl has been reached, items will be kicked out.
|
||||
*/
|
||||
public constructor(options: Partial<KVCacheOptions> = {}) {
|
||||
// Combine our defaults with the constructor options
|
||||
this.options = {
|
||||
maxSize: -1,
|
||||
ttl: -1,
|
||||
...options
|
||||
}
|
||||
}
|
||||
|
||||
get(key: string): T | undefined {
|
||||
this.evictExpired();
|
||||
return this.cache.get(key)?.value as T | undefined;
|
||||
}
|
||||
|
||||
set(key: string, value: T): void {
|
||||
if (this.options.maxSize !== -1 && this.cache.size >= this.options.maxSize) {
|
||||
const oldest = this.getOldestItem();
|
||||
if (oldest) {
|
||||
this.cache.delete(oldest.key);
|
||||
}
|
||||
}
|
||||
|
||||
this.cache.set(key, { value, added: Date.now() });
|
||||
|
||||
this.evictExpired()
|
||||
}
|
||||
|
||||
delete(key: string): void {
|
||||
this.cache.delete(key);
|
||||
}
|
||||
|
||||
clear(): void {
|
||||
this.cache.clear();
|
||||
}
|
||||
|
||||
evictExpired(): void {
|
||||
if (this.options.ttl === -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
const entries = this.cache.entries();
|
||||
|
||||
let next = entries.next();
|
||||
while (next.value) {
|
||||
const [key, value] = next.value;
|
||||
|
||||
const itemExpiry = value.added + this.options.ttl;
|
||||
if (itemExpiry < Date.now()) {
|
||||
this.cache.delete(key);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
next = entries.next();
|
||||
}
|
||||
}
|
||||
|
||||
getOldestItem(): { key: string, value: unknown, added: number } | undefined {
|
||||
const first = this.cache.entries().next()
|
||||
|
||||
if (!first.value) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return {
|
||||
key: first.value[0],
|
||||
...first.value[1]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
325
src/utils/inverse-bloom-filter.ts
Normal file
325
src/utils/inverse-bloom-filter.ts
Normal file
@@ -0,0 +1,325 @@
|
||||
/**
|
||||
* A single cell in the inverse Bloom filter table.
|
||||
*
|
||||
* Each cell aggregates information about all items that hash into it.
|
||||
*
|
||||
* - `count` tracks the net number of items in the cell.
|
||||
* - `idSum` is the XOR of all item IDs in the cell.
|
||||
* - `hashSum` is intended to validate whether a cell contains exactly one item.
|
||||
*
|
||||
* In this implementation, `hashSum` is currently the same value as `idSum`.
|
||||
* In a fuller IBLT implementation, this would usually be a separate hash of the
|
||||
* item ID, allowing single-item cells to be verified with lower false-positive risk.
|
||||
*/
|
||||
interface Cell {
|
||||
count: number;
|
||||
idSum: bigint;
|
||||
hashSum: bigint;
|
||||
}
|
||||
|
||||
export class InverseBloomFilterDifference {
|
||||
constructor(
|
||||
private readonly cells: Cell[],
|
||||
private readonly hashToIndex: (value: string) => number,
|
||||
private readonly hashToId: (value: string) => bigint,
|
||||
) {}
|
||||
|
||||
maybeMissing(item: string): boolean {
|
||||
const idx = this.hashToIndex(item);
|
||||
const itemId = this.hashToId(item);
|
||||
const cell = this.cells[idx]!;
|
||||
|
||||
return cell.count === 1 && cell.idSum === itemId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A simplified inverse Bloom filter used for set reconciliation.
|
||||
*
|
||||
* Intended use:
|
||||
*
|
||||
* 1. Node A and Node B each have a set of item IDs.
|
||||
* 2. One node serializes its filter using `toBytes()`.
|
||||
* 3. The other node subtracts that filter from its own.
|
||||
* 4. The resulting difference table can be inspected to find items that may be
|
||||
* present locally but missing remotely.
|
||||
*
|
||||
* This is useful when two nodes mostly share the same data and only need to find
|
||||
* the small difference between their sets.
|
||||
*
|
||||
* Important:
|
||||
*
|
||||
* This implementation only hashes each item into one cell. A full IBLT normally
|
||||
* hashes each item into multiple cells and then repeatedly "peels" pure cells to
|
||||
* recover the full symmetric difference. Because this version uses one cell per
|
||||
* item, collisions can hide differences.
|
||||
*/
|
||||
export class InverseBloomFilter {
|
||||
/**
|
||||
* Convenience constructor.
|
||||
*
|
||||
* @param items The item IDs to include in the filter.
|
||||
* @param expectedDiffSize Optional expected difference size. This controls
|
||||
* the number of cells allocated.
|
||||
*/
|
||||
static from(items: string[], expectedDiffSize?: number) {
|
||||
return new this(items, expectedDiffSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of cells in the filter.
|
||||
*
|
||||
* Both sides must use the same size for subtraction to work.
|
||||
*/
|
||||
private size: number;
|
||||
|
||||
/**
|
||||
* The backing table of aggregated cells.
|
||||
*/
|
||||
private cells: Cell[];
|
||||
|
||||
/**
|
||||
* Builds a filter from the supplied item IDs.
|
||||
*
|
||||
* @param items Item identifiers to add to the filter.
|
||||
* @param expectedDiffSize Optional number of cells to allocate. Despite the
|
||||
* name, this is currently used directly as the table size.
|
||||
*
|
||||
* If no size is supplied, the table is sized to roughly half the number of
|
||||
* items, with a lower bound of 256 cells.
|
||||
*/
|
||||
constructor(items: string[], expectedDiffSize?: number) {
|
||||
this.size = expectedDiffSize ?? Math.max(256, Math.ceil(items.length / 2));
|
||||
|
||||
this.cells = Array(this.size)
|
||||
.fill(null)
|
||||
.map(() => ({
|
||||
count: 0,
|
||||
idSum: 0n,
|
||||
hashSum: 0n,
|
||||
}));
|
||||
|
||||
for (const item of items) {
|
||||
this.addItem(item);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to find local items that are probably missing from the remote filter.
|
||||
*
|
||||
* The supplied filter is subtracted from this filter. Then each local item is
|
||||
* checked against the difference table.
|
||||
*
|
||||
* An item is considered "maybe missing" when:
|
||||
*
|
||||
* - its cell has a net count of `1`, and
|
||||
* - the cell's XORed ID sum equals the item's ID.
|
||||
*
|
||||
* @param otherFilterBytes A serialized filter from another node.
|
||||
* @returns Local item IDs that appear to be absent from the other filter.
|
||||
*
|
||||
* Limitations:
|
||||
*
|
||||
* This can miss items when multiple differing items collide into the same cell.
|
||||
* It may also produce incorrect results if hash collisions occur.
|
||||
*/
|
||||
findMaybeMissing(items: Iterable<string>, otherFilterBytes: Uint8Array): string[] {
|
||||
const difference = this.createDifference(otherFilterBytes);
|
||||
const missing: string[] = [];
|
||||
|
||||
for (const item of items) {
|
||||
if (difference.maybeMissing(item)) {
|
||||
missing.push(item);
|
||||
}
|
||||
}
|
||||
|
||||
return missing;
|
||||
}
|
||||
|
||||
createDifference(otherFilterBytes: Uint8Array): InverseBloomFilterDifference {
|
||||
return new InverseBloomFilterDifference(
|
||||
this.subtractFilter(otherFilterBytes),
|
||||
this.hashToIndex.bind(this),
|
||||
this.hashToId.bind(this),
|
||||
);
|
||||
}
|
||||
|
||||
hasSameBytes(otherFilterBytes: Uint8Array): boolean {
|
||||
const localBytes = this.toBytes();
|
||||
if (localBytes.byteLength !== otherFilterBytes.byteLength) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (let i = 0; i < localBytes.byteLength; i++) {
|
||||
if (localBytes[i] !== otherFilterBytes[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the filter into a compact binary format.
|
||||
*
|
||||
* Binary layout:
|
||||
*
|
||||
* ```txt
|
||||
* offset size field
|
||||
* ------ ---- -----------------------------
|
||||
* 0 8 table size as signed int64 LE
|
||||
* 8 4 cell[0].count as int32 LE
|
||||
* 12 8 cell[0].idSum as int64 LE
|
||||
* 20 8 cell[0].hashSum as int64 LE
|
||||
* 28 4 cell[1].count
|
||||
* ... ... repeated for every cell
|
||||
* ```
|
||||
*
|
||||
* Each cell occupies 20 bytes.
|
||||
*
|
||||
* @returns The serialized filter.
|
||||
*/
|
||||
toBytes(): Uint8Array {
|
||||
const cellSize = 4 + 8 + 8;
|
||||
const bytes = new Uint8Array(8 + this.size * cellSize);
|
||||
const view = new DataView(bytes.buffer);
|
||||
|
||||
view.setBigInt64(0, BigInt(this.size), true);
|
||||
|
||||
let offset = 8;
|
||||
|
||||
for (const cell of this.cells) {
|
||||
view.setInt32(offset, cell.count, true);
|
||||
view.setBigInt64(offset + 4, cell.idSum, true);
|
||||
view.setBigInt64(offset + 12, cell.hashSum, true);
|
||||
offset += cellSize;
|
||||
}
|
||||
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hashes an item string into a cell index.
|
||||
*
|
||||
* This uses an FNV-1a-like 32-bit hash and maps the result into the table size.
|
||||
*
|
||||
* @param value Item string.
|
||||
* @returns A cell index in the range `[0, this.size)`.
|
||||
*/
|
||||
private hashToIndex(value: string): number {
|
||||
let hash = 2166136261;
|
||||
|
||||
for (let i = 0; i < value.length; i++) {
|
||||
hash ^= value.charCodeAt(i);
|
||||
|
||||
hash +=
|
||||
(hash << 1) +
|
||||
(hash << 4) +
|
||||
(hash << 7) +
|
||||
(hash << 8) +
|
||||
(hash << 24);
|
||||
|
||||
// Keep the value within unsigned 32-bit integer range.
|
||||
hash = hash >>> 0;
|
||||
}
|
||||
|
||||
return hash % this.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hashes an item string into a 63-bit bigint identifier.
|
||||
*
|
||||
* The result is kept within signed 64-bit range so it can be serialized using
|
||||
* `DataView#setBigInt64`.
|
||||
*
|
||||
* @param value Item string.
|
||||
* @returns A bigint ID for the item.
|
||||
*/
|
||||
private hashToId(value: string): bigint {
|
||||
let hash = 0n;
|
||||
|
||||
// Use 63 bits so the result can safely fit into signed int64 storage.
|
||||
const mod = (1n << 63n) - 1n;
|
||||
|
||||
for (let i = 0; i < value.length; i++) {
|
||||
hash = ((hash << 5n) - hash + BigInt(value.charCodeAt(i))) % mod;
|
||||
}
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a single item to the filter.
|
||||
*
|
||||
* The item is hashed to one cell. That cell's count is incremented, and the
|
||||
* item's hashed ID is XORed into the cell aggregates.
|
||||
*
|
||||
* @param item Item string to add.
|
||||
*/
|
||||
private addItem(item: string): void {
|
||||
const idx = this.hashToIndex(item);
|
||||
const itemId = this.hashToId(item);
|
||||
const cell = this.cells[idx]!;
|
||||
|
||||
cell.count += 1;
|
||||
|
||||
// XOR allows subtraction/reconciliation because:
|
||||
// x ^ x === 0
|
||||
// x ^ 0 === x
|
||||
//
|
||||
// When two filters are XORed together, matching items cancel out.
|
||||
cell.idSum ^= itemId;
|
||||
|
||||
// Currently identical to idSum. In a stronger implementation this should
|
||||
// be a separate checksum/hash used to verify that a cell is "pure".
|
||||
cell.hashSum ^= itemId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subtracts another serialized filter from this filter.
|
||||
*
|
||||
* The result is a difference table:
|
||||
*
|
||||
* - positive `count` means this filter has more items in that cell.
|
||||
* - negative `count` means the other filter has more items in that cell.
|
||||
* - zero `count` means both filters have the same number of items in that cell,
|
||||
* though hash collisions can still hide differences.
|
||||
*
|
||||
* @param otherBytes Serialized filter bytes from another node.
|
||||
* @returns A new array of difference cells.
|
||||
*
|
||||
* @throws If the remote filter size does not match this filter size.
|
||||
*/
|
||||
private subtractFilter(otherBytes: Uint8Array): Cell[] {
|
||||
const view = new DataView(otherBytes.buffer);
|
||||
const otherSize = Number(view.getBigInt64(0, true));
|
||||
|
||||
if (otherSize !== this.size) {
|
||||
throw new Error('Filter sizes do not match');
|
||||
}
|
||||
|
||||
const result = Array(this.size);
|
||||
let offset = 8;
|
||||
const cellSize = 20;
|
||||
|
||||
for (let i = 0; i < this.size; i++) {
|
||||
const otherCount = view.getInt32(offset, true);
|
||||
const otherIdSum = view.getBigInt64(offset + 4, true);
|
||||
const otherHashSum = view.getBigInt64(offset + 12, true);
|
||||
|
||||
const cell = this.cells[i]!;
|
||||
|
||||
result[i] = {
|
||||
count: cell.count - otherCount,
|
||||
|
||||
// Matching items cancel out because they appear in both filters.
|
||||
idSum: cell.idSum ^ otherIdSum,
|
||||
hashSum: cell.hashSum ^ otherHashSum,
|
||||
};
|
||||
|
||||
offset += cellSize;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -1,68 +0,0 @@
|
||||
import Z, { z } from 'zod';
|
||||
|
||||
/**
|
||||
* Zod schemas for invitation validation.
|
||||
*
|
||||
* IMPORTANT: We use .passthrough() on all object schemas to preserve fields
|
||||
* that aren't explicitly defined. This is critical because:
|
||||
* 1. Invitations are signed based on stringify(commit.data)
|
||||
* 2. If we strip fields, the signature verification will fail
|
||||
* 3. The actual XOInvitation types have many more fields than we validate here
|
||||
*/
|
||||
|
||||
const variableSchema = z.object({
|
||||
variableIdentifier: z.string(),
|
||||
roleIdentifier: z.string().optional(),
|
||||
value: z.number().or(z.string()).or(z.boolean()).or(z.bigint()),
|
||||
}).passthrough();
|
||||
|
||||
const mergesWithSchema = z.object({
|
||||
commitIdentifier: z.string(),
|
||||
index: z.number(),
|
||||
}).passthrough();
|
||||
|
||||
const inputSchema = z.object({
|
||||
inputIdentifier: z.string().optional(),
|
||||
transactionIndex: z.number().optional(),
|
||||
roleIdentifier: z.string().optional(),
|
||||
mergesWith: mergesWithSchema.optional(),
|
||||
// Additional fields preserved via passthrough:
|
||||
// outpointTransactionHash, outpointIndex, sequenceNumber, unlockingBytecode, etc.
|
||||
}).passthrough();
|
||||
|
||||
const outputSchema = z.object({
|
||||
outputIdentifier: z.string().optional(),
|
||||
roleIdentifier: z.string().optional(),
|
||||
secretIdentifier: z.string().optional(),
|
||||
transactionIndex: z.number().optional(),
|
||||
mergesWith: mergesWithSchema.optional(),
|
||||
// Additional fields preserved via passthrough:
|
||||
// valueSatoshis, lockingBytecode, token, etc.
|
||||
}).passthrough();
|
||||
|
||||
const dataSchema = z.object({
|
||||
transactionVersion: z.number().optional(),
|
||||
transactionLocktime: z.number().optional(),
|
||||
variables: z.array(variableSchema).optional(),
|
||||
inputs: z.array(inputSchema).optional(),
|
||||
outputs: z.array(outputSchema).optional(),
|
||||
}).passthrough();
|
||||
|
||||
const commitSchema = z.object({
|
||||
commitIdentifier: z.string(),
|
||||
previousCommitIdentifier: z.string().or(z.undefined()),
|
||||
entityIdentifier: z.string(),
|
||||
data: dataSchema,
|
||||
signature: z.string(),
|
||||
expiresAtTimestamp: z.number(),
|
||||
}).passthrough();
|
||||
|
||||
export const parseInvitation = z.object({
|
||||
invitationIdentifier: z.string(),
|
||||
commits: z.array(commitSchema),
|
||||
createdAtTimestamp: z.number(),
|
||||
templateIdentifier: z.string(),
|
||||
actionIdentifier: z.string(),
|
||||
}).passthrough();
|
||||
|
||||
export type InvitationSchema = Z.infer<typeof parseInvitation>;
|
||||
73
src/utils/logger.ts
Normal file
73
src/utils/logger.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import Debug, { type Debugger } from "debug";
|
||||
|
||||
type LogHandler = {
|
||||
(...args: Parameters<Debugger>): void;
|
||||
extend: (namespace: string) => LogHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Declares that Logger instances may also be invoked as functions.
|
||||
*/
|
||||
export interface Logger {
|
||||
(...args: unknown[]): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Logger class, similar to 'debug' library but you can call `instanceof Logger` to check if a value is a Logger instance.
|
||||
* Also, it wont throw if you dont enable debugging but one of the modules want a debug logger. - This feels like a bug in debug though
|
||||
*/
|
||||
export class Logger {
|
||||
public readonly namespace!: string;
|
||||
|
||||
private readonly handler!: LogHandler;
|
||||
|
||||
public constructor(
|
||||
namespace: string,
|
||||
handler: LogHandler = Debug(namespace),
|
||||
) {
|
||||
// Create a logger function that calls the handler.
|
||||
const logger = ((...args: Parameters<Debugger>): void => {
|
||||
handler(...args);
|
||||
}) as Logger;
|
||||
|
||||
/*
|
||||
* This makes `logger instanceof Logger` true and provides
|
||||
* access to instance methods such as `extend`.
|
||||
*/
|
||||
Object.setPrototypeOf(logger, new.target.prototype);
|
||||
|
||||
// Define the properties for the logger.
|
||||
Object.defineProperties(logger, {
|
||||
namespace: {
|
||||
value: namespace,
|
||||
enumerable: true,
|
||||
},
|
||||
handler: {
|
||||
value: handler,
|
||||
},
|
||||
});
|
||||
|
||||
return logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extend the logger with a child namespace
|
||||
* @param childNamespace - The child namespace to extend the logger with
|
||||
* @returns The extended logger
|
||||
*/
|
||||
public extend(childNamespace: string): Logger {
|
||||
return new Logger(
|
||||
`${this.namespace}:${childNamespace}`,
|
||||
this.handler.extend(childNamespace),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a value is a Logger instance
|
||||
* @param value - The value to check
|
||||
* @returns True if the value is a Logger instance, false otherwise
|
||||
*/
|
||||
static isLogger(value: unknown): value is Logger {
|
||||
return value instanceof Logger;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user