Small cleanup

This commit is contained in:
2025-09-09 02:14:06 +00:00
parent 283dcd3a44
commit 67a3410c41
11 changed files with 45 additions and 102 deletions

View File

@@ -264,18 +264,11 @@ export class AvailableMessageCache
// Oracles start their sequence at 1
const earliestMessageSequence = 0;
// Get the gaps for the oracle
// The query expects an exclusive range so we extend both the start and end by 1.
console.log('Getting Oracle Gaps from Database for Oracle: ', oraclePublicKey);
const start = performance.now();
// const messageCount = await db.db.selectFrom(`messages-${oraclePublicKey}`).selectAll().execute();
// console.log('Message count for Oracle: ', oraclePublicKey, messageCount.length);
// Get the gaps for the oracle
// The query expects an exclusive range so we extend both the start and end by 1.
// Use a subquery approach to find gap boundaries - more reliable than CTEs
const oracleMessageRows = await db.db
.selectFrom(
@@ -303,11 +296,7 @@ export class AvailableMessageCache
.execute();
console.log('Found gap boundary rows:', oracleMessageRows.length);
console.log('Oracle Gaps from Database for Oracle: ', oraclePublicKey, performance.now() - start);
console.log(`Oracle Gaps from Database for Oracle: Gaps ${oracleMessageRows.length} for Oracle: ${oraclePublicKey} in ${performance.now() - start}ms`);
return oracleMessageRows;
}

View File

@@ -1,16 +0,0 @@
export type DatabaseSchema = {
oracles: {
id: number;
publicKey: string;
}
[key: `messages-${string}`]: {
id: number;
signature: string;
timestamp: number;
sequence: number;
priceSequence: number | null;
price: number | null;
metadataType: number | null;
metadata: string | null;
}
}

View File

View File

@@ -1,7 +1,7 @@
import { type SignedMessage } from '@generalprotocols/oracle-client';
import debug from 'debug';
import 'dotenv/config';
import { Config } from './config.js';
import { Database } from './services/db.js';
import { OraclesRoute } from './routes/oracles.js';
@@ -11,14 +11,13 @@ import { RouteHandlerExpress } from './services/route-handler/route-handler-expr
import { BroadcasterZMQ } from './services/broadcaster/broadcaster-zmq.js';
import { BroadcasterSSE } from './services/broadcaster/broadcaster-sse.js';
import { Config } from './config.js';
import { OracleManager } from './oracles/oracle-manager.js';
type AppDependencies = {
database: Database;
oracleManager: OracleManager;
broadcasterZMQ: BroadcasterZMQ<SignedMessage<string>>;
broadcasterSSE: BroadcasterSSE<SignedMessage<string>>;
broadcasterZMQ: BroadcasterZMQ;
broadcasterSSE: BroadcasterSSE;
oracleRoutes: OraclesRoute;
routeHandlerZMQ: RouteHandlerZMQ;
routeHandlerExpress: RouteHandlerExpress;
@@ -33,9 +32,6 @@ class App {
const database = await Database.from(config.DATABASE_FILE_PATH);
log('created database');
// const subscriber = new SubscriberSSE('https://oracles.generalprotocols.com');
// log('created subscriber');
const broadcasterZMQ = new BroadcasterZMQ();
log('created broadcasterZMQ');
@@ -85,39 +81,7 @@ class App {
constructor(
private dependencies: AppDependencies,
) {
// this.dependencies.subscriber.on('message', async (message) => {
// const { publicKey, signature } = message.toHexObject();
// await this.dependencies.database.getOracleStore(publicKey)
// if (OracleMessage.isMetadataMessage(message)) {
// await this.dependencies.database.db.insertInto(`messages-${publicKey}`).values({
// id: message.messageSequence,
// signature,
// timestamp: message.messageTimestamp,
// sequence: message.messageSequence,
// metadataType: message.metadataType,
// metadata: message.metadataContent,
// }).execute();
// }
// if (OracleMessage.isPriceMessage(message)) {
// await this.dependencies.database.db.insertInto(`messages-${publicKey}`).values({
// id: message.messageSequence,
// signature,
// timestamp: message.messageTimestamp,
// sequence: message.messageSequence,
// priceSequence: message.priceSequence,
// price: message.priceValue,
// }).execute();
// }
// this.dependencies.broadcasterZMQ.broadcast(message.toHexObject());
// this.dependencies.broadcasterSSE.broadcast(message.toHexObject());
// });
}
) {}
async start() {
await Promise.all([
@@ -127,7 +91,6 @@ class App {
this.dependencies.routeHandlerExpress.start(),
this.dependencies.routeHandlerZMQ.start(),
// this.dependencies.subscriber.start(),
this.dependencies.oracleManager.start(),
])
}

View File

@@ -1,17 +1,18 @@
import { OracleClient, OracleMessage, OracleMetadataMessage, OraclePriceMessage, type SignedMessage } from "@generalprotocols/oracle-client";
import { OracleClient, OracleMetadataMessage, OraclePriceMessage } from "@generalprotocols/oracle-client";
import debug, { type Debugger } from "debug";
import { Oracle } from "./oracles.js";
import type { Database } from "~/services/db.js";
import type { Config } from "~/config.js";
import type { BaseBroadcaster } from "~/services/broadcaster/base-broadcaster.js";
import debug, { type Debugger } from "debug";
import { RemoteComposite } from "~/services/remotes/remote-composite.js";
export type OracleManagerDependencies = {
database: Database;
config: Config;
broadcasters: Array<BaseBroadcaster<SignedMessage<string>>>;
broadcasters: Array<BaseBroadcaster>;
}
export class OracleManager {

View File

@@ -1,11 +1,11 @@
import debug, { type Debugger } from "debug";
import { OracleMessage, OracleClient, type OracleMetadataMessage, type OraclePriceMessage } from "@generalprotocols/oracle-client";
import { OracleMessage, type OracleMetadataMessage, type OraclePriceMessage } from "@generalprotocols/oracle-client";
import { Mutex } from "async-mutex";
import { EventEmitter } from "../utils/event-emitter.js";
import { AvailableMessageCache } from "../cache/available-messages.js";
import type { Database } from "../services/db.js";
import { Mutex } from "async-mutex";
import type { BaseRemote } from "~/services/remotes/base-remote.js";
export type OracleDependencies = {
@@ -156,8 +156,14 @@ export class Oracle extends EventEmitter<OracleEvent> {
// Update the gaps
gaps = await this.availableMessages.getGaps();
// Get some stats for our debug logs.
// NOTE: The variables here are just to show x/y messages recovered
const messageCount = await this.availableMessages.getOracleMessageCount();
const lastMessage = gaps.at(-1)?.end;
const availableMessages = await this.availableMessages.getAvailableMessages();
const lastMessage = availableMessages.at(-1)?.end;
// Log the stats
this.debug(`Recovered messages ${start} to ${end} oracle messages (${messageCount} / ${lastMessage})`);
await new Promise(resolve => setTimeout(resolve, 2000));

View File

@@ -1,4 +1,6 @@
export abstract class BaseBroadcaster<T> {
import type { SignedMessage } from "@generalprotocols/oracle-client";
export abstract class BaseBroadcaster<T extends SignedMessage<string> = SignedMessage<string>> {
abstract start(): Promise<void>;
abstract broadcast(data: T): Promise<void>;

View File

@@ -4,7 +4,7 @@ import debug, { type Debugger } from "debug";
import { BaseBroadcaster } from "./base-broadcaster.js";
export class BroadcasterSSE<T extends SignedMessage<string>> extends BaseBroadcaster<T> {
export class BroadcasterSSE<T extends SignedMessage<string> = SignedMessage<string>> extends BaseBroadcaster<T> {
static async from() {
const broadcaster = new BroadcasterSSE();
await broadcaster.start();

View File

@@ -1,4 +1,4 @@
import { binToHex, flattenBinArray, hexToBin, opcodeToPushLength } from '@bitauth/libauth';
import { binToHex, flattenBinArray, hexToBin } from '@bitauth/libauth';
import { OracleProtocol, type SignedMessage } from '@generalprotocols/oracle-client';
import { Publisher } from 'zeromq'
import debug, { type Debugger } from "debug";
@@ -8,7 +8,7 @@ import { BaseBroadcaster } from "./base-broadcaster.js";
const OP_RETURN = hexToBin('6A');
export class BroadcasterZMQ<T extends SignedMessage<string>> extends BaseBroadcaster<T> {
export class BroadcasterZMQ<T extends SignedMessage<string> = SignedMessage<string>> extends BaseBroadcaster<T> {
static async from() {
const broadcaster = new BroadcasterZMQ();
await broadcaster.start();

View File

@@ -1,6 +1,22 @@
import { Kysely, sql, SqliteDialect, type SelectQueryBuilder } from "kysely";
import SqliteDatabase from 'better-sqlite3'
import { type DatabaseSchema } from '~/database/schema.js';
export type DatabaseSchema = {
oracles: {
id: number;
publicKey: string;
}
[key: `messages-${string}`]: {
id: number;
signature: string;
timestamp: number;
sequence: number;
priceSequence: number | null;
price: number | null;
metadataType: number | null;
metadata: string | null;
}
}
export class Database {
static async from(databaseFilePath: string) {
@@ -80,14 +96,3 @@ export class Database {
return this.db.selectFrom(`messages-${publicKey}`)
}
}
// const db = await Database.from();
// const oracles = await db.getOracles();
// Object.values(oracles).forEach(async (oracle) => {
// const messages = await oracle.selectAll().orderBy('timestamp', 'desc').limit(10).execute();
// console.log(messages);
// });
// console.log(oracles);

View File

@@ -3,14 +3,13 @@ import cors from 'cors';
import debug, { type Debugger } from 'debug';
import { type RouteOptions, Request } from '~/routes/types.js';
import { BroadcasterSSE } from '~/services/broadcaster/broadcaster-sse.js';
import type { SignedMessage } from '@generalprotocols/oracle-client';
export type RouteHandlerExpressDependencies = {
routes: Array<RouteOptions>
port: number
host: string
broadcasterSSE: BroadcasterSSE<SignedMessage<string>>;
broadcasterSSE: BroadcasterSSE;
}
export class RouteHandlerExpress {
@@ -44,12 +43,6 @@ export class RouteHandlerExpress {
this.app.enable('trust proxy');
this.app.set('json spaces', 2);
// Test debug. Just print all requests
// this.app.use((req, res, next) => {
// this.debug('Request received', req.method, req.url);
// next();
// });
// Create our route handlers
for (const route of this.deps.routes) {
this.app[route.method.toLowerCase() as keyof Application](route.url, async (req: ExpressRequest, res: ExpressResponse) => {