Files
xo-sync-server-hono/src/routes/data.ts

146 lines
4.3 KiB
TypeScript

import { createHash } from "node:crypto";
import { z } from "zod";
import type { Broadcaster } from "../services/broadcaster/broadcaster.js";
import type { StorageService } from "../services/storage.js";
import type { Cache } from "../utils/cache.js";
import { InverseBloomFilter } from "../utils/inverse-bloom-filter.js";
import type { BloomFilterCacheEntry } from "./bloom-filter-cache.js";
import type { AnyRouteOptions, Context } from "./types.js";
const storageParams = z.object({
storageIdentifier: z.string(),
bloomFilter: z.string().optional(),
count: z.coerce.number().int().positive().optional(),
});
const appendBody = z.object({
storageIdentifier: z.string(),
data: z.string(),
});
export class DataRoute {
constructor(
private readonly storage: StorageService,
private readonly broadcaster: Broadcaster,
private readonly bloomFilterCache: Cache<BloomFilterCacheEntry>,
) {}
async getRoutes(): Promise<Array<AnyRouteOptions>> {
return [
{
method: "GET",
url: "/data",
handler: this.getData.bind(this),
},
{
method: "GET",
url: "/data/stream",
mode: "sse",
broadcaster: this.broadcaster,
topics: this.getStorageTopics.bind(this),
handler: this.getData.bind(this),
},
{
method: "POST",
url: "/data",
handler: this.appendData.bind(this),
},
];
}
async getData(context: Context) {
const { storageIdentifier, bloomFilter, count } = storageParams.parse(context.params);
if (bloomFilter && count) {
return this.getMissingData(storageIdentifier, bloomFilter, count);
}
return this.storage.list(storageIdentifier);
}
async appendData(context: Context) {
const input = appendBody.parse(context.body);
const event = await this.storage.append(input);
this.bloomFilterCache.delete(input.storageIdentifier);
await this.broadcaster.publish(DataRoute.storageTopic(input.storageIdentifier), {
type: "storage-event",
data: event,
});
return event;
}
getStorageTopics(context: Context): string[] {
const { storageIdentifier } = storageParams.parse(context.params);
return [DataRoute.storageTopic(storageIdentifier)];
}
private static storageTopic(storageIdentifier: string): string {
return `storage:${storageIdentifier}`;
}
private async getMissingData(
storageIdentifier: string,
bloomFilter: string,
count: number,
) {
const serverCount = await this.storage.count(storageIdentifier);
const negotiatedCount = DataRoute.getNegotiatedBloomFilterSize(serverCount);
if (count !== negotiatedCount) {
return this.storage.list(storageIdentifier);
}
const cacheEntry = await this.getOrCreateBloomFilter(storageIdentifier, negotiatedCount);
const remoteFilter = Buffer.from(bloomFilter, "base64");
if (cacheEntry.filterBase64 === bloomFilter || cacheEntry.filter.hasSameBytes(remoteFilter)) {
return [];
}
let difference: ReturnType<InverseBloomFilter["createDifference"]>;
try {
difference = cacheEntry.filter.createDifference(remoteFilter);
} catch (error) {
throw new z.ZodError([
{
code: "custom",
path: ["bloomFilter"],
message: error instanceof Error ? error.message : "Invalid bloom filter",
input: bloomFilter,
},
]);
}
return this.storage.listWhereHash(storageIdentifier, (hash) =>
difference.maybeMissing(hash),
);
}
private async getOrCreateBloomFilter(
storageIdentifier: string,
count: number,
): Promise<BloomFilterCacheEntry> {
const cached = this.bloomFilterCache.get(storageIdentifier);
if (cached?.count === count) {
return cached;
}
const hashes = await this.storage.listHashes(storageIdentifier);
const filter = InverseBloomFilter.from(hashes, count);
const filterBytes = filter.toBytes();
const entry = {
count,
filter,
filterBase64: Buffer.from(filterBytes).toString("base64"),
filterHash: createHash("sha256").update(filterBytes).digest("base64"),
};
this.bloomFilterCache.set(storageIdentifier, entry);
return entry;
}
private static getNegotiatedBloomFilterSize(count: number): number {
return Math.max(count, 256);
}
}