Fix invitation syncing in realtime

This commit is contained in:
2026-06-01 11:28:18 +02:00
parent 5bec49858f
commit 5e9c6db412
5 changed files with 242 additions and 142 deletions

View File

@@ -183,6 +183,7 @@ export class AppService extends EventEmitter<AppEventMap> {
// Add the invitation to the invitations array
this.invitations.push(invitation);
this.bumpInvitationRevision(invitation.data.invitationIdentifier);
// Emit the invitation-added event
this.emit("invitation-added", invitation);
@@ -201,6 +202,7 @@ export class AppService extends EventEmitter<AppEventMap> {
if (invitationIndex >= 0) {
this.invitations.splice(invitationIndex, 1);
}
this.bumpInvitationRevision(invitationIdentifier);
// Emit the invitation-removed event
this.emit("invitation-removed", invitation);
@@ -215,12 +217,14 @@ export class AppService extends EventEmitter<AppEventMap> {
if (this.invitationEventCleanup.has(invitationIdentifier)) return;
const onUpdated = () => {
this.bumpInvitationRevision(invitationIdentifier);
this.emit("wallet-state-changed", {
reason: "invitation-updated",
invitationIdentifier,
});
};
const onStatusChanged = () => {
this.bumpInvitationRevision(invitationIdentifier);
this.emit("wallet-state-changed", {
reason: "invitation-status-changed",
invitationIdentifier,
@@ -236,6 +240,18 @@ export class AppService extends EventEmitter<AppEventMap> {
});
}
getInvitationRevision(invitationIdentifier: string): number {
return this.invitationRevisions.get(invitationIdentifier) ?? 0;
}
private bumpInvitationRevision(invitationIdentifier: string): void {
this.invitationsRevision += 1;
this.invitationRevisions.set(
invitationIdentifier,
this.getInvitationRevision(invitationIdentifier) + 1,
);
}
private detachInvitationListeners(invitationIdentifier: string): void {
const trackedInvitation = this.invitations.find(
(candidate) =>

View File

@@ -3,7 +3,7 @@ import type {
Engine,
GetSpendableResourcesParameters,
} from "@xo-cash/engine";
import { generateTemplateIdentifier, hasInvitationExpired, mergeInvitationCommits, serializeInvitation } from "@xo-cash/engine";
import { generateTemplateIdentifier, hasInvitationExpired, mergeInvitationCommits, serializeInvitation, deserializeInvitation } from "@xo-cash/engine";
import type {
XOInvitation,
XOInvitationCommit,
@@ -43,6 +43,13 @@ export type InvitationDependencies = {
electrum: BlockchainService;
};
function stripLocalInvitationMetadata(invitation: XOInvitation): XOInvitation {
const { entityIdentifier: _entityIdentifier, ...sharedInvitation } =
invitation as XOInvitation & { entityIdentifier?: string };
return sharedInvitation;
}
export class Invitation extends EventEmitter<InvitationEventMap> {
/**
* Create an invitation and start the SSE Session required for it.
@@ -90,9 +97,6 @@ export class Invitation extends EventEmitter<InvitationEventMap> {
// Create the invitation
const invitationInstance = new Invitation(engineInvitation, dependencies);
// Start the invitation and its tracking
invitationInstance.start();
return invitationInstance;
}
@@ -123,6 +127,7 @@ export class Invitation extends EventEmitter<InvitationEventMap> {
*/
private storage: BaseStorage;
private electrum: BlockchainService;
private sseUpdateQueue: Promise<void> = Promise.resolve();
/**
* The status of the invitation (last emitted word: pending, actionable, signed, ready, complete, expired, unknown).
@@ -141,8 +146,23 @@ export class Invitation extends EventEmitter<InvitationEventMap> {
this.storage = dependencies.storage;
this.electrum = dependencies.electrum;
// Create a listerner for the messages from the SSE Session (sync server)
this.syncServer.on("message", this.handleSSEMessage.bind(this));
// Apply SSE updates serially so each engine update sees the latest history.
this.syncServer.on("message", (event) => {
this.enqueueSyncUpdate(() => this.handleSSEMessage(event)).catch(
(error) => {
this.emit(
"error",
error instanceof Error ? error : new Error(String(error)),
);
},
);
});
}
private enqueueSyncUpdate(update: () => Promise<void>): Promise<void> {
const queuedUpdate = this.sseUpdateQueue.then(update);
this.sseUpdateQueue = queuedUpdate.catch(() => {});
return queuedUpdate;
}
/**
@@ -160,20 +180,32 @@ export class Invitation extends EventEmitter<InvitationEventMap> {
this.syncServer.getInvitation(this.data.invitationIdentifier),
]);
// There is a chance we get SSE messages before the invitation is returned, so we want to combine any commits
const sseCommits = this.data.commits;
await this.enqueueSyncUpdate(async () => {
// SSE messages can arrive before the GET request completes.
const combinedCommits = this.mergeCommits(
this.data.commits,
invitation?.commits ?? [],
);
// Merge the commits
const combinedCommits = this.mergeCommits(
sseCommits,
invitation?.commits ?? [],
);
try {
// Prefer keeping the engine's local invitation state in sync.
this.data = stripLocalInvitationMetadata(
await this.engine.updateInvitation({
...this.data,
...invitation,
commits: combinedCommits,
}),
);
} catch (error) {
this.emit(
"error",
error instanceof Error ? error : new Error(String(error)),
);
this.data = { ...this.data, commits: combinedCommits };
}
// Set the invitation data with the combined commits
this.data = { ...this.data, ...invitation, commits: combinedCommits };
// Store the invitation in the storage
await this.storage.set(this.data.invitationIdentifier, this.data);
await this.storage.set(this.data.invitationIdentifier, this.data);
});
// Publish the invitation to the sync server
this.publishInvitation(this.data);
@@ -181,8 +213,6 @@ export class Invitation extends EventEmitter<InvitationEventMap> {
// Compute and emit initial status
await this.updateStatus();
} catch (err) {
// console.error(`Error starting invitation, could not connect to sync server or get invitation`, err);
// Emit the error event. We might want to throw? but we need a better way of handling errors in the invitation system because we need the invitation to successfully initialize.
this.emit("error", err instanceof Error ? err : new Error(String(err)));
}
}
@@ -192,30 +222,83 @@ export class Invitation extends EventEmitter<InvitationEventMap> {
*
* TODO: Invitation should sync up the initial data (top level) then everything after that should be the commits. This makes it easier to merge as we go instead of just having to overwrite the entire invitation.
*/
private handleSSEMessage(event: SSEvent): void {
const data = JSON.parse(event.data) as { topic?: string; data?: unknown };
if (data.topic === "invitation-updated") {
const invitation = decodeExtendedJsonObject(data.data) as XOInvitation;
if (invitation.invitationIdentifier !== this.data.invitationIdentifier) {
return;
}
// Filter out commits that already exist (probably a faster way to do this. This is n^2)
const newCommits = this.mergeCommits(
this.data.commits,
invitation.commits,
);
// Set the new commits
this.data = { ...this.data, commits: newCommits };
// Calculate the new status of the invitation (fire-and-forget; handler is sync)
this.updateStatus().catch(() => {});
// Emit the updated event
this.emit("invitation-updated", this.data);
private async handleSSEMessage(event: SSEvent): Promise<void> {
const invitation = this.parseInvitationFromSSEMessage(event);
if (
!invitation ||
invitation.invitationIdentifier !== this.data.invitationIdentifier
) {
return;
}
// Filter out commits that already exist
const newCommits = this.mergeCommits(this.data.commits, invitation.commits);
try {
this.data = stripLocalInvitationMetadata(
await this.engine.updateInvitation({
...this.data,
...invitation,
commits: newCommits,
}),
);
} catch (error) {
this.emit(
"error",
error instanceof Error ? error : new Error(String(error)),
);
this.data = { ...this.data, commits: newCommits };
}
await this.storage.set(this.data.invitationIdentifier, this.data);
await this.updateStatus();
this.emit("invitation-updated", this.data);
}
private parseInvitationFromSSEMessage(event: SSEvent): XOInvitation | null {
try {
const parsed = JSON.parse(event.data) as unknown;
const payload =
event.event === "invitation-updated"
? this.unwrapInvitationUpdatedPayload(parsed)
: this.unwrapLegacyInvitationUpdatedPayload(parsed);
if (!payload) return null;
const decoded = decodeExtendedJsonObject(payload) as XOInvitation;
return stripLocalInvitationMetadata(
deserializeInvitation(serializeInvitation(decoded)),
);
} catch {
return null;
}
}
private unwrapInvitationUpdatedPayload(payload: unknown): unknown | null {
if (
payload &&
typeof payload === "object" &&
"topic" in payload &&
"data" in payload
) {
return this.unwrapLegacyInvitationUpdatedPayload(payload);
}
return payload;
}
private unwrapLegacyInvitationUpdatedPayload(payload: unknown): unknown | null {
if (
payload &&
typeof payload === "object" &&
"topic" in payload &&
"data" in payload &&
payload.topic === "invitation-updated"
) {
return payload.data;
}
return null;
}
/**
@@ -388,12 +471,29 @@ export class Invitation extends EventEmitter<InvitationEventMap> {
this.data = await this.engine.acceptInvitation(this.data, acceptParams);
// Sync the invitation to the sync server
this.publishInvitation(this.data);
await this.publishInvitation(this.data);
// Store the accepted invitation and notify reactive consumers.
await this.storage.set(this.data.invitationIdentifier, this.data);
this.emit("invitation-updated", this.data);
// Update the status of the invitation
await this.updateStatus();
}
/**
* Accept the invitation once for this engine entity so future appends have a root commit.
*/
async ensureAccepted(): Promise<void> {
const ownCommits = await this.engine.findOwnCommits(
this.data.invitationIdentifier,
);
if (ownCommits.length === 0) {
await this.accept();
}
}
/**
* Sign the invitation
*/
@@ -435,11 +535,7 @@ export class Invitation extends EventEmitter<InvitationEventMap> {
* Append a commit to the invitation
*/
async append(data: InvitationParameters): Promise<void> {
try {
await this.engine.acceptInvitation(this.data);
} catch (err) {
// Literally do nothing here. We are just trying to accept the invitation in case we haven't already
}
await this.ensureAccepted();
// Append the commit to the invitation
this.data = await this.engine.appendInvitation(this.data.invitationIdentifier, data);

View File

@@ -10,7 +10,7 @@ import { useAppContext } from './useAppContext.js';
/**
* Get all invitations reactively.
* Re-renders when invitations are added or removed.
* Re-renders when invitations are added, removed, or updated.
*/
export function useInvitations(): Invitation[] {
const { appService } = useAppContext();
@@ -21,26 +21,22 @@ export function useInvitations(): Invitation[] {
return () => {};
}
// Subscribe to invitation list changes
const onAdded = () => callback();
const onRemoved = () => callback();
appService.on('invitation-added', onAdded);
appService.on('invitation-removed', onRemoved);
appService.on('wallet-state-changed', callback);
return () => {
appService.off('invitation-added', onAdded);
appService.off('invitation-removed', onRemoved);
appService.off('wallet-state-changed', callback);
};
},
[appService]
);
const getSnapshot = useCallback(() => {
return appService?.invitations ?? [];
return appService?.invitationsRevision ?? 0;
}, [appService]);
return useSyncExternalStore(subscribe, getSnapshot, getSnapshot);
const revision = useSyncExternalStore(subscribe, getSnapshot, getSnapshot);
return useMemo(() => [...(appService?.invitations ?? [])], [appService, revision]);
}
/**
@@ -56,48 +52,41 @@ export function useInvitation(invitationId: string | null): Invitation | null {
return () => {};
}
// Find the invitation instance
const invitation = appService.invitations.find(
(inv) => inv.data.invitationIdentifier === invitationId
);
if (!invitation) {
return () => {};
}
// Subscribe to this specific invitation's updates
const onUpdated = () => callback();
const onStatusChanged = () => callback();
invitation.on('invitation-updated', onUpdated);
invitation.on('invitation-status-changed', onStatusChanged);
// Also subscribe to list changes in case the invitation is removed
const onRemoved = () => callback();
appService.on('invitation-removed', onRemoved);
const onWalletStateChanged = ({
invitationIdentifier,
}: {
invitationIdentifier: string;
}) => {
if (invitationIdentifier === invitationId) {
callback();
}
};
appService.on('wallet-state-changed', onWalletStateChanged);
return () => {
invitation.off('invitation-updated', onUpdated);
invitation.off('invitation-status-changed', onStatusChanged);
appService.off('invitation-removed', onRemoved);
appService.off('wallet-state-changed', onWalletStateChanged);
};
},
[appService, invitationId]
);
const getSnapshot = useCallback(() => {
if (!appService || !invitationId) {
return null;
}
return (
appService.invitations.find(
(inv) => inv.data.invitationIdentifier === invitationId
) ?? null
);
return appService && invitationId
? appService.getInvitationRevision(invitationId)
: 0;
}, [appService, invitationId]);
return useSyncExternalStore(subscribe, getSnapshot, getSnapshot);
useSyncExternalStore(subscribe, getSnapshot, getSnapshot);
if (!appService || !invitationId) {
return null;
}
return (
appService.invitations.find(
(inv) => inv.data.invitationIdentifier === invitationId
) ?? null
);
}
/**
@@ -109,7 +98,7 @@ export function useInvitationData(invitationId: string | null): XOInvitation | n
return useMemo(() => {
return invitation?.data ?? null;
}, [invitation?.data.invitationIdentifier, invitation?.data.commits?.length]);
}, [invitation?.data]);
}
/**

View File

@@ -7,4 +7,3 @@ export { SeedInputScreen } from './SeedInput.js';
export { WalletStateScreen } from './WalletState.js';
export { TemplateListScreen } from './TemplateList.js';
export { InvitationScreen } from './invitations/InvitationScreen.js';
export { TransactionScreen } from './Transaction.js';

View File

@@ -1,9 +1,15 @@
import type { XOInvitation } from "@xo-cash/types";
import { EventEmitter } from "./event-emitter.js";
// import { SSESession, type SSEvent } from "./sse-client.js";
import { SSESession, type SSEvent } from "@xo-cash/utils";
import { deserializeInvitation, serializeInvitation } from "@xo-cash/engine";
function stripLocalInvitationMetadata(invitation: XOInvitation): XOInvitation {
const { entityIdentifier: _entityIdentifier, ...sharedInvitation } =
invitation as XOInvitation & { entityIdentifier?: string };
return sharedInvitation;
}
export type SyncServerEventMap = {
connected: void;
disconnected: void;
@@ -21,62 +27,66 @@ export class SyncServer extends EventEmitter<SyncServerEventMap> {
return server;
}
private sse: SSESession;
private sse: SSESession | null = null;
constructor(
private readonly baseUrl: string,
private readonly invitationIdentifier: string,
) {
super();
}
// Create an SSE Session
this.sse = new SSESession(
`${baseUrl}/invitations?invitationIdentifier=${invitationIdentifier}`,
async connect(): Promise<void> {
if (this.sse) {
await this.sse.connect();
return;
}
await this.createSSESession();
}
async disconnect(): Promise<void> {
await this.sse?.disconnect();
this.sse = null;
}
private async createSSESession(): Promise<void> {
const sse = await SSESession.create(
`${this.baseUrl}/invitations?invitationIdentifier=${encodeURIComponent(this.invitationIdentifier)}`,
{
method: "GET",
headers: {
Accept: "text/event-stream",
},
// Create our event bubblers
onError: (error: unknown) =>
persistent: true,
onRequest: async (request) => {
const { body: _body, ...requestWithoutBody } = request;
return requestWithoutBody;
},
onError: (error: unknown) => {
this.emit(
"error",
error instanceof Error ? error : new Error(String(error)),
),
onDisconnected: () => this.emit("disconnected", undefined),
onConnected: () => this.emit("connected", undefined),
);
},
onDisconnected: () => {
this.emit("disconnected", undefined);
},
onConnected: () => {
this.emit("connected", undefined);
},
},
);
this.sse.on("message", (event: SSEvent) => this.emit("message", event));
this.sse = sse;
sse.on("message", (event: SSEvent) => {
this.emit("message", event);
});
}
/**
* Connect to the sync server.
*/
async connect(): Promise<void> {
// Connect to the SSE Session
await this.sse.connect();
}
/**
* Disconnect from the sync server.
*/
async disconnect(): Promise<void> {
// Disconnect from the SSE Session
await this.sse.disconnect();
}
/**
* Get the invitation by identifier.
* @param identifier - The invitation identifier.
* @returns The invitation.
*/
async getInvitation(identifier: string): Promise<XOInvitation | undefined> {
// Send a GET request to the sync server
const response = await fetch(
`${this.baseUrl}/invitations?invitationIdentifier=${identifier}`,
`${this.baseUrl}/invitations?invitationIdentifier=${encodeURIComponent(identifier)}`,
);
if (!response.ok) {
@@ -84,33 +94,23 @@ export class SyncServer extends EventEmitter<SyncServerEventMap> {
}
const invitation = deserializeInvitation(await response.text());
return invitation;
return stripLocalInvitationMetadata(invitation);
}
/**
* Publish an invitation.
* @param invitation - The invitation to create.
* @returns The invitation.
*/
async publishInvitation(invitation: XOInvitation): Promise<XOInvitation> {
// Send a POST request to the sync server
const response = await fetch(`${this.baseUrl}/invitations`, {
method: "POST",
body: serializeInvitation(invitation),
body: serializeInvitation(stripLocalInvitationMetadata(invitation)),
headers: {
"Content-Type": "application/json",
},
});
// Throw is there was an issue with the request
if (!response.ok) {
throw new Error(`Failed to publish invitation: ${response.statusText}`);
}
// Read the returned JSON
// TODO: This should use zod to verify the response
const data = deserializeInvitation(await response.text());
return data;
return stripLocalInvitationMetadata(data);
}
}