From ed1b174b5925072dcbc047b345a26d1e7653a01d Mon Sep 17 00:00:00 2001 From: Rohan Chitale Date: Thu, 27 Mar 2025 23:19:23 -0700 Subject: [PATCH 1/3] Add API handlers and API Docker image Signed-off-by: Rohan Chitale --- .../workflows/publish_remote_api_image.yml | 56 +++++ .github/workflows/run-tests.yml | 1 + remote_vector_index_builder/app/Dockerfile | 20 ++ remote_vector_index_builder/app/__init__.py | 6 + .../app/base/__init__.py | 6 + .../app/base/config.py | 35 +++ .../app/base/exceptions.py | 22 ++ .../app/base/resources.py | 107 ++++++++ .../app/executors/__init__.py | 6 + .../app/executors/workflow_executor.py | 141 +++++++++++ remote_vector_index_builder/app/main.py | 116 +++++++++ .../app/models/__init__.py | 6 + remote_vector_index_builder/app/models/job.py | 61 +++++ .../app/models/request.py | 48 ++++ .../app/models/workflow.py | 30 +++ .../app/requirements.txt | 3 + .../app/routes/__init__.py | 6 + .../app/routes/build.py | 48 ++++ .../app/routes/status.py | 44 ++++ .../app/schemas/__init__.py | 6 + .../app/schemas/api.py | 37 +++ .../app/services/__init__.py | 6 + .../app/services/index_builder.py | 39 +++ .../app/services/job_service.py | 236 ++++++++++++++++++ .../app/storage/__init__.py | 6 + .../app/storage/base.py | 75 ++++++ .../app/storage/factory.py | 40 +++ .../app/storage/memory.py | 159 ++++++++++++ .../app/storage/types.py | 22 ++ .../app/test_imports.py | 23 ++ .../app/utils/__init__.py | 6 + .../app/utils/error_message.py | 35 +++ remote_vector_index_builder/app/utils/hash.py | 28 +++ .../app/utils/logging_config.py | 17 ++ .../app/utils/memory.py | 49 ++++ .../app/utils/request.py | 33 +++ .../common/models/index_build_parameters.py | 11 + setup.cfg | 1 + .../requirements.txt | 3 +- .../test_app/__init__.py | 6 + .../test_app/conftest.py | 32 +++ .../test_app/test_base/__init__.py | 6 + .../test_app/test_base/test_resources.py | 115 +++++++++ .../test_app/test_executors/__init__.py | 6 + .../test_executors/test_workflow_executor.py | 187 ++++++++++++++ .../test_app/test_routes/__init__.py | 6 + .../test_app/test_routes/test_build.py | 86 +++++++ .../test_app/test_routes/test_status.py | 138 ++++++++++ .../test_app/test_services/__init__.py | 6 + .../test_services/test_index_builder.py | 54 ++++ .../test_services/test_job_service.py | 197 +++++++++++++++ .../test_app/test_storage/__init__.py | 6 + .../test_app/test_storage/test_factory.py | 35 +++ .../test_app/test_storage/test_memory.py | 129 ++++++++++ .../test_app/test_utils/__init__.py | 6 + .../test_app/test_utils/test_error_message.py | 53 ++++ .../test_app/test_utils/test_memory.py | 65 +++++ 57 files changed, 2726 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/publish_remote_api_image.yml create mode 100644 remote_vector_index_builder/app/Dockerfile create mode 100644 remote_vector_index_builder/app/__init__.py create mode 100644 remote_vector_index_builder/app/base/__init__.py create mode 100644 remote_vector_index_builder/app/base/config.py create mode 100644 remote_vector_index_builder/app/base/exceptions.py create mode 100644 remote_vector_index_builder/app/base/resources.py create mode 100644 remote_vector_index_builder/app/executors/__init__.py create mode 100644 remote_vector_index_builder/app/executors/workflow_executor.py create mode 100644 remote_vector_index_builder/app/main.py create mode 100644 remote_vector_index_builder/app/models/__init__.py create mode 100644 remote_vector_index_builder/app/models/job.py create mode 100644 remote_vector_index_builder/app/models/request.py create mode 100644 remote_vector_index_builder/app/models/workflow.py create mode 100644 remote_vector_index_builder/app/requirements.txt create mode 100644 remote_vector_index_builder/app/routes/__init__.py create mode 100644 remote_vector_index_builder/app/routes/build.py create mode 100644 remote_vector_index_builder/app/routes/status.py create mode 100644 remote_vector_index_builder/app/schemas/__init__.py create mode 100644 remote_vector_index_builder/app/schemas/api.py create mode 100644 remote_vector_index_builder/app/services/__init__.py create mode 100644 remote_vector_index_builder/app/services/index_builder.py create mode 100644 remote_vector_index_builder/app/services/job_service.py create mode 100644 remote_vector_index_builder/app/storage/__init__.py create mode 100644 remote_vector_index_builder/app/storage/base.py create mode 100644 remote_vector_index_builder/app/storage/factory.py create mode 100644 remote_vector_index_builder/app/storage/memory.py create mode 100644 remote_vector_index_builder/app/storage/types.py create mode 100644 remote_vector_index_builder/app/test_imports.py create mode 100644 remote_vector_index_builder/app/utils/__init__.py create mode 100644 remote_vector_index_builder/app/utils/error_message.py create mode 100644 remote_vector_index_builder/app/utils/hash.py create mode 100644 remote_vector_index_builder/app/utils/logging_config.py create mode 100644 remote_vector_index_builder/app/utils/memory.py create mode 100644 remote_vector_index_builder/app/utils/request.py create mode 100644 test_remote_vector_index_builder/test_app/__init__.py create mode 100644 test_remote_vector_index_builder/test_app/conftest.py create mode 100644 test_remote_vector_index_builder/test_app/test_base/__init__.py create mode 100644 test_remote_vector_index_builder/test_app/test_base/test_resources.py create mode 100644 test_remote_vector_index_builder/test_app/test_executors/__init__.py create mode 100644 test_remote_vector_index_builder/test_app/test_executors/test_workflow_executor.py create mode 100644 test_remote_vector_index_builder/test_app/test_routes/__init__.py create mode 100644 test_remote_vector_index_builder/test_app/test_routes/test_build.py create mode 100644 test_remote_vector_index_builder/test_app/test_routes/test_status.py create mode 100644 test_remote_vector_index_builder/test_app/test_services/__init__.py create mode 100644 test_remote_vector_index_builder/test_app/test_services/test_index_builder.py create mode 100644 test_remote_vector_index_builder/test_app/test_services/test_job_service.py create mode 100644 test_remote_vector_index_builder/test_app/test_storage/__init__.py create mode 100644 test_remote_vector_index_builder/test_app/test_storage/test_factory.py create mode 100644 test_remote_vector_index_builder/test_app/test_storage/test_memory.py create mode 100644 test_remote_vector_index_builder/test_app/test_utils/__init__.py create mode 100644 test_remote_vector_index_builder/test_app/test_utils/test_error_message.py create mode 100644 test_remote_vector_index_builder/test_app/test_utils/test_memory.py diff --git a/.github/workflows/publish_remote_api_image.yml b/.github/workflows/publish_remote_api_image.yml new file mode 100644 index 0000000..276e136 --- /dev/null +++ b/.github/workflows/publish_remote_api_image.yml @@ -0,0 +1,56 @@ +name: Build and Publish Remote-Vector-Index-Builder API Image to Docker + +on: + push: + branches: + - main + paths: + - 'remote_vector_index_builder/app/**' + - '.github/workflows/publish_remote_api_image.yml' + +permissions: + id-token: write + contents: read + +jobs: + build-and-publish-api-image: + name: Build and Publish Remote-Vector-Index-Builder API Image + if: github.repository == 'opensearch-project/remote-vector-index-builder' + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Build Docker Image + run : | + docker build -f ./remote_vector_index_builder/app/Dockerfile . -t opensearchstaging/remote-vector-index-builder:api-1.0.0 + docker tag opensearchstaging/remote-vector-index-builder:api-1.0.0 opensearchstaging/remote-vector-index-builder:api-latest + + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.REMOTE_VECTOR_DOCKER_ROLE }} + aws-region: us-east-1 + + - name: Retrieve Values + id: retrieve-values + run: | + DOCKERHUB_PASSWORD=`aws secretsmanager get-secret-value --secret-id jenkins-staging-dockerhub-credential --query SecretString --output text` + echo "::add-mask::$DOCKERHUB_PASSWORD" + echo "dockerhub-password=$DOCKERHUB_PASSWORD" >> $GITHUB_OUTPUT + + - name: Login to DockerHub + uses: docker/login-action@v3 + with: + username: ${{ secrets.REMOTE_VECTOR_DOCKER_USERNAME }} + password: ${{ steps.retrieve-values.outputs.dockerhub-password }} + + - name: Push Docker Image + run : | + docker push opensearchstaging/remote-vector-index-builder:api-1.0.0 + docker push opensearchstaging/remote-vector-index-builder:api-latest + - name: Runner Cleanups + if: always() + run: | + docker logout \ No newline at end of file diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 9c0d990..c90b4ec 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -38,6 +38,7 @@ jobs: run: | python -m pip install --upgrade pip python -m pip install -r remote_vector_index_builder/core/requirements.txt + python -m pip install -r remote_vector_index_builder/app/requirements.txt python -m pip install -r test_remote_vector_index_builder/requirements.txt - name: Run Linting - flake8 diff --git a/remote_vector_index_builder/app/Dockerfile b/remote_vector_index_builder/app/Dockerfile new file mode 100644 index 0000000..479ad2b --- /dev/null +++ b/remote_vector_index_builder/app/Dockerfile @@ -0,0 +1,20 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +FROM opensearchstaging/remote-vector-index-builder:core-latest + +WORKDIR /remote_vector_index_builder + +COPY ./remote_vector_index_builder/app/requirements.txt /remote_vector_index_builder/app/requirements.txt + +RUN pip install --no-cache-dir --upgrade -r /remote_vector_index_builder/app/requirements.txt + +COPY ./remote_vector_index_builder/app /remote_vector_index_builder/app + +ENV PYTHONPATH='${PYTHONPATH}:/tmp/faiss/build/faiss/python:/remote_vector_index_builder' +RUN ["python", "app/test_imports.py"] +CMD ["fastapi", "run", "app/main.py", "--port", "80"] \ No newline at end of file diff --git a/remote_vector_index_builder/app/__init__.py b/remote_vector_index_builder/app/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/base/__init__.py b/remote_vector_index_builder/app/base/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/base/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/base/config.py b/remote_vector_index_builder/app/base/config.py new file mode 100644 index 0000000..944a889 --- /dev/null +++ b/remote_vector_index_builder/app/base/config.py @@ -0,0 +1,35 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from pydantic_settings import BaseSettings +from app.storage.types import RequestStoreType +from typing import Optional + + +class Settings(BaseSettings): + """ + Settings class for the application. Pulls the settings + from the Docker container environment variables + """ + + # Request Store settings + request_store_type: RequestStoreType = RequestStoreType.MEMORY + + # In memory settings + request_store_max_size: int = 1000000 + request_store_ttl_seconds: Optional[int] = 600 + + # Resource Manager settings, in bytes + gpu_memory_limit: float = 24.0 * 10**9 + cpu_memory_limit: float = 32.0 * 10**9 + + # Workflow Executor settings + max_workers: int = 5 + + # Service settings + service_name: str = "remote-vector-index-builder-api" + log_level: str = "INFO" diff --git a/remote_vector_index_builder/app/base/exceptions.py b/remote_vector_index_builder/app/base/exceptions.py new file mode 100644 index 0000000..7ebec7f --- /dev/null +++ b/remote_vector_index_builder/app/base/exceptions.py @@ -0,0 +1,22 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +class ApiError(Exception): + """Base exception for api errors""" + + pass + + +class HashCollisionError(ApiError): + """Raised when there's a hash collision in the Request Store""" + + pass + + +class CapacityError(ApiError): + """Raised when the worker does not have enough capacity to fulfill the request""" + + pass diff --git a/remote_vector_index_builder/app/base/resources.py b/remote_vector_index_builder/app/base/resources.py new file mode 100644 index 0000000..ee08029 --- /dev/null +++ b/remote_vector_index_builder/app/base/resources.py @@ -0,0 +1,107 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import threading + + +class ResourceManager: + """ + A thread-safe resource manager that tracks and manages GPU and CPU memory allocations. + + This class provides mechanisms to safely allocate and release memory resources + in a multi-threaded environment, ensuring that memory usage doesn't exceed + the specified limits. + + Attributes: + _total_gpu_memory (float): Total available GPU memory in bytes + _total_cpu_memory (float): Total available CPU memory in bytes + _available_gpu_memory (float): Currently available GPU memory in bytes + _available_cpu_memory (float): Currently available CPU memory in bytes + _lock (threading.Lock): Thread lock for synchronization + """ + + def __init__(self, total_gpu_memory: float, total_cpu_memory: float): + """ + Initialize the ResourceManager with specified GPU and CPU memory limits. + + Args: + total_gpu_memory (float): Total GPU memory available for allocation, in bytes + total_cpu_memory (float): Total CPU memory available for allocation, in bytes + """ + self._total_gpu_memory = total_gpu_memory + self._total_cpu_memory = total_cpu_memory + self._available_gpu_memory = total_gpu_memory + self._available_cpu_memory = total_cpu_memory + self._lock = threading.Lock() + + # TODO: separate this function into CPU and GPU specific allocation checks + def can_allocate(self, gpu_memory: float, cpu_memory: float) -> bool: + """ + Check if the requested amount of GPU and CPU memory can be allocated. + + Args: + gpu_memory (float): Amount of GPU memory requested, in bytes + cpu_memory (float): Amount of CPU memory requested, in bytes + + Returns: + bool: True if the requested memory can be allocated, False otherwise + """ + with self._lock: + return ( + self._available_gpu_memory >= gpu_memory + and self._available_cpu_memory >= cpu_memory + ) + + def allocate(self, gpu_memory: float, cpu_memory: float) -> bool: + """ + Attempt to allocate the specified amount of GPU and CPU memory. + + Args: + gpu_memory (float): Amount of GPU memory to allocate, in bytes + cpu_memory (float): Amount of CPU memory to allocate, in bytes + + Returns: + bool: True if allocation was successful, False if insufficient resources + """ + if not self.can_allocate(gpu_memory, cpu_memory): + return False + with self._lock: + self._available_gpu_memory -= gpu_memory + self._available_cpu_memory -= cpu_memory + return True + + def release(self, gpu_memory: float, cpu_memory: float) -> None: + """ + Release previously allocated GPU and CPU memory back to the pool. + + Args: + gpu_memory (float): Amount of GPU memory to release, in bytes + cpu_memory (float): Amount of CPU memory to release, in bytes + """ + with self._lock: + self._available_gpu_memory += gpu_memory + self._available_cpu_memory += cpu_memory + + def get_available_gpu_memory(self) -> float: + """ + Get the current amount of available GPU memory. + + Returns: + float: Amount of available GPU memory in bytes + """ + with self._lock: + return self._available_gpu_memory + + def get_available_cpu_memory(self) -> float: + """ + Get the current amount of available GPU memory. + + Returns: + float: Amount of available GPU memory in bytes + """ + with self._lock: + return self._available_cpu_memory diff --git a/remote_vector_index_builder/app/executors/__init__.py b/remote_vector_index_builder/app/executors/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/executors/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/executors/workflow_executor.py b/remote_vector_index_builder/app/executors/workflow_executor.py new file mode 100644 index 0000000..e76f308 --- /dev/null +++ b/remote_vector_index_builder/app/executors/workflow_executor.py @@ -0,0 +1,141 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from concurrent.futures import ThreadPoolExecutor +import logging +from typing import Optional, Callable +from app.models.workflow import BuildWorkflow +from app.base.resources import ResourceManager +from app.storage.base import RequestStore +from app.models.job import JobStatus + +logger = logging.getLogger(__name__) + + +class WorkflowExecutor: + """ + Executes build workflows in a thread pool while managing system resources. + + This class handles the concurrent execution of build workflows, managing thread pools, + and coordinating resource allocation for index building operations. + + Attributes: + _executor (ThreadPoolExecutor): Thread pool for executing concurrent workflows + _request_store (RequestStore): Interface for storing request data + _resource_manager (ResourceManager): Manager for system resource allocation + _build_index_fn (Callable): Function that performs the actual index building + """ + + def __init__( + self, + max_workers: int, + request_store: RequestStore, + resource_manager: ResourceManager, + build_index_fn: Callable[ + [BuildWorkflow], tuple[bool, Optional[str], Optional[str]] + ], + ): + """ + Initialize the WorkflowExecutor with specified parameters. + + Args: + max_workers (int): Maximum number of concurrent worker threads + request_store (RequestStore): Interface for storing request data + resource_manager (ResourceManager): Manager for system resources + build_index_fn (Callable): Function that builds the index, returns a tuple of + (success: bool, error_message: Optional[str], result: Optional[str]) + """ + self._executor = ThreadPoolExecutor(max_workers=max_workers) + self._request_store = request_store + self._resource_manager = resource_manager + self._build_index_fn = build_index_fn + + def submit_workflow(self, workflow: BuildWorkflow) -> None: + """ + Submit a workflow for execution in the thread pool. + + This method queues the workflow for execution. + The workflow will be executed asynchronously in a thread pool + + Args: + workflow (BuildWorkflow): The workflow to be executed + + """ + + # Submit the workflow to thread pool + self._executor.submit(self._execute_workflow, workflow) + + def _execute_workflow(self, workflow: BuildWorkflow) -> None: + """ + Execute the workflow and handle results. + + This method handles the actual execution of the build workflow, including: + - Executing the build process + - Updating the job status in the request store + - Logging the execution status + + Args: + workflow (BuildWorkflow): The workflow to execute containing job parameters + and resource requirements + + Note: + This method is intended to be run in a separate thread. + """ + try: + logger.info(f"Starting execution of job {workflow.job_id}") + + # Job may have been deleted by request store TTL + if not self._request_store.get(workflow.job_id): + logger.info(f"Job {workflow.job_id} was deleted before execution") + return + + # Execute the build + success, index_path, msg = self._build_index_fn(workflow) + + # Job may have been deleted by request store TTL, so we need to check if job + # still exists before updating status. + if self._request_store.get(workflow.job_id): + status = JobStatus.COMPLETED if success else JobStatus.FAILED + self._request_store.update( + workflow.job_id, + {"status": status, "file_name": index_path, "error_message": msg}, + ) + + logger.info( + f"Job {workflow.job_id} completed with status: {status}, index path: " + f"{index_path}, and error message: {msg}" + ) + else: + logger.error( + f"[ERROR] Job {workflow.job_id} was deleted during execution" + ) + + except Exception as e: + logger.error(f"Build process failed for job {workflow.job_id}: {str(e)}") + self._request_store.update( + workflow.job_id, + {"status": JobStatus.FAILED, "error_message": str(e)}, + ) + finally: + # Release resources + self._resource_manager.release( + workflow.gpu_memory_required, workflow.cpu_memory_required + ) + + def shutdown(self) -> None: + """ + Shutdown the executor and wait for all pending tasks to complete. + + This method initiates a graceful shutdown of the thread pool executor. + It blocks until all pending tasks are completed and releases all resources. + No new tasks will be accepted after calling this method. + + Note: + - This is a blocking call that waits for all tasks to finish + - The executor cannot be reused after shutdown + """ + self._executor.shutdown(wait=True) diff --git a/remote_vector_index_builder/app/main.py b/remote_vector_index_builder/app/main.py new file mode 100644 index 0000000..1784247 --- /dev/null +++ b/remote_vector_index_builder/app/main.py @@ -0,0 +1,116 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +""" +Remote Vector Index Builder API Service + +This module serves as the main entry point for the Remote Vector Index Builder FastAPI application. +It initializes the FastAPI application, configures logging, and sets up the request store +for handling vector index building operations. + +The service provides endpoints for building and monitoring the status of vector indices +, managing the workflow execution and job services. + +Components: + - FastAPI application setup and configuration + - Request validation and error handling + - Resource management + - Workflow execution + - Index building services + - Job status tracking + - Request storage management + +Environment Configuration: + The application uses Settings from app.base.config for configuration management + and can be customized through environment variables. + +Dependencies: + - FastAPI: Web framework for building APIs + - app.base.config: Application configuration + - app.base.resources: Resource management + - app.services: Core service implementations + - app.storage: Storage implementations + - app.utils: Utility functions and logging +""" +from app.routes import build, status +from fastapi import FastAPI, Request +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse + +from app.base.config import Settings +from app.base.resources import ResourceManager +from app.executors.workflow_executor import WorkflowExecutor +from app.services.index_builder import IndexBuilder +from app.services.job_service import JobService +from app.storage.factory import RequestStoreFactory +from app.utils.logging_config import configure_logging +from contextlib import asynccontextmanager + +from app.utils.error_message import get_field_path + +import logging + +settings = Settings() + +configure_logging(settings.log_level) + +logger = logging.getLogger(__name__) + +request_store = RequestStoreFactory.create( + store_type=settings.request_store_type, settings=settings +) + +resource_manager = ResourceManager( + total_gpu_memory=settings.gpu_memory_limit, + total_cpu_memory=settings.cpu_memory_limit, +) + +index_builder = IndexBuilder() + +workflow_executor = WorkflowExecutor( + max_workers=settings.max_workers, + request_store=request_store, + resource_manager=resource_manager, + build_index_fn=index_builder.build_index, +) + +job_service = JobService( + request_store=request_store, + resource_manager=resource_manager, + workflow_executor=workflow_executor, + total_gpu_memory=settings.gpu_memory_limit, + total_cpu_memory=settings.cpu_memory_limit, +) + +app = FastAPI(title=settings.service_name) + +app.state.job_service = job_service + + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + logger.info("Shutting down application ...") + workflow_executor.shutdown() + + +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + errors = [] + for error in exc.errors(): + field_path = get_field_path(error["loc"]) + errors.append( + {"field": field_path, "message": error["msg"], "type": error["type"]} + ) + + logger.info(f"Error while validating parameters: #{errors}") + return JSONResponse( + status_code=422, content={"detail": "Validation Error", "errors": errors} + ) + + +app.include_router(build.router) +app.include_router(status.router) diff --git a/remote_vector_index_builder/app/models/__init__.py b/remote_vector_index_builder/app/models/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/models/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/models/job.py b/remote_vector_index_builder/app/models/job.py new file mode 100644 index 0000000..53374f0 --- /dev/null +++ b/remote_vector_index_builder/app/models/job.py @@ -0,0 +1,61 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from enum import Enum +from pydantic import BaseModel +from app.models.request import RequestParameters +from typing import Optional + + +class JobStatus(str, Enum): + """ + Enumeration of possible job status states. + + Attributes: + RUNNING (str): Indicates the index build is currently in progress + FAILED (str): Indicates the index build failed to complete + COMPLETED (str): Indicates the index build completed successfully + """ + + RUNNING = "RUNNING_INDEX_BUILD" + FAILED = "FAILED_INDEX_BUILD" + COMPLETED = "COMPLETED_INDEX_BUILD" + + +class Job(BaseModel): + """ + Represents a job in the remote vector index building system. + + This class tracks the state and parameters of an index building job, + including its status, associated request parameters, and any error information. + + Attributes: + id (str): Unique identifier for the job + status (JobStatus): Current status of the job + request_parameters (RequestParameters): Parameters specified in the original request + file_name (Optional[str]): Name of the output file, if any + error_message (Optional[str]): Error message if the job failed + """ + + id: str + status: JobStatus + request_parameters: RequestParameters + file_name: Optional[str] = None + error_message: Optional[str] = None + + def compare_request_parameters(self, other: RequestParameters) -> bool: + """ + Compare this job's request parameters with another set of parameters. + + This method is used to check if a new request matches an existing job's parameters. + + Args: + other (RequestParameters): The request parameters to compare against + + Returns: + bool: True if the parameters match, False otherwise + """ + return self.request_parameters == other diff --git a/remote_vector_index_builder/app/models/request.py b/remote_vector_index_builder/app/models/request.py new file mode 100644 index 0000000..1189617 --- /dev/null +++ b/remote_vector_index_builder/app/models/request.py @@ -0,0 +1,48 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from pydantic import BaseModel + + +class RequestParameters(BaseModel): + """ + Model representing the parameters required for a vector index build request. + + This class validates and stores the essential parameters needed to process + a vector index building request, including the path to the vector data and + the tenant identification. + + Attributes: + vector_path (str): Path to the vector data file or resource + tenant_id (str): Unique identifier for the tenant making the request + """ + + vector_path: str + tenant_id: str + + def __str__(self): + """ + Create a string representation of the request parameters. + + Returns: + str: A string in the format "{vector_path}-{tenant_id}" + """ + return f"{self.vector_path}-{self.tenant_id}" + + def __eq__(self, other): + """ + Compare this RequestParameters instance with another object. + + Args: + other: The object to compare with this instance + + Returns: + bool: True if the other object is a RequestParameters instance + with the same string representation, False otherwise + """ + if not isinstance(other, RequestParameters): + return False + return str(self) == str(other) diff --git a/remote_vector_index_builder/app/models/workflow.py b/remote_vector_index_builder/app/models/workflow.py new file mode 100644 index 0000000..22fda13 --- /dev/null +++ b/remote_vector_index_builder/app/models/workflow.py @@ -0,0 +1,30 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from pydantic import BaseModel +from core.common.models.index_build_parameters import IndexBuildParameters + + +class BuildWorkflow(BaseModel): + """ + Represents a workflow for building a vector index with specified resource requirements. + + This class encapsulates all necessary information for executing a vector index build, + including job identification, resource requirements, and build parameters. + + Attributes: + job_id (str): Unique identifier for the build job + gpu_memory_required (float): Amount of GPU memory required for the build process in bytes + cpu_memory_required (float): Amount of CPU memory required for the build process in bytes + index_build_parameters (IndexBuildParameters): Parameters specifying how to build the index + + """ + + job_id: str + gpu_memory_required: float + cpu_memory_required: float + index_build_parameters: IndexBuildParameters diff --git a/remote_vector_index_builder/app/requirements.txt b/remote_vector_index_builder/app/requirements.txt new file mode 100644 index 0000000..f8f3751 --- /dev/null +++ b/remote_vector_index_builder/app/requirements.txt @@ -0,0 +1,3 @@ +fastapi[standard]==0.115.8 +pydantic==2.10.6 +pydantic-settings==2.7.1 \ No newline at end of file diff --git a/remote_vector_index_builder/app/routes/__init__.py b/remote_vector_index_builder/app/routes/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/routes/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/routes/build.py b/remote_vector_index_builder/app/routes/build.py new file mode 100644 index 0000000..0bb066a --- /dev/null +++ b/remote_vector_index_builder/app/routes/build.py @@ -0,0 +1,48 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from app.base.exceptions import HashCollisionError, CapacityError +from fastapi import APIRouter, HTTPException, Request +from app.schemas.api import CreateJobResponse +from core.common.models import IndexBuildParameters + +import logging + +router = APIRouter() +logger = logging.getLogger(__name__) + + +@router.post("/_build") +def create_job( + index_build_parameters: IndexBuildParameters, request: Request +) -> CreateJobResponse: + """ + Create a new index build job with the provided parameters. + + This endpoint initiates a new job for building an index based on the provided + parameters. It handles job creation through the job service and manages + potential error scenarios. + + Args: + index_build_parameters (IndexBuildParameters): Parameters for the index build job + request (Request): FastAPI request object containing application state + + Returns: + CreateJobResponse: Response object containing the created job ID + + Raises: + HTTPException: + - 429 status code if a hash collision occurs + - 507 status code if system memory capacity is exceeded + """ + try: + job_service = request.app.state.job_service + job_id = job_service.create_job(index_build_parameters) + except HashCollisionError as e: + raise HTTPException(status_code=429, detail=str(e)) + except CapacityError as e: + raise HTTPException(status_code=507, detail=str(e)) + return CreateJobResponse(job_id=job_id) diff --git a/remote_vector_index_builder/app/routes/status.py b/remote_vector_index_builder/app/routes/status.py new file mode 100644 index 0000000..330741a --- /dev/null +++ b/remote_vector_index_builder/app/routes/status.py @@ -0,0 +1,44 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from fastapi import APIRouter, HTTPException, Request +from app.schemas.api import GetStatusResponse + +router = APIRouter() + + +@router.get("/_status/{job_id}") +def get_status(job_id: str, request: Request) -> GetStatusResponse: + """ + Retrieve the status of a specific job by its ID. + + Args: + job_id (str): The unique identifier of the job. + request (Request): The FastAPI request object containing the application state. + + Returns: + GetStatusResponse: A response object containing the job status and optional fields: + - task_status: Current status of the job + - file_name: Name of the file being processed (if available) + - error_message: Error message if job failed (if available) + + Raises: + HTTPException: 404 error if the job is not found + """ + job_service = request.app.state.job_service + job = job_service.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + response_data = {"task_status": job.status} + + if hasattr(job, "file_name") and job.file_name is not None: + response_data["file_name"] = job.file_name + + if hasattr(job, "error_message") and job.error_message is not None: + response_data["error_message"] = job.error_message + + return GetStatusResponse(**response_data) diff --git a/remote_vector_index_builder/app/schemas/__init__.py b/remote_vector_index_builder/app/schemas/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/schemas/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/schemas/api.py b/remote_vector_index_builder/app/schemas/api.py new file mode 100644 index 0000000..faaebc5 --- /dev/null +++ b/remote_vector_index_builder/app/schemas/api.py @@ -0,0 +1,37 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from pydantic import BaseModel +from typing import Optional + +from app.models.job import JobStatus + + +class CreateJobResponse(BaseModel): + """ + Response model for job creation endpoint. + + Attributes: + job_id (str): Unique identifier for the created job. + """ + + job_id: str + + +class GetStatusResponse(BaseModel): + """ + Response model for retrieving job status. + + Attributes: + task_status (JobStatus): Current status of the task. + file_name (Optional[str]): Name of the file uploaded to remote storage, if present + error_message (Optional[str]): Error message if task encountered an error. + """ + + task_status: JobStatus + file_name: Optional[str] = None + error_message: Optional[str] = None diff --git a/remote_vector_index_builder/app/services/__init__.py b/remote_vector_index_builder/app/services/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/services/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/services/index_builder.py b/remote_vector_index_builder/app/services/index_builder.py new file mode 100644 index 0000000..d50d0d9 --- /dev/null +++ b/remote_vector_index_builder/app/services/index_builder.py @@ -0,0 +1,39 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import logging +from typing import Optional, Tuple +from app.models.workflow import BuildWorkflow +from core.tasks import run_tasks + +logger = logging.getLogger(__name__) + + +class IndexBuilder: + """ + Handles the building of indexes based on provided workflows. + """ + + def build_index( + self, workflow: BuildWorkflow + ) -> Tuple[bool, Optional[str], Optional[str]]: + """ + Builds the index for the given workflow, using the run_tasks function + + Args: + workflow (BuildWorkflow): Workflow containing index build parameters + + Returns: + Tuple[bool, Optional[str], Optional[str]]: A tuple containing: + - Success status (True/False) + - Index path if successful, None otherwise + - Error message if failed, None otherwise + """ + result = run_tasks(workflow.index_build_parameters) + if not result.file_name: + return False, None, result.error + return True, result.file_name, None diff --git a/remote_vector_index_builder/app/services/job_service.py b/remote_vector_index_builder/app/services/job_service.py new file mode 100644 index 0000000..5557d80 --- /dev/null +++ b/remote_vector_index_builder/app/services/job_service.py @@ -0,0 +1,236 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from typing import Optional +from app.base.exceptions import HashCollisionError, CapacityError +from app.base.resources import ResourceManager +from app.executors.workflow_executor import WorkflowExecutor +from app.models.job import Job, JobStatus +from app.models.request import RequestParameters +from app.models.workflow import BuildWorkflow +from app.utils.hash import generate_job_id +from app.utils.memory import calculate_memory_requirements +from app.utils.request import create_request_parameters +from app.storage.base import RequestStore +from core.common.models import IndexBuildParameters + +import logging + +logger = logging.getLogger(__name__) + + +class JobService: + """ + Service class for managing job operations including creation, validation, and resource management. + + This service handles job lifecycle operations, resource allocation, and workflow execution + while maintaining state in the request store. + + Attributes: + request_store (RequestStore): Store for persisting job requests and their states + workflow_executor (WorkflowExecutor): Executor for handling workflow operations + total_gpu_memory (float): Total available GPU memory for job allocation, in bytes + total_cpu_memory (float): Total available CPU memory for job allocation, in bytes + resource_manager (ResourceManager): Manager for handling system resources + """ + + def __init__( + self, + request_store: RequestStore, + workflow_executor: WorkflowExecutor, + resource_manager: ResourceManager, + total_gpu_memory: float, + total_cpu_memory: float, + ): + """ + Initialize the JobService with required dependencies. + + Args: + request_store (RequestStore): Store for managing job requests + workflow_executor (WorkflowExecutor): Executor for workflow operations + resource_manager (ResourceManager): Manager for system resources + total_gpu_memory (float): Total GPU memory available, in bytes + total_cpu_memory (float): Total CPU memory available, in bytes + """ + self.request_store = request_store + self.workflow_executor = workflow_executor + self.total_gpu_memory = total_gpu_memory + self.total_cpu_memory = total_cpu_memory + self.resource_manager = resource_manager + + def _validate_job_existence( + self, job_id: str, request_parameters: RequestParameters + ) -> bool: + """ + Validate if a job exists and check for hash collisions. + + Args: + job_id (str): Unique identifier for the job + request_parameters (RequestParameters): Parameters of the request to validate + + Returns: + bool: True if job exists with matching parameters, False otherwise + + Raises: + HashCollisionError: If job exists but with different parameters + """ + job = self.request_store.get(job_id) + if job: + if job.compare_request_parameters(request_parameters): + return True + raise HashCollisionError(f"Hash collision detected for job_id: {job_id}") + return False + + def _get_required_resources( + self, index_build_parameters: IndexBuildParameters + ) -> tuple[float, float]: + """ + Calculate required GPU and CPU memory resources for a job. + + Args: + index_build_parameters (IndexBuildParameters): Parameters for building the index. + Contains dimension, doc count, data type, m - the parameters needed to calculate memory + + Returns: + tuple[float, float]: Required GPU and CPU memory (in bytes) + """ + gpu_mem, cpu_mem = calculate_memory_requirements(index_build_parameters) + + logger.info( + f"Job id requirements: GPU memory: {gpu_mem}, CPU memory: {cpu_mem}" + ) + if not self.resource_manager.can_allocate(gpu_mem, cpu_mem): + raise CapacityError( + "Insufficient available GPU and CPU resources to process job" + ) + + return gpu_mem, cpu_mem + + def _add_to_request_store( + self, job_id: str, request_parameters: RequestParameters + ) -> None: + """ + Add a new job to the request store with initial running status. + + Args: + job_id (str): Unique identifier for the job + request_parameters (RequestParameters): Parameters of the job request + + Raises: + CapacityError: If the job cannot be added to the request store + """ + result = self.request_store.add( + job_id, + Job( + id=job_id, + status=JobStatus.RUNNING, + request_parameters=request_parameters, + ), + ) + + if not result: + raise CapacityError("Could not add item to request store") + + def _create_workflow( + self, + job_id: str, + gpu_mem: float, + cpu_mem: float, + index_build_parameters: IndexBuildParameters, + ) -> BuildWorkflow: + """ + Create a new build workflow with the specified parameters. + + Args: + job_id (str): Unique identifier for the job + gpu_mem (float): Required GPU memory for the job, in bytes + cpu_mem (float): Required CPU memory for the job, in bytes + index_build_parameters (IndexBuildParameters): Parameters for building the index + + Returns: + BuildWorkflow: Configured workflow instance ready for execution + Raises: + CapacityError: If the service does not have enough GPU or CPU memory to process the job + """ + workflow = BuildWorkflow( + job_id=job_id, + gpu_memory_required=gpu_mem, + cpu_memory_required=cpu_mem, + index_build_parameters=index_build_parameters, + ) + + # Allocate resources + allocation_success = self.resource_manager.allocate( + workflow.gpu_memory_required, workflow.cpu_memory_required + ) + + if not allocation_success: + self.request_store.delete(job_id) + raise CapacityError( + f"Insufficient available resources to process workflow {workflow.job_id}" + ) + + return workflow + + def create_job(self, index_build_parameters: IndexBuildParameters) -> str: + """ + Creates and initiates a new index building job. + + This method handles the complete job creation workflow including: + - Generating and validating job ID + - Checking for existing jobs + - Calculating required resources + - Creating and submitting the workflow + + Args: + index_build_parameters (IndexBuildParameters): Parameters for building the index, + including dimensions, document count, and algorithm settings + + Returns: + str: Unique job identifier for the created job + + Raises: + CapacityError: If there are insufficient resources to process the job + HashCollisionError: If a job exists with same ID but different parameters + """ + # Create parameters and validate job + request_parameters = create_request_parameters(index_build_parameters) + job_id = generate_job_id(request_parameters) + job_exists = self._validate_job_existence(job_id, request_parameters) + if job_exists: + logger.info(f"Job with id {job_id} already exists") + return job_id + + gpu_mem, cpu_mem = self._get_required_resources(index_build_parameters) + + self._add_to_request_store(job_id, request_parameters) + logger.info(f"Added job to request store with job id: {job_id}") + + workflow = self._create_workflow( + job_id, gpu_mem, cpu_mem, index_build_parameters + ) + logger.info( + f"Worker resource status for job id {job_id}: - " + f"GPU: {self.resource_manager.get_available_gpu_memory():,} bytes, " + f"CPU: {self.resource_manager.get_available_cpu_memory():,} bytes" + ) + self.workflow_executor.submit_workflow(workflow) + logger.info(f"Successfully created workflow with job id: {job_id}") + + return job_id + + def get_job(self, job_id: str) -> Optional[Job]: + """ + Retrieves a job by its unique identifier. + + Args: + job_id (str): Unique identifier of the job to retrieve + + Returns: + Optional[Job]: The job object if found, None otherwise + """ + return self.request_store.get(job_id) diff --git a/remote_vector_index_builder/app/storage/__init__.py b/remote_vector_index_builder/app/storage/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/storage/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/storage/base.py b/remote_vector_index_builder/app/storage/base.py new file mode 100644 index 0000000..55c3cb7 --- /dev/null +++ b/remote_vector_index_builder/app/storage/base.py @@ -0,0 +1,75 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any +from app.models.job import Job + + +class RequestStore(ABC): + """ + Abstract base class defining the interface for job request storage operations. + + This class provides the contract for implementing storage operations for job requests, + including adding, retrieving, updating, and deleting jobs, as well as cleaning up + expired entries. + """ + + @abstractmethod + def add(self, job_id: str, job: Job) -> bool: + """ + Add a job to the store. + + Args: + job_id (str): Unique identifier for the job + job (Job): Job object containing the job details + + Returns: + bool: True if addition was successful, False otherwise + """ + """Add a job to the store""" + return True + + @abstractmethod + def get(self, job_id: str) -> Optional[Job]: + """ + Retrieve a job from the store. + + Args: + job_id (str): Unique identifier of the job to retrieve + + Returns: + Optional[Job]: The job if found, None otherwise + """ + pass + + @abstractmethod + def update(self, job_id: str, data: Dict[str, Any]) -> bool: + """ + Update a job in the store. + + Args: + job_id (str): Unique identifier of the job to update + data (Dict[str, Any]): Dictionary containing the fields to update + + Returns: + bool: True if update was successful, False otherwise + """ + return True + + @abstractmethod + def delete(self, job_id: str) -> bool: + """ + Delete a job from the store. + + Args: + job_id (str): Unique identifier of the job to delete + + Returns: + bool: True if deletion was successful, False otherwise + """ + return True diff --git a/remote_vector_index_builder/app/storage/factory.py b/remote_vector_index_builder/app/storage/factory.py new file mode 100644 index 0000000..104d72a --- /dev/null +++ b/remote_vector_index_builder/app/storage/factory.py @@ -0,0 +1,40 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from app.base.config import Settings +from app.storage.base import RequestStore +from app.storage.memory import InMemoryRequestStore +from app.storage.types import RequestStoreType + + +class RequestStoreFactory: + """ + Factory class for creating RequestStore instances. + + This class provides a static factory method to create different types of + RequestStore implementations based on the specified store type. + """ + + @staticmethod + def create(store_type: RequestStoreType, settings: Settings) -> RequestStore: + """ + Creates and returns a RequestStore instance based on the specified type. + + Args: + store_type (RequestStoreType): The type of request store to create + settings (Settings): Configuration settings for the request store + + Returns: + RequestStore: An instance of the specified RequestStore implementation + + Raises: + ValueError: If the specified store type is not supported + """ + if store_type == RequestStoreType.MEMORY: + return InMemoryRequestStore(settings) + else: + raise ValueError(f"Unsupported request store type: {store_type}") diff --git a/remote_vector_index_builder/app/storage/memory.py b/remote_vector_index_builder/app/storage/memory.py new file mode 100644 index 0000000..e5a696b --- /dev/null +++ b/remote_vector_index_builder/app/storage/memory.py @@ -0,0 +1,159 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from typing import Dict, Optional, Any +from datetime import datetime, timedelta, timezone +import threading +import time + +from app.models.job import Job +from app.storage.base import RequestStore +from app.base.config import Settings + +import logging + +logger = logging.getLogger(__name__) + + +class InMemoryRequestStore(RequestStore): + """ + In-memory implementation of the RequestStore interface. + + This class provides a thread-safe, in-memory storage solution for job requests + with configurable maximum size and TTL-based cleanup capabilities. + + Attributes: + _store (Dict[str, tuple[Job, datetime]]): Internal dictionary storing jobs and their timestamps + _lock (threading.Lock): Thread lock for synchronization + _max_size (int): Maximum number of jobs that can be stored + _ttl_seconds (int): Time-to-live in seconds for stored jobs + """ + + def __init__(self, settings: Settings): + """ + Initialize the in-memory request store. + + Args: + settings (Settings): Configuration settings containing request_store_max_size + and request_store_ttl_seconds + """ + self._store: Dict[str, tuple[Job, datetime]] = {} + self._lock = threading.Lock() + self._max_size = settings.request_store_max_size + self._ttl_seconds = settings.request_store_ttl_seconds + + # Start cleanup thread, TTL seconds is not None + if self._ttl_seconds is not None: + logger.info( + f"Starting cleanup thread for request store with TTL {self._ttl_seconds} seconds" + ) + self._cleanup_thread = threading.Thread( + target=self._cleanup_loop, daemon=True + ) + self._cleanup_thread.start() + + def add(self, job_id: str, job: Job) -> bool: + """ + Add a new job to the store with current timestamp. + + Args: + job_id (str): Unique identifier for the job + job (Job): Job object to store + + Returns: + bool: True if job was added successfully, False if store is at capacity + """ + with self._lock: + if len(self._store) >= self._max_size: + return False + else: + self._store[job_id] = (job, datetime.now(timezone.utc)) + return True + + def get(self, job_id: str) -> Optional[Job]: + """ + Retrieve a job from the store. + + Args: + job_id (str): Unique identifier of the job to retrieve + + Returns: + Optional[Job]: The job if found and not expired, None otherwise + """ + with self._lock: + if job_id in self._store: + job, timestamp = self._store[job_id] + if self._ttl_seconds is None or ( + self._ttl_seconds + and datetime.now(timezone.utc) - timestamp + < timedelta(seconds=self._ttl_seconds) + ): + return job + else: + del self._store[job_id] + return None + + def update(self, job_id: str, data: Dict[str, Any]) -> bool: + """ + Update an existing job in the store with new data. + + Args: + job_id (str): Unique identifier of the job to update + data (Dict[str, Any]): Dictionary containing the fields and values to update + + Returns: + bool: True if job was found and updated successfully, False if job not found + """ + with self._lock: + if job_id not in self._store: + return False + + job, timestamp = self._store[job_id] + for key, value in data.items(): + setattr(job, key, value) + self._store[job_id] = (job, timestamp) + return True + + def delete(self, job_id: str) -> bool: + """ + Delete a job from the store. + + Args: + job_id (str): Unique identifier of the job to delete + + Returns: + bool: True if job was found and deleted successfully, False if job not found + """ + with self._lock: + if job_id in self._store: + del self._store[job_id] + return True + return False + + def cleanup_expired(self) -> None: + """ + Remove all expired entries from the store based on TTL. + Thread-safe implementation using the store's lock. + """ + with self._lock: + current_time = datetime.now(timezone.utc) + expiration_threshold = current_time - timedelta(seconds=self._ttl_seconds) + + self._store = { + job_id: data + for job_id, data in self._store.items() + if data[1] > expiration_threshold + } + + def _cleanup_loop(self) -> None: + """ + Background thread that periodically removes expired entries from the store. + Runs continuously while the store is active. + """ + while True: + time.sleep(5) # Run cleanup every 5 seconds + self.cleanup_expired() diff --git a/remote_vector_index_builder/app/storage/types.py b/remote_vector_index_builder/app/storage/types.py new file mode 100644 index 0000000..fceccc3 --- /dev/null +++ b/remote_vector_index_builder/app/storage/types.py @@ -0,0 +1,22 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from enum import Enum + + +class RequestStoreType(str, Enum): + """ + Enumeration of supported request store types. + + This enum defines the available storage backend types for the request store system. + Inherits from str and Enum to provide string-based enumeration values. + + Attributes: + MEMORY (str): In-memory storage backend, data is stored in application memory + """ + + MEMORY = "memory" diff --git a/remote_vector_index_builder/app/test_imports.py b/remote_vector_index_builder/app/test_imports.py new file mode 100644 index 0000000..989b99c --- /dev/null +++ b/remote_vector_index_builder/app/test_imports.py @@ -0,0 +1,23 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + + +def test_imports(): + try: + from app import main + from core.tasks import run_tasks + import faiss + + print("All imports successful!") + return 0 + except ImportError as e: + print(f"Import failed: {e}") + exit(1) + + +if __name__ == "__main__": + test_imports() diff --git a/remote_vector_index_builder/app/utils/__init__.py b/remote_vector_index_builder/app/utils/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/remote_vector_index_builder/app/utils/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/remote_vector_index_builder/app/utils/error_message.py b/remote_vector_index_builder/app/utils/error_message.py new file mode 100644 index 0000000..e6ac63f --- /dev/null +++ b/remote_vector_index_builder/app/utils/error_message.py @@ -0,0 +1,35 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from typing import Any + + +def get_field_path(location: tuple[Any, ...]) -> str: + """Convert a location tuple to a readable field path string. + This function is used to format errors thrown by + Pydantic input validation. + + Args: + location (tuple[Any, ...]): A tuple containing path elements, which can be + either integers (for array indices) or strings (for field names). + + Returns: + str: A formatted string representing the field path where: + - Integer elements are formatted as array indices (e.g., "[0]") + - String elements are joined with dots (e.g., "field.subfield") + - First string element appears without a leading dot + + """ + field_path = [] + for loc in location: + if isinstance(loc, int): + field_path.append(f"[{loc}]") + elif isinstance(loc, str): + if field_path: + field_path.append("." + loc) + else: + field_path.append(loc) + return "".join(field_path) diff --git a/remote_vector_index_builder/app/utils/hash.py b/remote_vector_index_builder/app/utils/hash.py new file mode 100644 index 0000000..dc6ac26 --- /dev/null +++ b/remote_vector_index_builder/app/utils/hash.py @@ -0,0 +1,28 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import hashlib +from app.models.job import RequestParameters + + +def generate_job_id(request_parameters: RequestParameters) -> str: + """Generate a unique hash-based job identifier from request parameters. + + This function creates a SHA-256 hash from the string representation of the + provided request parameters, ensuring a unique and consistent identifier + for each unique set of parameters. + + Args: + request_parameters (RequestParameters): The request parameters object + containing the job configuration details. + + Returns: + str: A 64-character hexadecimal string representing the SHA-256 hash + of the request parameters. + """ + combined = str(request_parameters).encode() + return hashlib.sha256(combined).hexdigest() diff --git a/remote_vector_index_builder/app/utils/logging_config.py b/remote_vector_index_builder/app/utils/logging_config.py new file mode 100644 index 0000000..cb48c87 --- /dev/null +++ b/remote_vector_index_builder/app/utils/logging_config.py @@ -0,0 +1,17 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import logging + + +def configure_logging(log_level): + # Configure logging + logging.basicConfig( + level=log_level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) diff --git a/remote_vector_index_builder/app/utils/memory.py b/remote_vector_index_builder/app/utils/memory.py new file mode 100644 index 0000000..a56dcf0 --- /dev/null +++ b/remote_vector_index_builder/app/utils/memory.py @@ -0,0 +1,49 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from core.common.models import IndexBuildParameters + + +def calculate_memory_requirements( + index_build_parameters: IndexBuildParameters, +) -> tuple[float, float]: + """ + Calculate GPU and CPU memory requirements for a vector workload. + + This function estimates the memory needed for processing vector operations, + taking into account the workload size and complexity. + + Note: + The memory estimations are specific to building a CAGRA index on GPUs, + and converting the CAGRA index to CPU compatible HNSW. The GPU memory calculation is a very rough estimate. + There is an open issue with NVIDIA on how to better calculate memory taken up + by a CAGRA index: https://github.com/rapidsai/cuvs/issues/566 + + Args: + index_build_parameters: Build parameters that contain value of doc count, dimensions, m, and vector data type + + Returns: + tuple[float, float]: A tuple containing: + - gpu_memory (float): Required GPU memory in bytes + - cpu_memory (float): Required CPU memory in bytes + """ + + m = index_build_parameters.index_parameters.algorithm_parameters.m + entry_size = index_build_parameters.data_type.get_size() + vector_dimensions = index_build_parameters.dimension + num_vectors = index_build_parameters.doc_count + + # Vector memory (same for both GPU and CPU) + vector_memory = vector_dimensions * num_vectors * entry_size + + # use formula to calculate memory taken up by index + index_gpu_memory = ( + (vector_dimensions * entry_size + m * 8) * 1.1 * num_vectors + ) * 1.5 + + index_cpu_memory = (vector_dimensions * entry_size + m * 8) * 1.1 * num_vectors + + return (index_gpu_memory + vector_memory), (index_cpu_memory + vector_memory) diff --git a/remote_vector_index_builder/app/utils/request.py b/remote_vector_index_builder/app/utils/request.py new file mode 100644 index 0000000..042908d --- /dev/null +++ b/remote_vector_index_builder/app/utils/request.py @@ -0,0 +1,33 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from core.common.models import IndexBuildParameters +from app.models.request import RequestParameters + + +def create_request_parameters( + index_build_parameters: IndexBuildParameters, +) -> RequestParameters: + """Create a RequestParameters object from IndexBuildParameters. + + This function transforms an IndexBuildParameters object into a RequestParameters + object, extracting and mapping the necessary fields for request processing. + The RequestParameters object is later used for generating a request hash + + Args: + index_build_parameters (IndexBuildParameters): The input parameters containing + unique request attributes (such as vector path) for index building. + + Returns: + RequestParameters: A new RequestParameters object containing the mapped + attributes from the input parameters. + + """ + return RequestParameters( + vector_path=index_build_parameters.vector_path, + tenant_id=index_build_parameters.tenant_id, + ) diff --git a/remote_vector_index_builder/core/common/models/index_build_parameters.py b/remote_vector_index_builder/core/common/models/index_build_parameters.py index f96feee..b4ca13c 100644 --- a/remote_vector_index_builder/core/common/models/index_build_parameters.py +++ b/remote_vector_index_builder/core/common/models/index_build_parameters.py @@ -23,6 +23,17 @@ class DataType(str, Enum): FLOAT = "float" + def get_size(self): + """Get the size of the data type in bytes. + + Returns: + int: The size of the data type in bytes. + """ + if self == DataType.FLOAT: + return 4 + else: + raise ValueError(f"Unsupported data type: {self}") + class SpaceType(str, Enum): """Distance method used for measuring vector similarities. diff --git a/setup.cfg b/setup.cfg index 493a4e6..6f468f2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,6 +8,7 @@ exclude = .git, __pycache__, remote_vector_index_builder/core/test_imports.py + remote_vector_index_builder/app/test_imports.py # Output formatting statistics = True diff --git a/test_remote_vector_index_builder/requirements.txt b/test_remote_vector_index_builder/requirements.txt index ee9a34e..2624620 100644 --- a/test_remote_vector_index_builder/requirements.txt +++ b/test_remote_vector_index_builder/requirements.txt @@ -2,4 +2,5 @@ black>=25.1.0,<26.0.0 flake8>=7.1.0,<8.0.0 isort>=6.0.0,<7.0.0 mypy>=1.15.0,<2.0.0 -pytest>=8.0.0,<9.0.0 \ No newline at end of file +pytest>=8.0.0,<9.0.0 +httpx>=0.28.0,<0.29.0 \ No newline at end of file diff --git a/test_remote_vector_index_builder/test_app/__init__.py b/test_remote_vector_index_builder/test_app/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/test_remote_vector_index_builder/test_app/conftest.py b/test_remote_vector_index_builder/test_app/conftest.py new file mode 100644 index 0000000..f7c5a94 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/conftest.py @@ -0,0 +1,32 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import pytest +from core.common.models.index_build_parameters import ( + AlgorithmParameters, + IndexBuildParameters, + IndexParameters, + SpaceType, +) + + +@pytest.fixture +def index_build_parameters(): + """Create sample IndexBuildParameters for testing""" + return IndexBuildParameters( + container_name="testbucket", + vector_path="test.knnvec", + doc_id_path="test_ids.txt", + dimension=3, + doc_count=2, + index_parameters=IndexParameters( + space_type=SpaceType.INNERPRODUCT, + algorithm_parameters=AlgorithmParameters( + ef_construction=200, ef_search=200 + ), + ), + ) diff --git a/test_remote_vector_index_builder/test_app/test_base/__init__.py b/test_remote_vector_index_builder/test_app/test_base/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_base/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/test_remote_vector_index_builder/test_app/test_base/test_resources.py b/test_remote_vector_index_builder/test_app/test_base/test_resources.py new file mode 100644 index 0000000..c27f54e --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_base/test_resources.py @@ -0,0 +1,115 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import pytest +from app.base.resources import ResourceManager +from threading import Thread + + +@pytest.fixture +def resource_manager(): + """Fixture to create a ResourceManager instance with 1GB GPU and CPU memory""" + total_gpu = 1024 * 1024 * 1024 # 1GB in bytes + total_cpu = 1024 * 1024 * 1024 + return ResourceManager(total_gpu, total_cpu), total_gpu, total_cpu + + +def test_initialization(resource_manager): + """Test proper initialization of ResourceManager""" + manager, total_gpu, total_cpu = resource_manager + assert manager.get_available_gpu_memory() == total_gpu + assert manager.get_available_cpu_memory() == total_cpu + + +def test_successful_allocation(resource_manager): + """Test successful memory allocation""" + manager, total_gpu, total_cpu = resource_manager + allocation_size = 512 * 1024 * 1024 # 512MB + + # Check if allocation is possible + assert manager.can_allocate(allocation_size, allocation_size) + + # Perform allocation + success = manager.allocate(allocation_size, allocation_size) + assert success + + # Verify remaining memory + assert manager.get_available_gpu_memory() == total_gpu - allocation_size + assert manager.get_available_cpu_memory() == total_cpu - allocation_size + + +def test_failed_allocation(resource_manager): + """Test allocation failure when requesting more than available memory""" + manager, total_gpu, _ = resource_manager + excessive_size = total_gpu + 1 + + # Check if allocation is possible + assert not manager.can_allocate(excessive_size, 0) + + # Attempt allocation + success = manager.allocate(excessive_size, 0) + assert not success + + # Verify memory hasn't changed + assert manager.get_available_gpu_memory() == total_gpu + + +def test_memory_release(resource_manager): + """Test memory release functionality""" + manager, total_gpu, total_cpu = resource_manager + allocation_size = 256 * 1024 * 1024 # 256MB + + # Allocate memory + manager.allocate(allocation_size, allocation_size) + + # Release memory + manager.release(allocation_size, allocation_size) + + # Verify all memory is available again + assert manager.get_available_gpu_memory() == total_gpu + assert manager.get_available_cpu_memory() == total_cpu + + +def test_multiple_allocations(resource_manager): + """Test multiple sequential allocations""" + manager, total_gpu, total_cpu = resource_manager + allocation_size = 256 * 1024 * 1024 # 256MB + + # Perform 3 allocations + for _ in range(3): + success = manager.allocate(allocation_size, allocation_size) + assert success + + # Verify remaining memory + expected_remaining = total_gpu - (3 * allocation_size) + assert manager.get_available_gpu_memory() == expected_remaining + assert manager.get_available_cpu_memory() == expected_remaining + + +def test_thread_safety(resource_manager): + """Test thread-safe operations""" + manager, total_gpu, total_cpu = resource_manager + allocation_size = 100 * 1024 * 1024 # 100MB + num_threads = 10 + + def allocate_and_release(): + manager.allocate(allocation_size, allocation_size) + manager.release(allocation_size, allocation_size) + + threads = [Thread(target=allocate_and_release) for _ in range(num_threads)] + + # Start all threads + for thread in threads: + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Verify final memory state + assert manager.get_available_gpu_memory() == total_gpu + assert manager.get_available_cpu_memory() == total_cpu diff --git a/test_remote_vector_index_builder/test_app/test_executors/__init__.py b/test_remote_vector_index_builder/test_app/test_executors/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_executors/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/test_remote_vector_index_builder/test_app/test_executors/test_workflow_executor.py b/test_remote_vector_index_builder/test_app/test_executors/test_workflow_executor.py new file mode 100644 index 0000000..30c4e8f --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_executors/test_workflow_executor.py @@ -0,0 +1,187 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +import pytest +from unittest.mock import Mock, patch +from concurrent.futures import ThreadPoolExecutor +from app.models.workflow import BuildWorkflow +from app.models.job import JobStatus +from app.executors.workflow_executor import WorkflowExecutor + + +@pytest.fixture +def mock_request_store(): + store = Mock() + store.get.return_value = True # Default to existing job + return store + + +@pytest.fixture +def mock_resource_manager(): + return Mock() + + +@pytest.fixture +def mock_build_index_fn(): + return Mock(return_value=(True, "test/path", None)) + + +@pytest.fixture +def workflow_executor(mock_request_store, mock_resource_manager, mock_build_index_fn): + return WorkflowExecutor( + max_workers=2, + request_store=mock_request_store, + resource_manager=mock_resource_manager, + build_index_fn=mock_build_index_fn, + ) + + +@pytest.fixture +def sample_workflow(): + workflow = Mock(spec=BuildWorkflow) + workflow.job_id = "test_job_123" + workflow.gpu_memory_required = 1000 + workflow.cpu_memory_required = 2000 + return workflow + + +def test_workflow_executor_initialization( + mock_request_store, mock_resource_manager, mock_build_index_fn +): + """Test proper initialization of WorkflowExecutor""" + executor = WorkflowExecutor( + max_workers=2, + request_store=mock_request_store, + resource_manager=mock_resource_manager, + build_index_fn=mock_build_index_fn, + ) + + assert isinstance(executor._executor, ThreadPoolExecutor) + assert executor._request_store == mock_request_store + assert executor._resource_manager == mock_resource_manager + assert executor._build_index_fn == mock_build_index_fn + + +def test_successful_workflow_execution( + workflow_executor, + sample_workflow, + mock_request_store, + mock_resource_manager, + mock_build_index_fn, +): + """Test successful execution of a workflow""" + mock_build_index_fn.return_value = (True, "/path/to/index", None) + + # Execute workflow + workflow_executor.submit_workflow(sample_workflow) + # Wait for execution to complete + workflow_executor._executor.shutdown(wait=True) + + # Verify build function was called + mock_build_index_fn.assert_called_once_with(sample_workflow) + + # Verify status update + mock_request_store.update.assert_called_with( + sample_workflow.job_id, + { + "status": JobStatus.COMPLETED, + "file_name": "/path/to/index", + "error_message": None, + }, + ) + + # Verify resource release + mock_resource_manager.release.assert_called_with( + sample_workflow.gpu_memory_required, sample_workflow.cpu_memory_required + ) + + +def test_failed_workflow_execution( + workflow_executor, + sample_workflow, + mock_request_store, + mock_resource_manager, + mock_build_index_fn, +): + """Test workflow execution with failure""" + error_message = "Build failed" + mock_build_index_fn.return_value = (False, None, error_message) + + workflow_executor.submit_workflow(sample_workflow) + workflow_executor._executor.shutdown(wait=True) + + mock_request_store.update.assert_called_with( + sample_workflow.job_id, + {"status": JobStatus.FAILED, "file_name": None, "error_message": error_message}, + ) + + +def test_deleted_job_before_execution( + workflow_executor, + sample_workflow, + mock_request_store, + mock_resource_manager, + mock_build_index_fn, +): + """Test handling of job that was deleted before execution""" + mock_request_store.get.return_value = False + + workflow_executor.submit_workflow(sample_workflow) + workflow_executor._executor.shutdown(wait=True) + + mock_build_index_fn.assert_not_called() + mock_request_store.update.assert_not_called() + + +def test_deleted_job_during_execution( + workflow_executor, + sample_workflow, + mock_request_store, + mock_resource_manager, + mock_build_index_fn, +): + """Test handling of job that was deleted during execution""" + # First get returns True, second returns False + mock_request_store.get.side_effect = [True, False] + + workflow_executor.submit_workflow(sample_workflow) + workflow_executor._executor.shutdown(wait=True) + + mock_build_index_fn.assert_called_once() + mock_request_store.update.assert_not_called() + + +def test_exception_during_execution( + workflow_executor, + sample_workflow, + mock_request_store, + mock_resource_manager, + mock_build_index_fn, +): + """Test handling of exceptions during execution""" + error_message = "Unexpected error" + mock_build_index_fn.side_effect = Exception(error_message) + + workflow_executor.submit_workflow(sample_workflow) + workflow_executor._executor.shutdown(wait=True) + + # mock_request_store.update.assert_called_with( + # sample_workflow.job_id, + # {"status": JobStatus.FAILED, "error_message": error_message} + # ) + mock_request_store.update.assert_called_once() + + # Verify resources are released even after exception + mock_resource_manager.release.assert_called_with( + sample_workflow.gpu_memory_required, sample_workflow.cpu_memory_required + ) + + +def test_shutdown(workflow_executor): + """Test executor shutdown""" + with patch.object(workflow_executor._executor, "shutdown") as mock_shutdown: + workflow_executor.shutdown() + mock_shutdown.assert_called_once_with(wait=True) diff --git a/test_remote_vector_index_builder/test_app/test_routes/__init__.py b/test_remote_vector_index_builder/test_app/test_routes/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_routes/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/test_remote_vector_index_builder/test_app/test_routes/test_build.py b/test_remote_vector_index_builder/test_app/test_routes/test_build.py new file mode 100644 index 0000000..472fdf0 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_routes/test_build.py @@ -0,0 +1,86 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from unittest.mock import Mock + +from app.base.exceptions import HashCollisionError, CapacityError +from app.routes import build + + +@pytest.fixture +def app(): + app = FastAPI() + app.include_router(build.router) + return app + + +@pytest.fixture +def client(app): + return TestClient(app) + + +@pytest.fixture +def mock_job_service(): + return Mock() + + +def test_create_job_success(client, mock_job_service, index_build_parameters): + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job_service.create_job.return_value = "test_job_id" + + # Act + response = client.post("/_build", json=index_build_parameters.model_dump()) + + # Assert + assert response.status_code == 200 + assert response.json() == {"job_id": "test_job_id"} + mock_job_service.create_job.assert_called_once() + + +def test_create_job_hash_collision(client, mock_job_service, index_build_parameters): + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job_service.create_job.side_effect = HashCollisionError( + "Hash collision occurred" + ) + + # Act + response = client.post("/_build", json=index_build_parameters.model_dump()) + + # Assert + assert response.status_code == 429 + assert response.json() == {"detail": "Hash collision occurred"} + + +def test_create_job_capacity_error(client, mock_job_service, index_build_parameters): + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job_service.create_job.side_effect = CapacityError("System capacity exceeded") + + # Act + response = client.post("/_build", json=index_build_parameters.model_dump()) + + # Assert + assert response.status_code == 507 + assert response.json() == {"detail": "System capacity exceeded"} + + +def test_create_job_unexpected_error(client, mock_job_service, index_build_parameters): + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job_service.create_job.side_effect = Exception("Unexpected error") + + with pytest.raises(Exception): + client.post("/_build", json=index_build_parameters.model_dump()) diff --git a/test_remote_vector_index_builder/test_app/test_routes/test_status.py b/test_remote_vector_index_builder/test_app/test_routes/test_status.py new file mode 100644 index 0000000..438dcd9 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_routes/test_status.py @@ -0,0 +1,138 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from unittest.mock import Mock +from app.routes import status +from app.models.job import JobStatus, Job + + +@pytest.fixture +def app(): + app = FastAPI() + app.include_router(status.router) + return app + + +@pytest.fixture +def client(app): + return TestClient(app) + + +@pytest.fixture +def mock_job_service(): + return Mock() + + +@pytest.fixture +def mock_job(): + job = Mock(spec=Job) + job.status = JobStatus.RUNNING + return job + + +def test_get_status_basic(client, mock_job_service, mock_job): + """Test basic status retrieval with only status field""" + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job_service.get_job.return_value = mock_job + job_id = "test_job_123" + + # Act + response = client.get(f"/_status/{job_id}") + + # Assert + assert response.status_code == 200 + assert response.json()["task_status"] == mock_job.status + assert response.json()["file_name"] is None + assert response.json()["error_message"] is None + mock_job_service.get_job.assert_called_once_with(job_id) + + +def test_get_status_with_filename(client, mock_job_service, mock_job): + """Test status retrieval with filename""" + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job.file_name = "test_file.index" + mock_job_service.get_job.return_value = mock_job + job_id = "test_job_123" + + # Act + response = client.get(f"/_status/{job_id}") + + # Assert + assert response.status_code == 200 + assert response.json()["task_status"] == mock_job.status + assert response.json()["file_name"] == mock_job.file_name + assert response.json()["error_message"] is None + + +def test_get_status_with_error(client, mock_job_service, mock_job): + """Test status retrieval with error message""" + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job.status = JobStatus.FAILED + mock_job.error_message = "Test error message" + mock_job_service.get_job.return_value = mock_job + job_id = "test_job_123" + + # Act + response = client.get(f"/_status/{job_id}") + + # Assert + assert response.status_code == 200 + assert response.json()["task_status"] == mock_job.status + assert response.json()["file_name"] is None + assert response.json()["error_message"] == mock_job.error_message + + +def test_get_status_with_all_fields(client, mock_job_service, mock_job): + """Test status retrieval with all optional fields""" + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job.status = JobStatus.COMPLETED + mock_job.file_name = "test_file.index" + mock_job.error_message = "Warning: something happened" + mock_job_service.get_job.return_value = mock_job + job_id = "test_job_123" + + # Act + response = client.get(f"/_status/{job_id}") + + # Assert + assert response.status_code == 200 + assert response.json()["task_status"] == mock_job.status + assert response.json()["file_name"] == mock_job.file_name + assert response.json()["error_message"] == mock_job.error_message + + +def test_get_status_job_not_found(client, mock_job_service): + """Test status retrieval for non-existent job""" + # Arrange + app = client.app + app.state.job_service = mock_job_service + mock_job_service.get_job.return_value = None + job_id = "nonexistent_job" + + # Act + response = client.get(f"/_status/{job_id}") + + # Assert + assert response.status_code == 404 + assert response.json() == {"detail": "Job not found"} diff --git a/test_remote_vector_index_builder/test_app/test_services/__init__.py b/test_remote_vector_index_builder/test_app/test_services/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_services/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/test_remote_vector_index_builder/test_app/test_services/test_index_builder.py b/test_remote_vector_index_builder/test_app/test_services/test_index_builder.py new file mode 100644 index 0000000..b25ccd5 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_services/test_index_builder.py @@ -0,0 +1,54 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +import pytest +from unittest.mock import Mock, patch +from app.models.workflow import BuildWorkflow +from app.services.index_builder import IndexBuilder + + +@pytest.fixture +def index_builder(): + return IndexBuilder() + + +@pytest.fixture +def mock_workflow(): + workflow = Mock(spec=BuildWorkflow) + workflow.index_build_parameters = {"param1": "value1", "param2": "value2"} + return workflow + + +def test_build_index_success(index_builder, mock_workflow): + """Test successful index building""" + with patch("app.services.index_builder.run_tasks") as mock_run_tasks: + mock_result = Mock() + mock_result.file_name = "/path/to/index" + mock_result.error = None + mock_run_tasks.return_value = mock_result + + success, path, error = index_builder.build_index(mock_workflow) + + assert success is True + assert path == "/path/to/index" + assert error is None + mock_run_tasks.assert_called_once_with(mock_workflow.index_build_parameters) + + +def test_build_index_failure(index_builder, mock_workflow): + """Test failed index building""" + with patch("app.services.index_builder.run_tasks") as mock_run_tasks: + mock_result = Mock() + mock_result.file_name = None + mock_result.error = "Build failed" + mock_run_tasks.return_value = mock_result + + success, path, error = index_builder.build_index(mock_workflow) + + assert success is False + assert path is None + assert error == "Build failed" + mock_run_tasks.assert_called_once_with(mock_workflow.index_build_parameters) diff --git a/test_remote_vector_index_builder/test_app/test_services/test_job_service.py b/test_remote_vector_index_builder/test_app/test_services/test_job_service.py new file mode 100644 index 0000000..a37b031 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_services/test_job_service.py @@ -0,0 +1,197 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import pytest +from unittest.mock import Mock, patch + +from app.base.exceptions import HashCollisionError, CapacityError +from app.models.job import Job +from app.models.workflow import BuildWorkflow +from app.services.job_service import JobService + + +@pytest.fixture +def request_store(): + mock = Mock() + mock.get.return_value = None + mock.add.return_value = True + mock.delete.return_value = True + return mock + + +@pytest.fixture +def workflow_executor(): + mock = Mock() + mock.submit_workflow.return_value = None + return mock + + +@pytest.fixture +def resource_manager(): + mock = Mock() + mock.can_allocate.return_value = True + mock.allocate.return_value = True + mock.get_available_gpu_memory.return_value = 1000 + mock.get_available_cpu_memory.return_value = 1000 + return mock + + +@pytest.fixture +def job_service(request_store, workflow_executor, resource_manager): + return JobService( + request_store=request_store, + workflow_executor=workflow_executor, + resource_manager=resource_manager, + total_gpu_memory=1000.0, + total_cpu_memory=1000.0, + ) + + +@pytest.fixture +def mock_request_parameters(): + return Mock() + + +@pytest.fixture +def mock_job(): + job = Mock(spec=Job) + job.compare_request_parameters.return_value = True + return job + + +def test_validate_job_existence_no_job(job_service, mock_request_parameters): + """Test job validation when job doesn't exist""" + assert ( + job_service._validate_job_existence("test_id", mock_request_parameters) is False + ) + + +def test_validate_job_existence_matching_job( + job_service, mock_request_parameters, mock_job +): + """Test job validation with existing matching job""" + job_service.request_store.get.return_value = mock_job + assert ( + job_service._validate_job_existence("test_id", mock_request_parameters) is True + ) + + +def test_validate_job_existence_hash_collision( + job_service, mock_request_parameters, mock_job +): + """Test job validation with hash collision""" + mock_job.compare_request_parameters.return_value = False + job_service.request_store.get.return_value = mock_job + + with pytest.raises(HashCollisionError): + job_service._validate_job_existence("test_id", mock_request_parameters) + + +@patch("app.services.job_service.calculate_memory_requirements") +def test_get_required_resources_success(mock_calc, job_service, index_build_parameters): + """Test successful resource requirements calculation""" + mock_calc.return_value = (100.0, 200.0) + gpu_mem, cpu_mem = job_service._get_required_resources(index_build_parameters) + assert gpu_mem == 100.0 + assert cpu_mem == 200.0 + + +@patch("app.services.job_service.calculate_memory_requirements") +def test_get_required_resources_insufficient( + mock_calc, job_service, index_build_parameters +): + """Test resource calculation with insufficient resources""" + mock_calc.return_value = (100.0, 200.0) + job_service.resource_manager.can_allocate.return_value = False + + with pytest.raises(CapacityError): + job_service._get_required_resources(index_build_parameters) + + +@patch("app.services.job_service.Job") +def test_add_to_request_store_success(job, job_service, mock_request_parameters): + """Test successful addition to request store""" + job_service._add_to_request_store("test_id", mock_request_parameters) + job_service.request_store.add.assert_called_once() + + +@patch("app.services.job_service.Job") +def test_add_to_request_store_failure(job, job_service, mock_request_parameters): + """Test failed addition to request store""" + job_service.request_store.add.return_value = False + + with pytest.raises(CapacityError): + job_service._add_to_request_store("test_id", mock_request_parameters) + + +def test_create_workflow_success(job_service, index_build_parameters): + """Test successful workflow creation""" + workflow = job_service._create_workflow( + "test_id", 100.0, 200.0, index_build_parameters + ) + assert isinstance(workflow, BuildWorkflow) + assert workflow.job_id == "test_id" + + +def test_create_workflow_allocation_failure(job_service, index_build_parameters): + """Test workflow creation with allocation failure""" + job_service.resource_manager.allocate.return_value = False + + with pytest.raises(CapacityError): + job_service._create_workflow("test_id", 100.0, 200.0, index_build_parameters) + + +@patch("app.services.job_service.create_request_parameters") +@patch("app.services.job_service.generate_job_id") +@patch("app.services.job_service.calculate_memory_requirements") +@patch("app.services.job_service.Job") +def test_create_job_success( + job, + mock_calc, + mock_generate_id, + mock_create_params, + job_service, + index_build_parameters, +): + """Test successful job creation""" + mock_calc.return_value = (100.0, 200.0) + mock_generate_id.return_value = "test_id" + mock_create_params.return_value = Mock() + + job_id = job_service.create_job(index_build_parameters) + + assert job_id == "test_id" + job_service.workflow_executor.submit_workflow.assert_called_once() + + +@patch("app.services.job_service.create_request_parameters") +@patch("app.services.job_service.generate_job_id") +def test_create_job_exists( + mock_generate_id, mock_create_params, job_service, index_build_parameters +): + """Test successful job creation""" + mock_generate_id.return_value = "test_id" + mock_create_params.return_value = Mock() + + job_service.request_store.get.return_value = Mock(spec=Job) + job_id = job_service.create_job(index_build_parameters) + + assert job_id == "test_id" + job_service.workflow_executor.submit_workflow.assert_not_called() + + +def test_get_job_exists(job_service, mock_job): + """Test retrieving existing job""" + job_service.request_store.get.return_value = mock_job + result = job_service.get_job("test_id") + assert result == mock_job + + +def test_get_job_not_exists(job_service): + """Test retrieving non-existent job""" + result = job_service.get_job("test_id") + assert result is None diff --git a/test_remote_vector_index_builder/test_app/test_storage/__init__.py b/test_remote_vector_index_builder/test_app/test_storage/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_storage/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/test_remote_vector_index_builder/test_app/test_storage/test_factory.py b/test_remote_vector_index_builder/test_app/test_storage/test_factory.py new file mode 100644 index 0000000..cd69051 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_storage/test_factory.py @@ -0,0 +1,35 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import pytest +from unittest.mock import Mock +from app.storage.factory import RequestStoreFactory +from app.storage.types import RequestStoreType +from app.storage.memory import InMemoryRequestStore + + +class TestRequestStoreFactory: + def setup_method(self): + """Setup common test fixtures""" + self.mock_settings = Mock() + + def test_create_memory_store(self): + """Test creation of in-memory store""" + store = RequestStoreFactory.create(RequestStoreType.MEMORY, self.mock_settings) + assert isinstance(store, InMemoryRequestStore) + + def test_unsupported_store_type(self): + """Test creating store with unsupported type raises ValueError""" + with pytest.raises(ValueError) as exc_info: + RequestStoreFactory.create("INVALID_TYPE", self.mock_settings) + assert "Unsupported request store type" in str(exc_info.value) + + def test_none_store_type(self): + """Test creating store with None type raises ValueError""" + with pytest.raises(ValueError) as exc_info: + RequestStoreFactory.create(None, self.mock_settings) + assert "Unsupported request store type" in str(exc_info.value) diff --git a/test_remote_vector_index_builder/test_app/test_storage/test_memory.py b/test_remote_vector_index_builder/test_app/test_storage/test_memory.py new file mode 100644 index 0000000..592963d --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_storage/test_memory.py @@ -0,0 +1,129 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import pytest +import time +from unittest.mock import Mock, patch + +from app.storage.memory import InMemoryRequestStore +from app.base.config import Settings +from app.models.job import JobStatus + + +@pytest.fixture +def settings(): + return Settings(request_store_max_size=2, request_store_ttl_seconds=1) + + +@pytest.fixture +def settings_no_ttl(): + return Settings(request_store_max_size=2, request_store_ttl_seconds=None) + + +@pytest.fixture +def sample_job(): + job = Mock() + return job + + +def test_init_with_ttl(settings): + store = InMemoryRequestStore(settings) + assert store._max_size == 2 + assert store._ttl_seconds == 1 + assert len(store._store) == 0 + + +def test_init_without_ttl(settings_no_ttl): + store = InMemoryRequestStore(settings_no_ttl) + assert store._max_size == 2 + assert store._ttl_seconds is None + assert len(store._store) == 0 + + +def test_add_and_get(settings, sample_job): + store = InMemoryRequestStore(settings) + assert store.add("job1", sample_job) is True + retrieved_job = store.get("job1") + assert retrieved_job == sample_job + + +def test_add_max_size(settings, sample_job): + store = InMemoryRequestStore(settings) + assert store.add("job1", sample_job) is True + assert store.add("job2", sample_job) is True + assert store.add("job3", sample_job) is False # Should fail, store is full + + +def test_get_nonexistent(settings): + store = InMemoryRequestStore(settings) + assert store.get("nonexistent") is None + + +def test_get_expired(settings, sample_job): + store = InMemoryRequestStore(settings) + store.add("job1", sample_job) + time.sleep(1.1) # Wait for expiration + assert store.get("job1") is None + + +def test_update(settings, sample_job): + store = InMemoryRequestStore(settings) + store.add("job1", sample_job) + + update_data = {"status": JobStatus.COMPLETED} + assert store.update("job1", update_data) is True + + updated_job = store.get("job1") + assert updated_job.status == JobStatus.COMPLETED + + +def test_update_nonexistent(settings): + store = InMemoryRequestStore(settings) + assert store.update("nonexistent", {"status": JobStatus.COMPLETED}) is False + + +def test_delete(settings, sample_job): + store = InMemoryRequestStore(settings) + store.add("job1", sample_job) + assert store.delete("job1") is True + assert store.get("job1") is None + + +def test_delete_nonexistent(settings): + store = InMemoryRequestStore(settings) + assert store.delete("nonexistent") is False + + +def test_cleanup_expired(settings, sample_job): + store = InMemoryRequestStore(settings) + store.add("job1", sample_job) + time.sleep(1.1) # Wait for expiration + store.cleanup_expired() + assert store.get("job1") is None + + +@patch("time.sleep") +def test_cleanup_loop(mock_sleep, settings): + """Test cleanup loop""" + # Create a store but patch the thread creation + with patch("threading.Thread.start"): + store = InMemoryRequestStore(settings) + + # Now test the cleanup loop directly + mock_sleep.side_effect = [None, Exception("Stop loop")] + + with pytest.raises(Exception): + store._cleanup_loop() + + mock_sleep.assert_called_with(5) + + +def test_get_no_ttl(settings_no_ttl, sample_job): + store = InMemoryRequestStore(settings_no_ttl) + store.add("job1", sample_job) + time.sleep(1.1) # Even after waiting, job should still be there + assert store.get("job1") == sample_job diff --git a/test_remote_vector_index_builder/test_app/test_utils/__init__.py b/test_remote_vector_index_builder/test_app/test_utils/__init__.py new file mode 100644 index 0000000..fe22b86 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_utils/__init__.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. diff --git a/test_remote_vector_index_builder/test_app/test_utils/test_error_message.py b/test_remote_vector_index_builder/test_app/test_utils/test_error_message.py new file mode 100644 index 0000000..781c5d6 --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_utils/test_error_message.py @@ -0,0 +1,53 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from app.utils.error_message import get_field_path + + +def test_empty_location(): + """Test with empty tuple""" + assert get_field_path(()) == "" + + +def test_single_string(): + """Test with single string element""" + assert get_field_path(("field",)) == "field" + + +def test_single_integer(): + """Test with single integer element""" + assert get_field_path((0,)) == "[0]" + + +def test_multiple_strings(): + """Test with multiple string elements""" + assert ( + get_field_path(("parent", "child", "grandchild")) == "parent.child.grandchild" + ) + + +def test_multiple_integers(): + """Test with multiple integer elements""" + assert get_field_path((0, 1, 2)) == "[0][1][2]" + + +def test_mixed_types(): + """Test with mixture of strings and integers""" + assert get_field_path(("array", 0, "field", 1)) == "array[0].field[1]" + + +def test_starting_with_integer(): + """Test path starting with integer""" + assert get_field_path((0, "field", "subfield")) == "[0].field.subfield" + + +def test_complex_path(): + """Test complex path with multiple types""" + assert ( + get_field_path(("users", 0, "addresses", 1, "street")) + == "users[0].addresses[1].street" + ) diff --git a/test_remote_vector_index_builder/test_app/test_utils/test_memory.py b/test_remote_vector_index_builder/test_app/test_utils/test_memory.py new file mode 100644 index 0000000..87e9d6b --- /dev/null +++ b/test_remote_vector_index_builder/test_app/test_utils/test_memory.py @@ -0,0 +1,65 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import pytest +from core.common.models.index_build_parameters import ( + AlgorithmParameters, + IndexBuildParameters, + IndexParameters, + SpaceType, + DataType, +) +from app.utils.memory import calculate_memory_requirements + + +@pytest.fixture +def base_index_parameters(): + """Create base IndexParameters for testing""" + return IndexParameters( + space_type=SpaceType.INNERPRODUCT, + algorithm_parameters=AlgorithmParameters( + ef_construction=200, ef_search=200, m=16 + ), + ) + + +def create_index_build_parameters( + dimension: int, doc_count: int, m: int = 16, data_type: DataType = DataType.FLOAT +) -> IndexBuildParameters: + """Helper function to create IndexBuildParameters with different values""" + return IndexBuildParameters( + container_name="testbucket", + vector_path="test.knnvec", + doc_id_path="test_ids.txt", + dimension=dimension, + doc_count=doc_count, + data_type=data_type, + index_parameters=IndexParameters( + space_type=SpaceType.INNERPRODUCT, + algorithm_parameters=AlgorithmParameters( + ef_construction=200, ef_search=200, m=m + ), + ), + ) + + +def test_basic_calculation(): + """Test calculation with typical values""" + params = create_index_build_parameters(dimension=128, doc_count=1000) + + gpu_memory, cpu_memory = calculate_memory_requirements(params) + + # Calculate expected values + entry_size = 4 # FLOAT32 size + vector_memory = 128 * 1000 * entry_size + expected_gpu_memory = ( + (128 * entry_size + 16 * 8) * 1.1 * 1000 + ) * 1.5 + vector_memory + expected_cpu_memory = (128 * entry_size + 16 * 8) * 1.1 * 1000 + vector_memory + + assert gpu_memory == expected_gpu_memory + assert cpu_memory == expected_cpu_memory From 7bb7a15c2babb713cff71d3d867cab90db37e88a Mon Sep 17 00:00:00 2001 From: Rohan Chitale Date: Thu, 3 Apr 2025 09:02:44 -0700 Subject: [PATCH 2/3] Changed api port, removed _get_required_resources function for clarity, and prevent eviction of in progress tasks Signed-off-by: Rohan Chitale --- remote_vector_index_builder/app/Dockerfile | 4 +- .../app/base/resources.py | 22 +---------- .../app/services/job_service.py | 37 +++++-------------- .../app/storage/memory.py | 6 +-- .../test_app/test_base/test_resources.py | 6 --- .../test_services/test_job_service.py | 24 +----------- .../test_app/test_storage/test_memory.py | 8 ++++ 7 files changed, 26 insertions(+), 81 deletions(-) diff --git a/remote_vector_index_builder/app/Dockerfile b/remote_vector_index_builder/app/Dockerfile index 479ad2b..d75c235 100644 --- a/remote_vector_index_builder/app/Dockerfile +++ b/remote_vector_index_builder/app/Dockerfile @@ -16,5 +16,7 @@ RUN pip install --no-cache-dir --upgrade -r /remote_vector_index_builder/app/req COPY ./remote_vector_index_builder/app /remote_vector_index_builder/app ENV PYTHONPATH='${PYTHONPATH}:/tmp/faiss/build/faiss/python:/remote_vector_index_builder' + RUN ["python", "app/test_imports.py"] -CMD ["fastapi", "run", "app/main.py", "--port", "80"] \ No newline at end of file + +CMD ["fastapi", "run", "app/main.py", "--port", "1025"] \ No newline at end of file diff --git a/remote_vector_index_builder/app/base/resources.py b/remote_vector_index_builder/app/base/resources.py index ee08029..86718a6 100644 --- a/remote_vector_index_builder/app/base/resources.py +++ b/remote_vector_index_builder/app/base/resources.py @@ -38,24 +38,6 @@ def __init__(self, total_gpu_memory: float, total_cpu_memory: float): self._available_cpu_memory = total_cpu_memory self._lock = threading.Lock() - # TODO: separate this function into CPU and GPU specific allocation checks - def can_allocate(self, gpu_memory: float, cpu_memory: float) -> bool: - """ - Check if the requested amount of GPU and CPU memory can be allocated. - - Args: - gpu_memory (float): Amount of GPU memory requested, in bytes - cpu_memory (float): Amount of CPU memory requested, in bytes - - Returns: - bool: True if the requested memory can be allocated, False otherwise - """ - with self._lock: - return ( - self._available_gpu_memory >= gpu_memory - and self._available_cpu_memory >= cpu_memory - ) - def allocate(self, gpu_memory: float, cpu_memory: float) -> bool: """ Attempt to allocate the specified amount of GPU and CPU memory. @@ -67,9 +49,9 @@ def allocate(self, gpu_memory: float, cpu_memory: float) -> bool: Returns: bool: True if allocation was successful, False if insufficient resources """ - if not self.can_allocate(gpu_memory, cpu_memory): - return False with self._lock: + if not (self._available_gpu_memory >= gpu_memory and self._available_cpu_memory >= cpu_memory): + return False self._available_gpu_memory -= gpu_memory self._available_cpu_memory -= cpu_memory return True diff --git a/remote_vector_index_builder/app/services/job_service.py b/remote_vector_index_builder/app/services/job_service.py index 5557d80..fc57a55 100644 --- a/remote_vector_index_builder/app/services/job_service.py +++ b/remote_vector_index_builder/app/services/job_service.py @@ -85,31 +85,6 @@ def _validate_job_existence( raise HashCollisionError(f"Hash collision detected for job_id: {job_id}") return False - def _get_required_resources( - self, index_build_parameters: IndexBuildParameters - ) -> tuple[float, float]: - """ - Calculate required GPU and CPU memory resources for a job. - - Args: - index_build_parameters (IndexBuildParameters): Parameters for building the index. - Contains dimension, doc count, data type, m - the parameters needed to calculate memory - - Returns: - tuple[float, float]: Required GPU and CPU memory (in bytes) - """ - gpu_mem, cpu_mem = calculate_memory_requirements(index_build_parameters) - - logger.info( - f"Job id requirements: GPU memory: {gpu_mem}, CPU memory: {cpu_mem}" - ) - if not self.resource_manager.can_allocate(gpu_mem, cpu_mem): - raise CapacityError( - "Insufficient available GPU and CPU resources to process job" - ) - - return gpu_mem, cpu_mem - def _add_to_request_store( self, job_id: str, request_parameters: RequestParameters ) -> None: @@ -171,7 +146,7 @@ def _create_workflow( if not allocation_success: self.request_store.delete(job_id) raise CapacityError( - f"Insufficient available resources to process workflow {workflow.job_id}" + f"Insufficient available resources to process workflow" ) return workflow @@ -205,19 +180,25 @@ def create_job(self, index_build_parameters: IndexBuildParameters) -> str: logger.info(f"Job with id {job_id} already exists") return job_id - gpu_mem, cpu_mem = self._get_required_resources(index_build_parameters) - self._add_to_request_store(job_id, request_parameters) logger.info(f"Added job to request store with job id: {job_id}") + gpu_mem, cpu_mem = calculate_memory_requirements(index_build_parameters) + + logger.info( + f"Job id requirements: GPU memory: {gpu_mem}, CPU memory: {cpu_mem}" + ) + workflow = self._create_workflow( job_id, gpu_mem, cpu_mem, index_build_parameters ) + logger.info( f"Worker resource status for job id {job_id}: - " f"GPU: {self.resource_manager.get_available_gpu_memory():,} bytes, " f"CPU: {self.resource_manager.get_available_cpu_memory():,} bytes" ) + self.workflow_executor.submit_workflow(workflow) logger.info(f"Successfully created workflow with job id: {job_id}") diff --git a/remote_vector_index_builder/app/storage/memory.py b/remote_vector_index_builder/app/storage/memory.py index e5a696b..43f40ae 100644 --- a/remote_vector_index_builder/app/storage/memory.py +++ b/remote_vector_index_builder/app/storage/memory.py @@ -10,7 +10,7 @@ import threading import time -from app.models.job import Job +from app.models.job import Job, JobStatus from app.storage.base import RequestStore from app.base.config import Settings @@ -136,7 +136,7 @@ def delete(self, job_id: str) -> bool: def cleanup_expired(self) -> None: """ - Remove all expired entries from the store based on TTL. + Remove all expired completed and failed entries from the store based on TTL. Thread-safe implementation using the store's lock. """ with self._lock: @@ -146,7 +146,7 @@ def cleanup_expired(self) -> None: self._store = { job_id: data for job_id, data in self._store.items() - if data[1] > expiration_threshold + if data[1] > expiration_threshold or data[0].status == JobStatus.RUNNING } def _cleanup_loop(self) -> None: diff --git a/test_remote_vector_index_builder/test_app/test_base/test_resources.py b/test_remote_vector_index_builder/test_app/test_base/test_resources.py index c27f54e..582a14d 100644 --- a/test_remote_vector_index_builder/test_app/test_base/test_resources.py +++ b/test_remote_vector_index_builder/test_app/test_base/test_resources.py @@ -30,9 +30,6 @@ def test_successful_allocation(resource_manager): manager, total_gpu, total_cpu = resource_manager allocation_size = 512 * 1024 * 1024 # 512MB - # Check if allocation is possible - assert manager.can_allocate(allocation_size, allocation_size) - # Perform allocation success = manager.allocate(allocation_size, allocation_size) assert success @@ -47,9 +44,6 @@ def test_failed_allocation(resource_manager): manager, total_gpu, _ = resource_manager excessive_size = total_gpu + 1 - # Check if allocation is possible - assert not manager.can_allocate(excessive_size, 0) - # Attempt allocation success = manager.allocate(excessive_size, 0) assert not success diff --git a/test_remote_vector_index_builder/test_app/test_services/test_job_service.py b/test_remote_vector_index_builder/test_app/test_services/test_job_service.py index a37b031..8d55774 100644 --- a/test_remote_vector_index_builder/test_app/test_services/test_job_service.py +++ b/test_remote_vector_index_builder/test_app/test_services/test_job_service.py @@ -33,7 +33,6 @@ def workflow_executor(): @pytest.fixture def resource_manager(): mock = Mock() - mock.can_allocate.return_value = True mock.allocate.return_value = True mock.get_available_gpu_memory.return_value = 1000 mock.get_available_cpu_memory.return_value = 1000 @@ -90,28 +89,6 @@ def test_validate_job_existence_hash_collision( with pytest.raises(HashCollisionError): job_service._validate_job_existence("test_id", mock_request_parameters) - -@patch("app.services.job_service.calculate_memory_requirements") -def test_get_required_resources_success(mock_calc, job_service, index_build_parameters): - """Test successful resource requirements calculation""" - mock_calc.return_value = (100.0, 200.0) - gpu_mem, cpu_mem = job_service._get_required_resources(index_build_parameters) - assert gpu_mem == 100.0 - assert cpu_mem == 200.0 - - -@patch("app.services.job_service.calculate_memory_requirements") -def test_get_required_resources_insufficient( - mock_calc, job_service, index_build_parameters -): - """Test resource calculation with insufficient resources""" - mock_calc.return_value = (100.0, 200.0) - job_service.resource_manager.can_allocate.return_value = False - - with pytest.raises(CapacityError): - job_service._get_required_resources(index_build_parameters) - - @patch("app.services.job_service.Job") def test_add_to_request_store_success(job, job_service, mock_request_parameters): """Test successful addition to request store""" @@ -143,6 +120,7 @@ def test_create_workflow_allocation_failure(job_service, index_build_parameters) with pytest.raises(CapacityError): job_service._create_workflow("test_id", 100.0, 200.0, index_build_parameters) + job_service.request_store.delete.assert_called_once() @patch("app.services.job_service.create_request_parameters") diff --git a/test_remote_vector_index_builder/test_app/test_storage/test_memory.py b/test_remote_vector_index_builder/test_app/test_storage/test_memory.py index 592963d..8a173c6 100644 --- a/test_remote_vector_index_builder/test_app/test_storage/test_memory.py +++ b/test_remote_vector_index_builder/test_app/test_storage/test_memory.py @@ -27,6 +27,7 @@ def settings_no_ttl(): @pytest.fixture def sample_job(): job = Mock() + job.status = JobStatus.COMPLETED return job @@ -105,6 +106,13 @@ def test_cleanup_expired(settings, sample_job): store.cleanup_expired() assert store.get("job1") is None +def test_do_not_clean_up_in_progress_job(settings, sample_job): + store = InMemoryRequestStore(settings) + sample_job.status = JobStatus.RUNNING + store.add("job1", sample_job) + store.cleanup_expired() + assert store.get("job1") == sample_job + @patch("time.sleep") def test_cleanup_loop(mock_sleep, settings): From 9eac9a8dec34cc03152217798b89f6ab5ad8d431 Mon Sep 17 00:00:00 2001 From: Rohan Chitale Date: Thu, 3 Apr 2025 09:04:59 -0700 Subject: [PATCH 3/3] Formatting changes Signed-off-by: Rohan Chitale --- remote_vector_index_builder/app/base/resources.py | 5 ++++- remote_vector_index_builder/app/services/job_service.py | 2 +- .../test_app/test_services/test_job_service.py | 1 + .../test_app/test_storage/test_memory.py | 1 + 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/remote_vector_index_builder/app/base/resources.py b/remote_vector_index_builder/app/base/resources.py index 86718a6..64f4716 100644 --- a/remote_vector_index_builder/app/base/resources.py +++ b/remote_vector_index_builder/app/base/resources.py @@ -50,7 +50,10 @@ def allocate(self, gpu_memory: float, cpu_memory: float) -> bool: bool: True if allocation was successful, False if insufficient resources """ with self._lock: - if not (self._available_gpu_memory >= gpu_memory and self._available_cpu_memory >= cpu_memory): + if not ( + self._available_gpu_memory >= gpu_memory + and self._available_cpu_memory >= cpu_memory + ): return False self._available_gpu_memory -= gpu_memory self._available_cpu_memory -= cpu_memory diff --git a/remote_vector_index_builder/app/services/job_service.py b/remote_vector_index_builder/app/services/job_service.py index fc57a55..9573ccc 100644 --- a/remote_vector_index_builder/app/services/job_service.py +++ b/remote_vector_index_builder/app/services/job_service.py @@ -146,7 +146,7 @@ def _create_workflow( if not allocation_success: self.request_store.delete(job_id) raise CapacityError( - f"Insufficient available resources to process workflow" + f"Insufficient available resources to process job {job_id}" ) return workflow diff --git a/test_remote_vector_index_builder/test_app/test_services/test_job_service.py b/test_remote_vector_index_builder/test_app/test_services/test_job_service.py index 8d55774..0fdf728 100644 --- a/test_remote_vector_index_builder/test_app/test_services/test_job_service.py +++ b/test_remote_vector_index_builder/test_app/test_services/test_job_service.py @@ -89,6 +89,7 @@ def test_validate_job_existence_hash_collision( with pytest.raises(HashCollisionError): job_service._validate_job_existence("test_id", mock_request_parameters) + @patch("app.services.job_service.Job") def test_add_to_request_store_success(job, job_service, mock_request_parameters): """Test successful addition to request store""" diff --git a/test_remote_vector_index_builder/test_app/test_storage/test_memory.py b/test_remote_vector_index_builder/test_app/test_storage/test_memory.py index 8a173c6..3656f22 100644 --- a/test_remote_vector_index_builder/test_app/test_storage/test_memory.py +++ b/test_remote_vector_index_builder/test_app/test_storage/test_memory.py @@ -106,6 +106,7 @@ def test_cleanup_expired(settings, sample_job): store.cleanup_expired() assert store.get("job1") is None + def test_do_not_clean_up_in_progress_job(settings, sample_job): store = InMemoryRequestStore(settings) sample_job.status = JobStatus.RUNNING