diff --git a/docs/pinecone-connector-refactor-summary.md b/docs/pinecone-connector-refactor-summary.md new file mode 100644 index 00000000..d48ffa68 --- /dev/null +++ b/docs/pinecone-connector-refactor-summary.md @@ -0,0 +1,42 @@ +# Pinecone Connector Phase 1 Summary + +## Overview +Phase 1 of the Pinecone connector refactor modernizes connection management, reliability tooling, and automated validation within the VectorDB subsystem. The work introduces shared helpers, centralized client lifecycle handling, and targeted unit tests to harden production behavior while laying groundwork for future observability and documentation. + +## Change Summary +| Area | File(s) | Description | +| --- | --- | --- | +| Typed configuration | `packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/types.ts` | Defines Pinecone connector settings covering auth, retry, timeout, and health inputs to keep runtime code declarative. | +| Async utilities | `packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/async-utils.ts` | Adds cancellable timeout wrapper and jittered sleep helper to standardize operation control flow. | +| Resilient retries | `packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/retry-utils.ts` | Implements exponential backoff with jitter, per-operation timeouts, and attempt callbacks. | +| Connection lifecycle | `packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/connection-manager.ts` | Centralizes Pinecone client creation, credential lookups (ManagedVault/Vault/direct), and local caching. | +| Main connector refactor | `packages/core/src/subsystems/IO/VectorDB.service/connectors/PineconeVectorDB.class.ts` | Consumes the new helpers for namespace CRUD, query, and upsert flows with retry + timeout safety. | +| Automated validation | `packages/core/tests/unit/004-VectorDB/pinecone-helpers.test.ts` | Covers helper utilities and connection manager behavior using vitest without external dependencies. | + +## Rationale and Improvements +- **Configuration clarity**. Moving Pinecone-specific knobs into `types.ts` allows safer merges and runtime validation while de-duplicating hard-coded constants. +- **Deterministic async control**. `async-utils.ts` enables uniform timeout enforcement and jittered backoff, preventing thread starvation and aligning with platform abort semantics. +- **Consistent resiliency posture**. `retry-utils.ts` enforces explicit retry policies per operation, reducing copy/paste retry logic and concentrating future enhancements (e.g., metrics hooks). +- **Credential hygiene**. `connection-manager.ts` encapsulates ManagedVault and Vault resolution with local caching, minimizing secret fetches and ensuring fallbacks for direct configuration. +- **Safer connector operations**. `PineconeVectorDB.class.ts` now delegates to the connection manager and wraps I/O calls in timeouts/retries, shielding consumers from transient Pinecone failures. +- **Regression coverage**. Adding `pinecone-helpers.test.ts` establishes a dedicated test harness, increasing confidence in helper evolution and supporting CI enforcement. + +## Business Impact +- **Reliability for Production Agents**. Centralized connection lifecycle management, retries, and jittered backoff significantly reduce transient Pinecone failures. These improvements reinforce uptime and stability, a requirement for production-grade AI agent runtimes. +- **Enterprise Security and Compliance**. Vault-based credential resolution with caching formalizes secret management. This creates an auditable and secure flow, aligning with enterprise adoption requirements and SLA-driven commitments. +- **Developer Velocity and Ecosystem Growth**. Typed configuration, shared helpers, and streamlined retry logic simplify extension and maintenance. This reduces cognitive load and accelerates safe feature delivery, directly supporting SmythOS’s goal of rapid ecosystem growth. +- **Operational Efficiency**. The connection manager exposes explicit reset and shutdown pathways. This simplifies incident response playbooks, improves recoverability, and enhances operational resilience. +- **Pathway to Enterprise Observability**. Retry and connection abstractions provide natural integration points for structured logging, metrics, and circuit breaking. These hooks lay the groundwork for future observability enhancements and SLA enforcement. + +## Testing Instructions +- **Install dependencies**. From the repository root run `pnpm install` if workspace packages have not been bootstrapped. +- **Prepare environment**. Ensure any required `.env` secrets for vault integration are stubbed or mocked as the unit suite relies on internal mocks and does not call external services. +- **Execute targeted suite**. Run `pnpm vitest run packages/core/tests/unit/004-VectorDB/pinecone-helpers.test.ts` from the repository root to validate the Pinecone helper and connection manager behavior. +- **Inspect output**. Confirm all tests pass and review console warnings for context on mocked vault connectors. + +## Future Enhancements +- **Structured logging and metrics** integration within `connection-manager.ts` and `PineconeVectorDB.class.ts` to surface latency, retries, and health signals. +- **Health probe orchestration** leveraging the `health` configuration placeholders to support automated circuit breaking. +- **Extended documentation**: update `docs/core/documents/connectors_vectordb.html` (or source markdown) to reference Pinecone-specific connection guidance and troubleshooting. +- **Integration tests**: add staging exercises with a live Pinecone sandbox, gated behind feature flags, to validate end-to-end pipelines. +- **Config validation tooling** ensuring that runtime-provided retry/timeout/auth options conform to expected ranges before use. diff --git a/packages/core/src/subsystems/IO/VectorDB.service/connectors/PineconeVectorDB.class.ts b/packages/core/src/subsystems/IO/VectorDB.service/connectors/PineconeVectorDB.class.ts index 227ddab6..1df335e5 100644 --- a/packages/core/src/subsystems/IO/VectorDB.service/connectors/PineconeVectorDB.class.ts +++ b/packages/core/src/subsystems/IO/VectorDB.service/connectors/PineconeVectorDB.class.ts @@ -5,15 +5,7 @@ import { AccessRequest } from '@sre/Security/AccessControl/AccessRequest.class'; import { AccessCandidate } from '@sre/Security/AccessControl/AccessCandidate.class'; import { SecureConnector } from '@sre/Security/SecureConnector.class'; import { VectorDBConnector, DeleteTarget } from '../VectorDBConnector'; -import { - DatasourceDto, - IStorageVectorDataSource, - IStorageVectorNamespace, - IVectorDataSourceDto, - QueryOptions, - VectorsResultData, -} from '@sre/types/VectorDB.types'; -import { Pinecone } from '@pinecone-database/pinecone'; +import { DatasourceDto, IStorageVectorDataSource, IVectorDataSourceDto, QueryOptions, VectorsResultData } from '@sre/types/VectorDB.types'; import { ConnectorService } from '@sre/Core/ConnectorsService'; import { Logger } from '@sre/helpers/Log.helper'; import { NKVConnector } from '@sre/IO/NKV.service/NKVConnector'; @@ -22,9 +14,12 @@ import { JSONContentHelper } from '@sre/helpers/JsonContent.helper'; import { CacheConnector } from '@sre/MemoryManager/Cache.service/CacheConnector'; import crypto from 'crypto'; import { BaseEmbedding, TEmbeddings } from '../embed/BaseEmbedding'; -import { EmbeddingsFactory, SupportedProviders, SupportedModels } from '../embed'; +import { EmbeddingsFactory } from '../embed'; import { chunkText } from '@sre/utils/string.utils'; import { jsonrepair } from 'jsonrepair'; +import { PineconeConnectionManager } from './pinecone/connection-manager'; +import { PineconeAuthConfig, PineconeHealthConfig, PineconeRequestTimeouts, PineconeRetryConfig } from './pinecone/types'; +import { withSafeRetry } from './pinecone/retry-utils'; const console = Logger('Pinecone VectorDB'); @@ -32,7 +27,7 @@ export type PineconeConfig = { /** * The Pinecone API key */ - apiKey: string; + apiKey?: string; /** * The Pinecone index name */ @@ -41,11 +36,18 @@ export type PineconeConfig = { * The embeddings model to use */ embeddings?: TEmbeddings; + /** + * Auth configuration for vault-based credentials + */ + auth?: PineconeAuthConfig; + requestTimeouts?: PineconeRequestTimeouts; + retry?: PineconeRetryConfig; + health?: PineconeHealthConfig; }; export class PineconeVectorDB extends VectorDBConnector { public name = 'PineconeVectorDB'; public id = 'pinecone'; - private client: Pinecone; + private connection: PineconeConnectionManager; private indexName: string; private cache: CacheConnector; private accountConnector: AccountConnector; @@ -54,20 +56,15 @@ export class PineconeVectorDB extends VectorDBConnector { constructor(protected _settings: PineconeConfig) { super(_settings); - if (!_settings.apiKey) { - console.warn('Missing Pinecone API key : returning empty Pinecone connector'); - return; - } if (!_settings.indexName) { console.warn('Missing Pinecone index name : returning empty Pinecone connector'); return; } - this.client = new Pinecone({ - apiKey: _settings.apiKey, - }); - console.info('Pinecone client initialized'); - console.info('Pinecone index name:', _settings.indexName); + if (!_settings.apiKey && !_settings.auth?.vaultKey) { + console.warn('Pinecone connector configured without apiKey or auth.vaultKey; operations will fail until credentials are provided at runtime'); + } + this.indexName = _settings.indexName; this.accountConnector = ConnectorService.getAccountConnector(); this.cache = ConnectorService.getCacheConnector(); @@ -77,8 +74,16 @@ export class PineconeVectorDB extends VectorDBConnector { } if (!_settings.embeddings.params) _settings.embeddings.params = { dimensions: 1024 }; if (!_settings.embeddings.params?.dimensions) _settings.embeddings.params.dimensions = 1024; - this.embedder = EmbeddingsFactory.create(_settings.embeddings.provider, _settings.embeddings); + + this.connection = new PineconeConnectionManager({ + indexName: this.indexName, + embeddings: _settings.embeddings, + auth: _settings.auth ?? (_settings.apiKey ? { apiKey: _settings.apiKey } : undefined), + requestTimeouts: _settings.requestTimeouts, + retry: _settings.retry, + health: _settings.health, + }); } public async getResourceACL(resourceId: string, candidate: IAccessCandidate): Promise { @@ -109,19 +114,27 @@ export class PineconeVectorDB extends VectorDBConnector { displayName: namespace, ...metadata, }; - await this.client - .Index(this.indexName) - .namespace(preparedNs) - .upsert([ - { - id: `_reserved_${preparedNs}`, - values: this.embedder.dummyVector, - metadata: { - isSkeletonVector: true, - ...nsData, - }, - }, - ]); + const client = await this.connection.getClient(acRequest); + await withSafeRetry( + 'createNamespace', + async () => { + await client + .Index(this.indexName) + .namespace(preparedNs) + .upsert([ + { + id: `_reserved_${preparedNs}`, + values: this.embedder.dummyVector, + metadata: { + isSkeletonVector: true, + ...nsData, + }, + }, + ]); + }, + this.settings.retry, + this.settings.requestTimeouts + ); await this.setACL(acRequest, preparedNs, acl.ACL); @@ -131,27 +144,37 @@ export class PineconeVectorDB extends VectorDBConnector { @SecureConnector.AccessControl protected async namespaceExists(acRequest: AccessRequest, namespace: string): Promise { //const teamId = await this.accountConnector.getCandidateTeam(acRequest.candidate); - const stats = await this.client.Index(this.indexName).describeIndexStats(); + const client = await this.connection.getClient(acRequest); + const stats = await withSafeRetry( + 'describeIndexStats', + async () => client.Index(this.indexName).describeIndexStats(), + this.settings.retry, + this.settings.requestTimeouts + ); return Object.keys(stats.namespaces).includes(this.constructNsName(acRequest.candidate as AccessCandidate, namespace)); } @SecureConnector.AccessControl protected async deleteNamespace(acRequest: AccessRequest, namespace: string): Promise { - //const teamId = await this.accountConnector.getCandidateTeam(acRequest.candidate); - //const candidate = AccessCandidate.team(teamId); const preparedNs = this.constructNsName(acRequest.candidate as AccessCandidate, namespace); - - await this.client - .Index(this.indexName) - .namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace)) - .deleteAll() - .catch((e) => { - if (e?.name == 'PineconeNotFoundError') { - console.warn(`Namespace ${namespace} does not exist and was requested to be deleted`); - return; - } - throw e; - }); + const client = await this.connection.getClient(acRequest); + await withSafeRetry( + 'deleteNamespace', + async () => + client + .Index(this.indexName) + .namespace(preparedNs) + .deleteAll() + .catch((e) => { + if (e?.name == 'PineconeNotFoundError') { + console.warn(`Namespace ${namespace} does not exist and was requested to be deleted`); + return; + } + throw e; + }), + this.settings.retry, + this.settings.requestTimeouts + ); await this.deleteACL(AccessCandidate.clone(acRequest.candidate), namespace); } @@ -165,7 +188,8 @@ export class PineconeVectorDB extends VectorDBConnector { ): Promise { //const teamId = await this.accountConnector.getCandidateTeam(acRequest.candidate); - const pineconeIndex = this.client.Index(this.indexName).namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace)); + const client = await this.connection.getClient(acRequest); + const pineconeIndex = client.Index(this.indexName).namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace)); let _vector = query; if (typeof query === 'string') { _vector = await this.embedder.embedText(query, acRequest.candidate as AccessCandidate); @@ -173,12 +197,18 @@ export class PineconeVectorDB extends VectorDBConnector { const topK = (options.topK || 10) + 1; //* we increment one in case it included the skeleton vector - const results = await pineconeIndex.query({ - topK, - vector: _vector as number[], - includeMetadata: true, - includeValues: true, - }); + const results = await withSafeRetry( + 'query', + async () => + pineconeIndex.query({ + topK, + vector: _vector as number[], + includeMetadata: true, + includeValues: true, + }), + this.settings.retry, + this.settings.requestTimeouts + ); let matches = []; @@ -226,10 +256,17 @@ export class PineconeVectorDB extends VectorDBConnector { })); // await pineconeStore.addDocuments(chunks, ids); - await this.client - .Index(this.indexName) - .namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace)) - .upsert(preparedSource); + const client = await this.connection.getClient(acRequest); + await withSafeRetry( + 'upsert', + async () => + client + .Index(this.indexName) + .namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace)) + .upsert(preparedSource), + this.settings.retry, + this.settings.requestTimeouts + ); const accessCandidate = acRequest.candidate; @@ -251,10 +288,17 @@ export class PineconeVectorDB extends VectorDBConnector { } else { const _ids = Array.isArray(deleteTarget) ? deleteTarget : [deleteTarget]; - const res = await this.client - .Index(this.indexName) - .namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace)) - .deleteMany(_ids); + const client = await this.connection.getClient(acRequest); + await withSafeRetry( + 'deleteMany', + async () => + client + .Index(this.indexName) + .namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace)) + .deleteMany(_ids), + this.settings.retry, + this.settings.requestTimeouts + ); } } diff --git a/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/async-utils.ts b/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/async-utils.ts new file mode 100644 index 00000000..602fd997 --- /dev/null +++ b/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/async-utils.ts @@ -0,0 +1,54 @@ +import { setTimeout as sleep } from 'timers/promises'; + +export type TimeoutOptions = { + timeoutMs: number; + signal?: AbortSignal; + onTimeout?: () => void; +}; + +export async function sleepWithJitter(delayMs: number, jitterRatio: number, signal?: AbortSignal) { + if (delayMs <= 0) return; + const jitter = delayMs * Math.random() * jitterRatio; + const totalDelay = delayMs + jitter; + await sleep(totalDelay, undefined, { signal }); +} + +export async function withTimeout(operation: (signal: AbortSignal) => Promise, options: TimeoutOptions): Promise { + const controller = new AbortController(); + const { timeoutMs, signal, onTimeout } = options; + + const timer = setTimeout(() => { + controller.abort(new Error('Operation timed out')); + onTimeout?.(); + }, timeoutMs).unref?.(); + + const abortListener = () => controller.abort(signal?.reason ?? new Error('Operation aborted')); + + if (signal) { + if (signal.aborted) { + clearTimeout(timer as unknown as NodeJS.Timeout); + controller.abort(signal.reason ?? new Error('Operation aborted before start')); + } else { + signal.addEventListener('abort', abortListener, { once: true }); + } + } + + try { + const result = await Promise.race([ + operation(controller.signal), + new Promise((_, reject) => { + const abortHandler = (event: Event) => { + const reason = (event?.target as AbortSignal)?.reason ?? new Error('Operation aborted'); + reject(reason instanceof Error ? reason : new Error(String(reason))); + }; + controller.signal.addEventListener('abort', abortHandler, { once: true }); + }), + ]); + return result; + } finally { + clearTimeout(timer as unknown as NodeJS.Timeout); + if (signal) { + signal.removeEventListener('abort', abortListener); + } + } +} diff --git a/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/connection-manager.ts b/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/connection-manager.ts new file mode 100644 index 00000000..239968c3 --- /dev/null +++ b/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/connection-manager.ts @@ -0,0 +1,130 @@ +import { Pinecone } from '@pinecone-database/pinecone'; +import { ConnectorService } from '@sre/Core/ConnectorsService'; +import { Logger } from '@sre/helpers/Log.helper'; +import { LocalCache } from '@sre/helpers/LocalCache.helper'; +import { AccessRequest } from '@sre/Security/AccessControl/AccessRequest.class'; +import { ManagedVaultConnector } from '@sre/Security/ManagedVault.service/ManagedVaultConnector'; +import { VaultConnector } from '@sre/Security/Vault.service/VaultConnector'; +import { withSafeRetry } from './retry-utils'; +import { PineconeConnectorSettings, PineconeOperationName } from './types'; + +const console = Logger('PineconeConnectionManager'); + +const API_KEY_CACHE_TTL_MS = 5 * 60 * 1000; + +export class PineconeConnectionManager { + private client?: Pinecone; + private readonly apiKeyCache = new LocalCache(API_KEY_CACHE_TTL_MS); + private readonly managedVault?: ManagedVaultConnector; + private readonly vault?: VaultConnector; + + constructor(private readonly settings: PineconeConnectorSettings) { + try { + this.managedVault = ConnectorService.getManagedVaultConnector(); + } catch (error) { + console.warn('ManagedVault connector unavailable, falling back to Vault if configured'); + } + try { + this.vault = ConnectorService.getVaultConnector(); + } catch (error) { + console.warn('Vault connector unavailable; direct API key usage only'); + } + } + + public get indexName(): string { + return this.settings.indexName; + } + + public reset(): void { + this.client = undefined; + this.apiKeyCache.clear(); + } + + public async shutdown(): Promise { + this.client = undefined; + this.apiKeyCache.clear(); + } + + public async getClient(acRequest: AccessRequest, signal?: AbortSignal): Promise { + if (!this.client) { + const apiKey = await this.resolveApiKey(acRequest, signal); + this.client = await this.createClient(apiKey, signal); + } + return this.client; + } + + private async createClient(apiKey: string, signal?: AbortSignal): Promise { + return await withSafeRetry( + 'init', + async () => { + if (signal?.aborted) { + throw signal.reason ?? new Error('Initialization aborted'); + } + return new Pinecone({ apiKey }); + }, + { attempts: 2, baseDelayMs: 100, maxDelayMs: 500 }, + { defaultMs: 3000 }, + signal + ); + } + + private async resolveApiKey(acRequest: AccessRequest, signal?: AbortSignal): Promise { + const cacheKey = `${acRequest.candidate.role}:${acRequest.candidate.id}`; + const cached = this.apiKeyCache.get(cacheKey); + if (cached) { + return cached; + } + + const rawKey = await this.fetchApiKey(acRequest, signal); + if (!rawKey || typeof rawKey !== 'string') { + throw new Error('Pinecone API key could not be resolved'); + } + this.apiKeyCache.set(cacheKey, rawKey, API_KEY_CACHE_TTL_MS); + return rawKey; + } + + private async fetchApiKey(acRequest: AccessRequest, signal?: AbortSignal): Promise { + const { auth } = this.settings; + if (auth?.apiKey) { + return auth.apiKey; + } + + const operation: PineconeOperationName = 'init'; + + if (auth?.vaultKey && this.managedVault) { + try { + return await withSafeRetry( + operation, + async () => { + const requester = this.managedVault!.requester(acRequest.candidate); + return requester.get(auth.vaultKey!); + }, + { attempts: 3, baseDelayMs: 150, maxDelayMs: 1500 }, + { defaultMs: 2000 }, + signal + ); + } catch (error) { + console.error('Failed to fetch Pinecone API key from ManagedVault', error); + } + } + + if (auth?.vaultKey && this.vault) { + try { + return await withSafeRetry( + operation, + async () => { + const requester = this.vault!.requester(acRequest.candidate); + return requester.get(auth.vaultKey!); + }, + { attempts: 3, baseDelayMs: 150, maxDelayMs: 1500 }, + { defaultMs: 2000 }, + signal + ); + } catch (error) { + console.error('Failed to fetch Pinecone API key from Vault', error); + } + } + + throw new Error('No Pinecone API key available; configure auth.vaultKey or auth.apiKey'); + } +} diff --git a/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/retry-utils.ts b/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/retry-utils.ts new file mode 100644 index 00000000..609fe015 --- /dev/null +++ b/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/retry-utils.ts @@ -0,0 +1,81 @@ +import { sleepWithJitter, withTimeout } from './async-utils'; +import { PineconeRetryConfig, PineconeRequestTimeouts, PineconeOperationName } from './types'; + +export type RetryableOperation = (signal: AbortSignal) => Promise; + +const DEFAULT_RETRY: Required = { + attempts: 3, + baseDelayMs: 200, + maxDelayMs: 2000, + jitterRatio: 0.4, +}; + +const DEFAULT_TIMEOUTS: Required = { + defaultMs: 5000, + queryMs: 5000, + upsertMs: 10000, + deleteMs: 5000, + describeStatsMs: 5000, +}; + +function getTimeoutMs(operation: PineconeOperationName, overrides?: PineconeRequestTimeouts) { + const merged = { ...DEFAULT_TIMEOUTS, ...overrides }; + switch (operation) { + case 'query': + return merged.queryMs; + case 'upsert': + return merged.upsertMs; + case 'deleteMany': + case 'deleteAll': + case 'deleteNamespace': + return merged.deleteMs; + case 'describeIndexStats': + return merged.describeStatsMs; + default: + return merged.defaultMs; + } +} + +export async function withSafeRetry( + operationName: PineconeOperationName, + operation: RetryableOperation, + retryConfig?: PineconeRetryConfig, + timeoutConfig?: PineconeRequestTimeouts, + externalSignal?: AbortSignal, + onAttempt?: (attempt: number, error?: unknown) => void +): Promise { + const config = { ...DEFAULT_RETRY, ...retryConfig }; + let attempt = 0; + let lastError: unknown; + + while (attempt < config.attempts) { + attempt += 1; + try { + const timeoutMs = getTimeoutMs(operationName, timeoutConfig); + const result = await withTimeout((signal) => operation(signal), { + timeoutMs, + signal: externalSignal, + onTimeout: () => onAttempt?.(attempt, new Error(`Timeout after ${timeoutMs}ms`)), + }); + onAttempt?.(attempt); + return result; + } catch (error) { + lastError = error; + onAttempt?.(attempt, error); + + const isAbortError = error instanceof Error && error.name === 'AbortError'; + if (isAbortError || externalSignal?.aborted) { + throw error; + } + + if (attempt >= config.attempts) { + break; + } + + const delay = Math.min(config.baseDelayMs * 2 ** (attempt - 1), config.maxDelayMs); + await sleepWithJitter(delay, config.jitterRatio, externalSignal); + } + } + + throw lastError instanceof Error ? lastError : new Error(String(lastError)); +} diff --git a/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/types.ts b/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/types.ts new file mode 100644 index 00000000..0fd51fea --- /dev/null +++ b/packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/types.ts @@ -0,0 +1,47 @@ +import { TEmbeddings } from '../../embed/BaseEmbedding'; + +export type PineconeAuthConfig = { + vaultKey?: string; + apiKey?: string; +}; + +export type PineconeRequestTimeouts = { + defaultMs?: number; + queryMs?: number; + upsertMs?: number; + deleteMs?: number; + describeStatsMs?: number; +}; + +export type PineconeRetryConfig = { + attempts?: number; + baseDelayMs?: number; + maxDelayMs?: number; + jitterRatio?: number; +}; + +export type PineconeHealthConfig = { + failureThreshold?: number; + recoveryThreshold?: number; + probeIntervalMs?: number; + cooldownMs?: number; +}; + +export type PineconeConnectorSettings = { + indexName: string; + embeddings?: TEmbeddings; + auth?: PineconeAuthConfig; + requestTimeouts?: PineconeRequestTimeouts; + retry?: PineconeRetryConfig; + health?: PineconeHealthConfig; +}; + +export type PineconeOperationName = + | 'init' + | 'describeIndexStats' + | 'query' + | 'upsert' + | 'deleteMany' + | 'deleteAll' + | 'deleteNamespace' + | 'createNamespace'; diff --git a/packages/core/tests/unit/004-VectorDB/pinecone-helpers.test.ts b/packages/core/tests/unit/004-VectorDB/pinecone-helpers.test.ts new file mode 100644 index 00000000..93decee9 --- /dev/null +++ b/packages/core/tests/unit/004-VectorDB/pinecone-helpers.test.ts @@ -0,0 +1,238 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { sleepWithJitter, withTimeout } from '@sre/IO/VectorDB.service/connectors/pinecone/async-utils'; +import { withSafeRetry } from '@sre/IO/VectorDB.service/connectors/pinecone/retry-utils'; +import * as retryUtils from '@sre/IO/VectorDB.service/connectors/pinecone/retry-utils'; +import { PineconeConnectionManager } from '@sre/IO/VectorDB.service/connectors/pinecone/connection-manager'; +import { ConnectorService } from '@sre/Core/ConnectorsService'; +import { AccessCandidate } from '@sre/Security/AccessControl/AccessCandidate.class'; +import { AccessRequest } from '@sre/Security/AccessControl/AccessRequest.class'; + +const { pineconeCtor } = vi.hoisted(() => { + const ctor = vi + .fn() + .mockImplementation(function (this: any, { apiKey }: { apiKey: string }) { + this.apiKey = apiKey; + this.Index = vi.fn().mockReturnValue({ + namespace: vi.fn().mockReturnValue({ + upsert: vi.fn(), + deleteAll: vi.fn(), + deleteMany: vi.fn(), + query: vi.fn(), + }), + }); + }); + return { pineconeCtor: ctor }; +}); + +vi.mock('@pinecone-database/pinecone', () => ({ + Pinecone: pineconeCtor, +})); + +describe('async-utils', () => { + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it('sleepWithJitter waits for deterministic jittered delay', async () => { + vi.useFakeTimers(); + const randomSpy = vi.spyOn(Math, 'random').mockReturnValue(0.5); + const promise = sleepWithJitter(100, 0.2); + + await vi.advanceTimersByTimeAsync(110); + await expect(promise).resolves.toBeUndefined(); + expect(randomSpy).toHaveBeenCalled(); + }); + + it('withTimeout resolves before deadline', async () => { + const result = await withTimeout(async () => 'done', { timeoutMs: 100 }); + expect(result).toBe('done'); + }); + + it('withTimeout rejects on timeout and triggers callback', async () => { + const onTimeout = vi.fn(); + const promise = withTimeout( + () => new Promise(() => { + /* never resolves */ + }), + { timeoutMs: 10, onTimeout } + ); + + await expect(promise).rejects.toThrow('Operation timed out'); + expect(onTimeout).toHaveBeenCalledTimes(1); + }); + + it('withTimeout respects external abort signal', async () => { + const controller = new AbortController(); + const promise = withTimeout( + () => new Promise(() => { + /* never resolves */ + }), + { timeoutMs: 1000, signal: controller.signal } + ); + + controller.abort(new Error('stop now')); + await expect(promise).rejects.toThrow('stop now'); + }); +}); + +describe('retry-utils', () => { + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it('returns result on first attempt', async () => { + const operation = vi.fn().mockResolvedValue('ok'); + await expect(withSafeRetry('query', operation)).resolves.toBe('ok'); + expect(operation).toHaveBeenCalledTimes(1); + }); + + it('retries after failure and eventually succeeds', async () => { + vi.useFakeTimers(); + vi.spyOn(Math, 'random').mockReturnValue(0); + const operation = vi + .fn<[AbortSignal?], Promise>() + .mockRejectedValueOnce(new Error('first')) + .mockResolvedValueOnce('ok'); + const onAttempt = vi.fn(); + + const pending = withSafeRetry('query', operation, { attempts: 3, baseDelayMs: 100, maxDelayMs: 100, jitterRatio: 0 }, undefined, undefined, onAttempt); + await vi.advanceTimersByTimeAsync(0); + await vi.advanceTimersByTimeAsync(100); + await expect(pending).resolves.toBe('ok'); + + expect(operation).toHaveBeenCalledTimes(2); + expect(onAttempt).toHaveBeenCalledWith(1, expect.any(Error)); + expect(onAttempt).toHaveBeenCalledWith(2); + }); + + it('throws after exhausting retry attempts', async () => { + vi.useFakeTimers(); + vi.spyOn(Math, 'random').mockReturnValue(0); + const operation = vi.fn().mockRejectedValue(new Error('fail')); + + const pending = withSafeRetry('query', operation, { attempts: 2, baseDelayMs: 50, maxDelayMs: 50, jitterRatio: 0 }); + await vi.advanceTimersByTimeAsync(0); + await vi.advanceTimersByTimeAsync(50); + + await expect(pending).rejects.toThrow('fail'); + expect(operation).toHaveBeenCalledTimes(2); + }); + + it('honors external abort signals', async () => { + vi.useFakeTimers(); + const controller = new AbortController(); + const operation = vi.fn().mockImplementation(async () => { + await vi.advanceTimersByTimeAsync(10); + return 'never'; + }); + + const pending = withSafeRetry('query', operation, { attempts: 2, baseDelayMs: 10, maxDelayMs: 10, jitterRatio: 0 }, undefined, controller.signal); + controller.abort(new Error('aborted')); + + await expect(pending).rejects.toThrow('aborted'); + expect(operation).toHaveBeenCalledTimes(1); + }); +}); + +describe('PineconeConnectionManager', () => { + const candidate = AccessCandidate.user('user-1'); + let acRequest: AccessRequest; + let safeRetrySpy: ReturnType; + + beforeEach(() => { + pineconeCtor.mockClear(); + acRequest = new AccessRequest(candidate); + safeRetrySpy = vi.spyOn(retryUtils, 'withSafeRetry').mockImplementation( + async (_operationName, operation, _retryConfig, _timeoutConfig, externalSignal) => { + return operation(externalSignal ?? new AbortController().signal); + } + ); + }); + + afterEach(() => { + safeRetrySpy.mockRestore(); + vi.restoreAllMocks(); + }); + + it('creates and caches Pinecone client when apiKey is provided', async () => { + vi.spyOn(ConnectorService, 'getManagedVaultConnector').mockImplementation(() => { + throw new Error('no managed vault'); + }); + vi.spyOn(ConnectorService, 'getVaultConnector').mockImplementation(() => { + throw new Error('no vault'); + }); + + const manager = new PineconeConnectionManager({ + indexName: 'test-index', + auth: { apiKey: 'secret-key' }, + }); + + const first = await manager.getClient(acRequest); + const second = await manager.getClient(acRequest); + + expect(first).toBe(second); + expect(pineconeCtor).toHaveBeenCalledTimes(1); + expect(pineconeCtor).toHaveBeenCalledWith({ apiKey: 'secret-key' }); + }); + + it('resets cached client on reset()', async () => { + vi.spyOn(ConnectorService, 'getManagedVaultConnector').mockImplementation(() => { + throw new Error('no managed vault'); + }); + vi.spyOn(ConnectorService, 'getVaultConnector').mockImplementation(() => { + throw new Error('no vault'); + }); + + const manager = new PineconeConnectionManager({ + indexName: 'test-index', + auth: { apiKey: 'secret-key' }, + }); + + await manager.getClient(acRequest); + manager.reset(); + await manager.getClient(acRequest); + + expect(pineconeCtor).toHaveBeenCalledTimes(2); + }); + + it('fetches api key from managed vault and caches it', async () => { + const managedVaultRequester = vi.fn().mockResolvedValue('managed-secret'); + const managedVault = { + requester: vi.fn().mockReturnValue({ + get: managedVaultRequester, + }), + } as unknown as ReturnType; + + vi.spyOn(ConnectorService, 'getManagedVaultConnector').mockReturnValue(managedVault); + vi.spyOn(ConnectorService, 'getVaultConnector').mockImplementation(() => { + throw new Error('no vault'); + }); + + const manager = new PineconeConnectionManager({ + indexName: 'test-index', + auth: { vaultKey: 'secret/path' }, + }); + + await manager.getClient(acRequest); + await manager.getClient(acRequest); + + expect(managedVault.requester).toHaveBeenCalledTimes(1); + expect(managedVaultRequester).toHaveBeenCalledWith('secret/path'); + expect(pineconeCtor).toHaveBeenCalledWith({ apiKey: 'managed-secret' }); + }); + + it('throws when no credentials are available', async () => { + vi.spyOn(ConnectorService, 'getManagedVaultConnector').mockImplementation(() => { + throw new Error('no managed vault'); + }); + vi.spyOn(ConnectorService, 'getVaultConnector').mockImplementation(() => { + throw new Error('no vault'); + }); + + const manager = new PineconeConnectionManager({ indexName: 'test-index' }); + await expect(manager.getClient(acRequest)).rejects.toThrow('No Pinecone API key available'); + expect(pineconeCtor).not.toHaveBeenCalled(); + }); +});