diff --git a/README.md b/README.md index 7c4dea8..c93e087 100644 --- a/README.md +++ b/README.md @@ -1013,6 +1013,13 @@ Create `~/.agentmemory/.env`: # OPENAI_EMBEDDING_MODEL=text-embedding-3-small # OPENAI_EMBEDDING_DIMENSIONS=1536 # Required when the model is not in the known-models table +# Outbound LLM / embedding timeout +# AGENTMEMORY_LLM_TIMEOUT_MS=60000 # Default: 60 000 ms (60 s). Applies to every + # raw-fetch provider (Gemini, OpenRouter, MiniMax, + # OpenAI/Cohere/Voyage/OpenRouter embedding). + # Increase for slow networks or large batch calls; + # decrease to fail-fast on rate-limit holds. + # Search tuning # BM25_WEIGHT=0.4 # VECTOR_WEIGHT=0.6 diff --git a/src/providers/_fetch.ts b/src/providers/_fetch.ts new file mode 100644 index 0000000..ed9b5e8 --- /dev/null +++ b/src/providers/_fetch.ts @@ -0,0 +1,19 @@ +import { getEnvVar } from "../config.js"; + +export function fetchWithTimeout( + url: string, + init: RequestInit, + timeoutMs?: number, +): Promise { + const parsed = + timeoutMs ?? + Number.parseInt(getEnvVar("AGENTMEMORY_LLM_TIMEOUT_MS") ?? "60000", 10); + const ms = Number.isFinite(parsed) && parsed > 0 ? parsed : 60000; + + const ctl = new AbortController(); + const signal = init.signal + ? AbortSignal.any([init.signal, ctl.signal]) + : ctl.signal; + const t = setTimeout(() => ctl.abort(), ms); + return fetch(url, { ...init, signal }).finally(() => clearTimeout(t)); +} diff --git a/src/providers/embedding/cohere.ts b/src/providers/embedding/cohere.ts index 1a96cc4..02b7f13 100644 --- a/src/providers/embedding/cohere.ts +++ b/src/providers/embedding/cohere.ts @@ -1,5 +1,6 @@ import type { EmbeddingProvider } from "../../types.js"; import { getEnvVar } from "../../config.js"; +import { fetchWithTimeout } from "../_fetch.js"; const API_URL = "https://api.cohere.ai/v1/embed"; @@ -19,7 +20,7 @@ export class CohereEmbeddingProvider implements EmbeddingProvider { } async embedBatch(texts: string[]): Promise { - const response = await fetch(API_URL, { + const response = await fetchWithTimeout(API_URL, { method: "POST", headers: { Authorization: `Bearer ${this.apiKey}`, diff --git a/src/providers/embedding/gemini.ts b/src/providers/embedding/gemini.ts index 6cfb276..1401646 100644 --- a/src/providers/embedding/gemini.ts +++ b/src/providers/embedding/gemini.ts @@ -1,5 +1,6 @@ import type { EmbeddingProvider } from "../../types.js"; import { getEnvVar } from "../../config.js"; +import { fetchWithTimeout } from "../_fetch.js"; const BATCH_LIMIT = 100; const MODEL = "models/gemini-embedding-001"; @@ -25,7 +26,7 @@ export class GeminiEmbeddingProvider implements EmbeddingProvider { for (let i = 0; i < texts.length; i += BATCH_LIMIT) { const chunk = texts.slice(i, i + BATCH_LIMIT); - const response = await fetch(`${API_BASE}?key=${this.apiKey}`, { + const response = await fetchWithTimeout(`${API_BASE}?key=${this.apiKey}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ diff --git a/src/providers/embedding/openai.ts b/src/providers/embedding/openai.ts index 308479f..bbc0f9f 100644 --- a/src/providers/embedding/openai.ts +++ b/src/providers/embedding/openai.ts @@ -1,5 +1,6 @@ import type { EmbeddingProvider } from "../../types.js"; import { getEnvVar } from "../../config.js"; +import { fetchWithTimeout } from "../_fetch.js"; const DEFAULT_BASE_URL = "https://api.openai.com"; const DEFAULT_MODEL = "text-embedding-3-small"; @@ -70,7 +71,7 @@ export class OpenAIEmbeddingProvider implements EmbeddingProvider { async embedBatch(texts: string[]): Promise { const url = `${this.baseUrl}/v1/embeddings`; - const response = await fetch(url, { + const response = await fetchWithTimeout(url, { method: "POST", headers: { Authorization: `Bearer ${this.apiKey}`, diff --git a/src/providers/embedding/openrouter.ts b/src/providers/embedding/openrouter.ts index 7d4acb2..46999e5 100644 --- a/src/providers/embedding/openrouter.ts +++ b/src/providers/embedding/openrouter.ts @@ -1,5 +1,6 @@ import type { EmbeddingProvider } from "../../types.js"; import { getEnvVar } from "../../config.js"; +import { fetchWithTimeout } from "../_fetch.js"; const API_URL = "https://openrouter.ai/api/v1/embeddings"; @@ -23,7 +24,7 @@ export class OpenRouterEmbeddingProvider implements EmbeddingProvider { } async embedBatch(texts: string[]): Promise { - const response = await fetch(API_URL, { + const response = await fetchWithTimeout(API_URL, { method: "POST", headers: { Authorization: `Bearer ${this.apiKey}`, diff --git a/src/providers/embedding/voyage.ts b/src/providers/embedding/voyage.ts index b92fb60..14ddc36 100644 --- a/src/providers/embedding/voyage.ts +++ b/src/providers/embedding/voyage.ts @@ -1,5 +1,6 @@ import type { EmbeddingProvider } from "../../types.js"; import { getEnvVar } from "../../config.js"; +import { fetchWithTimeout } from "../_fetch.js"; const API_URL = "https://api.voyageai.com/v1/embeddings"; @@ -19,7 +20,7 @@ export class VoyageEmbeddingProvider implements EmbeddingProvider { } async embedBatch(texts: string[]): Promise { - const response = await fetch(API_URL, { + const response = await fetchWithTimeout(API_URL, { method: "POST", headers: { Authorization: `Bearer ${this.apiKey}`, diff --git a/src/providers/minimax.ts b/src/providers/minimax.ts index e912fab..72fc9ec 100644 --- a/src/providers/minimax.ts +++ b/src/providers/minimax.ts @@ -1,5 +1,6 @@ import type { MemoryProvider } from '../types.js' import { getEnvVar } from '../config.js' +import { fetchWithTimeout } from './_fetch.js' /** * MiniMax provider using raw fetch to call MiniMax's Anthropic-compatible API. @@ -40,7 +41,7 @@ export class MinimaxProvider implements MemoryProvider { private async call(systemPrompt: string, userPrompt: string): Promise { const url = `${this.baseUrl}/v1/messages` - const response = await fetch(url, { + const response = await fetchWithTimeout(url, { method: 'POST', headers: { 'Content-Type': 'application/json', diff --git a/src/providers/openrouter.ts b/src/providers/openrouter.ts index 219ce42..5c47bb0 100644 --- a/src/providers/openrouter.ts +++ b/src/providers/openrouter.ts @@ -1,4 +1,5 @@ import type { MemoryProvider } from "../types.js"; +import { fetchWithTimeout } from "./_fetch.js"; export class OpenRouterProvider implements MemoryProvider { name: string; @@ -32,7 +33,7 @@ export class OpenRouterProvider implements MemoryProvider { systemPrompt: string, userPrompt: string, ): Promise { - const response = await fetch(this.baseUrl, { + const response = await fetchWithTimeout(this.baseUrl, { method: "POST", headers: { "Content-Type": "application/json", diff --git a/test/fetch-timeout.test.ts b/test/fetch-timeout.test.ts new file mode 100644 index 0000000..2beb04a --- /dev/null +++ b/test/fetch-timeout.test.ts @@ -0,0 +1,197 @@ +import { describe, it, expect, vi, afterEach, beforeEach } from "vitest"; +import { fetchWithTimeout } from "../src/providers/_fetch.js"; +import { MinimaxProvider } from "../src/providers/minimax.js"; +import { OpenRouterProvider } from "../src/providers/openrouter.js"; +import { GeminiEmbeddingProvider } from "../src/providers/embedding/gemini.js"; +import { OpenAIEmbeddingProvider } from "../src/providers/embedding/openai.js"; +import { CohereEmbeddingProvider } from "../src/providers/embedding/cohere.js"; +import { VoyageEmbeddingProvider } from "../src/providers/embedding/voyage.js"; +import { OpenRouterEmbeddingProvider } from "../src/providers/embedding/openrouter.js"; + +// A fetch mock that never resolves — simulates a hung upstream. +function hangingFetch(_url: string, _init?: RequestInit): Promise { + // honour AbortSignal so the timeout actually cancels us + const init = _init ?? {}; + return new Promise((_resolve, reject) => { + if (init.signal) { + if (init.signal.aborted) { + reject(new DOMException("AbortError", "AbortError")); + return; + } + init.signal.addEventListener("abort", () => { + reject(new DOMException("AbortError", "AbortError")); + }); + } + }); +} + +// ───────────────────────────────────────────────────────────── +// fetchWithTimeout unit tests +// ───────────────────────────────────────────────────────────── +describe("fetchWithTimeout", () => { + beforeEach(() => { + vi.spyOn(globalThis, "fetch").mockImplementation(hangingFetch as typeof fetch); + }); + + afterEach(() => { + vi.restoreAllMocks(); + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + }); + + it("resolves normally when fetch completes within the timeout", async () => { + vi.restoreAllMocks(); + vi.spyOn(globalThis, "fetch").mockResolvedValue( + new Response(JSON.stringify({ ok: true }), { status: 200 }), + ); + const res = await fetchWithTimeout("https://example.com", {}, 1000); + expect(res.status).toBe(200); + }); + + it("aborts with an AbortError when fetch hangs beyond the configured timeout", async () => { + await expect( + fetchWithTimeout("https://example.com", {}, 50), + ).rejects.toThrow(); + }); + + it("reads AGENTMEMORY_LLM_TIMEOUT_MS as the default timeout when no explicit ms is given", async () => { + process.env["AGENTMEMORY_LLM_TIMEOUT_MS"] = "50"; + // no explicit third arg — must pick up the env var + await expect( + fetchWithTimeout("https://example.com", {}), + ).rejects.toThrow(); + }); + + it("falls back to 60 000 ms when AGENTMEMORY_LLM_TIMEOUT_MS is not set (type check only)", () => { + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + vi.restoreAllMocks(); + vi.spyOn(globalThis, "fetch").mockResolvedValue( + new Response(null, { status: 204 }), + ); + const p = fetchWithTimeout("https://example.com", {}); + expect(p).toBeInstanceOf(Promise); + return p; + }); +}); + +// ───────────────────────────────────────────────────────────── +// Provider hang regression tests +// Each provider must call fetchWithTimeout, which honours the +// AbortSignal when the explicit timeoutMs is tiny (50 ms). +// ───────────────────────────────────────────────────────────── + +describe("Provider hang regression — MinimaxProvider", () => { + beforeEach(() => { + vi.spyOn(globalThis, "fetch").mockImplementation(hangingFetch as typeof fetch); + process.env["AGENTMEMORY_LLM_TIMEOUT_MS"] = "50"; + }); + afterEach(() => { + vi.restoreAllMocks(); + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + }); + + it("compress() aborts after timeout when upstream hangs", async () => { + const provider = new MinimaxProvider("test-key", "MiniMax-M2.7", 800); + await expect(provider.compress("system", "user")).rejects.toThrow(); + }); +}); + +describe("Provider hang regression — OpenRouterProvider (covers Gemini LLM path)", () => { + beforeEach(() => { + vi.spyOn(globalThis, "fetch").mockImplementation(hangingFetch as typeof fetch); + process.env["AGENTMEMORY_LLM_TIMEOUT_MS"] = "50"; + }); + afterEach(() => { + vi.restoreAllMocks(); + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + }); + + it("compress() aborts after timeout when upstream hangs", async () => { + const provider = new OpenRouterProvider( + "test-key", + "gemini-2.5-flash", + 1024, + "https://generativelanguage.googleapis.com/v1beta/openai/chat/completions", + ); + await expect(provider.compress("system", "user")).rejects.toThrow(); + }); +}); + +describe("Provider hang regression — GeminiEmbeddingProvider", () => { + beforeEach(() => { + vi.spyOn(globalThis, "fetch").mockImplementation(hangingFetch as typeof fetch); + process.env["AGENTMEMORY_LLM_TIMEOUT_MS"] = "50"; + }); + afterEach(() => { + vi.restoreAllMocks(); + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + }); + + it("embedBatch() aborts after timeout when upstream hangs", async () => { + const provider = new GeminiEmbeddingProvider("test-key"); + await expect(provider.embedBatch(["hello"])).rejects.toThrow(); + }); +}); + +describe("Provider hang regression — OpenAIEmbeddingProvider", () => { + beforeEach(() => { + vi.spyOn(globalThis, "fetch").mockImplementation(hangingFetch as typeof fetch); + process.env["AGENTMEMORY_LLM_TIMEOUT_MS"] = "50"; + }); + afterEach(() => { + vi.restoreAllMocks(); + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + }); + + it("embedBatch() aborts after timeout when upstream hangs", async () => { + const provider = new OpenAIEmbeddingProvider("test-key"); + await expect(provider.embedBatch(["hello"])).rejects.toThrow(); + }); +}); + +describe("Provider hang regression — CohereEmbeddingProvider", () => { + beforeEach(() => { + vi.spyOn(globalThis, "fetch").mockImplementation(hangingFetch as typeof fetch); + process.env["AGENTMEMORY_LLM_TIMEOUT_MS"] = "50"; + }); + afterEach(() => { + vi.restoreAllMocks(); + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + }); + + it("embedBatch() aborts after timeout when upstream hangs", async () => { + const provider = new CohereEmbeddingProvider("test-key"); + await expect(provider.embedBatch(["hello"])).rejects.toThrow(); + }); +}); + +describe("Provider hang regression — VoyageEmbeddingProvider", () => { + beforeEach(() => { + vi.spyOn(globalThis, "fetch").mockImplementation(hangingFetch as typeof fetch); + process.env["AGENTMEMORY_LLM_TIMEOUT_MS"] = "50"; + }); + afterEach(() => { + vi.restoreAllMocks(); + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + }); + + it("embedBatch() aborts after timeout when upstream hangs", async () => { + const provider = new VoyageEmbeddingProvider("test-key"); + await expect(provider.embedBatch(["hello"])).rejects.toThrow(); + }); +}); + +describe("Provider hang regression — OpenRouterEmbeddingProvider", () => { + beforeEach(() => { + vi.spyOn(globalThis, "fetch").mockImplementation(hangingFetch as typeof fetch); + process.env["AGENTMEMORY_LLM_TIMEOUT_MS"] = "50"; + }); + afterEach(() => { + vi.restoreAllMocks(); + delete process.env["AGENTMEMORY_LLM_TIMEOUT_MS"]; + }); + + it("embedBatch() aborts after timeout when upstream hangs", async () => { + const provider = new OpenRouterEmbeddingProvider("test-key"); + await expect(provider.embedBatch(["hello"])).rejects.toThrow(); + }); +});