Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
725 changes: 160 additions & 565 deletions packages/db/src/buffers/event-buffer.test.ts

Large diffs are not rendered by default.

913 changes: 101 additions & 812 deletions packages/db/src/buffers/event-buffer.ts

Large diffs are not rendered by default.

27 changes: 24 additions & 3 deletions packages/db/src/clickhouse/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,15 @@ export const TABLE_NAMES = {
event_property_values_mv: 'event_property_values_mv',
cohort_events_mv: 'cohort_events_mv',
sessions: 'sessions',
events_imports: 'events_imports',
};

export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = {
max_open_connections: 30,
request_timeout: 60000,
request_timeout: 300000,
keep_alive: {
enabled: true,
idle_socket_ttl: 8000,
idle_socket_ttl: 60000,
},
compression: {
request: true,
Expand Down Expand Up @@ -132,7 +133,27 @@ export const ch = new Proxy(originalCh, {
const value = Reflect.get(target, property, receiver);

if (property === 'insert') {
return (...args: any[]) => withRetry(() => value.apply(target, args));
return (...args: any[]) =>
withRetry(() => {
args[0].clickhouse_settings = {
// Allow bigger HTTP payloads/time to stream rows
async_insert: 1,
wait_for_async_insert: 1,
// Increase insert timeouts and buffer sizes for large batches
max_execution_time: 300,
max_insert_block_size: '500000',
max_http_get_redirects: '0',
// Ensure JSONEachRow stays efficient
input_format_parallel_parsing: 1,
// Keep long-running inserts/queries from idling out at proxies by sending progress headers
send_progress_in_http_headers: 1,
http_headers_progress_interval_ms: '50000',
// Ensure server holds the connection until the query is finished
wait_end_of_query: 1,
...args[0].clickhouse_settings,
};
return value.apply(target, args);
});
}

return value;
Expand Down
7 changes: 5 additions & 2 deletions packages/db/src/services/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ export async function getEventList(options: GetEventListOptions) {
sb.select.model = 'model';
}
if (select.duration) {
sb.select.duration = 'duration';
sb.select.duration = `${getDurationSql()} as duration`;
}
Comment on lines 512 to 514
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Duration can be NULL/negative; coalesce and clamp to zero.

Lead-based diff returns NULL for session tail and can go negative with clock skew. That breaks consumers expecting a number.

Apply this by hardening the helper (below); the call here can stay unchanged once the helper is fixed.

🤖 Prompt for AI Agents
In packages/db/src/services/event.service.ts around lines 512 to 514, the
duration expression can be NULL or negative (lead-based diff for session tail
and clock skew), so update the helper used by getDurationSql() to return SQL
that clamps negatives to zero and coalesces NULLs to 0 (e.g., wrap the computed
duration in GREATEST(..., 0) and COALESCE(..., 0) or equivalent for the SQL
dialect). Keep the call here unchanged — just harden the helper so
sb.select.duration receives a non-null, non-negative numeric duration.

if (select.path) {
sb.select.path = 'path';
Expand Down Expand Up @@ -570,7 +570,7 @@ export async function getEventList(options: GetEventListOptions) {
custom(sb);
}

console.log('getSql()', getSql());
console.log('getSql() ----> ', getSql());

const data = await getEvents(getSql(), {
profile: select.profile ?? true,
Expand Down Expand Up @@ -1016,3 +1016,6 @@ class EventService {
}

export const eventService = new EventService(ch);

export const getDurationSql = (field = 'created_at') =>
`dateDiff('millisecond', ${field}, leadInFrame(toNullable(${field}), 1, NULL) OVER (PARTITION BY session_id ORDER BY ${field} ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING))`;
Loading