diff --git a/src/agentic-gpt.ts b/src/gpt/agentic-gpt.ts similarity index 100% rename from src/agentic-gpt.ts rename to src/gpt/agentic-gpt.ts diff --git a/src/gpt-response.ts b/src/gpt/gpt-response.ts similarity index 74% rename from src/gpt-response.ts rename to src/gpt/gpt-response.ts index 1648cff..557ac69 100644 --- a/src/gpt-response.ts +++ b/src/gpt/gpt-response.ts @@ -1,4 +1,6 @@ -import type { SSEvent } from './sse-session.js'; +import type { SSEvent } from '../utils/sse-session.js'; + +import { ThenableIterator } from '../utils/thenable-iterator.js'; /** * JSON Schema type for tool parameter definitions @@ -121,13 +123,7 @@ export type GPTResponse = { }; } -export class MessageResponse implements PromiseLike { - private chunks: MessageChunk[] = []; - private iteratorConsumed = false; - private resolveResult!: (value: FinalResult) => void; - private resultPromise: Promise; - private iterator: AsyncIterable; - +export class GPTResponseIterator extends ThenableIterator { /** * Accumulates tool calls as they stream in from the API * Key is the tool call index, value is the partially completed tool call @@ -135,118 +131,54 @@ export class MessageResponse implements PromiseLike { private toolCallsInProgress: Map> = new Map(); constructor(iterator: AsyncIterable) { - this.iterator = iterator; - this.resultPromise = new Promise(resolve => { - this.resolveResult = resolve; - }); - } - - async *[Symbol.asyncIterator]() { - if (this.iteratorConsumed) { - throw new Error('GPTResponse can only be iterated once'); - } - - this.iteratorConsumed = true; - - for await (const rawChunk of this.iterator) { - const chunks = this.parseChunk(rawChunk); - - // parseChunk may return multiple chunks (e.g., when tool calls complete) - for (const chunk of chunks) { - this.chunks.push(chunk); - yield chunk; - } - } - - this.resolveResult(this.buildResult()); - } - - then( - onfulfilled?: ((value: FinalResult) => TResult1 | PromiseLike) | null, - onrejected?: ((reason: unknown) => TResult2 | PromiseLike) | null, - ): Promise { - // If not yet iterated, consume the iterator to get the result - if (!this.iteratorConsumed) { - this.iteratorConsumed = true; - - (async () => { - for await (const rawChunk of this.iterator) { - const chunks = this.parseChunk(rawChunk); - - // parseChunk may return multiple chunks - for (const chunk of chunks) { - this.chunks.push(chunk); - } - } - - this.resolveResult(this.buildResult()); - })(); - } - - return this.resultPromise.then(onfulfilled, onrejected); - } - - catch(onrejected?: ((reason: unknown) => never) | null): Promise { - return this.resultPromise.catch(onrejected); - } - - finally(onfinally?: (() => void) | undefined): Promise { - return this.resultPromise.finally(onfinally); - } - - private buildResult(): FinalResult { - return { - reasoning: this.chunks - .filter(c => c.type === 'reasoning') - .map(c => 'content' in c ? c.content : '') - .join(''), - content: this.chunks - .filter(c => c.type === 'content') - .map(c => 'content' in c ? c.content : '') - .join(''), - toolCalls: this.chunks - .filter(c => c.type === 'tool_call') - .map(c => 'toolCall' in c ? c.toolCall : null) - .filter((tc): tc is ToolCall => tc !== null), - }; + super(iterator); } /** * Parses a raw SSE chunk and returns one or more MessageChunks * May return multiple chunks when tool calls complete */ - private parseChunk(rawChunk: SSEvent): MessageChunk[] { + parseChunk(rawChunk: SSEvent): MessageChunk[] { // console.log('Raw Chunk:', rawChunk); if (rawChunk.data === '[DONE]') { // When stream ends, flush any pending tool calls const completedToolCalls = this.flushToolCalls(); + // If there are completed tool calls, return them if (completedToolCalls.length > 0) { return completedToolCalls; } + // Otherwise, return an empty content chunk so we don't crash trying to parse invalid data return [{ type: 'content', content: '', }]; } + // Parse the chunk data as a GPTResponse const data = JSON.parse(rawChunk.data) as GPTResponse; + // Get the first choice const choice = data.choices[0]; + // If no choice found, throw an error if (!choice) { throw new Error('No choice found in chunk'); } + // Get the delta from the choice const delta = choice.delta; + // Get the finish reason from the choice or the response const finishReason = choice.finish_reason || data.finish_reason; // Handle tool calls if (delta.tool_calls) { + // Process the tool call deltas this.processToolCallDeltas(delta.tool_calls); // If finish_reason is 'tool_calls', all tool calls are complete if (finishReason === 'tool_calls') { + // If all tool calls are complete, flush them and return them as chunks return this.flushToolCalls(); } @@ -281,6 +213,29 @@ export class MessageResponse implements PromiseLike { return []; } + /** + * Parses a final result from the output chunks + * + * @param chunks - The output chunks to parse + * @returns The parsed final result + */ + parseFinal(chunks: MessageChunk[]): FinalResult { + return { + reasoning: chunks + .filter(c => c.type === 'reasoning') + .map(c => 'content' in c ? c.content : '') + .join(''), + content: chunks + .filter(c => c.type === 'content') + .map(c => 'content' in c ? c.content : '') + .join(''), + toolCalls: chunks + .filter(c => c.type === 'tool_call') + .map(c => 'toolCall' in c ? c.toolCall : null) + .filter((tc): tc is ToolCall => tc !== null), + }; + } + /** * Processes tool call deltas and accumulates them */ @@ -339,4 +294,4 @@ export class MessageResponse implements PromiseLike { return chunks; } -} \ No newline at end of file +} diff --git a/src/gpt.ts b/src/gpt/gpt.ts similarity index 87% rename from src/gpt.ts rename to src/gpt/gpt.ts index 4abc37c..8606d81 100644 --- a/src/gpt.ts +++ b/src/gpt/gpt.ts @@ -1,7 +1,6 @@ -import { SSESession } from './sse-session.js'; -import { EventEmitter } from './utils/event-emitter.js'; -import { MessageResponse } from './gpt-response.js'; -import type { ToolDefinition, ToolCall } from './gpt-response.js'; +import { SSESession } from '../utils/sse-session.js'; +import { EventEmitter } from '../utils/event-emitter.js'; +import { GPTResponseIterator, type ToolCall, type ToolDefinition } from './gpt-response.js'; export type GPTEventMap = { @@ -93,10 +92,10 @@ export class GPT extends EventEmitter { /** * Sends a message to the GPT API - * @param request - The request configuration including messages and optional tools + * @param message - The message to send * @returns The response from the GPT API */ - send(request: GPTRequest): MessageResponse { + send(request: GPTRequest): GPTResponseIterator { const config = this.config; const lazyIterator = (async function* () { @@ -132,6 +131,6 @@ export class GPT extends EventEmitter { yield* session.messages; })(); - return new MessageResponse(lazyIterator); + return new GPTResponseIterator(lazyIterator); } } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index a2d161e..0d60eda 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,24 +1,28 @@ -/** - * Export all public types and classes - */ -export { GPT } from './gpt.js'; -export type { GPTConfig, GPTRequest, Message, GPTEventMap } from './gpt.js'; -export { AgenticGPT } from './agentic-gpt.js'; -export type { ToolFunction, ToolRegistration, AgenticOptions } from './agentic-gpt.js'; -export { MessageResponse } from './gpt-response.js'; -export type { - MessageChunk, - FinalResult, - ToolDefinition, - ToolCall, - ToolCallDelta, - JSONSchema, - GPTResponse -} from './gpt-response.js'; +export { + GPT, + type GPTConfig, + type GPTRequest, + type Message, + type GPTEventMap +} from './gpt/gpt.js' -import { GPT } from './gpt.js'; -import { AgenticGPT } from './agentic-gpt.js'; -import type { ToolDefinition } from './gpt-response.js'; +export { + AgenticGPT, + type ToolFunction, + type ToolRegistration, + type AgenticOptions +} from './gpt/agentic-gpt.js'; + +export { + GPTResponseIterator, + type MessageChunk, + type FinalResult, + type ToolDefinition, + type ToolCall, + type ToolCallDelta, + type JSONSchema, + type GPTResponse +} from './gpt/gpt-response.js'; /** * Examples demonstrating the different usage patterns diff --git a/src/async-push-iterator.ts b/src/utils/async-push-iterator.ts similarity index 100% rename from src/async-push-iterator.ts rename to src/utils/async-push-iterator.ts diff --git a/src/exponential-backoff.ts b/src/utils/exponential-backoff.ts similarity index 100% rename from src/exponential-backoff.ts rename to src/utils/exponential-backoff.ts diff --git a/src/sse-session.ts b/src/utils/sse-session.ts similarity index 100% rename from src/sse-session.ts rename to src/utils/sse-session.ts diff --git a/src/utils/thenable-iterator.ts b/src/utils/thenable-iterator.ts new file mode 100644 index 0000000..92ed111 --- /dev/null +++ b/src/utils/thenable-iterator.ts @@ -0,0 +1,133 @@ + + +export class ThenableIterator< + InputChunk, + OutputChunk, + Output, +> implements PromiseLike { + /** Iterator to be consumed */ + protected iterator: AsyncIterable; + + /** Chunks to be parsed */ + protected chunks: OutputChunk[] = []; + + /** Whether the iterator has been consumed */ + protected iteratorConsumed = false; + + /** Promise to resolve the result */ + protected resultPromise: Promise; + + /** Resolver function to resolve the result promise */ + protected resolveResult!: (value: Output) => void; + + /** + * Creates a new ThenableIterator instance + * @param iterator - The iterator to be consumed + */ + constructor(iterator: AsyncIterable) { + // Store the iterator to be consumed + this.iterator = iterator; + + // Create a promise that we can bind to the thenable pattern + this.resultPromise = new Promise((resolve) => this.resolveResult = resolve); + } + + /** + * `Symbol.asyncIterator` method to implement the async iterator pattern + * Consumes the iterator if it has not yet been consumed, then yields the parsed chunks + * + * @returns An async iterator that yields the parsed chunks + * @throws An error if the iterator has already been consumed + */ + async *[Symbol.asyncIterator]() { + if (this.iteratorConsumed) { + throw new Error('Iterator can only be iterated once'); + } + + this.iteratorConsumed = true; + + for await (const chunk of this.iterator) { + const parsedChunks = this.parseChunk(chunk); + + for (const parsedChunk of parsedChunks) { + this.chunks.push(parsedChunk); + yield parsedChunk; + } + } + + this.resolveResult(this.parseFinal(this.chunks)); + } + + /** + * `then` method to implement the thenable pattern + * Consumes the iterator if it has not yet been consumed, then returns the result as a resolved promise + * + * @param onfulfilled - The function to call when the promise is fulfilled + * @param onrejected - The function to call when the promise is rejected + * @returns A promise that resolves to the result + */ + then( + onfulfilled?: ((value: Output) => TResult1 | PromiseLike) | null, + onrejected?: ((reason: unknown) => TResult2 | PromiseLike) | null, + ): Promise { + // If not yet iterated, consume the iterator to get the result + if (!this.iteratorConsumed) { + this.iteratorConsumed = true; + + // Consume the iterator parts + (async () => { + for await (const rawChunk of this.iterator) { + const chunks = this.parseChunk(rawChunk); + this.chunks.push(...chunks); + } + + // Build the result from the chunks + this.resolveResult(this.parseFinal(this.chunks)); + })(); + } + + return this.resultPromise.then(onfulfilled, onrejected); + } + + /** + * `catch` method to implement the promise pattern + * Returns a promise that rejects with the reason + * + * @param onrejected - The function to call when the promise is rejected + * @returns A promise that rejects with the reason + */ + catch(onrejected?: ((reason: unknown) => never) | null): Promise { + return this.resultPromise.catch(onrejected); + } + + /** + * `finally` method to implement the promise pattern + * Returns a promise that resolves to the result + * + * @param onfinally - The function to call when the promise is fulfilled or rejected + * @returns A promise that resolves to the result + */ + finally(onfinally?: (() => void) | undefined): Promise { + return this.resultPromise.finally(onfinally); + } + + /** + * Parses a chunk of input into an output chunk + * + * @param chunk - The chunk of input to parse + * @returns The parsed output chunk + */ + parseChunk(chunk: InputChunk): OutputChunk[] { + return [chunk] as unknown as OutputChunk[]; + } + + /** + * Parses a final result from the output chunks + * + * @param chunks - The output chunks to parse + * @returns The parsed final result + */ + parseFinal(chunks: OutputChunk[]): Output { + return chunks as unknown as Output; + } +}