Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ extern "C"
int64_t line);
void ddup_push_absolute_ns(Datadog::Sample* sample, int64_t timestamp_ns);
void ddup_push_monotonic_ns(Datadog::Sample* sample, int64_t monotonic_ns);
void ddup_push_event(Datadog::Sample* sample, std::string_view event_type);
void ddup_push_label(Datadog::Sample* sample, std::string_view key, std::string_view val);

void ddup_increment_sampling_event_count();
void ddup_increment_sample_count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ namespace Datadog {
X(trace_type, "trace type") \
X(class_name, "class name") \
X(lock_name, "lock name") \
X(gpu_device_name, "gpu device name")
X(gpu_device_name, "gpu device name") \
X(event_type, "event type")

#define X_ENUM(a, b) a,
#define X_STR(a, b) b,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class Sample
// Helpers
bool push_label(ExportLabelKey key, std::string_view val);
bool push_label(ExportLabelKey key, int64_t val);
bool push_label(std::string_view key, std::string_view val);
void push_frame_impl(std::string_view name, std::string_view filename, uint64_t address, int64_t line);
void clear_buffers();

Expand All @@ -104,6 +105,7 @@ class Sample
bool push_gpu_gputime(int64_t time, int64_t count);
bool push_gpu_memory(int64_t size, int64_t count);
bool push_gpu_flops(int64_t flops, int64_t count);
bool push_event(std::string_view event_type);

// Adds metadata to sample
bool push_lock_name(std::string_view lock_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ enum SampleType : unsigned int
GPUTime = 1 << 7,
GPUMemory = 1 << 8,
GPUFlops = 1 << 9,
All = CPU | Wall | Exception | LockAcquire | LockRelease | Allocation | Heap | GPUTime | GPUMemory | GPUFlops
Event = 1 << 10,
All =
CPU | Wall | Exception | LockAcquire | LockRelease | Allocation | Heap | GPUTime | GPUMemory | GPUFlops | Event
};

// Every Sample object has a corresponding `values` vector, since libdatadog expects contiguous values per sample.
Expand All @@ -39,6 +41,7 @@ struct ValueIndex
unsigned short gpu_alloc_count;
unsigned short gpu_flops;
unsigned short gpu_flops_samples; // Should be "count," but flops is already a count
unsigned short event_count;
};

} // namespace Datadog
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,18 @@ ddup_push_monotonic_ns(Datadog::Sample* sample, int64_t monotonic_ns) // cppchec
sample->push_monotonic_ns(monotonic_ns);
}

void
ddup_push_event(Datadog::Sample* sample, std::string_view event_type) // cppcheck-suppress unusedFunction
{
sample->push_event(event_type);
}

void
ddup_push_label(Datadog::Sample* sample, std::string_view key, std::string_view val) // cppcheck-suppress unusedFunction
{
sample->push_label(key, val);
}

void
ddup_increment_sampling_event_count() // cppcheck-suppress unusedFunction
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ Datadog::Profile::setup_samplers()
val_idx.gpu_flops = get_value_idx("gpu-flops", "count");
val_idx.gpu_flops_samples = get_value_idx("gpu-flops-samples", "count");
}
if (0U != (type_mask & SampleType::Event)) {
val_idx.event_count = get_value_idx("event-samples", "count");
}

// Whatever the first sampler happens to be is the default "period" for the profile
// The value of 1 is a pointless default.
Expand Down
33 changes: 33 additions & 0 deletions ddtrace/internal/datadog/profiling/dd_wrapper/src/sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,23 @@ Datadog::Sample::push_label(const ExportLabelKey key, int64_t val)
return true;
}

bool
Datadog::Sample::push_label(std::string_view key, std::string_view val)
{
// Push a custom label with arbitrary key and value
if (val.empty() || key.empty()) {
return true;
}

// Persist both key and val strings in the arena
key = string_storage.insert(key);
val = string_storage.insert(val);
auto& label = labels.emplace_back();
label.key = to_slice(key);
label.str = to_slice(val);
return true;
}

void
Datadog::Sample::clear_buffers()
{
Expand Down Expand Up @@ -340,6 +357,22 @@ Datadog::Sample::push_gpu_flops(int64_t size, int64_t count)
return false;
}

bool
Datadog::Sample::push_event(std::string_view event_type)
{
static bool already_warned = false; // cppcheck-suppress threadsafety-threadsafety
if (0U != (type_mask & SampleType::Event)) {
push_label(ExportLabelKey::event_type, event_type);
values[profile_state.val().event_count] += 0;
return true;
}
if (!already_warned) {
already_warned = true;
std::cerr << "bad push event" << std::endl;
}
return false;
}

bool
Datadog::Sample::push_lock_name(std::string_view lock_name)
{
Expand Down
9 changes: 9 additions & 0 deletions ddtrace/internal/datadog/profiling/ddup/_ddup.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ class SampleHandle:
def push_alloc(self, value: int, count: int) -> None: ...
def push_class_name(self, class_name: StringType) -> None: ...
def push_cputime(self, value: int, count: int) -> None: ...
def push_event(self, event_type: StringType) -> None: ...
def push_exceptioninfo(self, exc_type: Union[None, bytes, str, type], count: int) -> None: ...
def push_frame(self, name: StringType, filename: StringType, address: int, line: int) -> None: ...
def push_gpu_device_name(self, device_name: StringType) -> None: ...
def push_gpu_flops(self, value: int, count: int) -> None: ...
def push_gpu_gputime(self, value: int, count: int) -> None: ...
def push_gpu_memory(self, value: int, count: int) -> None: ...
def push_heap(self, value: int) -> None: ...
def push_label(self, key: StringType, val: StringType) -> None: ...
def push_lock_name(self, lock_name: StringType) -> None: ...
def push_monotonic_ns(self, monotonic_ns: int) -> None: ...
def push_release(self, value: int, count: int) -> None: ...
Expand All @@ -56,3 +58,10 @@ class SampleHandle:
def push_task_name(self, task_name: StringType) -> None: ...
def push_threadinfo(self, thread_id: int, thread_native_id: int, thread_name: StringType) -> None: ...
def push_walltime(self, value: int, count: int) -> None: ...

def push_event(
event_type: str,
labels: Optional[Dict[str, str]] = None,
capture_stack: bool = True,
max_nframes: Optional[int] = None,
) -> None: ...
85 changes: 85 additions & 0 deletions ddtrace/internal/datadog/profiling/ddup/_ddup.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ cdef extern from "ddup_interface.hpp":
void ddup_push_frame(Sample *sample, string_view _name, string_view _filename, uint64_t address, int64_t line)
void ddup_push_monotonic_ns(Sample *sample, int64_t monotonic_ns)
void ddup_push_absolute_ns(Sample *sample, int64_t monotonic_ns)
void ddup_push_event(Sample *sample, string_view event_type)
void ddup_push_label(Sample *sample, string_view key, string_view val)
void ddup_flush_sample(Sample *sample)
void ddup_drop_sample(Sample *sample)

Expand Down Expand Up @@ -300,6 +302,37 @@ cdef call_ddup_push_trace_type(Sample* sample, trace_type: StringType):
if utf8_data != NULL:
ddup_push_trace_type(sample, string_view(utf8_data, utf8_size))

cdef call_ddup_push_event(Sample* sample, event_type: StringType):
if not event_type:
return
if isinstance(event_type, bytes):
ddup_push_event(sample, string_view(<const char*>event_type, len(event_type)))
return
cdef const char* utf8_data
cdef Py_ssize_t utf8_size
utf8_data = PyUnicode_AsUTF8AndSize(event_type, &utf8_size)
if utf8_data != NULL:
ddup_push_event(sample, string_view(utf8_data, utf8_size))

cdef call_ddup_push_label(Sample* sample, key: StringType, val: StringType):
if not key or not val:
return
if isinstance(key, bytes) and isinstance(val, bytes):
ddup_push_label(sample, string_view(<const char*>key, len(key)), string_view(<const char*>val, len(val)))
return
cdef const char* key_utf8_data
cdef Py_ssize_t key_utf8_size
cdef const char* val_utf8_data
cdef Py_ssize_t val_utf8_size
key_utf8_data = PyUnicode_AsUTF8AndSize(key, &key_utf8_size)
val_utf8_data = PyUnicode_AsUTF8AndSize(val, &val_utf8_size)
if key_utf8_data != NULL and val_utf8_data != NULL:
ddup_push_label(
sample,
string_view(key_utf8_data, key_utf8_size),
string_view(val_utf8_data, val_utf8_size)
)

# Conversion functions
cdef uint64_t clamp_to_uint64_unsigned(value):
# This clamps a Python int to the nonnegative range of an unsigned 64-bit integer.
Expand All @@ -323,6 +356,50 @@ cdef int64_t clamp_to_int64_unsigned(value):
cdef bint _code_provenance_set = False


def push_event(
event_type: str,
labels: Optional[Dict[str, str]] = None,
capture_stack: bool = True,
max_nframes: Optional[int] = None,
) -> None:
"""Push a custom event to the profiler.

Events are samples with a value of 0 that represent points in time.
They are tagged with an event_type label and optional custom labels.

Args:
event_type: The type of event (e.g., "task_start", "task_end")
labels: Optional dictionary of custom labels to attach to the event
capture_stack: Whether to capture the current stack trace (default: True)
max_nframes: Maximum number of frames to capture (default: use global config)
"""
import sys
from types import FrameType
from ddtrace.profiling.collector import _traceback
from ddtrace.internal.settings.profiling import config

handle = SampleHandle()

# Push the event type label
handle.push_event(event_type)

# Push any custom labels
if labels:
for key, value in labels.items():
handle.push_label(str(key), str(value))

# Capture stack trace if requested
if capture_stack:
nframes = max_nframes if max_nframes is not None else config.max_frames
# Skip this function's frame (sys._getframe(0) is push_event itself)
frame: FrameType = sys._getframe(1)
frames, _ = _traceback.pyframe_to_frames(frame, nframes)
for ddframe in frames:
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)

handle.flush_sample()


def config(
service: StringType = None,
env: StringType = None,
Expand Down Expand Up @@ -523,6 +600,14 @@ cdef class SampleHandle:
if self.ptr is not NULL:
ddup_push_absolute_ns(self.ptr, <int64_t>timestamp_ns)

def push_event(self, event_type: StringType) -> None:
if self.ptr is not NULL:
call_ddup_push_event(self.ptr, event_type)

def push_label(self, key: StringType, val: StringType) -> None:
if self.ptr is not NULL:
call_ddup_push_label(self.ptr, key, val)

def flush_sample(self) -> None:
# Flushing the sample consumes it. The user will no longer be able to use
# this handle after flushing it.
Expand Down
101 changes: 101 additions & 0 deletions examples/profiling_push_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Example demonstrating how to push custom events to the profiler.

This shows how to use the new push_event() API to send custom events
(like asyncio task starts) to your profiles.
"""
import asyncio
from typing import Optional

from ddtrace.internal.datadog.profiling import ddup


def example_simple_event() -> None:
"""Push a simple event with no stack trace."""
ddup.push_event("my_custom_event", capture_stack=False)


def example_event_with_stack() -> None:
"""Push an event with a stack trace (default behavior)."""
ddup.push_event("function_called")


def example_event_with_labels() -> None:
"""Push an event with custom labels."""
labels = {
"task_name": "my_task",
"parent_task": "parent_id_123",
"status": "started",
}
ddup.push_event("task_start", labels=labels)


def example_event_with_limited_stack() -> None:
"""Push an event with a limited stack trace."""
ddup.push_event("checkpoint", max_nframes=10)


async def track_asyncio_task(task_name: str, parent_task: Optional[str] = None) -> None:
"""Example: Track an asyncio task lifecycle."""
labels = {"task_name": task_name}
if parent_task:
labels["parent_task"] = parent_task

# Push task start event
ddup.push_event("asyncio_task_start", labels=labels)

try:
# Simulate some work
await asyncio.sleep(0.1)

# Push checkpoint events as needed
ddup.push_event("asyncio_task_checkpoint", labels={**labels, "checkpoint": "1"})

await asyncio.sleep(0.1)

finally:
# Push task end event
ddup.push_event("asyncio_task_end", labels=labels)


def example_manual_sample_creation() -> None:
"""Example: Manually create a sample with more control."""
handle = ddup.SampleHandle()

# Push the event type (this makes it an Event sample)
handle.push_event("custom_event")

# Add custom labels
handle.push_label("label1", "value1")
handle.push_label("label2", "value2")

# Optionally add stack frames manually
import sys
frame = sys._getframe(0)
handle.push_frame("my_function", "my_file.py", 0, frame.f_lineno)

# Flush the sample to the profile
handle.flush_sample()


if __name__ == "__main__":
# Initialize the profiler
ddup.init(
service="my-service",
env="dev",
version="1.0.0",
)
ddup.start()

# Run examples
example_simple_event()
example_event_with_stack()
example_event_with_labels()
example_event_with_limited_stack()
example_manual_sample_creation()

# Run async example
asyncio.run(track_asyncio_task("main_task"))

# Upload the profile
ddup.upload()

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
features:
- |
profiling: This adds the ``process_id`` tag to profiles. The value of this tag is the current process ID (PID).
Loading
Loading