Documentation and TODOs
This commit is contained in:
@@ -3,6 +3,9 @@ import { deserializeInvitation, serializeInvitation } from "@xo-cash/engine";
|
|||||||
|
|
||||||
import { SSESession, type SSEvent } from "../utils/sse-session.js";
|
import { SSESession, type SSEvent } from "../utils/sse-session.js";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listeners for the invitation sync client.
|
||||||
|
*/
|
||||||
export type InvitationSyncListeners = {
|
export type InvitationSyncListeners = {
|
||||||
onInvitationUpdated?: (invitation: XOInvitation) => void;
|
onInvitationUpdated?: (invitation: XOInvitation) => void;
|
||||||
onConnected?: () => void;
|
onConnected?: () => void;
|
||||||
@@ -10,8 +13,14 @@ export type InvitationSyncListeners = {
|
|||||||
onDisconnected?: () => void;
|
onDisconnected?: () => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO: Why are there two classes in this file? I would much rather split this up by responsibility, or merge the two of them somehow.
|
||||||
|
* Leaving for now because I also don't hate the separation of concerns.
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Live SSE subscription for a single invitation.
|
* Live SSE subscription for a single invitation.
|
||||||
|
* TODO: Why isnt this using the event emitter?
|
||||||
*/
|
*/
|
||||||
export class InvitationSyncSubscription {
|
export class InvitationSyncSubscription {
|
||||||
private sse: SSESession;
|
private sse: SSESession;
|
||||||
@@ -127,6 +136,7 @@ export class InvitationSyncClient {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse invitation from SSE event payloads (supports xo-cli wrapped and direct formats).
|
* Parse invitation from SSE event payloads (supports xo-cli wrapped and direct formats).
|
||||||
|
* TODO: Move into a class with a static method.
|
||||||
*/
|
*/
|
||||||
function parseInvitationFromSseEvent(event: SSEvent): XOInvitation | undefined {
|
function parseInvitationFromSseEvent(event: SSEvent): XOInvitation | undefined {
|
||||||
if (!event.data) {
|
if (!event.data) {
|
||||||
|
|||||||
@@ -29,26 +29,38 @@ export class OrderInvitationTracker {
|
|||||||
this.initialCommitCount = deps.invitation.commits?.length ?? 0;
|
this.initialCommitCount = deps.invitation.commits?.length ?? 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the tracker by publishing the invitation to the sync server and subscribing to updates.
|
||||||
|
* @param deps - The dependencies for the tracker
|
||||||
|
* @returns The tracker
|
||||||
|
*/
|
||||||
static async start(
|
static async start(
|
||||||
deps: OrderInvitationTrackerDeps,
|
deps: OrderInvitationTrackerDeps,
|
||||||
): Promise<OrderInvitationTracker> {
|
): Promise<OrderInvitationTracker> {
|
||||||
|
// Create a new tracker.
|
||||||
const tracker = new OrderInvitationTracker(deps);
|
const tracker = new OrderInvitationTracker(deps);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Publish the invitation to the sync server.
|
||||||
await deps.syncClient.publish(deps.invitation);
|
await deps.syncClient.publish(deps.invitation);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
// If the invitation fails to publish, continue anyway.
|
||||||
deps.debug(
|
deps.debug(
|
||||||
"Failed to publish invitation to sync server (continuing): %o",
|
"Failed to publish invitation to sync server (continuing): %o",
|
||||||
error,
|
error,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe to updates for the invitation.
|
||||||
|
// TODO: Why isnt this using the event emitter?
|
||||||
tracker.subscription = deps.syncClient.subscribe(
|
tracker.subscription = deps.syncClient.subscribe(
|
||||||
deps.invitation.invitationIdentifier,
|
deps.invitation.invitationIdentifier,
|
||||||
{
|
{
|
||||||
|
// Handle updates for the invitation.
|
||||||
onInvitationUpdated: (invitation) => {
|
onInvitationUpdated: (invitation) => {
|
||||||
void tracker.handleUpdate(invitation);
|
void tracker.handleUpdate(invitation);
|
||||||
},
|
},
|
||||||
|
// Handle errors from the sync server.
|
||||||
onError: (error) => {
|
onError: (error) => {
|
||||||
deps.debug(
|
deps.debug(
|
||||||
"Sync subscription error for order %s: %o",
|
"Sync subscription error for order %s: %o",
|
||||||
@@ -59,10 +71,16 @@ export class OrderInvitationTracker {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Connect to the sync server.
|
||||||
await tracker.subscription.connect();
|
await tracker.subscription.connect();
|
||||||
|
|
||||||
|
// Return the tracker.
|
||||||
return tracker;
|
return tracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the tracker by closing the subscription and clearing the dispense timer.
|
||||||
|
*/
|
||||||
stop(): void {
|
stop(): void {
|
||||||
this.stopped = true;
|
this.stopped = true;
|
||||||
this.subscription?.close();
|
this.subscription?.close();
|
||||||
@@ -75,26 +93,32 @@ export class OrderInvitationTracker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async handleUpdate(invitation: XOInvitation): Promise<void> {
|
private async handleUpdate(invitation: XOInvitation): Promise<void> {
|
||||||
|
// If the tracker has been stopped, return.
|
||||||
if (this.stopped) {
|
if (this.stopped) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const mergedCommits = mergeCommits(
|
// Merge the commits from the initial invitation and the updated invitation.
|
||||||
|
const mergedCommits = OrderInvitationTracker.mergeCommits(
|
||||||
this.deps.invitation.commits ?? [],
|
this.deps.invitation.commits ?? [],
|
||||||
invitation.commits ?? [],
|
invitation.commits ?? [],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Update the invitation with the merged commits.
|
||||||
this.deps.invitation = {
|
this.deps.invitation = {
|
||||||
...this.deps.invitation,
|
...this.deps.invitation,
|
||||||
...invitation,
|
...invitation,
|
||||||
commits: mergedCommits,
|
commits: mergedCommits,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Get the order from the database.
|
||||||
const order = await this.deps.database.db
|
const order = await this.deps.database.db
|
||||||
.selectFrom("orders")
|
.selectFrom("orders")
|
||||||
.selectAll()
|
.selectAll()
|
||||||
.where("id", "=", this.deps.orderId)
|
.where("id", "=", this.deps.orderId)
|
||||||
.executeTakeFirst();
|
.executeTakeFirst();
|
||||||
|
|
||||||
|
// If the order is not found, or the order is completed or cancelled, stop the tracker.
|
||||||
if (
|
if (
|
||||||
!order ||
|
!order ||
|
||||||
order.status === "completed" ||
|
order.status === "completed" ||
|
||||||
@@ -108,6 +132,7 @@ export class OrderInvitationTracker {
|
|||||||
// Basing it off of the commit count is flawed and useless.
|
// Basing it off of the commit count is flawed and useless.
|
||||||
const hasCustomerActivity = mergedCommits.length > this.initialCommitCount;
|
const hasCustomerActivity = mergedCommits.length > this.initialCommitCount;
|
||||||
|
|
||||||
|
// If the order is pending and there is customer activity, mark the order as paid and set the dispense timer.
|
||||||
if (order.status === "pending" && hasCustomerActivity) {
|
if (order.status === "pending" && hasCustomerActivity) {
|
||||||
await this.deps.database.db
|
await this.deps.database.db
|
||||||
.updateTable("orders")
|
.updateTable("orders")
|
||||||
@@ -123,22 +148,31 @@ export class OrderInvitationTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Complete the order by marking it as completed and decrementing the stock of the items.
|
||||||
|
* TODO: There is a performance improvement to be made here by passing in the order from handleUpdate. Its probably worth almost nothing though
|
||||||
|
* @returns A promise that resolves when the order has been completed
|
||||||
|
*/
|
||||||
private async completeOrder(): Promise<void> {
|
private async completeOrder(): Promise<void> {
|
||||||
|
// If the tracker has been stopped, return.
|
||||||
if (this.stopped) {
|
if (this.stopped) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the order from the database.
|
||||||
const order = await this.deps.database.db
|
const order = await this.deps.database.db
|
||||||
.selectFrom("orders")
|
.selectFrom("orders")
|
||||||
.selectAll()
|
.selectAll()
|
||||||
.where("id", "=", this.deps.orderId)
|
.where("id", "=", this.deps.orderId)
|
||||||
.executeTakeFirst();
|
.executeTakeFirst();
|
||||||
|
|
||||||
|
// If the order is not found, or the order is completed, stop the tracker.
|
||||||
if (!order || order.status === "completed") {
|
if (!order || order.status === "completed") {
|
||||||
this.stop();
|
this.stop();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark the order as completed.
|
||||||
await this.deps.database.db
|
await this.deps.database.db
|
||||||
.updateTable("orders")
|
.updateTable("orders")
|
||||||
.set({ status: "completed" })
|
.set({ status: "completed" })
|
||||||
@@ -147,18 +181,24 @@ export class OrderInvitationTracker {
|
|||||||
|
|
||||||
this.deps.debug("Order %s mock dispensed (completed)", this.deps.orderId);
|
this.deps.debug("Order %s mock dispensed (completed)", this.deps.orderId);
|
||||||
|
|
||||||
|
// Decrement the stock of the items.
|
||||||
try {
|
try {
|
||||||
|
// Parse the line items from the order.
|
||||||
const lineItems = JSON.parse(order.items) as Array<{
|
const lineItems = JSON.parse(order.items) as Array<{
|
||||||
id: string;
|
id: string;
|
||||||
quantity: number;
|
quantity: number;
|
||||||
}>;
|
}>;
|
||||||
|
|
||||||
|
// Loop through the line items and decrement the stock of the items.
|
||||||
for (const line of lineItems) {
|
for (const line of lineItems) {
|
||||||
|
// Get the item from the database.
|
||||||
const item = await this.deps.database.db
|
const item = await this.deps.database.db
|
||||||
.selectFrom("items")
|
.selectFrom("items")
|
||||||
.selectAll()
|
.selectAll()
|
||||||
.where("id", "=", line.id)
|
.where("id", "=", line.id)
|
||||||
.executeTakeFirst();
|
.executeTakeFirst();
|
||||||
|
|
||||||
|
// If the item is found, decrement the stock of the item.
|
||||||
if (item) {
|
if (item) {
|
||||||
await this.deps.database.db
|
await this.deps.database.db
|
||||||
.updateTable("items")
|
.updateTable("items")
|
||||||
@@ -168,27 +208,31 @@ export class OrderInvitationTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.deps.debug(
|
// If the error is not an order payment error, return a 500 error.
|
||||||
"Failed to decrement stock for order %s: %o",
|
this.deps.debug("Failed to decrement stock for order %s: %o", this.deps.orderId, error);
|
||||||
this.deps.orderId,
|
|
||||||
error,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop the tracker.
|
||||||
this.stop();
|
this.stop();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
function mergeCommits(
|
/**
|
||||||
initial: XOInvitationCommit[],
|
* Merge two arrays of commits into a single array of commits.
|
||||||
additional: XOInvitationCommit[],
|
* TODO: Make sure the commits are actually immutable. If they aren't this would allow the "additional" commits to modify the initial commits.
|
||||||
): XOInvitationCommit[] {
|
* @param initial - The initial commits
|
||||||
const map = new Map<string, XOInvitationCommit>();
|
* @param additional - The additional commits
|
||||||
for (const commit of initial) {
|
* @returns The merged commits
|
||||||
map.set(commit.commitIdentifier, commit);
|
*/
|
||||||
|
static mergeCommits(initial: XOInvitationCommit[], additional: XOInvitationCommit[]): XOInvitationCommit[] {
|
||||||
|
// Create a map of the initial commits.
|
||||||
|
const map = Object.fromEntries(initial.map(commit => [commit.commitIdentifier, commit]));
|
||||||
|
|
||||||
|
// Add the additional commits to the map.
|
||||||
|
for (const commit of additional) {
|
||||||
|
map[commit.commitIdentifier] = commit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the merged commits.
|
||||||
|
return Object.values(map);
|
||||||
}
|
}
|
||||||
for (const commit of additional) {
|
};
|
||||||
map.set(commit.commitIdentifier, commit);
|
|
||||||
}
|
|
||||||
return Array.from(map.values());
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user