diff --git a/middleware/__init__.py b/middleware/__init__.py index aedb957..8a3899e 100644 --- a/middleware/__init__.py +++ b/middleware/__init__.py @@ -1,4 +1,5 @@ -from middleware.distro import mw_tracker, record_exception +from middleware.distro import mw_tracker, custom_record_exception_wrapper, record_exception +from opentelemetry.sdk.trace import Span from middleware.options import ( MWOptions, DETECT_AWS_BEANSTALK, @@ -28,3 +29,5 @@ "DETECT_GCP", "DETECT_ENVVARS", ] + +Span.record_exception = custom_record_exception_wrapper \ No newline at end of file diff --git a/middleware/distro.py b/middleware/distro.py index 79d716e..2883a36 100644 --- a/middleware/distro.py +++ b/middleware/distro.py @@ -1,4 +1,8 @@ import logging +import inspect +import traceback +from typing import Optional, Type +import sys from logging import getLogger from typing import Optional from opentelemetry.instrumentation.distro import BaseDistro @@ -9,7 +13,10 @@ from middleware.log import create_logger_handler from middleware.profiler import collect_profiling from opentelemetry import trace -from opentelemetry.trace import Tracer, get_current_span, get_tracer +from opentelemetry.trace import Tracer, get_current_span, get_tracer, get_tracer, Status, StatusCode +from opentelemetry.sdk.trace import Span +import os +import json _logger = getLogger(__name__) @@ -80,7 +87,6 @@ def mw_tracker( mw_tracker_called = True - def record_exception(exc: Exception, span_name: Optional[str] = None) -> None: """ Reports an exception as a span event creating a dummy span if necessary. @@ -114,6 +120,127 @@ def record_exception(exc: Exception, span_name: Optional[str] = None) -> None: span.set_status(trace.Status(trace.StatusCode.ERROR, str(exc))) span.end() +def extract_function_code(tb_frame, lineno): + """Extracts the full function body where the exception occurred.""" + try: + source_lines, start_line = inspect.getsourcelines(tb_frame) + end_line = start_line + len(source_lines) - 1 + + if len(source_lines) > 20: + # Get 10 lines above and 10 below the exception line + start_idx = max(0, lineno - start_line - 10) + end_idx = min(len(source_lines), lineno - start_line + 10) + source_lines = source_lines[(start_idx - 1):end_idx] + + start_line = start_line + start_idx + end_line = start_line + end_idx + + function_code = "".join(source_lines) # Convert to a string + + return { + "function_code": function_code, + "function_start_line": start_line, + "function_end_line": end_line, + } + + except Exception as e: + return { + "function_code": f"Error extracting function code: {e}", + "function_start_line": None, + "function_end_line": None + } + +_original_record_exception = Span.record_exception + +def custom_record_exception_wrapper(self: Span, + exception: BaseException, + attributes=None, + timestamp: int = None, + escaped: bool = False) -> None: + """ + Custom wrapper for Span.record_exception. + This calls our custom_record_exception to add extra details before delegating + to the original record_exception method. + """ + # Check for a recursion marker + if self.attributes.get("exception.is_recursion") == "true": + return _original_record_exception(self, exception, attributes, timestamp, escaped) + + # Mark the span to prevent infinite recursion. + self.set_attribute("exception.is_recursion", "true") + + # Call our custom exception recording logic. + custom_record_exception(self, exception) + + # Optionally, call the original record_exception for default behavior. + return _original_record_exception(self, exception, attributes, timestamp, escaped) + +# Replacement of span.record_exception to include function source code +def custom_record_exception(span: Span, exc: Exception): + """Custom exception recording that captures function source code.""" + exc_type, exc_value, exc_tb = exc.__class__, str(exc), exc.__traceback__ + + if exc_tb is None: + # span.set_attribute("exception.warning", "No traceback available") + span.record_exception(exc) + return + + tb_details = traceback.extract_tb(exc_tb) + + if not tb_details: + # span.set_attribute("exception.warning", "Traceback is empty") + span.record_exception(exc) + return + + stack_info = [] + + for (frame, _), (filename, lineno, func_name, _) in zip(traceback.walk_tb(exc_tb), tb_details): + function_details = extract_function_code(frame, lineno) if frame else "Function source not found." + + stack_entry = { + "exception.file": filename, + "exception.line": lineno, + "exception.function_name": func_name, + "exception.function_body": function_details["function_code"], + "exception.start_line": function_details["function_start_line"], + "exception.end_line": function_details["function_end_line"], + } + + # Check if the file is from site-packages + if "site-packages" in filename: + stack_entry["exception.is_file_external"] = "true" + else: + stack_entry["exception.is_file_external"] = "false" + + stack_info.insert(0, stack_entry) # Prepend instead of append + + # Determine if the exception is escaping + current_exc = sys.exc_info()[1] # Get the currently active exception + exception_escaped = current_exc is exc # True if it's still propagating + + mw_vcs_repository_url = os.getenv("MW_VCS_REPOSITORY_URL") + mw_vcs_commit_sha = os.getenv("MW_VCS_COMMIT_SHA") + + # Serialize stack info as JSON string since OpenTelemetry only supports string values + stack_info_str = json.dumps(stack_info, indent=2) + + # Add extra details in the existing "exception" event + span.add_event( + "exception", + { + "exception.type": str(exc_type.__name__), + "exception.message": exc_value, + "exception.language": "python", + "exception.stacktrace": traceback.format_exc(), + "exception.escaped": exception_escaped, + "exception.vcs.commit_sha": mw_vcs_commit_sha or "", + "exception.vcs.repository_url": mw_vcs_repository_url or "", + "exception.stack_details": stack_info_str, # Attach full stacktrace details + } + ) + + + # pylint: disable=too-few-public-methods class MiddlewareDistro(BaseDistro): diff --git a/middleware/trace.py b/middleware/trace.py index 3d21d3b..6d0a3bb 100644 --- a/middleware/trace.py +++ b/middleware/trace.py @@ -2,7 +2,7 @@ import sys import logging from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace import TracerProvider, SpanProcessor, ReadableSpan from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, SimpleSpanProcessor, @@ -11,11 +11,44 @@ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.processor.baggage import ALLOW_ALL_BAGGAGE_KEYS, BaggageSpanProcessor from middleware.options import MWOptions -from opentelemetry.trace import set_tracer_provider +from opentelemetry.trace import set_tracer_provider, Span from middleware.sampler import configure_sampler _logger = logging.getLogger(__name__) +class ExceptionFilteringSpanProcessor(SpanProcessor): + def on_start(self, span: ReadableSpan, parent_context): + pass + + def on_end(self, span: ReadableSpan): + # Check if there is any "exception" event with "exception.stack_details" + has_stack_details = any( + event.name == "exception" and "exception.stack_details" in event.attributes + for event in span.events + ) + + if has_stack_details: + # Keep only the unique "exception" events based on "exception.stack_trace" + seen_stack_traces = set() + filtered_events = [] + for event in span.events: + if event.name == "exception" and "exception.stack_details" in event.attributes: + stack_trace = event.attributes.get("exception.stack_trace") + seen_stack_traces.add(stack_trace) + filtered_events.append(event) + elif event.name == "exception": + stack_trace = event.attributes.get("exception.stack_trace") + if stack_trace not in seen_stack_traces: + filtered_events.append(event) + elif event.name != "exception": + filtered_events.append(event) + span._events = filtered_events + + def shutdown(self): + pass + + def force_flush(self, timeout_millis=None): + pass def create_tracer_provider(options: MWOptions, resource: Resource) -> TracerProvider: """ @@ -28,6 +61,9 @@ def create_tracer_provider(options: MWOptions, resource: Resource) -> TracerProv Returns: TracerProvider: the new tracer provider """ + # from middleware.distro import custom_record_exception_wrapper + # Span.record_exception = custom_record_exception_wrapper + exporter = OTLPSpanExporter( endpoint=options.target, compression=grpc.Compression.Gzip, @@ -41,6 +77,7 @@ def create_tracer_provider(options: MWOptions, resource: Resource) -> TracerProv exporter, ) ) + trace_provider.add_span_processor(ExceptionFilteringSpanProcessor()) if options.console_exporter: output = sys.stdout if options.debug_log_file: @@ -58,4 +95,4 @@ def create_tracer_provider(options: MWOptions, resource: Resource) -> TracerProv ) ) set_tracer_provider(tracer_provider=trace_provider) - return trace_provider + return trace_provider \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 328782b..85d3638 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "middleware-io" -version = "2.1.1" +version = "2.1.2rc16" requires-python = ">=3.8" description = "Middleware's APM tool enables Python developers to effortlessly monitor their applications, gathering distributed tracing, metrics, logs, and profiling data for valuable insights and performance optimization." authors = [{ name = "middleware-dev" }]