Files
LLM-Iterable-Thenable/src/gpt-response.ts

342 lines
8.7 KiB
TypeScript

import type { SSEvent } from './sse-session.js';
/**
* JSON Schema type for tool parameter definitions
*/
export type JSONSchema = {
type: 'object' | 'string' | 'number' | 'boolean' | 'array' | 'null';
properties?: Record<string, JSONSchema>;
items?: JSONSchema;
required?: string[];
enum?: (string | number)[];
description?: string;
additionalProperties?: boolean;
[key: string]: unknown;
};
/**
* OpenAI-format tool definition using JSON Schema
*/
export type ToolDefinition = {
type: 'function';
function: {
name: string;
description: string;
parameters: JSONSchema;
strict?: boolean;
};
};
/**
* Represents a complete tool call from the LLM
*/
export type ToolCall = {
id: string;
type: 'function';
function: {
name: string;
arguments: string; // JSON string
};
};
/**
* Delta format for streaming tool calls
*/
export type ToolCallDelta = {
index: number;
id?: string;
type?: 'function';
function?: {
name?: string;
arguments?: string;
};
};
/**
* Message chunk types that can be yielded during streaming
*/
export type MessageChunk =
| {
type: 'reasoning';
reasoning_details?: string;
content: string;
}
| {
type: 'content';
content: string;
}
| {
type: 'tool_call';
toolCall: ToolCall;
};
/**
* Final result after consuming all chunks
*/
export type FinalResult = {
reasoning: string;
content: string;
toolCalls: ToolCall[];
}
export type GPTResponse = {
id: string;
provider: string;
model: string;
object: string;
created: number;
choices: {
index: number;
delta: {
role?: 'user' | 'assistant' | 'system';
content?: string | null;
reasoning?: string;
reasoning_details?: {
type: string;
summary: string;
};
tool_calls?: ToolCallDelta[];
};
finish_reason?: 'stop' | 'tool_calls' | 'length' | 'content_filter' | null;
}[];
finish_reason?: 'stop' | 'tool_calls' | 'length' | 'content_filter' | null;
native_finish_reason?: string | null;
usage?: {
prompt_tokens: number;
completion_tokens: number;
total_tokens: number;
cost: number;
is_byok: boolean;
prompt_tokens_details: {
cached_tokens: number;
};
cost_details: {
upstream_inference_cost: number;
upstream_prompt_cost: number;
upstream_inference_completions_cost: number;
},
completion_tokens_details: {
reasoning_tokens: number;
}
};
}
export class MessageResponse implements PromiseLike<FinalResult> {
private chunks: MessageChunk[] = [];
private iteratorConsumed = false;
private resolveResult!: (value: FinalResult) => void;
private resultPromise: Promise<FinalResult>;
private iterator: AsyncIterable<SSEvent>;
/**
* Accumulates tool calls as they stream in from the API
* Key is the tool call index, value is the partially completed tool call
*/
private toolCallsInProgress: Map<number, Partial<ToolCall>> = new Map();
constructor(iterator: AsyncIterable<SSEvent>) {
this.iterator = iterator;
this.resultPromise = new Promise(resolve => {
this.resolveResult = resolve;
});
}
async *[Symbol.asyncIterator]() {
if (this.iteratorConsumed) {
throw new Error('GPTResponse can only be iterated once');
}
this.iteratorConsumed = true;
for await (const rawChunk of this.iterator) {
const chunks = this.parseChunk(rawChunk);
// parseChunk may return multiple chunks (e.g., when tool calls complete)
for (const chunk of chunks) {
this.chunks.push(chunk);
yield chunk;
}
}
this.resolveResult(this.buildResult());
}
then<TResult1 = FinalResult, TResult2 = never>(
onfulfilled?: ((value: FinalResult) => TResult1 | PromiseLike<TResult1>) | null,
onrejected?: ((reason: unknown) => TResult2 | PromiseLike<TResult2>) | null,
): Promise<TResult1 | TResult2> {
// If not yet iterated, consume the iterator to get the result
if (!this.iteratorConsumed) {
this.iteratorConsumed = true;
(async () => {
for await (const rawChunk of this.iterator) {
const chunks = this.parseChunk(rawChunk);
// parseChunk may return multiple chunks
for (const chunk of chunks) {
this.chunks.push(chunk);
}
}
this.resolveResult(this.buildResult());
})();
}
return this.resultPromise.then(onfulfilled, onrejected);
}
catch(onrejected?: ((reason: unknown) => never) | null): Promise<FinalResult> {
return this.resultPromise.catch(onrejected);
}
finally(onfinally?: (() => void) | undefined): Promise<FinalResult> {
return this.resultPromise.finally(onfinally);
}
private buildResult(): FinalResult {
return {
reasoning: this.chunks
.filter(c => c.type === 'reasoning')
.map(c => 'content' in c ? c.content : '')
.join(''),
content: this.chunks
.filter(c => c.type === 'content')
.map(c => 'content' in c ? c.content : '')
.join(''),
toolCalls: this.chunks
.filter(c => c.type === 'tool_call')
.map(c => 'toolCall' in c ? c.toolCall : null)
.filter((tc): tc is ToolCall => tc !== null),
};
}
/**
* Parses a raw SSE chunk and returns one or more MessageChunks
* May return multiple chunks when tool calls complete
*/
private parseChunk(rawChunk: SSEvent): MessageChunk[] {
// console.log('Raw Chunk:', rawChunk);
if (rawChunk.data === '[DONE]') {
// When stream ends, flush any pending tool calls
const completedToolCalls = this.flushToolCalls();
if (completedToolCalls.length > 0) {
return completedToolCalls;
}
return [{
type: 'content',
content: '',
}];
}
const data = JSON.parse(rawChunk.data) as GPTResponse;
const choice = data.choices[0];
if (!choice) {
throw new Error('No choice found in chunk');
}
const delta = choice.delta;
const finishReason = choice.finish_reason || data.finish_reason;
// Handle tool calls
if (delta.tool_calls) {
this.processToolCallDeltas(delta.tool_calls);
// If finish_reason is 'tool_calls', all tool calls are complete
if (finishReason === 'tool_calls') {
return this.flushToolCalls();
}
// Otherwise, don't yield anything yet (still accumulating)
return [];
}
// Handle reasoning chunks
if (delta.reasoning) {
const chunk: MessageChunk = {
type: 'reasoning',
content: delta.reasoning,
};
// Add reasoning_details if present
if (delta.reasoning_details?.summary) {
chunk.reasoning_details = delta.reasoning_details.summary;
}
return [chunk];
}
// Handle content chunks
if (delta.content !== undefined && delta.content !== null) {
return [{
type: 'content',
content: delta.content,
}];
}
// Empty chunk (e.g., role assignment)
return [];
}
/**
* Processes tool call deltas and accumulates them
*/
private processToolCallDeltas(deltas: ToolCallDelta[]): void {
for (const delta of deltas) {
const index = delta.index;
if (!this.toolCallsInProgress.has(index)) {
// Start a new tool call
this.toolCallsInProgress.set(index, {
id: delta.id || '',
type: 'function',
function: {
name: delta.function?.name || '',
arguments: delta.function?.arguments || '',
},
});
} else {
// Accumulate arguments for existing tool call
const existing = this.toolCallsInProgress.get(index)!;
if (delta.function?.arguments) {
existing.function!.arguments += delta.function.arguments;
}
// Update other fields if provided
if (delta.id) {
existing.id = delta.id;
}
if (delta.function?.name) {
existing.function!.name = delta.function.name;
}
}
}
}
/**
* Flushes all accumulated tool calls and returns them as chunks
*/
private flushToolCalls(): MessageChunk[] {
const chunks: MessageChunk[] = [];
// Convert accumulated tool calls to chunks
for (const [index, toolCall] of this.toolCallsInProgress.entries()) {
// Validate that the tool call is complete
if (toolCall.id && toolCall.function?.name && toolCall.function?.arguments !== undefined) {
chunks.push({
type: 'tool_call',
toolCall: toolCall as ToolCall,
});
}
}
// Clear the accumulator
this.toolCallsInProgress.clear();
return chunks;
}
}