Skip to content

Commit febcc09

Browse files
CopilotCopilotnikhilNava
authored
Security hardening for observability packages (#198)
* Initial plan * Replace assert statements with explicit TypeError raises in langchain utils Replace all 30 assert statements in utils.py with equivalent if-not-raise TypeError checks. This ensures type validation is not silently stripped when Python runs with -O (optimized mode). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * security: fix sensitive data logging, context leak, unbounded memory, asserts, and more - Fix #1: Downgrade sensitive data logging from INFO to DEBUG in agent365_exporter.py - Fix #2: Fix unpaired context.attach() in opentelemetry_scope.py add_baggage() by storing and detaching baggage tokens on scope end - Fix #3: Add bounded OrderedDict caps to unbounded dicts in OpenAI trace_processor.py - Fix #4: Replace 30 assert statements with proper TypeError raises in LangChain utils.py - Fix #5: Log security warning when HTTP domain override is detected - Fix #6: Warn when bearer token sent over non-HTTPS connection - Fix #10: Respect Retry-After header and use exponential backoff in retries - Fix #13: Rename reset() to _reset() in ObservabilityHostingManager - Fix #15: Replace print() with logger.warning() in LangChain tracer_instrumentor.py Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> * Restore agent/tenant IDs and response text in exporter log messages Agent IDs and tenant IDs are not sensitive data and are useful for debugging. Restore them in debug/error log messages. Also restore truncated response text in HTTP error logs to help developers debug failures. Log levels remain at DEBUG (from the prior security fix). Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> * Remove add_baggage() from OpenTelemetryScope The method had an unpaired context.attach() that leaked context tokens. Users should use BaggageBuilder.build() context manager instead, which properly restores the previous context on exit. Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> * Security hardening for observability packages Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> * Fix formatting in test_agent365_exporter.py and replace remaining raise TypeError with isinstance guards in langchain utils.py Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> * Move _parse_retry_after to exporters/utils.py as standalone parse_retry_after function Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> * Replace type(e).__name__ with str(e) in exporter error logging per PR review Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> * feat: add bounded collections for LangChain tracer and OutputScope - Convert LangChain _spans_by_run from unbounded DictWithLock to bounded OrderedDict with _MAX_TRACKED_RUNS=10000 cap - Add _cap_ordered_dict helper for FIFO eviction (matching OpenAI pattern) - Add thread-safe lock usage for _spans_by_run in error handlers - Add _MAX_OUTPUT_MESSAGES=5000 cap for OutputScope._output_messages - Add unit tests for both bounded collections Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> Co-authored-by: Nikhil Navakiran <nikhil.navakiran@gmail.com>
1 parent 9f8ed47 commit febcc09

File tree

14 files changed

+409
-113
lines changed

14 files changed

+409
-113
lines changed

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
hex_span_id,
2424
hex_trace_id,
2525
kind_name,
26+
parse_retry_after,
2627
partition_by_identity,
2728
status_name,
2829
truncate_span,
@@ -79,9 +80,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
7980
logger.info("No spans with tenant/agent identity found; nothing exported.")
8081
return SpanExportResult.SUCCESS
8182

82-
# Debug: Log number of groups and total span count
83+
# Log number of groups and total span count
8384
total_spans = sum(len(activities) for activities in groups.values())
84-
logger.info(
85+
logger.debug(
8586
f"Found {len(groups)} identity groups with {total_spans} total spans to export"
8687
)
8788

@@ -98,8 +99,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
9899

99100
url = build_export_url(endpoint, agent_id, tenant_id, self._use_s2s_endpoint)
100101

101-
# Debug: Log endpoint being used
102-
logger.info(
102+
# Log endpoint details at DEBUG to avoid leaking IDs in production logs
103+
logger.debug(
103104
f"Exporting {len(activities)} spans to endpoint: {url} "
104105
f"(tenant: {tenant_id}, agent: {agent_id})"
105106
)
@@ -108,10 +109,16 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
108109
try:
109110
token = self._token_resolver(agent_id, tenant_id)
110111
if token:
112+
# Warn if sending bearer token over non-HTTPS connection
113+
if not url.lower().startswith("https://"):
114+
logger.warning(
115+
"Bearer token is being sent over a non-HTTPS connection. "
116+
"This may expose credentials in transit."
117+
)
111118
headers["authorization"] = f"Bearer {token}"
112-
logger.info(f"Token resolved successfully for agent {agent_id}")
119+
logger.debug(f"Token resolved successfully for agent {agent_id}")
113120
else:
114-
logger.info(f"No token returned for agent {agent_id}")
121+
logger.debug(f"No token returned for agent {agent_id}")
115122
except Exception as e:
116123
# If token resolution fails, treat as failure for this group
117124
logger.error(
@@ -174,7 +181,7 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo
174181

175182
# 2xx => success
176183
if 200 <= resp.status_code < 300:
177-
logger.info(
184+
logger.debug(
178185
f"HTTP {resp.status_code} success on attempt {attempt + 1}. "
179186
f"Correlation ID: {correlation_id}. "
180187
f"Response: {self._truncate_text(resp.text, 200)}"
@@ -186,12 +193,19 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo
186193

187194
# Retry transient
188195
if resp.status_code in (408, 429) or 500 <= resp.status_code < 600:
196+
# Respect Retry-After header for 429 responses
197+
retry_after = parse_retry_after(resp.headers)
189198
if attempt < DEFAULT_MAX_RETRIES:
190-
time.sleep(0.2 * (attempt + 1))
199+
if retry_after is not None:
200+
time.sleep(min(retry_after, 60.0))
201+
else:
202+
# Exponential backoff with base 0.5s
203+
time.sleep(0.5 * (2**attempt))
191204
continue
192205
# Final attempt failed
193206
logger.error(
194-
f"HTTP {resp.status_code} final failure after {DEFAULT_MAX_RETRIES + 1} attempts. "
207+
f"HTTP {resp.status_code} final failure after "
208+
f"{DEFAULT_MAX_RETRIES + 1} attempts. "
195209
f"Correlation ID: {correlation_id}. "
196210
f"Response: {response_text}"
197211
)
@@ -206,12 +220,11 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo
206220

207221
except requests.RequestException as e:
208222
if attempt < DEFAULT_MAX_RETRIES:
209-
time.sleep(0.2 * (attempt + 1))
223+
# Exponential backoff with base 0.5s
224+
time.sleep(0.5 * (2**attempt))
210225
continue
211226
# Final attempt failed
212-
logger.error(
213-
f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts with exception: {e}"
214-
)
227+
logger.error(f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts: {e}")
215228
return False
216229
return False
217230

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
from __future__ import annotations
5+
46
import json
57
import logging
68
import os
@@ -194,6 +196,13 @@ def get_validated_domain_override() -> str | None:
194196
logger.warning(f"Invalid domain override '{domain_override}': {e}")
195197
return None
196198

199+
# Warn when using insecure HTTP — telemetry data and bearer tokens may be exposed
200+
if domain_override.lower().startswith("http://"):
201+
logger.warning(
202+
"Domain override uses insecure HTTP. Telemetry data (including "
203+
"bearer tokens) will be transmitted in cleartext."
204+
)
205+
197206
return domain_override
198207

199208

@@ -223,6 +232,31 @@ def build_export_url(
223232
return f"https://{endpoint}{endpoint_path}?api-version=1"
224233

225234

235+
def parse_retry_after(headers: dict[str, str]) -> float | None:
236+
"""Parse the ``Retry-After`` header value.
237+
238+
Only numeric (seconds) values are supported. HTTP-date values
239+
(e.g. ``Wed, 21 Oct 2025 07:28:00 GMT``) are intentionally ignored
240+
and treated as absent, falling back to exponential backoff.
241+
242+
Args:
243+
headers: Response headers mapping.
244+
245+
Returns:
246+
The number of seconds to wait, or ``None`` if the header is
247+
absent, non-numeric, or otherwise invalid.
248+
"""
249+
retry_after = headers.get("Retry-After")
250+
if retry_after is None:
251+
return None
252+
try:
253+
return float(retry_after)
254+
except (ValueError, TypeError):
255+
# Intentionally ignore HTTP-date formatted Retry-After values;
256+
# callers should fall back to exponential backoff.
257+
return None
258+
259+
226260
def is_agent365_exporter_enabled() -> bool:
227261
"""Check if Agent 365 exporter is enabled."""
228262
# Check environment variable

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from threading import Lock
1010
from typing import TYPE_CHECKING, Any
1111

12-
from opentelemetry import baggage, context, trace
12+
from opentelemetry import context, trace
1313
from opentelemetry.trace import (
1414
Span,
1515
SpanKind,
@@ -241,21 +241,6 @@ def set_tag_maybe(self, name: str, value: Any) -> None:
241241
if value is not None and self._span and self._is_telemetry_enabled():
242242
self._span.set_attribute(name, value)
243243

244-
def add_baggage(self, key: str, value: str) -> None:
245-
"""Add baggage to the current context.
246-
247-
Args:
248-
key: The baggage key
249-
value: The baggage value
250-
"""
251-
# Set baggage in the current context
252-
if self._is_telemetry_enabled():
253-
# Set baggage on the current context
254-
# This will be inherited by child spans created within this context
255-
baggage_context = baggage.set_baggage(key, value)
256-
# The context needs to be made current for child spans to inherit the baggage
257-
context.attach(baggage_context)
258-
259244
def record_attributes(self, attributes: dict[str, Any] | list[tuple[str, Any]]) -> None:
260245
"""Record multiple attribute key/value pairs for telemetry tracking.
261246

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
class OutputScope(OpenTelemetryScope):
1717
"""Provides OpenTelemetry tracing scope for output messages."""
1818

19+
_MAX_OUTPUT_MESSAGES = 5000
20+
1921
@staticmethod
2022
def start(
2123
agent_details: AgentDetails,
@@ -82,9 +84,12 @@ def record_output_messages(self, messages: list[str]) -> None:
8284
"""Records the output messages for telemetry tracking.
8385
8486
Appends the provided messages to the accumulated output messages list.
87+
The list is capped at _MAX_OUTPUT_MESSAGES to prevent unbounded memory growth.
8588
8689
Args:
8790
messages: List of output messages to append
8891
"""
8992
self._output_messages.extend(messages)
93+
if len(self._output_messages) > self._MAX_OUTPUT_MESSAGES:
94+
self._output_messages = self._output_messages[-self._MAX_OUTPUT_MESSAGES :]
9095
self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(self._output_messages))

libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import logging
55
import re
6+
from collections import OrderedDict
67
from collections.abc import Iterator
78
from itertools import chain
89
from threading import RLock
@@ -69,6 +70,8 @@
6970

7071

7172
class CustomLangChainTracer(BaseTracer):
73+
_MAX_TRACKED_RUNS = 10000
74+
7275
__slots__ = (
7376
"_tracer",
7477
"_separate_trace_from_runtime_context",
@@ -98,11 +101,18 @@ def __init__(
98101
self.run_map = DictWithLock[str, Run](self.run_map)
99102
self._tracer = tracer
100103
self._separate_trace_from_runtime_context = separate_trace_from_runtime_context
101-
self._spans_by_run: dict[UUID, Span] = DictWithLock[UUID, Span]()
104+
self._spans_by_run: OrderedDict[UUID, Span] = OrderedDict()
102105
self._lock = RLock() # handlers may be run in a thread by langchain
103106

104107
def get_span(self, run_id: UUID) -> Span | None:
105-
return self._spans_by_run.get(run_id)
108+
with self._lock:
109+
return self._spans_by_run.get(run_id)
110+
111+
@staticmethod
112+
def _cap_ordered_dict(d: OrderedDict, max_size: int) -> None:
113+
"""Evict oldest entries from an OrderedDict to stay within max_size."""
114+
while len(d) > max_size:
115+
d.popitem(last=False)
106116

107117
def _start_trace(self, run: Run) -> None:
108118
self.run_map[str(run.id)] = run
@@ -142,12 +152,14 @@ def _start_trace(self, run: Run) -> None:
142152
# token = context_api.attach(context)
143153
with self._lock:
144154
self._spans_by_run[run.id] = span
155+
self._cap_ordered_dict(self._spans_by_run, self._MAX_TRACKED_RUNS)
145156

146157
def _end_trace(self, run: Run) -> None:
147158
self.run_map.pop(str(run.id), None)
148159
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
149160
return
150-
span = self._spans_by_run.pop(run.id, None)
161+
with self._lock:
162+
span = self._spans_by_run.pop(run.id, None)
151163
if span:
152164
try:
153165
_update_span(span, run)
@@ -162,24 +174,32 @@ def _persist_run(self, run: Run) -> None:
162174
pass
163175

164176
def on_llm_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run:
165-
if span := self._spans_by_run.get(run_id):
177+
with self._lock:
178+
span = self._spans_by_run.get(run_id)
179+
if span:
166180
record_exception(span, error)
167181
return super().on_llm_error(error, *args, run_id=run_id, **kwargs)
168182

169183
def on_chain_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run:
170-
if span := self._spans_by_run.get(run_id):
184+
with self._lock:
185+
span = self._spans_by_run.get(run_id)
186+
if span:
171187
record_exception(span, error)
172188
return super().on_chain_error(error, *args, run_id=run_id, **kwargs)
173189

174190
def on_retriever_error(
175191
self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any
176192
) -> Run:
177-
if span := self._spans_by_run.get(run_id):
193+
with self._lock:
194+
span = self._spans_by_run.get(run_id)
195+
if span:
178196
record_exception(span, error)
179197
return super().on_retriever_error(error, *args, run_id=run_id, **kwargs)
180198

181199
def on_tool_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run:
182-
if span := self._spans_by_run.get(run_id):
200+
with self._lock:
201+
span = self._spans_by_run.get(run_id)
202+
if span:
183203
record_exception(span, error)
184204
return super().on_tool_error(error, *args, run_id=run_id, **kwargs)
185205

libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
import logging
67
from collections.abc import Callable, Collection
78
from typing import Any
89
from uuid import UUID
@@ -21,6 +22,8 @@
2122

2223
from microsoft_agents_a365.observability.extensions.langchain.tracer import CustomLangChainTracer
2324

25+
logger = logging.getLogger(__name__)
26+
2427
_INSTRUMENTS: str = "langchain_core >= 1.2.0"
2528

2629

@@ -86,7 +89,7 @@ def _uninstrument(self, **kwargs: Any) -> None:
8689
def get_span(self, run_id: UUID) -> Span | None:
8790
"""Return the span for a specific LangChain run_id, if available."""
8891
if not self._tracer:
89-
print("Missing tracer; call InstrumentorForLangChain().instrument() first.")
92+
logger.warning("Missing tracer; call InstrumentorForLangChain().instrument() first.")
9093
return None
9194
# TraceForLangChain is expected to expose get_span(run_id).
9295
get_span_fn = getattr(self._tracer, "get_span", None)
@@ -95,7 +98,7 @@ def get_span(self, run_id: UUID) -> Span | None:
9598
def get_ancestors(self, run_id: UUID) -> list[Span]:
9699
"""Return ancestor spans from the run’s parent up to the root (nearest first)."""
97100
if not self._tracer:
98-
print("Missing tracer; call InstrumentorForLangChain().instrument() first.")
101+
logger.warning("Missing tracer; call InstrumentorForLangChain().instrument() first.")
99102
return []
100103

101104
# Expect the processor to keep a run_map with parent linkage (string keys).

0 commit comments

Comments
 (0)