Initial Commit

This commit is contained in:
2025-09-08 05:31:19 +00:00
commit 283dcd3a44
27 changed files with 4127 additions and 0 deletions

55
.env Normal file
View File

@@ -0,0 +1,55 @@
# ORACLES=`{
# "02d09db08af1ff4e8453919cc866a4be427d7bfe18f2c05e5444c196fcf6fd2818":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "02bb9b3324df889a66a57bc890b3452b84a2a74ba753f8842b06bba03e0fa0dfc5":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "0245a107de5c6aabc9e7b976f26625b01474f90d1a7d11c180bec990b6938e731e":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "038ab22e37cf020f6bbef40111ddc51083a936f0821de56ac01f799cf15b87904d":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "021f8338ccd45a7790025de198a266f252ac43c95bf81d2469feff110beeac89dd":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "02712c349ebb7555b17bdbbe9f7aad5a337fa4179d0680eec3f6c8d77bac9cfa79":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "030654b9598186fe4bc9e1b0490c6b85b13991cdb9a7afa34af1bbeee22a35487a":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "02e82ad82eb88fcdfd02fd5e2e0a67bc6ef4139bbcb63ce0b107a7604deb9f7ce1":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "02461db2aead56ea67543c8196a248152185cb824802a2e7ff12bfc2a493898b8f":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "02d3c1de9d4bc77d6c3608cbe44d10138c7488e592dc2b1e10a6cf0e92c2ecb047":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "03c22127c967bb28ec518fcc225164100df40470a1f6b457cd3a85adb051dcaa56":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "03518d199a4ca5dc06ecb1068416acde321df1b8d6f09149744b1e0fb38c92c92c":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "03e980928f14fc98e1f9d75d15f0b67dc58cdd3f5c641b8f825b146bcc04bd232c":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "03e9342b4d07dc35db0f555b80e19645b2a2a95a22675b50ead248d551a900fdec":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "03994dc2c759375e98afbf5049383cd987001c346d0f11aa262c105874fb1390c3":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "0330779426934d4fe5d18a3721e9eae246150501ebc537e866d2841369daeb0691":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "03c5e3e6a2fe9ed9be3c71a11e7808cf8428bc9ca48808d05a6fa2526865964f06":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "0307c919029daf616d6c01152b8eaadc22888c7a93384e06f4a22866fa9ea486a1":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "02664276fb7513f838f505c221680a9d963479ffb45452b0c744ddb6bd19ecacb3":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "02af3ed94f4872490344a3f2fc978e75ab74a797f703bbc6152524e4d0276ee5bc":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "02a7fd66755e47de6780643739aca156905a1451703d744ac133a2eada02a1ed60":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "0362984874a1106a126e299000da0b5d959aaa2f3ffa9565dbbcd19b6216f1a8bc":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "0336f13d65e3bd6a521bf582f22b74f50edab7c278d38b80e319673b859f95d830":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "034e1d3be2ee29b3d9e53b354b09d9a5a2803c568d8c6520bc72d97494c9a100c2":"http://staging.api.oracles.cash.mc.hzuccon.com",
# "03994dc2c759375e98afbf5049383cd987001c346d0f11aa262c105874fb1390c3":"http://staging.api.oracles.cash.mc.hzuccon.com"
# }`
ORACLES=`{
"02d09db08af1ff4e8453919cc866a4be427d7bfe18f2c05e5444c196fcf6fd2818":"https://oracles.generalprotocols.com",
"02bb9b3324df889a66a57bc890b3452b84a2a74ba753f8842b06bba03e0fa0dfc5":"https://oracles.generalprotocols.com",
"0245a107de5c6aabc9e7b976f26625b01474f90d1a7d11c180bec990b6938e731e":"https://oracles.generalprotocols.com",
"038ab22e37cf020f6bbef40111ddc51083a936f0821de56ac01f799cf15b87904d":"https://oracles.generalprotocols.com",
"021f8338ccd45a7790025de198a266f252ac43c95bf81d2469feff110beeac89dd":"https://oracles.generalprotocols.com",
"02712c349ebb7555b17bdbbe9f7aad5a337fa4179d0680eec3f6c8d77bac9cfa79":"https://oracles.generalprotocols.com",
"030654b9598186fe4bc9e1b0490c6b85b13991cdb9a7afa34af1bbeee22a35487a":"https://oracles.generalprotocols.com",
"02e82ad82eb88fcdfd02fd5e2e0a67bc6ef4139bbcb63ce0b107a7604deb9f7ce1":"https://oracles.generalprotocols.com",
"02461db2aead56ea67543c8196a248152185cb824802a2e7ff12bfc2a493898b8f":"https://oracles.generalprotocols.com",
"02d3c1de9d4bc77d6c3608cbe44d10138c7488e592dc2b1e10a6cf0e92c2ecb047":"https://oracles.generalprotocols.com",
"03c22127c967bb28ec518fcc225164100df40470a1f6b457cd3a85adb051dcaa56":"https://oracles.generalprotocols.com",
"03518d199a4ca5dc06ecb1068416acde321df1b8d6f09149744b1e0fb38c92c92c":"https://oracles.generalprotocols.com",
"03e980928f14fc98e1f9d75d15f0b67dc58cdd3f5c641b8f825b146bcc04bd232c":"https://staging-oracles.generalprotocols.com",
"03e9342b4d07dc35db0f555b80e19645b2a2a95a22675b50ead248d551a900fdec":"https://oracles.generalprotocols.com",
"03994dc2c759375e98afbf5049383cd987001c346d0f11aa262c105874fb1390c3":"https://oracles.generalprotocols.com",
"0330779426934d4fe5d18a3721e9eae246150501ebc537e866d2841369daeb0691":"https://oracles.generalprotocols.com",
"03c5e3e6a2fe9ed9be3c71a11e7808cf8428bc9ca48808d05a6fa2526865964f06":"https://oracles.generalprotocols.com",
"0307c919029daf616d6c01152b8eaadc22888c7a93384e06f4a22866fa9ea486a1":"https://oracles.generalprotocols.com",
"02664276fb7513f838f505c221680a9d963479ffb45452b0c744ddb6bd19ecacb3":"https://oracles.generalprotocols.com",
"02af3ed94f4872490344a3f2fc978e75ab74a797f703bbc6152524e4d0276ee5bc":"https://oracles.generalprotocols.com",
"02a7fd66755e47de6780643739aca156905a1451703d744ac133a2eada02a1ed60":"https://oracles.generalprotocols.com",
"0362984874a1106a126e299000da0b5d959aaa2f3ffa9565dbbcd19b6216f1a8bc":"https://oracles.generalprotocols.com",
"0336f13d65e3bd6a521bf582f22b74f50edab7c278d38b80e319673b859f95d830":"https://oracles.generalprotocols.com",
"034e1d3be2ee29b3d9e53b354b09d9a5a2803c568d8c6520bc72d97494c9a100c2":"https://oracles.generalprotocols.com",
"03994dc2c759375e98afbf5049383cd987001c346d0f11aa262c105874fb1390c3":"https://oracles.generalprotocols.com"
}`

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
/node_modules
/dist
/build
/oracles.db
/oracles.db-shm
/oracles.db-wal

2125
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

36
package.json Normal file
View File

@@ -0,0 +1,36 @@
{
"name": "rewrite",
"version": "1.0.0",
"main": "index.js",
"type": "module",
"scripts": {
"start": "tsx src/index.ts",
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"description": "",
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
"@types/cors": "^2.8.19",
"@types/debug": "^4.1.12",
"@types/express": "^5.0.3",
"@types/node": "^24.3.1",
"tsx": "^4.20.5",
"typescript": "^5.9.2"
},
"dependencies": {
"@bitauth/libauth": "^3.0.0",
"@generalprotocols/oracle-client": "file:../client",
"async-mutex": "^0.5.0",
"better-sqlite3": "^12.2.0",
"cors": "^2.8.5",
"debug": "^4.4.1",
"dotenv": "^17.2.2",
"express": "^5.1.0",
"kysely": "^0.28.5",
"zeromq": "^6.5.0",
"zod": "^4.1.5"
}
}

407
src/cache/available-messages.ts vendored Normal file
View File

@@ -0,0 +1,407 @@
import { Mutex } from 'async-mutex';
import debug, { type Debugger } from 'debug';
import { sql } from 'kysely';
import type { Database } from '~/services/db.js'
export type SequenceRange = {
start: number;
end: number;
}
export type OracleMessageGap = {
previousMessageSequence: number | null;
currentMessageSequence: number;
nextMessageSequence: number | null;
}
/**
* NOTE: This was largely ripped out of the existing Relay server. It is very likely that we can reduce the complexity of this code significantly due to other architectural changes.
*/
/**
* This is a cache for the available messages.
* We use this cache to keep track of the messages we have (available) and the messages we don't have (gaps).
* We can then reference this cache to find the gaps in the messages for recovery purposes.
*
* Messages are stored in an Object with the public key of the oracle as the key and an array of available message ranges as the value.
* Eg
* {
* PublicKey_ABC: {
* availableMessages: [
* { start: 1, end: 100 },
* { start: 102, end: 200 },
* { start: 205, end: 300 }
* ]
* }
* }
*
* This means that the oracle with PublicKey_ABC has messages 1 - 100, 102 - 200, and 205 - 300. But 101, 201, 202, 203, 204 are missing.
*/
export class AvailableMessageCache
{
private availableMessages: Array<SequenceRange>;
private debug: Debugger;
private initMutex = new Mutex();
/**
* Private constructor to initialize the AvailableMessageCache.
*
* @description
* This constructor is intentionally marked as private to enforce the factory pattern
* through the static create() method. This ensures all instances of AvailableMessageCache
* are properly wrapped with mutex protection via createSingleExecutionInstance().
*
* @private
*/
public constructor(private readonly db: Database, private readonly publicKey: string)
{
this.availableMessages = [];
this.debug = debug(`cache:available-messages:${this.publicKey}`);
}
async start() {
this.debug('Starting available messages cache');
}
async getAvailableMessages(): Promise<Array<SequenceRange>>
{
const release = await this.initMutex.acquire();
try {
const availableMessages = this.availableMessages
if (!availableMessages.length) {
this.availableMessages = await this.findAvailableMessagesFromDatabase()
return this.availableMessages;
}
// this.debug('Available messages', availableMessages);
return availableMessages;
} finally {
release();
}
}
/**
* Add a message to the cache.
* If this fills a gap, we will combine the available sequences and make them a single range.
* If it is a sequence in the middle of a gap, we will split the gap into two.
* Eg
* [{ start: 1, end: 100 }, { start: 102, end: 200 }, { start: 205, end: 300 }]
*
* add 101
*
* [{ start: 1, end: 200 }, { start: 205, end: 300 }]
*
* add 202
*
* [{ start: 1, end: 200 }, { start: 202, end: 202 }, { start: 205, end: 300 }]
*
* @param { Uint8Array | string } oraclePublicKey - the public key of the oracle to add the message to.
* @param { number } sequence - the sequence of the message to add.
*/
public async addMessage(sequence: number): Promise<void>
{
// Get the available messages for the oracle.
const availableMessages = await this.getAvailableMessages();
// Push the message to the available messages.
availableMessages.push({ start: sequence, end: sequence });
// Sort the available messages by start.
availableMessages.sort((a, b) => a.start - b.start);
// Loop through the available message sequences and combine/remove overlapping ranges.
for(const [ index, range ] of availableMessages.entries())
{
// Perform a nested loop for a max of 2 iterations so we can merge more than 2 ranges together in the case we fill a single message gap and need to merge 3 from the same starting point.
// It's only necessary to do two iterations in the worst case because we start from the base condition that there are zero overlapping ranges.
// In the worst case, the injected message connects two ranges, requiring two merges.
// eslint-disable-next-line
for(let i = 0; i < 2; i++)
{
// Get the next range.
const nextRange: SequenceRange | undefined = availableMessages[index + 1];
// Check if the next range can be merged into the current range.
// We subtract 1 from the next range start to account for when the next range is adjacent to the current range.
const canMergeWithNext = nextRange && range.end >= nextRange.start - 1;
// If we can merge with the next range, we need to combine them.
if(canMergeWithNext)
{
// Get the largest end of the two ranges.
const largestEnd = Math.max(range.end, nextRange.end);
// Combine the ranges.
range.end = largestEnd;
// Remove the next range.
availableMessages.splice(index + 1, 1);
// Iterate over this range again as we may be able to merge it with the now updated next range.
continue;
}
// Break out of the nested loop as we couldn't merge to the next loop.
// This will keep the outer loop running to check the next range.
break;
}
}
// Add the message to the cache.
this.availableMessages = availableMessages;
}
/**
* Get the gaps for an oracle.
*
* Loop through the available messages and find the gaps.
*
* If the available messages are empty, we will query the database for the gaps.
*
* @param { Uint8Array | string } oraclePublicKey - the public key of the oracle to get the gaps for.
*
* @returns { Array<ISequenceRange> } - the gaps for the oracle.
*/
public async getGaps(): Promise<Array<SequenceRange>>
{
// Get the available messages for the oracle.
const availableMessages = await this.getAvailableMessages();
// Parse the available messages to gaps.
const gaps = await AvailableMessageCache.parseAvailableMessagesToGaps(availableMessages);
return gaps;
}
/**
* Perform a database query to find the available messages for an oracle.
*
* @param { Uint8Array | string } oraclePublicKey - the public key of the oracle to find the available messages for.
*
* @returns { Array<ISequenceRange> } - the available messages for the oracle.
*/
private async findAvailableMessagesFromDatabase(): Promise<Array<SequenceRange>>
{
// Get the oracle message rows for the oracle
const oracleMessageRows = await AvailableMessageCache.getOracleMessageRows(this.db, this.publicKey);
// Parse the oracle message rows to available messages
const availableMessages = await AvailableMessageCache.parseRowsToAvailableMessages(oracleMessageRows);
return availableMessages;
}
/**
* Get the message count for an oracle.
*
* @param { Uint8Array | string } oraclePublicKey - the public key of the oracle to get the message count for.
*
* @returns { number } - the message count for the oracle.
*/
public async getOracleMessageCount(): Promise<number>
{
// Get the available messages for the oracle.
const availableMessages = await this.getAvailableMessages();
// Parse the available messages to a message count.
const messageCount = await AvailableMessageCache.parseAvailableMessagesToMessageCount(availableMessages);
// Return the message count.
return messageCount;
}
/**
* Parse an array of available messages to a message count.
*
* @param { Array<ISequenceRange> } availableMessages - the available messages to parse to a message count.
*
* @returns { number } - the message count.
*/
static async parseAvailableMessagesToMessageCount(availableMessages: Array<SequenceRange>): Promise<number>
{
// Initialize the total to 0.
let total = 0;
// Loop through the available messages and add the length of each range to the total.
for(const range of availableMessages)
{
// Add the length of the range to the total.
// Add 1 to account for the inclusive range.
total += range.end - range.start + 1;
}
// Return the total.
return total;
}
/**
* Get the oracle message rows for an oracle formatted to find gaps.
*
* @param { Uint8Array | string } oraclePublicKey - the public key of the oracle to find the oracle message rows for.
*
* @returns { Array<IOracleMessageGap> } - the oracle message rows for the oracle.
*/
static async getOracleMessageRows(db: Database, oraclePublicKey: string): Promise<Array<OracleMessageGap>>
{
const oracleStore = await db.getOracleStore(oraclePublicKey);
// Get the latest message sequence for the oracle
const latestMessage = await oracleStore.select('sequence').orderBy('sequence', 'desc').limit(1).executeTakeFirst();
// If the oracle has no messages, we will return an empty array.
// We don't have any messages to find gaps for.
// This should never happen since messages should be added to the db before we start querying for gaps.
if(!latestMessage)
{
return [];
}
// Oracles start their sequence at 1
const earliestMessageSequence = 0;
// Get the gaps for the oracle
// The query expects an exclusive range so we extend both the start and end by 1.
console.log('Getting Oracle Gaps from Database for Oracle: ', oraclePublicKey);
const start = performance.now();
// const messageCount = await db.db.selectFrom(`messages-${oraclePublicKey}`).selectAll().execute();
// console.log('Message count for Oracle: ', oraclePublicKey, messageCount.length);
// Use a subquery approach to find gap boundaries - more reliable than CTEs
const oracleMessageRows = await db.db
.selectFrom(
db.db
.selectFrom(`messages-${oraclePublicKey}` as any)
.select([
sql<number | null>`LAG(sequence) OVER (ORDER BY sequence)`.as('previousMessageSequence'),
'sequence as currentMessageSequence',
sql<number | null>`LEAD(sequence) OVER (ORDER BY sequence)`.as('nextMessageSequence')
])
.where('sequence', '>=', earliestMessageSequence)
.where('sequence', '<=', latestMessage.sequence)
.as('withNeighbors')
)
.selectAll()
.where((eb) => eb.or([
// Start of sequence (no previous) OR gap after previous
eb('previousMessageSequence', 'is', null),
eb(sql`currentMessageSequence - previousMessageSequence`, '>', 1),
// End of sequence (no next) OR gap before next
eb('nextMessageSequence', 'is', null),
eb(sql`nextMessageSequence - currentMessageSequence`, '>', 1)
]))
.orderBy('currentMessageSequence')
.execute();
console.log('Found gap boundary rows:', oracleMessageRows.length);
console.log('Oracle Gaps from Database for Oracle: ', oraclePublicKey, performance.now() - start);
return oracleMessageRows;
}
/**
* Parse rows that were returned from the database query to find oracle gaps.
* Returns an array of gaps. A gap is represented by an object indicating the first and last
* missing oracle message sequence (inclusive on both sides).
*
* @param { Array<IOracleMessageGap> } oracleMessageRows - the oracle message rows to parse to available messages.
*
* @returns { Array<ISequenceRange> } - the available messages for the oracle.
*/
static async parseRowsToAvailableMessages(oracleMessageRows: Array<OracleMessageGap>): Promise<Array<SequenceRange>>
{
// Initialize available messages to an empty list.
const availableMessages: Array<SequenceRange> = [];
let rangeStart = 1;
// Loop over the oracle message rows and add the available messages to the list.
for(const row of oracleMessageRows)
{
// Check if this is the start of a new range.
const isStartOfNewRange = (row.previousMessageSequence !== row.currentMessageSequence - 1);
// Check if this is the end of a range.
const isEndOfRange = (row.nextMessageSequence !== row.currentMessageSequence + 1);
// If this is the start of a new range, we need to update the range start.
if(isStartOfNewRange)
{
// Update the range start.
rangeStart = row.currentMessageSequence;
}
// If this is the end of a range, we need to add the range to the available messages.
if(isEndOfRange)
{
// Get the end of the range.
const rangeEnd = row.currentMessageSequence;
// Add the range to the available messages.
availableMessages.push({ start: rangeStart, end: rangeEnd });
}
}
// Return the available messages.
return availableMessages;
}
/**
* Parse an array of available messages to gaps.
*
* @param { Array<ISequenceRange> } availableMessages - the available messages to parse to gaps.
*
* @returns { Array<ISequenceRange> } - the gaps for the oracle.
*/
static async parseAvailableMessagesToGaps(availableMessages: Array<SequenceRange>): Promise<Array<SequenceRange>>
{
// Make sure the available messages are sorted by start
availableMessages.sort((a, b) => a.start - b.start);
const firstAvailableMessage = availableMessages[0];
if (!firstAvailableMessage) {
return [];
}
// Initialize gaps to an empty list.
const gaps: Array<SequenceRange> = [];
// Check to make sure our available messages start at 1.
if(firstAvailableMessage.start !== 1)
{
// Set the first gap to be from 1 to the start of the first available message.
gaps.push({ start: 1, end: firstAvailableMessage.start - 1 });
}
// Loop over the available messages and find the gaps.
for(const [ index, range ] of availableMessages.entries())
{
// Get the end of the previous range
const previousRangeEnd = availableMessages[index - 1]?.end;
// If there is no previous range, this is the first range.
if(!previousRangeEnd)
{
continue;
}
// Take the end of the previous range and the start of the current range and add them to the available messages.
gaps.push({ start: previousRangeEnd + 1, end: range.start - 1 });
}
return gaps;
}
}

52
src/config.ts Normal file
View File

@@ -0,0 +1,52 @@
import { z } from "zod";
export class Config {
static fromEnv(): Config {
// Validate the environment variables
const ORACLES = process.env.ORACLES ? JSON.parse(process.env.ORACLES) : {};
const config = this.schema.parse({ ...process.env, ORACLES });
return new Config(config);
}
static from(config: Config) {
// Validate the config
const validatedConfig = this.schema.parse(config);
return new Config(validatedConfig);
}
static schema = z.object({
DATABASE_FILE_PATH: z.string().default('./oracles.db'),
EXPRESS_PORT: z.number().default(3000),
ZMQ_REQUEST_INCOMING_SERVER_ADDRESS: z.string().default('0.0.0.0'),
ZMQ_REQUEST_INCOMING_SERVER_PORT: z.number().default(7083),
ZMQ_BROADCAST_OUTGOING_SERVER_ADDRESS: z.string().default('0.0.0.0'),
ZMQ_BROADCAST_OUTGOING_SERVER_PORT: z.number().default(7084),
ORACLES: z.record(z.string(), z.string()).default({}),
})
public readonly DATABASE_FILE_PATH: string;
// Express
public readonly EXPRESS_PORT: number;
// ZMQ
public readonly ZMQ_REQUEST_INCOMING_SERVER_ADDRESS: string;
public readonly ZMQ_REQUEST_INCOMING_SERVER_PORT: number;
public readonly ZMQ_BROADCAST_OUTGOING_SERVER_ADDRESS: string;
public readonly ZMQ_BROADCAST_OUTGOING_SERVER_PORT: number;
// Oracles
public readonly ORACLES: Record<string, string>;
constructor(config: Config) {
this.DATABASE_FILE_PATH = config.DATABASE_FILE_PATH;
this.EXPRESS_PORT = config.EXPRESS_PORT;
this.ZMQ_REQUEST_INCOMING_SERVER_ADDRESS = config.ZMQ_REQUEST_INCOMING_SERVER_ADDRESS;
this.ZMQ_REQUEST_INCOMING_SERVER_PORT = config.ZMQ_REQUEST_INCOMING_SERVER_PORT;
this.ZMQ_BROADCAST_OUTGOING_SERVER_ADDRESS = config.ZMQ_BROADCAST_OUTGOING_SERVER_ADDRESS;
this.ZMQ_BROADCAST_OUTGOING_SERVER_PORT = config.ZMQ_BROADCAST_OUTGOING_SERVER_PORT;
this.ORACLES = config.ORACLES;
}
}

16
src/database/schema.ts Normal file
View File

@@ -0,0 +1,16 @@
export type DatabaseSchema = {
oracles: {
id: number;
publicKey: string;
}
[key: `messages-${string}`]: {
id: number;
signature: string;
timestamp: number;
sequence: number;
priceSequence: number | null;
price: number | null;
metadataType: number | null;
metadata: string | null;
}
}

0
src/database/sqlite.ts Normal file
View File

137
src/index.ts Normal file
View File

@@ -0,0 +1,137 @@
import { type SignedMessage } from '@generalprotocols/oracle-client';
import debug from 'debug';
import 'dotenv/config';
import { Database } from './services/db.js';
import { OraclesRoute } from './routes/oracles.js';
import { RouteHandlerZMQ } from './services/route-handler/route-handler-zmq.js';
import { RouteHandlerExpress } from './services/route-handler/route-handler-express.js';
import { BroadcasterZMQ } from './services/broadcaster/broadcaster-zmq.js';
import { BroadcasterSSE } from './services/broadcaster/broadcaster-sse.js';
import { Config } from './config.js';
import { OracleManager } from './oracles/oracle-manager.js';
type AppDependencies = {
database: Database;
oracleManager: OracleManager;
broadcasterZMQ: BroadcasterZMQ<SignedMessage<string>>;
broadcasterSSE: BroadcasterSSE<SignedMessage<string>>;
oracleRoutes: OraclesRoute;
routeHandlerZMQ: RouteHandlerZMQ;
routeHandlerExpress: RouteHandlerExpress;
}
class App {
static async from() {
try {
const log = debug('App');
const config = Config.fromEnv();
log('created config');
const database = await Database.from(config.DATABASE_FILE_PATH);
log('created database');
// const subscriber = new SubscriberSSE('https://oracles.generalprotocols.com');
// log('created subscriber');
const broadcasterZMQ = new BroadcasterZMQ();
log('created broadcasterZMQ');
const broadcasterSSE = new BroadcasterSSE();
log('created broadcasterSSE');
const oracleManager = new OracleManager({
database,
config,
broadcasters: [broadcasterSSE, broadcasterZMQ],
});
log('created oracleManager');
const oracleRoutes = new OraclesRoute(database, oracleManager);
log('created oracleRoutes');
const routeHandlerZMQ = new RouteHandlerZMQ({
port: config.ZMQ_REQUEST_INCOMING_SERVER_PORT,
host: config.ZMQ_REQUEST_INCOMING_SERVER_ADDRESS,
onRequest: oracleRoutes.getOracleMessages.bind(oracleRoutes),
});
log('created routeHandlerZMQ');
const routeHandlerExpress = new RouteHandlerExpress({
port: config.EXPRESS_PORT,
host: '0.0.0.0',
routes: await oracleRoutes.getRoutes(),
broadcasterSSE: broadcasterSSE,
});
log('created routeHandlerExpress');
return new App({
database,
oracleRoutes,
routeHandlerExpress,
routeHandlerZMQ,
oracleManager,
broadcasterSSE,
broadcasterZMQ,
});
} catch (error) {
console.error('error', error);
throw error;
}
}
constructor(
private dependencies: AppDependencies,
) {
// this.dependencies.subscriber.on('message', async (message) => {
// const { publicKey, signature } = message.toHexObject();
// await this.dependencies.database.getOracleStore(publicKey)
// if (OracleMessage.isMetadataMessage(message)) {
// await this.dependencies.database.db.insertInto(`messages-${publicKey}`).values({
// id: message.messageSequence,
// signature,
// timestamp: message.messageTimestamp,
// sequence: message.messageSequence,
// metadataType: message.metadataType,
// metadata: message.metadataContent,
// }).execute();
// }
// if (OracleMessage.isPriceMessage(message)) {
// await this.dependencies.database.db.insertInto(`messages-${publicKey}`).values({
// id: message.messageSequence,
// signature,
// timestamp: message.messageTimestamp,
// sequence: message.messageSequence,
// priceSequence: message.priceSequence,
// price: message.priceValue,
// }).execute();
// }
// this.dependencies.broadcasterZMQ.broadcast(message.toHexObject());
// this.dependencies.broadcasterSSE.broadcast(message.toHexObject());
// });
}
async start() {
await Promise.all([
this.dependencies.broadcasterSSE.start(),
this.dependencies.broadcasterZMQ.start(),
this.dependencies.routeHandlerExpress.start(),
this.dependencies.routeHandlerZMQ.start(),
// this.dependencies.subscriber.start(),
this.dependencies.oracleManager.start(),
])
}
}
const app = await App.from();
await app.start();

View File

@@ -0,0 +1,103 @@
import { OracleClient, OracleMessage, OracleMetadataMessage, OraclePriceMessage, type SignedMessage } from "@generalprotocols/oracle-client";
import { Oracle } from "./oracles.js";
import type { Database } from "~/services/db.js";
import type { Config } from "~/config.js";
import type { BaseBroadcaster } from "~/services/broadcaster/base-broadcaster.js";
import debug, { type Debugger } from "debug";
import { RemoteComposite } from "~/services/remotes/remote-composite.js";
export type OracleManagerDependencies = {
database: Database;
config: Config;
broadcasters: Array<BaseBroadcaster<SignedMessage<string>>>;
}
export class OracleManager {
static async from(dependencies: OracleManagerDependencies) {
return new OracleManager(dependencies);
}
private debug: Debugger;
public oracles: Record<string, Oracle>;
// Map of relay url to Oracle Client so we can reuse connections between oracles
private connections: Map<string, OracleClient>;
constructor(private dependencies: OracleManagerDependencies) {
this.debug = debug(`OracleManager`);
this.oracles = {};
this.connections = new Map();
}
async start() {
this.debug('Starting Oracle Manager');
const oracles = this.dependencies.config.ORACLES;
// Load the oracles from the config
for (const [publicKey, relayUrl] of Object.entries(oracles)) {
const connection = await this.createConnection(relayUrl);
this.oracles[publicKey] = await Oracle.from({ database: this.dependencies.database, remote: connection }, publicKey);
this.debug(`Created oracle instance for ${publicKey}`);
}
// Load the oracles from the database - In the future, we will give them a remote based on the metadata message for it, but for now, we are just going to give it a composite of all the connections
const oraclesFromDatabase = await this.dependencies.database.getOracles();
this.debug(`Found ${Object.keys(oraclesFromDatabase).length} oracles in the database`);
for (const [publicKey, oracle] of Object.entries(oraclesFromDatabase)) {
// If we already created the oracle, skip it
if (this.oracles[publicKey]) {
continue;
}
// NOTE: Temporarily, we are just using the composite of all the remotes for if we do not know the relay url
const connection = new RemoteComposite(Object.values(this.connections));
this.oracles[publicKey] = await Oracle.from({ database: this.dependencies.database, remote: connection }, publicKey);
this.debug(`Created oracle instance for ${publicKey}`);
}
// Listen for messages from the broadcasters
for (const oracle of Object.values(this.oracles)) {
oracle.on('broadcast', (message: OracleMetadataMessage | OraclePriceMessage) => {
this.dependencies.broadcasters.forEach((broadcaster) => {
broadcaster.broadcast(message.toHexObject());
});
});
}
this.debug('Oracle Manager started');
}
private async createOracle(publicKey: string, relayUrl: string) {
const connection = await this.createConnection(relayUrl);
return await Oracle.from({ database: this.dependencies.database, remote: connection }, publicKey);
}
private async createConnection(relayUrl: string) {
if (this.connections.get(relayUrl)) {
return this.connections.get(relayUrl)!;
}
const connection = await OracleClient.from({ baseURL: relayUrl });
connection.setOnMessage(async (message: OraclePriceMessage | OracleMetadataMessage) => {
if (!this.oracles[message.toHexObject().publicKey]) {
this.oracles[message.toHexObject().publicKey] = await this.createOracle(message.toHexObject().publicKey, relayUrl);
}
const oracle = this.oracles[message.toHexObject().publicKey];
if (!oracle) {
return;
}
oracle.processMessage(message);
})
this.connections.set(relayUrl, connection);
return connection;
}
}

173
src/oracles/oracles.ts Normal file
View File

@@ -0,0 +1,173 @@
import debug, { type Debugger } from "debug";
import { OracleMessage, OracleClient, type OracleMetadataMessage, type OraclePriceMessage } from "@generalprotocols/oracle-client";
import { EventEmitter } from "../utils/event-emitter.js";
import { AvailableMessageCache } from "../cache/available-messages.js";
import type { Database } from "../services/db.js";
import { Mutex } from "async-mutex";
import type { BaseRemote } from "~/services/remotes/base-remote.js";
export type OracleDependencies = {
database: Database;
// Remote is the direct connection to the original relay/generator for reccovery
remote: BaseRemote;
}
export type OracleEvent = {
broadcast: OracleMetadataMessage | OraclePriceMessage;
}
export class Oracle extends EventEmitter<OracleEvent> {
static async from(dependencies: OracleDependencies, publicKey: string) {
return new Oracle(dependencies, publicKey);
}
private debug: Debugger;
private availableMessages: AvailableMessageCache;
private database: Database;
private remote: BaseRemote;
private recoveryMutex = new Mutex();
constructor(private dependencies: OracleDependencies, private publicKey: string) {
super();
this.debug = debug(`Oracle:${this.publicKey}`);
this.availableMessages = new AvailableMessageCache(this.dependencies.database, this.publicKey);
this.database = dependencies.database;
this.remote = dependencies.remote;
}
async start() {
this.debug('Instance created');
}
async getOracleStore() {
return await this.database.getOracleStore(this.publicKey);
}
async processMessage(message: OracleMetadataMessage | OraclePriceMessage) {
const result = await this.insertMessage(message);
if (result) {
this.emit('broadcast', message);
}
this.recover();
}
async insertMessage(message: OracleMetadataMessage | OraclePriceMessage) {
// Make sure that the table exists
await this.getOracleStore();
const { signature } = message.toHexObject();
// Update the cache
this.availableMessages.addMessage(message.messageSequence);
if (OracleMessage.isMetadataMessage(message)) {
const result = await this.database.db.insertInto(`messages-${this.publicKey}`).values({
id: message.messageSequence,
signature,
timestamp: message.messageTimestamp,
sequence: message.messageSequence,
metadataType: message.metadataType,
metadata: message.metadataContent,
}).onConflict((oc) => oc.doNothing()).executeTakeFirst();
if (result?.numInsertedOrUpdatedRows) {
return true;
}
}
if (OracleMessage.isPriceMessage(message)) {
const result = await this.database.db.insertInto(`messages-${this.publicKey}`).values({
id: message.messageSequence,
signature,
timestamp: message.messageTimestamp,
sequence: message.messageSequence,
priceSequence: message.priceSequence,
price: message.priceValue,
}).onConflict((oc) => oc.doNothing()).executeTakeFirst();
if (result?.numInsertedOrUpdatedRows) {
return true;
}
}
return false;
}
async getAvailableMessages() {
return await this.availableMessages.getAvailableMessages();
}
async getGaps() {
return await this.availableMessages.getGaps();
}
async getLastMessage() {
const oracleStore = await this.database.getOracleStore(this.publicKey);
return await oracleStore.select('sequence').orderBy('sequence', 'desc').limit(1).executeTakeFirst();
}
async recover(): Promise<void> {
const unlockRecovery = await this.recoveryMutex.acquire();
try {
let gaps = await this.availableMessages.getGaps();
if (gaps.length === 0) {
return;
}
while(gaps.length > 0) {
// Take the first gap
const gap = gaps[0];
if (!gap) break;
// Set our batch size
const maxBatchSize = 5000;
const batchSize = Math.min(gap.end - gap.start + 1, maxBatchSize);
// Set our start and end
const start = gap.start;
const end = Math.min(gap.end, start + batchSize - 1);
this.debug(`Recovering messages ${start} to ${end}`);
// Get the missing messages
const missingMessages = await this.remote.getOracleMessages({
publicKey: this.publicKey,
minMessageSequence: start,
maxMessageSequence: end,
count: batchSize,
});
// Insert the missing messages
for (const message of missingMessages) {
await this.insertMessage(message);
}
// Update the gaps
gaps = await this.availableMessages.getGaps();
const messageCount = await this.availableMessages.getOracleMessageCount();
const lastMessage = gaps.at(-1)?.end;
this.debug(`Recovered messages ${start} to ${end} oracle messages (${messageCount} / ${lastMessage})`);
await new Promise(resolve => setTimeout(resolve, 2000));
}
this.debug('Oracle messages recovered');
} catch (error) {
this.debug('Error recovering oracle messages', error);
} finally {
unlockRecovery();
}
}
}

0
src/routes/index.ts Normal file
View File

188
src/routes/oracles.ts Normal file
View File

@@ -0,0 +1,188 @@
import { numberToBinInt32LE, flattenBinArray, binToHex } from '@bitauth/libauth';
import { OracleMessage } from '@generalprotocols/oracle-client';
import { z } from 'zod';
import debug, { type Debugger } from 'debug';
import type { Database } from '~/services/db.js'
import type { Request, RouteOptions } from './types.js'
import type { OracleManager } from '~/oracles/oracle-manager.js';
import { AvailableMessageCache } from '~/cache/available-messages.js';
export class OraclesRoute {
private debug: Debugger;
constructor(private readonly database: Database, private readonly oracleMessages: OracleManager) {
this.debug = debug(`OraclesRoute`);
}
// NOTE: The idea is we can make this transport agnostic.
// These same routes can be registered to work with other transports (express, ZMQ, LibP2P, etc).
async getRoutes(): Promise<RouteOptions[]> {
return [
{
method: "GET",
url: "/api/v1/oracles",
handler: this.getOracles.bind(this),
},
{
method: "GET",
url: "/api/v1/oraclemessages",
handler: this.getOracleMessages.bind(this),
},
{
method: "GET",
url: 'count',
handler: this.getOracleMessageCount.bind(this),
}
] as RouteOptions[];
}
async getOracles(request: Request) {
console.log('getOracles', request);
return Object.keys(await this.database.getOracles()).map((oracle) => ({
publicKey: oracle,
}));
}
async getOracleMessages(request: Request) {
const parsedRequest = OraclesRoute.parseGetOracleMessagesRequest.parse(request.body);
// Get the oracles store for the given public key.
const oracleStore = await this.oracleMessages.oracles[parsedRequest.publicKey]?.getOracleStore();
if (!oracleStore) {
return [];
}
// Debugging: Search the database and check how many messages are in the database.
// const messageCount = await this.database.db.selectFrom(`messages-${parsedRequest.publicKey}`).selectAll().execute();
this.debug('getOracleMessages', parsedRequest);
let query = oracleStore.selectAll();
if (parsedRequest.minMessageTimestamp)
query = query.where('timestamp', '>=', parsedRequest.minMessageTimestamp);
if (parsedRequest.maxMessageTimestamp)
query = query.where('timestamp', '<=', parsedRequest.maxMessageTimestamp);
if (parsedRequest.minMessageSequence)
query = query.where('sequence', '>=', parsedRequest.minMessageSequence);
if (parsedRequest.maxMessageSequence)
query = query.where('sequence', '<=', parsedRequest.maxMessageSequence);
if (parsedRequest.minDataSequence)
query = query.where('priceSequence', '>=', parsedRequest.minDataSequence);
if (parsedRequest.maxDataSequence)
query = query.where('priceSequence', '<=', parsedRequest.maxDataSequence);
if (parsedRequest.minMetadataType)
query = query.where('metadataType', '>=', parsedRequest.minMetadataType);
if (parsedRequest.maxMetadataType)
query = query.where('metadataType', '<=', parsedRequest.maxMetadataType);
query = query.limit(parsedRequest.count || 10000);
const messages = await query.orderBy('sequence', 'desc').execute()
this.debug('getOracleMessages', messages.length);
// Compile the message values into a message format
return await Promise.all(messages.map(async (message) => {
// Convert the message timestamp, message sequence, metadata type, and metadata content to binary.
const messageTimestamp = numberToBinInt32LE(
message.timestamp,
);
const messageSequence = numberToBinInt32LE(
message.sequence,
);
const dataSequenceOrType = numberToBinInt32LE(message.metadataType || message.priceSequence || 0);
const priceContent = message.price && numberToBinInt32LE(message.price);
const metadataContent = new TextEncoder().encode(
message.metadata || '',
);
// Concatenate the message timestamp, message sequence, data sequence or type, and content.
const messageBin = flattenBinArray([
messageTimestamp,
messageSequence,
dataSequenceOrType,
(priceContent || metadataContent),
]);
return OracleMessage.from({
signature: message.signature,
message: binToHex(messageBin),
publicKey: parsedRequest.publicKey,
}).toHexObject();
}));
}
static parseGetOracleMessagesRequest =
z.object({
minMessageTimestamp: z.coerce.number().optional(),
maxMessageTimestamp: z.coerce.number().optional(),
minMessageSequence: z.coerce.number().optional(),
maxMessageSequence: z.coerce.number().optional(),
minDataSequence: z.coerce.number().optional(),
maxDataSequence: z.coerce.number().optional(),
minMetadataType: z.coerce.number().optional(),
maxMetadataType: z.coerce.number().optional(),
count: z.coerce.number().optional(),
publicKey: z.string(),
})
// Ensure ranges obey min < max.
.refine((data) => {
if (data.minDataSequence && data.maxDataSequence) {
return data.minDataSequence <= data.maxDataSequence;
}
return true;
}, { message: 'minDataSequence cannot be greater than maxDataSequence' })
.refine((data) => {
if (data.minMetadataType && data.maxMetadataType) {
return data.minMetadataType <= data.maxMetadataType;
}
return true;
}, { message: 'minMetadataType cannot be greater than maxMetadataType' })
.refine((data) => {
if (data.minMessageSequence && data.maxMessageSequence) {
return data.minMessageSequence <= data.maxMessageSequence;
}
return true;
}, { message: 'minMessageSequence cannot be greater than maxMessageSequence' })
.refine((data) => {
if (data.minMessageTimestamp && data.maxMessageTimestamp) {
return data.minMessageTimestamp <= data.maxMessageTimestamp;
}
return true;
}, { message: 'minMessageTimestamp cannot be greater than maxMessageTimestamp' })
// Dont allow both dataSequence and metadataType to be set. They are mutually exclusive.
.refine((data) => {
if ((data.minDataSequence || data.maxDataSequence) && (data.minMetadataType || data.maxMetadataType)) {
return false;
}
return true;
}, { message: 'Cannot set both dataSequence and metadataType' })
.strict()
static parseGetOracleMessageCountRequest =
z.object({
publicKey: z.string(),
})
.strict()
async getOracleMessageCount(request: Request) {
const parsedRequest = OraclesRoute.parseGetOracleMessageCountRequest.parse(request.body);
const availableMessages = await this.oracleMessages.oracles[parsedRequest.publicKey]?.getAvailableMessages();
if (!availableMessages) {
return 0;
}
const messageCount = await AvailableMessageCache.parseAvailableMessagesToMessageCount(availableMessages);
return messageCount;
}
}

13
src/routes/types.ts Normal file
View File

@@ -0,0 +1,13 @@
export type RouteOptions = {
method: "GET" | "POST" | "PUT" | "DELETE";
url: string;
handler: (request: Request) => Promise<unknown>;
}
export class Request {
constructor(public body: unknown) {
this.body = body;
}
}

View File

@@ -0,0 +1,5 @@
export abstract class BaseBroadcaster<T> {
abstract start(): Promise<void>;
abstract broadcast(data: T): Promise<void>;
}

View File

@@ -0,0 +1,53 @@
import type { SignedMessage } from "@generalprotocols/oracle-client";
import type { Response } from "express";
import debug, { type Debugger } from "debug";
import { BaseBroadcaster } from "./base-broadcaster.js";
export class BroadcasterSSE<T extends SignedMessage<string>> extends BaseBroadcaster<T> {
static async from() {
const broadcaster = new BroadcasterSSE();
await broadcaster.start();
return broadcaster;
}
private clients: Set<Response> = new Set();
private debug: Debugger;
constructor() {
super();
this.clients = new Set();
this.debug = debug('Broadcaster:SSE');
}
async start() {
this.debug('SSE broadcaster is running');
}
async broadcast(data: T) {
this.clients.forEach((client: Response) => {
client.write(`event: ${data.publicKey}\n`);
client.write(`data: ${JSON.stringify(data)}\n\n`);
});
this.debug('SSE broadcasted message', data);
}
async subscribe(res: Response) {
this.clients.add(res);
this.debug('SSE subscribed to client');
res.on('close', () => {
this.clients.delete(res);
});
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
res.write('retry: 3000\n\n');
}
}

View File

@@ -0,0 +1,64 @@
import { binToHex, flattenBinArray, hexToBin, opcodeToPushLength } from '@bitauth/libauth';
import { OracleProtocol, type SignedMessage } from '@generalprotocols/oracle-client';
import { Publisher } from 'zeromq'
import debug, { type Debugger } from "debug";
import { Mutex } from 'async-mutex';
import { BaseBroadcaster } from "./base-broadcaster.js";
const OP_RETURN = hexToBin('6A');
export class BroadcasterZMQ<T extends SignedMessage<string>> extends BaseBroadcaster<T> {
static async from() {
const broadcaster = new BroadcasterZMQ();
await broadcaster.start();
return broadcaster;
}
private publisher: Publisher;
private debug: Debugger;
private sendLock = new Mutex();
constructor() {
super();
this.publisher = new Publisher();
this.debug = debug('Broadcaster:ZMQ');
}
async start() {
await this.publisher.bind('tcp://0.0.0.0:7084');
this.debug('ZMQ publisher is running on %s:%s', '0.0.0.0', 7084);
}
async broadcast(data: T) {
const messageTopic = binToHex(flattenBinArray([ OP_RETURN, getPushDataOpcode(4), OracleProtocol.IDENTIFIER ]));
const messageBin = hexToBin(data.message);
const publicKey = hexToBin(data.publicKey);
const signature = hexToBin(data.signature);
const messageContent = binToHex(flattenBinArray([
getPushDataOpcode(messageBin.length), messageBin,
getPushDataOpcode(publicKey.length), publicKey,
getPushDataOpcode(signature.length), signature,
]));
await this.sendLock.runExclusive(async () => {
await this.publisher.send([messageTopic, messageContent]);
});
this.debug('ZMQ publisher sent message: ', messageTopic, messageContent);
}
}
export const getPushDataOpcode = function(byteLength: number): Uint8Array
{
if(byteLength === 0) return Uint8Array.from([ 0x4c, 0x00 ]);
if(byteLength < 76) return Uint8Array.from([ byteLength ]);
if(byteLength < 256) return Uint8Array.from([ 0x4c, byteLength ]);
throw Error('byteLength is too large.');
};
const bin = new Uint8Array([0x10, 0xe4, 0x53, 0xbc, 0x68, 0x9c, 0xdc, 0x13, 0x00, 0x83, 0xdc, 0x13])

93
src/services/db.ts Normal file
View File

@@ -0,0 +1,93 @@
import { Kysely, sql, SqliteDialect, type SelectQueryBuilder } from "kysely";
import SqliteDatabase from 'better-sqlite3'
import { type DatabaseSchema } from '~/database/schema.js';
export class Database {
static async from(databaseFilePath: string) {
// Create the database
const sqlite = new SqliteDatabase(databaseFilePath);
// Set WAL mode as its faster (safe to do with better-sqlite3 as its synchronous)
sqlite.pragma('journal_mode = WAL');
// Wrap with Kysely
const db = new Kysely<DatabaseSchema>({
dialect: new SqliteDialect({
database: sqlite,
}),
});
// Create the oracles table if it doesn't exist
await db.schema.createTable('oracles')
.addColumn('id', 'integer', (col) => col.primaryKey())
.addColumn('publicKey', 'text').addUniqueConstraint('publicKey', ['publicKey'])
.ifNotExists().execute();
return new Database(db);
}
constructor(public readonly db: Kysely<DatabaseSchema>) {}
async getOracles() {
// Get all the oracles
const oracles = await this.db.selectFrom('oracles').selectAll().execute();
// Get the store for each oracle
const oracleStores = await Promise.all(oracles.map(async (oracle) => {
return {
...oracle,
store: await this.getOracleStore(oracle.publicKey),
};
}));
// Compile into an object of { publicKey: Table}
return oracleStores.reduce((acc, oracleStore) => {
acc[oracleStore.publicKey] = oracleStore.store;
return acc;
}, {} as Record<string, SelectQueryBuilder<DatabaseSchema, `messages-${string}`, {}>>);
}
// Returns an instance of the oracles messages table. If it doesn't exist, it will be created.
async getOracleStore(publicKey: string) {
// Create the oracle row in the oracles table if it doesn't exist
await this.db.schema.createTable('oracles')
.addColumn('id', 'integer', (col) => col.primaryKey())
.addColumn('publicKey', 'text').addUniqueConstraint('publicKey', ['publicKey'])
.ifNotExists().execute();
// Create the oracles table if it doesn't exist
await this.db.schema.createTable(`messages-${publicKey}`)
.addColumn('id', 'integer', (col) => col.primaryKey())
.addColumn('signature', 'text', (col) => col.notNull())
.addColumn('timestamp', 'integer', (col) => col.notNull())
.addColumn('sequence', 'integer', (col) => col.notNull().unique())
.addColumn('priceSequence', 'integer') // This may need to be unique, but not sure if null is counted as unique or not.
.addCheckConstraint(`priceSequenceCheck-${publicKey}`, sql`priceSequence IS NULL OR priceSequence > 0`)
.addColumn('price', 'integer')
.addColumn('metadataType', 'integer')
.addCheckConstraint(`metadataTypeCheck-${publicKey}`, sql`metadataType IS NULL OR metadataType < 0`)
.addColumn('metadata', 'text')
.ifNotExists().execute();
// Create indexes on the table if they don't exist
await this.db.schema.createIndex(`messages-${publicKey}-timestamp`).on(`messages-${publicKey}`).columns(['timestamp']).ifNotExists().execute();
await this.db.schema.createIndex(`messages-${publicKey}-sequence`).on(`messages-${publicKey}`).columns(['sequence']).ifNotExists().execute();
await this.db.schema.createIndex(`messages-${publicKey}-priceSequence`).on(`messages-${publicKey}`).columns(['priceSequence']).ifNotExists().execute();
await this.db.schema.createIndex(`messages-${publicKey}-price`).on(`messages-${publicKey}`).columns(['price']).ifNotExists().execute();
await this.db.schema.createIndex(`messages-${publicKey}-metadataType`).on(`messages-${publicKey}`).columns(['metadataType']).ifNotExists().execute();
// Return the instance of the oracles messages table.
return this.db.selectFrom(`messages-${publicKey}`)
}
}
// const db = await Database.from();
// const oracles = await db.getOracles();
// Object.values(oracles).forEach(async (oracle) => {
// const messages = await oracle.selectAll().orderBy('timestamp', 'desc').limit(10).execute();
// console.log(messages);
// });
// console.log(oracles);

View File

@@ -0,0 +1,8 @@
import type { OracleMetadataMessage, OraclePriceMessage } from "@generalprotocols/oracle-client";
import type { MessagesRequest } from "@generalprotocols/oracle-client";
export abstract class BaseRemote {
abstract start(): Promise<void>;
abstract getOracleMessages(params: MessagesRequest): Promise<Array<OracleMetadataMessage | OraclePriceMessage>>
}

View File

@@ -0,0 +1,25 @@
import type { MessagesRequest, OraclePriceMessage, OracleMetadataMessage } from "@generalprotocols/oracle-client";
import { BaseRemote } from "./base-remote.js";
export class RemoteComposite extends BaseRemote {
constructor(private remotes: BaseRemote[]) {
super();
}
async start() {
await Promise.all(this.remotes.map((remote) => remote.start()));
}
async getOracleMessages(params: MessagesRequest) {
const messages = await Promise.all(this.remotes.map((remote) => remote.getOracleMessages(params)));
// Dedupe the messages
const messageMap = new Map<string, OracleMetadataMessage | OraclePriceMessage>();
for (const message of messages.flat()) {
messageMap.set(message.messageSequence.toString(), message);
}
return Array.from(messageMap.values());
}
}

View File

@@ -0,0 +1,91 @@
import express, { type Application, type Request as ExpressRequest, type Response as ExpressResponse } from 'express';
import cors from 'cors';
import debug, { type Debugger } from 'debug';
import { type RouteOptions, Request } from '~/routes/types.js';
import { BroadcasterSSE } from '~/services/broadcaster/broadcaster-sse.js';
import type { SignedMessage } from '@generalprotocols/oracle-client';
export type RouteHandlerExpressDependencies = {
routes: Array<RouteOptions>
port: number
host: string
broadcasterSSE: BroadcasterSSE<SignedMessage<string>>;
}
export class RouteHandlerExpress {
static async from(deps: RouteHandlerExpressDependencies) {
const handler = new RouteHandlerExpress(deps);
await handler.start();
return handler;
}
private app: Application;
private deps: RouteHandlerExpressDependencies;
private debug: Debugger;
constructor(deps: Partial<RouteHandlerExpressDependencies>) {
this.deps = {
port: 3000,
host: '0.0.0.0',
routes: [],
broadcasterSSE: new BroadcasterSSE(),
...deps,
}
this.app = express();
this.debug = debug('Router:Express');
}
async start() {
this.app.use(express.json());
this.app.use(express.urlencoded({ extended: true }));
this.app.use(cors());
this.app.enable('trust proxy');
this.app.set('json spaces', 2);
// Test debug. Just print all requests
// this.app.use((req, res, next) => {
// this.debug('Request received', req.method, req.url);
// next();
// });
// Create our route handlers
for (const route of this.deps.routes) {
this.app[route.method.toLowerCase() as keyof Application](route.url, async (req: ExpressRequest, res: ExpressResponse) => {
const request = new Request(req.body || req.query || req.params || {});
const responseBody = await route.handler(request)
res.json(responseBody);
});
}
// SPECIAL: Handle SSE requests by adding the request to the SSE Broadcaster
this.app.get('/sse/v1/messages', (_req, res) => {
this.deps.broadcasterSSE.subscribe(res);
});
this.debug(`Starting route handler on port ${this.deps.port}`);
await new Promise((resolve, reject) => {
const server = this.app.listen(this.deps.port, this.deps.host, (error) => {
if (error) {
this.debug('Error starting server', error);
reject(error);
}
});
server.on('listening', () => {
const address = server.address();
if (typeof address === 'string' || address === null) {
this.debug(`Failed to start server: ${address}`);
return reject(new Error(`Failed to start server: ${address}`));
}
this.debug(`RouteHandlerExpress is running on ${address.address}:${address.port}`);
resolve(undefined);
});
});
}
}

View File

@@ -0,0 +1,102 @@
import { Router } from 'zeromq';
import debug, { type Debugger } from 'debug';
import { replacer, reviver } from '~/utils/extended-json.js';
import { Request } from '~/routes/types.js';
export type RouteHandlerZMQDependencies = {
port: number;
host: string;
onRequest: (req: Request) => Promise<unknown>;
}
export class RouteHandlerZMQ {
static async from(deps: RouteHandlerZMQDependencies) {
const handler = new RouteHandlerZMQ(deps);
await handler.start();
return handler;
}
private router: Router;
private deps: RouteHandlerZMQDependencies;
private debug: Debugger;
constructor(deps: Partial<RouteHandlerZMQDependencies>) {
this.deps = {
port: 7083,
host: '0.0.0.0',
onRequest: async () => {
return {
error: 'Not implemented',
};
},
...deps
};
this.router = new Router();
this.debug = debug(`RouteHandlerZMQ (${this.deps.host}:${this.deps.port})`);
}
async start() {
await this.router.bind(`tcp://${this.deps.host}:${this.deps.port}`);
this.debug('ZMQ router is running on %s:%s', this.deps.host, this.deps.port);
for await (const [reqId, delimeter, content] of this.router) {
if (!reqId || !delimeter || !content) {
this.debug('Invalid request received');
continue;
}
const response = new ResponseZMQ(this.router, reqId, delimeter);
await this.handleRequest(content, response);
}
}
async stop() {
await this.router.close();
this.debug('ZMQ router is stopped');
}
async handleRequest(req: Buffer, response: ResponseZMQ) {
const parsedContent = JSON.parse(req.toString(), reviver);
const request = new Request(parsedContent);
if (!parsedContent) {
this.debug('Invalid request received');
await response.send({
error: 'Invalid request received',
});
return;
}
try {
const responseBody = await this.deps.onRequest(request);
if (!responseBody) throw new Error('Response body is undefined');
if (Array.isArray(responseBody)) {
this.debug(`Response with ${responseBody.length} items`);
} else {
this.debug(`Response body\n${JSON.stringify(responseBody, null, 2)}`);
}
await response.send(responseBody);
} catch (error) {
this.debug('Error handling request', error);
await response.send({
error: 'Internal server error',
});
return;
}
}
}
export class ResponseZMQ {
constructor(private router: Router, private id: Buffer, private delimeter: Buffer) {}
async send(content: unknown) {
const contentString = JSON.stringify(content, replacer);
await this.router.send([ this.id, this.delimeter, contentString ]);
}
}

View File

@@ -0,0 +1,9 @@
import { EventEmitter } from "~/utils/event-emitter.js";
export type BaseSubscriberEvent<T> = {
message: T;
}
export abstract class BaseSubscriber<T> extends EventEmitter<BaseSubscriberEvent<T>> {
abstract start(): Promise<void>;
}

View File

@@ -0,0 +1,51 @@
import { OracleClient, type OracleMetadataMessage, type OraclePriceMessage } from "@generalprotocols/oracle-client";
import { BaseSubscriber } from "./base-subscriber.js";
import debug, { type Debugger } from "debug";
export class SubscriberSSE extends BaseSubscriber<OracleMetadataMessage | OraclePriceMessage> {
static async from(url: string = 'https://oracles.generalprotocols.com') {
const subscriber = new SubscriberSSE(url);
await subscriber.start();
return subscriber;
}
private client: OracleClient;
private debug: Debugger;
constructor(
private url: string,
) {
super();
const debugLogger = debug(`Subscriber:SSE (${this.url})`);
this.debug = debugLogger;
this.client = new OracleClient({
baseURL: this.url,
options: {
sse: {
onConnected: () => {
this.debug('Connected to SSE Server');
},
onDisconnected: () => {
this.debug('Disconnected from SSE Server');
},
onError: (error) => {
this.debug('Error from SSE Server:', error);
},
}
},
})
this.client.setOnMessage((message: OracleMetadataMessage | OraclePriceMessage) => {
this.emit('message', message);
});
}
async start() {
this.debug('Starting SSE subscriber');
await this.client.start();
this.debug('Subscribed to SSE endpoint');
}
}

150
src/utils/event-emitter.ts Normal file
View File

@@ -0,0 +1,150 @@
export type EventMap = Record<string, unknown>;
type Listener<T> = (detail: T) => void;
interface ListenerEntry<T> {
listener: Listener<T>;
wrappedListener: Listener<T>;
debounceTime?: number |undefined;
once?: boolean | undefined;
}
export type OffCallback = () => void;
export class EventEmitter<T extends EventMap> {
private listeners: Map<keyof T, Set<ListenerEntry<T[keyof T]>>> = new Map();
on<K extends keyof T>(
type: K,
listener: Listener<T[K]>,
debounceMilliseconds?: number,
): OffCallback {
const wrappedListener =
debounceMilliseconds && debounceMilliseconds > 0
? this.debounce(listener, debounceMilliseconds)
: listener;
if (!this.listeners.has(type)) {
this.listeners.set(type, new Set());
}
const listenerEntry: ListenerEntry<T[K]> = {
listener,
wrappedListener,
debounceTime: debounceMilliseconds,
};
this.listeners.get(type)?.add(listenerEntry as ListenerEntry<T[keyof T]>);
// Return an "off" callback that can be called to stop listening for events.
return () => this.off(type, listener);
}
once<K extends keyof T>(
type: K,
listener: Listener<T[K]>,
debounceMilliseconds?: number,
): OffCallback {
const wrappedListener: Listener<T[K]> = (detail: T[K]) => {
this.off(type, listener);
listener(detail);
};
const debouncedListener =
debounceMilliseconds && debounceMilliseconds > 0
? this.debounce(wrappedListener, debounceMilliseconds)
: wrappedListener;
if (!this.listeners.has(type)) {
this.listeners.set(type, new Set());
}
const listenerEntry: ListenerEntry<T[K]> = {
listener,
wrappedListener: debouncedListener,
debounceTime: debounceMilliseconds,
once: true,
};
this.listeners.get(type)?.add(listenerEntry as ListenerEntry<T[keyof T]>);
// Return an "off" callback that can be called to stop listening for events.
return () => this.off(type, listener);
}
off<K extends keyof T>(type: K, listener: Listener<T[K]>): void {
const listeners = this.listeners.get(type);
if (!listeners) return;
const listenerEntry = Array.from(listeners).find(
(entry) =>
entry.listener === listener || entry.wrappedListener === listener,
);
if (listenerEntry) {
listeners.delete(listenerEntry);
}
}
emit<K extends keyof T>(type: K, payload: T[K]): boolean {
const listeners = this.listeners.get(type);
if (!listeners) return false;
listeners.forEach((entry) => {
entry.wrappedListener(payload);
});
return listeners.size > 0;
}
removeAllListeners(): void {
this.listeners.clear();
}
async waitFor<K extends keyof T>(
type: K,
predicate: (payload: T[K]) => boolean,
timeoutMs?: number,
): Promise<T[K]> {
return new Promise((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const listener = (payload: T[K]) => {
if (predicate(payload)) {
// Clean up
this.off(type, listener);
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
resolve(payload);
}
};
// Set up timeout if specified
if (timeoutMs !== undefined) {
timeoutId = setTimeout(() => {
this.off(type, listener);
reject(new Error(`Timeout waiting for event "${String(type)}"`));
}, timeoutMs);
}
this.on(type, listener);
});
}
private debounce<K extends keyof T>(
func: Listener<T[K]>,
wait: number,
): Listener<T[K]> {
let timeout: ReturnType<typeof setTimeout>;
return (detail: T[K]) => {
if (timeout !== null) {
clearTimeout(timeout);
}
timeout = setTimeout(() => {
func(detail);
}, wait);
};
}
}

117
src/utils/extended-json.ts Normal file
View File

@@ -0,0 +1,117 @@
import { binToHex, hexToBin } from '@bitauth/libauth';
// Define a list of supported view types.
const dataViews: { [index: string]: any } =
{
Int8Array,
Int16Array,
Int32Array,
Uint8Array,
Uint16Array,
Uint32Array,
Uint8ClampedArray,
Float32Array,
Float64Array,
};
// Add the BigInt arrays if supported on the current platform.
if(typeof BigInt64Array !== 'undefined')
{
dataViews.BigInt64Array = BigInt64Array;
dataViews.BigUint64Array = BigUint64Array;
}
/**
* Utility function to be used with JSON.stringify in order to encode ArrayBuffers and their data.
*
* @param key {string | number} the key/index of the data to replace.
* @param value {any} the value of the data to replace.
*
* @returns {any}
*/
const replacer = function(key: string | number, value: any)
{
// If the value if a big integer..
if(typeof value === 'bigint')
{
// Return a replacement object, encoding the number as a string.
return { $BigInt: value.toString() };
}
// If the value is a typed array buffer..
if(ArrayBuffer.isView(value))
{
// Create an replacement object, encoding the data as a hex string.
const replacement =
{
$ArrayBuffer:
{
View: value.constructor.name,
Data: binToHex(new Uint8Array(value.buffer)),
},
};
// Return the replacement.
return replacement;
}
// If the value is non-typed array buffer..
if(value instanceof ArrayBuffer)
{
// Create an replacement object, encoding the data as a hex string.
const replacement =
{
$ArrayBuffer:
{
View: value.constructor.name,
Data: binToHex(new Uint8Array(value)),
},
};
// Return the replacement.
return replacement;
}
// The value is not a typed array, so return the original value.
return value;
};
/**
* Utility function to be used with JSON.parse in order to decode ArrayBuffers and their data.
*
* @param key {string | number} the key/index of the data to restore.
* @param value {any} the value of the data to restore.
*
* @returns {any}
*/
const reviver = function(key: string | number, value: any)
{
// If the data is a big integer, encoded as a string..
if(value.$BigInt)
{
// Return the number as a big integer.
return BigInt(value.$BigInt);
}
// If the data is an array buffer..
if(value.$ArrayBuffer)
{
// If it lacks a data view..
if(value.$ArrayBuffer.View === 'ArrayBuffer')
{
// Return the buffer containing the data.
return hexToBin(value.$ArrayBuffer.Data).buffer;
}
// Since it has a type view, return the corresponding view of the data.
return new dataViews[value.$ArrayBuffer.View](hexToBin(value.$ArrayBuffer.Data).buffer);
}
// The value is not a typed array, so return the original value.
return value;
};
export { binToHex, hexToBin, replacer, reviver };

48
tsconfig.json Normal file
View File

@@ -0,0 +1,48 @@
{
// Visit https://aka.ms/tsconfig to read more about this file
"compilerOptions": {
// File Layout
"rootDir": "./src",
// "outDir": "./dist",
// Environment Settings
// See also https://aka.ms/tsconfig/module
"module": "nodenext",
"target": "esnext",
"types": ["node"],
// For nodejs:
// "lib": ["esnext"],
// "types": ["node"],
// and npm install -D @types/node
// Other Outputs
"sourceMap": true,
"declaration": true,
"declarationMap": true,
// Stricter Typechecking Options
"noUncheckedIndexedAccess": true,
"exactOptionalPropertyTypes": true,
// Style Options
// "noImplicitReturns": true,
// "noImplicitOverride": true,
// "noUnusedLocals": true,
// "noUnusedParameters": true,
// "noFallthroughCasesInSwitch": true,
// "noPropertyAccessFromIndexSignature": true,
// Recommended Options
"strict": true,
"jsx": "react-jsx",
"verbatimModuleSyntax": true,
"isolatedModules": true,
"noUncheckedSideEffectImports": true,
"moduleDetection": "force",
"skipLibCheck": true,
"paths": {
"~/*": ["./src/*"],
}
},
}