Mid-rewrite
This commit is contained in:
152
src/utils/event-emitter.ts
Normal file
152
src/utils/event-emitter.ts
Normal file
@@ -0,0 +1,152 @@
|
||||
// TODO: You'll probably want to use WeakRef's here.
|
||||
|
||||
export type EventMap = Record<string, unknown>;
|
||||
|
||||
type Listener<T> = (detail: T) => void;
|
||||
|
||||
interface ListenerEntry<T> {
|
||||
listener: Listener<T>;
|
||||
wrappedListener: Listener<T>;
|
||||
debounceTime?: number;
|
||||
once?: boolean;
|
||||
}
|
||||
|
||||
export type OffCallback = () => void;
|
||||
|
||||
export class EventEmitter<T extends EventMap> {
|
||||
private listeners: Map<keyof T, Set<ListenerEntry<T[keyof T]>>> = new Map();
|
||||
|
||||
on<K extends keyof T>(
|
||||
type: K,
|
||||
listener: Listener<T[K]>,
|
||||
debounceMilliseconds?: number,
|
||||
): OffCallback {
|
||||
const wrappedListener =
|
||||
debounceMilliseconds && debounceMilliseconds > 0
|
||||
? this.debounce(listener, debounceMilliseconds)
|
||||
: listener;
|
||||
|
||||
if (!this.listeners.has(type)) {
|
||||
this.listeners.set(type, new Set());
|
||||
}
|
||||
|
||||
const listenerEntry: ListenerEntry<T[K]> = {
|
||||
listener,
|
||||
wrappedListener,
|
||||
debounceTime: debounceMilliseconds,
|
||||
};
|
||||
|
||||
this.listeners.get(type)?.add(listenerEntry as ListenerEntry<T[keyof T]>);
|
||||
|
||||
// Return an "off" callback that can be called to stop listening for events.
|
||||
return () => this.off(type, listener);
|
||||
}
|
||||
|
||||
once<K extends keyof T>(
|
||||
type: K,
|
||||
listener: Listener<T[K]>,
|
||||
debounceMilliseconds?: number,
|
||||
): OffCallback {
|
||||
const wrappedListener: Listener<T[K]> = (detail: T[K]) => {
|
||||
this.off(type, listener);
|
||||
listener(detail);
|
||||
};
|
||||
|
||||
const debouncedListener =
|
||||
debounceMilliseconds && debounceMilliseconds > 0
|
||||
? this.debounce(wrappedListener, debounceMilliseconds)
|
||||
: wrappedListener;
|
||||
|
||||
if (!this.listeners.has(type)) {
|
||||
this.listeners.set(type, new Set());
|
||||
}
|
||||
|
||||
const listenerEntry: ListenerEntry<T[K]> = {
|
||||
listener,
|
||||
wrappedListener: debouncedListener,
|
||||
debounceTime: debounceMilliseconds,
|
||||
once: true,
|
||||
};
|
||||
|
||||
this.listeners.get(type)?.add(listenerEntry as ListenerEntry<T[keyof T]>);
|
||||
|
||||
// Return an "off" callback that can be called to stop listening for events.
|
||||
return () => this.off(type, listener);
|
||||
}
|
||||
|
||||
off<K extends keyof T>(type: K, listener: Listener<T[K]>): void {
|
||||
const listeners = this.listeners.get(type);
|
||||
if (!listeners) return;
|
||||
|
||||
const listenerEntry = Array.from(listeners).find(
|
||||
(entry) =>
|
||||
entry.listener === listener || entry.wrappedListener === listener,
|
||||
);
|
||||
|
||||
if (listenerEntry) {
|
||||
listeners.delete(listenerEntry);
|
||||
}
|
||||
}
|
||||
|
||||
emit<K extends keyof T>(type: K, payload: T[K]): boolean {
|
||||
const listeners = this.listeners.get(type);
|
||||
if (!listeners) return false;
|
||||
|
||||
listeners.forEach((entry) => {
|
||||
entry.wrappedListener(payload);
|
||||
});
|
||||
|
||||
return listeners.size > 0;
|
||||
}
|
||||
|
||||
removeAllListeners(): void {
|
||||
this.listeners.clear();
|
||||
}
|
||||
|
||||
async waitFor<K extends keyof T>(
|
||||
type: K,
|
||||
predicate: (payload: T[K]) => boolean,
|
||||
timeoutMs?: number,
|
||||
): Promise<T[K]> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
|
||||
const listener = (payload: T[K]) => {
|
||||
if (predicate(payload)) {
|
||||
// Clean up
|
||||
this.off(type, listener);
|
||||
if (timeoutId !== undefined) {
|
||||
clearTimeout(timeoutId);
|
||||
}
|
||||
resolve(payload);
|
||||
}
|
||||
};
|
||||
|
||||
// Set up timeout if specified
|
||||
if (timeoutMs !== undefined) {
|
||||
timeoutId = setTimeout(() => {
|
||||
this.off(type, listener);
|
||||
reject(new Error(`Timeout waiting for event "${String(type)}"`));
|
||||
}, timeoutMs);
|
||||
}
|
||||
|
||||
this.on(type, listener);
|
||||
});
|
||||
}
|
||||
|
||||
private debounce<K extends keyof T>(
|
||||
func: Listener<T[K]>,
|
||||
wait: number,
|
||||
): Listener<T[K]> {
|
||||
let timeout: ReturnType<typeof setTimeout>;
|
||||
|
||||
return (detail: T[K]) => {
|
||||
if (timeout !== null) {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
timeout = setTimeout(() => {
|
||||
func(detail);
|
||||
}, wait);
|
||||
};
|
||||
}
|
||||
}
|
||||
91
src/utils/sync-server.ts
Normal file
91
src/utils/sync-server.ts
Normal file
@@ -0,0 +1,91 @@
|
||||
import type { XOInvitation } from "@xo-cash/types";
|
||||
import { EventEmitter } from "./event-emitter.js";
|
||||
import { SSESession, type SSEvent } from "./sse-client.js";
|
||||
import { decodeExtendedJsonObject, encodeExtendedJson } from "./ext-json.js";
|
||||
|
||||
export type SyncServerEventMap = {
|
||||
'connected': void;
|
||||
'disconnected': void;
|
||||
'error': Error;
|
||||
'message': SSEvent;
|
||||
}
|
||||
|
||||
export class SyncServer extends EventEmitter<SyncServerEventMap> {
|
||||
static async from(baseUrl: string, invitationIdentifier: string): Promise<SyncServer> {
|
||||
const server = new SyncServer(baseUrl, invitationIdentifier);
|
||||
await server.connect();
|
||||
return server;
|
||||
}
|
||||
|
||||
private sse: SSESession;
|
||||
|
||||
constructor(private readonly baseUrl: string, private readonly invitationIdentifier: string) {
|
||||
super();
|
||||
|
||||
// Create an SSE Session
|
||||
this.sse = new SSESession(`${baseUrl}/invitations?invitationIdentifier=${invitationIdentifier}`, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'Accept': 'text/event-stream',
|
||||
},
|
||||
|
||||
// Create our event bubblers
|
||||
onMessage: (event: SSEvent) => this.emit('message', event),
|
||||
onError: (error: unknown) => this.emit('error', error instanceof Error ? error : new Error(String(error))),
|
||||
onDisconnected: () => this.emit('disconnected', undefined),
|
||||
onConnected: () => this.emit('connected', undefined),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the sync server.
|
||||
*/
|
||||
async connect(): Promise<void> {
|
||||
// Connect to the SSE Session
|
||||
await this.sse.connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from the sync server.
|
||||
*/
|
||||
async disconnect(): Promise<void> {
|
||||
// Disconnect from the SSE Session
|
||||
this.sse.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the invitation by identifier.
|
||||
* @param identifier - The invitation identifier.
|
||||
* @returns The invitation.
|
||||
*/
|
||||
async getInvitation(identifier: string): Promise<XOInvitation> {
|
||||
// Send a GET request to the sync server
|
||||
const response = await fetch(`${this.baseUrl}/invitations/${identifier}`);
|
||||
const invitation = await response.json() as XOInvitation;
|
||||
return invitation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish an invitation.
|
||||
* @param invitation - The invitation to create.
|
||||
* @returns The invitation.
|
||||
*/
|
||||
async publishInvitation(invitation: XOInvitation): Promise<XOInvitation> {
|
||||
// Send a POST request to the sync server
|
||||
const response = await fetch(`${this.baseUrl}/invitations`, {
|
||||
method: 'POST',
|
||||
body: encodeExtendedJson(invitation),
|
||||
});
|
||||
|
||||
// Throw is there was an issue with the request
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to publish invitation: ${response.statusText}`);
|
||||
}
|
||||
|
||||
// Read the returned JSON
|
||||
// TODO: This should use zod to verify the response
|
||||
const data = decodeExtendedJsonObject(await response.text()) as XOInvitation;
|
||||
|
||||
return data;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user