Skip to content

"A call stack without an active session" in a predict model API based in BentoML #405

@gabrielxfs

Description

@gabrielxfs

In the attempt to use pyinstrument in a predict model API based in BentoML, I received this message:

Received a call stack without an active session. Please file an issue on pyinstrument Github describing how you made this happen!

So I'm reporting.

Disclaimers:

  • Before disabling the asyncio stuff in the code, just worked fine. In this testing scenario, this error happened.
  • The asyncio is another testing scenario, and does not match the current tests goals (but was there as leftovers).
  • And before trying to sample/print every Nth requests (where in develop scenario is a hundred) it works fine for each request individually.
  • The main goal of sampling the printing is to reduce overhead at minimum in a stress and load test scenario, and still collect the statistical evidence, in which this lib does so well.

The complete stack trace:

 Traceback (most recent call last):
   File "/home/bentoml/bento/service/service.py", line 211, in predict
     print(profile_renderer.render(sessionz))
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/pyinstrument/renderers/console.py", line 66, in render
     frame = self.preprocess(session.root_frame())
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/pyinstrument/session.py", line 155, in root_frame
     root_frame = build_frame_tree(self.frame_records, context=self)
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/pyinstrument/frame_ops.py", line 56, in build_frame_tree
     frame.record_time_from_frame_info(frame_info=frame_info, time=time)
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/pyinstrument/frame.py", line 99, in record_time_from_frame_info
     for attribute in attributes_list:
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/pyinstrument/stack_sampler.py", line 197, in _sample
     subscriber.target(call_stack, time_since_last_sample, subscriber.async_state)
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/pyinstrument/profiler.py", line 257, in _sampler_saw_call_stack
     raise RuntimeError(
 RuntimeError: Received a call stack without an active session. Please file an issue on pyinstrument Github describing how you made this happen!
 
 During handling of the above exception, another exception occurred:
 
 Traceback (most recent call last):
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 692, in api_endpoint_wrapper
     resp = await self.api_endpoint(name, request)
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 797, in api_endpoint
     output = await self._to_thread(func, *call_args, **call_kwargs)
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 638, in _to_thread
     output = await anyio.to_thread.run_sync(func, limiter=self._limiter)
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
     return await get_async_backend().run_sync_in_worker_thread(
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
     return await future
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 967, in run
     result = context.run(func, *args)
   File "/home/bentoml/bento/.venv/lib/python3.10/site-packages/_bentoml_sdk/method.py", line 177, in wrapped
     return self.func(instance, *args, **kwargs)
   File "/home/bentoml/bento/service/service.py", line 227, in predict
     raise InternalServerError(

The code is a service.py where I obfuscate it underlying meaning due to NDA:

from re import sub
from traceback import format_exc
from datetime import datetime
from http import HTTPStatus
from logging import Logger, getLogger
from os import environ
from warnings import filterwarnings
from typing import Dict
from pydantic import BaseModel, ConfigDict, ValidationError

# import asyncio

from prometheus_client import Counter

import bentoml
from bentoml.exceptions import BadInput, InternalServerError
from pandas import DataFrame
from mlflow.exceptions import MlflowException

from internal_lib.serving import Transformer
from internal_lib.serving.extractors import (
    Extractor1,
    Extractor2,
    Extractor3,
    Extractor4,
    Extractor5,
    Extractor6
)

API_PORT: int = int(environ.get("API_PORT", 5000))
API_TIMEOUT: int = int(environ.get("API_TIMEOUT", 60))
API_MAX_CONCURRENCY: int = int(environ.get("API_MAX_CONCURRENCY", 50))
API_LOGLEVEL: str = environ.get("API_LOGLEVEL", "INFO")
API_WORKERS: int | str = environ.get("API_WORKERS", "cpu_count")
API_WORKERS = int(API_WORKERS) if API_WORKERS.isdigit() else API_WORKERS
# ENABLE_ASYNCIO: str = environ.get("ENABLE_ASYNCIO", "False")

MLFLOW_MODEL_NAME: str = environ.get("MLFLOW_MODEL_NAME", "NO_VALUE_DEFINED")
MLFLOW_MODEL_VERSION: str = environ.get("MLFLOW_MODEL_VERSION", "NO_VALUE_DEFINED")

MODEL_NAME: str = environ.get("MODEL_NAME", "NO_VALUE_DEFINED")
MODEL_VERSION: str = environ.get("MODEL_VERSION", "NO_VALUE_DEFINED")
API_SERVICE_NAME: str = sub(r"[-.]", "_", f"API_SERVICE_{MODEL_NAME}").upper()
API_PROFILLING_ENABLE: str = environ.get("API_PROFILLING_ENABLE", "")

if API_PROFILLING_ENABLE == "True":
    from pyinstrument import Profiler
    from pyinstrument.renderers import ConsoleRenderer


ENABLE_LOGGING_ACCESS: bool = API_LOGLEVEL == "DEBUG"

bentoml_logger: Logger = getLogger("api_model")
bentoml_logger.setLevel(API_LOGLEVEL)

if API_LOGLEVEL == "DEBUG":
    bentoml_logger.debug(
        msg=f"""
API_SERVICE_NAME: {API_SERVICE_NAME}
API_PORT: {API_PORT}
API_WORKERS: {API_WORKERS}
API_WORKER_TIMEOUT: {API_TIMEOUT}
API_WORKER_MAX_CONCURRENCY: {API_MAX_CONCURRENCY}
API_LOGLEVEL: {API_LOGLEVEL}
MODEL_NAME: {MODEL_NAME}
MODEL_VERSION: {MODEL_VERSION}"""
    )
else:
    filterwarnings("ignore")
    getLogger("bentoml.access").disabled = True
    getLogger("bentoml").disabled = True
    getLogger(__name__).disabled = True
    getLogger("bentoml.serve").disabled = True
    getLogger("bentoml._internal.tag").disabled = True


def return_error_message(predict_id: int | str, exception: str) -> dict:
    error: dict = {
        "predict_id": predict_id,
        "error": exception,
        "model_name": MODEL_NAME,
        "model_version": MODEL_VERSION,
        "traceback": sub(r'[\\^"\n]|  ', "", format_exc()),
    }
    return error

class DataModel(BaseModel):
    model_config = ConfigDict(extra="allow")
    data_1: Dict
    data_2: Dict | None = None
    data_3: Dict
    data_4: Dict

api_worker_counter = Counter(
    name='bentoml_service_api_workers_total',
    documentation='Total number of API Workers'
)
api_worker_counter.inc(1)

@bentoml.service(
    name=API_SERVICE_NAME,
    workers=API_WORKERS,
    http={"port": API_PORT},
    traffic={
        "timeout": API_TIMEOUT,
        "max_concurrency": API_MAX_CONCURRENCY,
    },
    metrics={
        "enabled": True,
        "duration": {"min": 0.05, "max": 5.0, "factor": 1.5}
    },
    logging={
        "access": {
            "enabled": ENABLE_LOGGING_ACCESS,
            "request_content_length": True,
            "request_content_type": True,
            "response_content_length": True,
            "response_content_type": True,
            "skip_paths": ["/metrics", "/healthz", "/livez", "/readyz"],
            "format": {"trace_id": "032x", "span_id": "016x"},
        }
    },
)
class APIService:
    bentoml_logger.info("Loading model from MLFlow: %s - %s", MLFLOW_MODEL_NAME, MLFLOW_MODEL_VERSION)

    def __init__(self) -> None:

        self.worker_index: int = bentoml.server_context.worker_index
        self.bento_model = bentoml.mlflow.load_model("model:latest")
        if API_PROFILLING_ENABLE == "True":
            self.counter = 0
            self.profiler = Profiler(interval=0.01, use_timing_thread=True)

        self.transform = Transformer(
            [
                Extractor1(),
                Extractor2(),
                Extractor3(),
                Extractor4(),
                Extractor5(),
                Extractor6()
            ],
        )

        bentoml_logger.info("Worker %s ready.", self.worker_index)

    @bentoml.api(input_spec=dict, route="/api/")
    # async def predict(self, **params: dict):
    def predict(self, **params: dict):
        try:
            if API_PROFILLING_ENABLE == "True":
                self.profiler.start()
            payload: dict = params.get("root")
            predict_id: int | str = payload.get("predict_id", "not found")

            predictDataModel(**(params.get("root")))
            # if ENABLE_ASYNCIO == "True":
            #     features: DataFrame = await asyncio.to_thread(self.transform.run, payload)
            # else:
            features: DataFrame = self.transform.run(payload)

            # if ENABLE_ASYNCIO == "True":
            #     prediction = await asyncio.to_thread(self.bento_model.predict, features)
            #     score: float = prediction[0, 1]
            # else:
            score: float = self.bento_model.predict(features)[0, 1]

            output: dict = {
                "score": score,
                "features": features.fillna("None").astype(str).to_dict("records")[0],
                "datetime": f"{str(datetime.now())}, 'info'",
            }

            response: dict = {
                "typeName": "...",
                "name": MODEL_NAME,
                "value": score,
                "details": output,
                "version": MODEL_VERSION,
                "state": 0,
            }

            bentoml_logger.info(
                "predict ID: %s - Model: %s - Version %s - Score: %s",
                predict_id,
                MODEL_NAME,
                MODEL_VERSION,
                round(score, 3)
            )
            if API_PROFILLING_ENABLE == "True":
                self.counter += 1
                if self.counter % 100 == 0:
                    sessionz = self.profiler.stop()
                    profile_renderer = ConsoleRenderer(
                        unicode=False,
                        color=True,
                        flat=False,
                        time='seconds',
                        flat_time='self',
                        short_mode=True,
                        show_all=False,
                        timeline=False,
                    )
                    print(profile_renderer.render(sessionz))
            return response
        except (
            AttributeError,
            TypeError,
            IndexError,
            ValueError,
            KeyError,
            AssertionError,
            ValidationError,
            MlflowException,
        ) as exception:
            raise BadInput(
                message=return_error_message(predict_id=predict_id, exception=str(exception))
            )
        except Exception as exception:
            raise InternalServerError(
                message=return_error_message(predict_id=predict_id, exception=str(exception)),
                error_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            )

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions