Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f1021ba
fix(gemini): improve base64 image data parsing
catalinstanciu Jan 6, 2026
0b61baa
feat: implement usage tracking for AI requests
catalinstanciu Jan 6, 2026
71d64e4
feat: add round-robin routing strategy
catalinstanciu Jan 6, 2026
ae04737
feat: enhance usage stats with sortable columns and improved data han…
catalinstanciu Jan 6, 2026
76ce27a
feat: add password change functionality and dependencies
catalinstanciu Jan 6, 2026
1a7aa06
fix: improve code formatting and reduce auto-refresh interval
catalinstanciu Jan 6, 2026
993bfdf
feat: implement request tracking and enhance usage stats display
catalinstanciu Jan 6, 2026
d6735f8
feat: add request logging functionality and usage metrics display
catalinstanciu Jan 6, 2026
4f7da87
Merge pull request #1 from catalinstanciu92/feature/add-usage-metrics
catalinstanciu92 Jan 6, 2026
38bb39a
feat: add enable/disable toggle for provider connections
catalinstanciu Jan 6, 2026
be9582f
feat: implement provider connection reordering on create, update, and…
catalinstanciu Jan 6, 2026
aa79a9e
Merge pull request #2 from catalinstanciu92/feat/provider-toggle
catalinstanciu92 Jan 6, 2026
0b38a48
feat: add sticky round-robin routing strategy
catalinstanciu Jan 6, 2026
9a0a45c
fix: prevent race conditions in sticky round-robin
catalinstanciu Jan 6, 2026
0da48e4
feat(usage): implement cost tracking backend and pricing configuration
catalinstanciu Jan 6, 2026
42f5192
feat(ui): add cost tracking to usage dashboard and pricing settings
catalinstanciu Jan 6, 2026
1429c9c
Merge pull request #3 from catalinstanciu92/feat/round-robin-advanced
catalinstanciu92 Jan 6, 2026
9221ef6
Merge pull request #4 from catalinstanciu92/feat/cost-tracking
catalinstanciu92 Jan 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 75 additions & 5 deletions open-sse/handlers/chatCore.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,46 @@ import { createRequestLogger } from "../utils/requestLogger.js";
import { getModelTargetFormat, PROVIDER_ID_TO_ALIAS } from "../config/providerModels.js";
import { createErrorResult, parseUpstreamError, formatProviderError } from "../utils/error.js";
import { handleBypassRequest } from "../utils/bypassHandler.js";
import { saveRequestUsage, trackPendingRequest, appendRequestLog } from "@/lib/usageDb.js";

/**
* Extract usage from non-streaming response body
* Handles different provider response formats
*/
function extractUsageFromResponse(responseBody, provider) {
if (!responseBody) return null;

// OpenAI format
if (responseBody.usage) {
return {
prompt_tokens: responseBody.usage.prompt_tokens || 0,
completion_tokens: responseBody.usage.completion_tokens || 0,
cached_tokens: responseBody.usage.prompt_tokens_details?.cached_tokens,
reasoning_tokens: responseBody.usage.completion_tokens_details?.reasoning_tokens
};
}

// Claude format
if (responseBody.usage?.input_tokens !== undefined || responseBody.usage?.output_tokens !== undefined) {
return {
prompt_tokens: responseBody.usage.input_tokens || 0,
completion_tokens: responseBody.usage.output_tokens || 0,
cache_read_input_tokens: responseBody.usage.cache_read_input_tokens,
cache_creation_input_tokens: responseBody.usage.cache_creation_input_tokens
};
}

// Gemini format
if (responseBody.usageMetadata) {
return {
prompt_tokens: responseBody.usageMetadata.promptTokenCount || 0,
completion_tokens: responseBody.usageMetadata.candidatesTokenCount || 0,
reasoning_tokens: responseBody.usageMetadata.thoughtsTokenCount
};
}

return null;
}

/**
* Core chat handler - shared between SSE and Worker
Expand All @@ -20,8 +60,9 @@ import { handleBypassRequest } from "../utils/bypassHandler.js";
* @param {function} options.onCredentialsRefreshed - Callback when credentials are refreshed
* @param {function} options.onRequestSuccess - Callback when request succeeds (to clear error status)
* @param {function} options.onDisconnect - Callback when client disconnects
* @param {string} options.connectionId - Connection ID for usage tracking
*/
export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest }) {
export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId }) {
const { provider, model } = modelInfo;

const sourceFormat = detectFormat(body);
Expand Down Expand Up @@ -78,6 +119,12 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
const providerUrl = buildProviderUrl(provider, model, stream);
const providerHeaders = buildProviderHeaders(provider, credentials, stream, translatedBody);

// Track pending request
trackPendingRequest(model, provider, connectionId, true);

// Log start
appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => {});

// 2. Log converted request to provider
reqLogger.logConvertedRequest(providerUrl, providerHeaders, translatedBody);

Expand Down Expand Up @@ -111,6 +158,8 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
signal: streamController.signal
});
} catch (error) {
trackPendingRequest(model, provider, connectionId, false);
appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : 502}` }).catch(() => {});
if (error.name === "AbortError") {
streamController.handleError(error);
return createErrorResult(499, "Request aborted");
Expand Down Expand Up @@ -204,7 +253,9 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred

// Check provider response - return error info for fallback handling
if (!providerResponse.ok) {
trackPendingRequest(model, provider, connectionId, false);
const { statusCode, message } = await parseUpstreamError(providerResponse);
appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => {});
const errMsg = formatProviderError(new Error(message), provider, model, statusCode);
console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`);

Expand All @@ -216,13 +267,32 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred

// Non-streaming response
if (!stream) {
trackPendingRequest(model, provider, connectionId, false);
const responseBody = await providerResponse.json();

// Notify success - caller can clear error status if needed
if (onRequestSuccess) {
await onRequestSuccess();
}


// Log usage for non-streaming responses
const usage = extractUsageFromResponse(responseBody, provider);
appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => {});
if (usage) {
const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage.prompt_tokens || 0} | out=${usage.completion_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`;
console.log(`${COLORS.green}${msg}${COLORS.reset}`);

saveRequestUsage({
provider: provider || "unknown",
model: model || "unknown",
tokens: usage,
timestamp: new Date().toISOString(),
connectionId: connectionId || undefined
}).catch(err => {
console.error("Failed to save usage stats:", err.message);
});
}

return {
success: true,
response: new Response(JSON.stringify(responseBody), {
Expand Down Expand Up @@ -251,9 +321,9 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
// Create transform stream with logger for streaming response
let transformStream;
if (needsTranslation(targetFormat, sourceFormat)) {
transformStream = createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger);
transformStream = createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, model, connectionId);
} else {
transformStream = createPassthroughStreamWithLogger(provider, reqLogger);
transformStream = createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId);
}

// Pipe response through transform with disconnect detection
Expand Down
6 changes: 4 additions & 2 deletions open-sse/handlers/responsesHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import { createResponsesApiTransformStream } from "../transformer/responsesTrans
* @param {function} options.onCredentialsRefreshed - Callback when credentials are refreshed
* @param {function} options.onRequestSuccess - Callback when request succeeds
* @param {function} options.onDisconnect - Callback when client disconnects
* @param {string} options.connectionId - Connection ID for usage tracking
* @returns {Promise<{success: boolean, response?: Response, status?: number, error?: string}>}
*/
export async function handleResponsesCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect }) {
export async function handleResponsesCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, connectionId }) {
// Convert Responses API format to Chat Completions format
const convertedBody = convertResponsesApiFormat(body);

Expand All @@ -34,7 +35,8 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o
log,
onCredentialsRefreshed,
onRequestSuccess,
onDisconnect
onDisconnect,
connectionId
});

if (!result.success || !result.response) {
Expand Down
11 changes: 8 additions & 3 deletions open-sse/translator/helpers/geminiHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ export function convertOpenAIContentToParts(content) {
if (item.type === "text") {
parts.push({ text: item.text });
} else if (item.type === "image_url" && item.image_url?.url?.startsWith("data:")) {
const match = item.image_url.url.match(/^data:([^;]+);base64,(.+)$/);
if (match) {
const url = item.image_url.url;
const commaIndex = url.indexOf(",");
if (commaIndex !== -1) {
const mimePart = url.substring(5, commaIndex); // skip "data:"
const data = url.substring(commaIndex + 1);
const mimeType = mimePart.split(";")[0];

parts.push({
inlineData: { mime_type: match[1], data: match[2] }
inlineData: { mime_type: mimeType, data: data }
});
}
}
Expand Down
88 changes: 56 additions & 32 deletions open-sse/utils/stream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { translateResponse, initState } from "../translator/index.js";
import { FORMATS } from "../translator/formats.js";
import { saveRequestUsage, trackPendingRequest, appendRequestLog } from "@/lib/usageDb.js";

// Get HH:MM timestamp
function getTimeString() {
Expand Down Expand Up @@ -48,27 +49,42 @@ export const COLORS = {
};

// Log usage with cache info (green color)
function logUsage(provider, usage) {
function logUsage(provider, usage, model = null, connectionId = null) {
if (!usage) return;

const p = provider?.toUpperCase() || "UNKNOWN";
const inTokens = usage.prompt_tokens || 0;
const outTokens = usage.completion_tokens || 0;

let msg = `[${getTimeString()}] 📊 [USAGE] ${p} | in=${inTokens} | out=${outTokens}`;

if (connectionId) msg += ` | account=${connectionId.slice(0, 8)}...`;

if (usage.cache_creation_input_tokens) msg += ` | cache_write=${usage.cache_creation_input_tokens}`;
if (usage.cache_read_input_tokens) msg += ` | cache_read=${usage.cache_read_input_tokens}`;
if (usage.cached_tokens) msg += ` | cached=${usage.cached_tokens}`;
if (usage.reasoning_tokens) msg += ` | reasoning=${usage.reasoning_tokens}`;

console.log(`${COLORS.green}${msg}${COLORS.reset}`);

// Log to log.txt
appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => {});

// Save to DB
saveRequestUsage({
provider: provider || "unknown",
model: model || "unknown",
tokens: usage,
timestamp: new Date().toISOString(),
connectionId: connectionId || undefined
}).catch(err => {
console.error("Failed to save usage stats:", err.message);
});
}

// Parse SSE data line
function parseSSELine(line) {
if (!line || !line.startsWith("data:")) return null;

const data = line.slice(5).trim();
if (data === "[DONE]") return { done: true };

Expand All @@ -91,17 +107,17 @@ function parseSSELine(line) {
*/
export function formatSSE(data, sourceFormat) {
if (data.done) return "data: [DONE]\n\n";

// OpenAI Responses API format: has event field
if (data.event && data.data) {
return `event: ${data.event}\ndata: ${JSON.stringify(data.data)}\n\n`;
}

// Claude format: include event prefix
if (sourceFormat === FORMATS.CLAUDE && data.type) {
return `event: ${data.type}\ndata: ${JSON.stringify(data)}\n\n`;
}

return `data: ${JSON.stringify(data)}\n\n`;
}

Expand All @@ -121,21 +137,25 @@ const STREAM_MODE = {
* @param {string} options.sourceFormat - Client format (for translate mode)
* @param {string} options.provider - Provider name
* @param {object} options.reqLogger - Request logger instance
* @param {string} options.model - Model name
* @param {string} options.connectionId - Connection ID for usage tracking
*/
export function createSSEStream(options = {}) {
const {
mode = STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider = null,
reqLogger = null
const {
mode = STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider = null,
reqLogger = null,
model = null,
connectionId = null
} = options;

const decoder = new TextDecoder();
const encoder = new TextEncoder();
let buffer = "";
let usage = null;

// State for translate mode
const state = mode === STREAM_MODE.TRANSLATE ? { ...initState(sourceFormat), provider } : null;

Expand All @@ -150,7 +170,7 @@ export function createSSEStream(options = {}) {

for (const line of lines) {
const trimmed = line.trim();

// Passthrough mode: normalize and forward
if (mode === STREAM_MODE.PASSTHROUGH) {
if (trimmed.startsWith("data:") && trimmed.slice(5).trim() !== "[DONE]") {
Expand Down Expand Up @@ -202,6 +222,7 @@ export function createSSEStream(options = {}) {
},

flush(controller) {
trackPendingRequest(model, provider, connectionId, false);
try {
const remaining = decoder.decode();
if (remaining) buffer += remaining;
Expand All @@ -215,7 +236,7 @@ export function createSSEStream(options = {}) {
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(encoder.encode(output));
}
if (usage) logUsage(provider, usage);
if (usage) logUsage(provider, usage, model, connectionId);
return;
}

Expand Down Expand Up @@ -249,7 +270,7 @@ export function createSSEStream(options = {}) {
reqLogger?.appendConvertedChunk?.(doneOutput);
controller.enqueue(encoder.encode(doneOutput));

if (state?.usage) logUsage(state.provider || targetFormat, state.usage);
if (state?.usage) logUsage(state.provider || targetFormat, state.usage, model, connectionId);
} catch (error) {
console.log("Error in flush:", error);
}
Expand All @@ -258,21 +279,24 @@ export function createSSEStream(options = {}) {
}

// Convenience functions for backward compatibility
export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider = null, reqLogger = null) {
return createSSEStream({
mode: STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider,
reqLogger
export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider = null, reqLogger = null, model = null, connectionId = null) {
return createSSEStream({
mode: STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider,
reqLogger,
model,
connectionId
});
}

export function createPassthroughStreamWithLogger(provider = null, reqLogger = null) {
return createSSEStream({
mode: STREAM_MODE.PASSTHROUGH,
provider,
reqLogger
export function createPassthroughStreamWithLogger(provider = null, reqLogger = null, model = null, connectionId = null) {
return createSSEStream({
mode: STREAM_MODE.PASSTHROUGH,
provider,
reqLogger,
model,
connectionId
});
}

Loading