Skip to content

Commit 12b0675

Browse files
committed
fix: use CSV...
1 parent cc9a7fb commit 12b0675

File tree

11 files changed

+289
-28
lines changed

11 files changed

+289
-28
lines changed

apps/worker/src/jobs/import.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type { ImportQueuePayload } from '@openpanel/queue';
2020
import type { Job } from 'bullmq';
2121
import { logger } from '../utils/logger';
2222

23-
const BATCH_SIZE = 50_000;
23+
const BATCH_SIZE = 100_000;
2424

2525
export async function importJob(job: Job<ImportQueuePayload>) {
2626
const { importId } = job.data.payload;

packages/common/src/object.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { describe, expect, it } from 'vitest';
2+
import { toDots } from './object';
3+
4+
describe('toDots', () => {
5+
it('should convert an object to a dot object', () => {
6+
const obj = {
7+
a: 1,
8+
b: 2,
9+
array: ['1', '2', '3'],
10+
arrayWithObjects: [{ a: 1 }, { b: 2 }, { c: 3 }],
11+
objectWithArrays: { a: [1, 2, 3] },
12+
null: null,
13+
undefined: undefined,
14+
empty: '',
15+
jsonString: '{"a": 1, "b": 2}',
16+
};
17+
expect(toDots(obj)).toEqual({
18+
a: '1',
19+
b: '2',
20+
'array.0': '1',
21+
'array.1': '2',
22+
'array.2': '3',
23+
'arrayWithObjects.0.a': '1',
24+
'arrayWithObjects.1.b': '2',
25+
'arrayWithObjects.2.c': '3',
26+
'objectWithArrays.a.0': '1',
27+
'objectWithArrays.a.1': '2',
28+
'objectWithArrays.a.2': '3',
29+
'jsonString.a': '1',
30+
'jsonString.b': '2',
31+
});
32+
});
33+
34+
it('should handle malformed JSON strings gracefully', () => {
35+
const obj = {
36+
validJson: '{"key":"value"}',
37+
malformedJson: '{"key":"unterminated string',
38+
startsWithBrace: '{not json at all',
39+
startsWithBracket: '[also not json',
40+
regularString: 'normal string',
41+
};
42+
43+
expect(toDots(obj)).toEqual({
44+
'validJson.key': 'value',
45+
regularString: 'normal string',
46+
});
47+
});
48+
});

packages/common/src/object.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
import { anyPass, assocPath, isEmpty, isNil, reject } from 'ramda';
22

3+
function isValidJsonString(value: string): boolean {
4+
return (
5+
(value.startsWith('{') && value.endsWith('}')) ||
6+
(value.startsWith('[') && value.endsWith(']'))
7+
);
8+
}
9+
function isMalformedJsonString(value: string): boolean {
10+
return (
11+
(value.startsWith('{') && !value.endsWith('}')) ||
12+
(value.startsWith('[') && !value.endsWith(']'))
13+
);
14+
}
15+
316
export function toDots(
417
obj: Record<string, unknown>,
518
path = '',
@@ -23,6 +36,24 @@ export function toDots(
2336
return acc;
2437
}
2538

39+
if (typeof value === 'string' && isMalformedJsonString(value)) {
40+
// Skip it
41+
return acc;
42+
}
43+
44+
// Fix nested json strings - but catch parse errors for malformed JSON
45+
if (typeof value === 'string' && isValidJsonString(value)) {
46+
try {
47+
return {
48+
...acc,
49+
...toDots(JSON.parse(value), `${path}${key}.`),
50+
};
51+
} catch {
52+
// Skip it
53+
return acc;
54+
}
55+
}
56+
2657
const cleanedValue =
2758
typeof value === 'string'
2859
? removeInvalidSurrogates(value).trim()

packages/db/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from './src/prisma-client';
22
export * from './src/clickhouse/client';
3+
export * from './src/clickhouse/csv';
34
export * from './src/sql-builder';
45
export * from './src/services/chart.service';
56
export * from './src/services/clients.service';

packages/db/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"with-env": "dotenv -e ../../.env -c --"
1414
},
1515
"dependencies": {
16-
"@clickhouse/client": "^1.2.0",
16+
"@clickhouse/client": "^1.12.1",
1717
"@openpanel/common": "workspace:*",
1818
"@openpanel/constants": "workspace:*",
1919
"@openpanel/json": "workspace:*",

packages/db/src/clickhouse/client.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Readable } from 'node:stream';
12
import type { ClickHouseSettings, ResponseJSON } from '@clickhouse/client';
23
import { ClickHouseLogLevel, createClient } from '@clickhouse/client';
34
import sqlstring from 'sqlstring';
@@ -198,6 +199,34 @@ export async function chQueryWithMeta<T extends Record<string, any>>(
198199
return response;
199200
}
200201

202+
export async function chInsertCSV(tableName: string, rows: string[]) {
203+
try {
204+
const now = performance.now();
205+
// Create a readable stream in binary mode for CSV (similar to EventBuffer)
206+
const csvStream = Readable.from(rows.join('\n'), {
207+
objectMode: false,
208+
});
209+
210+
await ch.insert({
211+
table: tableName,
212+
values: csvStream,
213+
format: 'CSV',
214+
clickhouse_settings: {
215+
format_csv_allow_double_quotes: 1,
216+
format_csv_allow_single_quotes: 0,
217+
},
218+
});
219+
220+
logger.info('CSV Insert successful', {
221+
elapsed: performance.now() - now,
222+
rows: rows.length,
223+
});
224+
} catch (error) {
225+
logger.error('CSV Insert failed:', error);
226+
throw error;
227+
}
228+
}
229+
201230
export async function chQuery<T extends Record<string, any>>(
202231
query: string,
203232
clickhouseSettings?: ClickHouseSettings,

packages/db/src/clickhouse/csv.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// ClickHouse Map(String, String) format in CSV uses single quotes, not JSON double quotes
2+
// Format: '{'key1':'value1','key2':'value2'}'
3+
// Single quotes inside values must be escaped with backslash: \'
4+
// We also need to escape newlines and control characters to prevent CSV parsing issues
5+
const escapeMapValue = (str: string) => {
6+
return str
7+
.replace(/\\/g, '\\\\') // Escape backslashes first
8+
.replace(/'/g, "\\'") // Escape single quotes
9+
.replace(/\n/g, '\\n') // Escape newlines
10+
.replace(/\r/g, '\\r') // Escape carriage returns
11+
.replace(/\t/g, '\\t') // Escape tabs
12+
.replace(/\0/g, '\\0'); // Escape null bytes
13+
};
14+
15+
export const csvEscapeJson = (
16+
value: Record<string, unknown> | null | undefined,
17+
): string => {
18+
if (value == null) return '';
19+
20+
// Normalize to strings if your column is Map(String,String)
21+
const normalized: Record<string, string> = Object.fromEntries(
22+
Object.entries(value).map(([k, v]) => [
23+
String(k),
24+
v == null ? '' : String(v),
25+
]),
26+
);
27+
28+
// Empty object should return empty Map (without quotes, csvEscapeField will handle if needed)
29+
if (Object.keys(normalized).length === 0) return '{}';
30+
31+
const pairs = Object.entries(normalized)
32+
.map(([k, v]) => `'${escapeMapValue(k)}':'${escapeMapValue(v)}'`)
33+
.join(',');
34+
35+
// Return Map format without outer quotes - csvEscapeField will handle CSV escaping
36+
// This allows csvEscapeField to properly wrap/escape the entire field if it contains newlines/quotes
37+
return csvEscapeField(`{${pairs}}`);
38+
};
39+
40+
// Escape a CSV field - wrap in double quotes if it contains commas, quotes, or newlines
41+
// Double quotes inside must be doubled (""), per CSV standard
42+
export const csvEscapeField = (value: string | number): string => {
43+
const str = String(value);
44+
45+
// If field contains commas, quotes, or newlines, it must be quoted
46+
if (/[,"\n\r]/.test(str)) {
47+
// Escape double quotes by doubling them
48+
const escaped = str.replace(/"/g, '""');
49+
return `"${escaped}"`;
50+
}
51+
52+
return str;
53+
};

packages/db/src/services/import.service.ts

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ import sqlstring from 'sqlstring';
33
import {
44
TABLE_NAMES,
55
ch,
6+
chInsertCSV,
67
convertClickhouseDateToJs,
78
formatClickhouseDate,
89
} from '../clickhouse/client';
10+
import { csvEscapeField, csvEscapeJson } from '../clickhouse/csv';
911
import { type Prisma, db } from '../prisma-client';
1012
import type { IClickhouseEvent } from './event.service';
1113

@@ -33,20 +35,54 @@ export async function insertImportBatch(
3335
return { importId, totalEvents: 0, insertedEvents: 0 };
3436
}
3537

36-
// Add import metadata to each event
37-
const eventsWithMetadata = events.map((event) => ({
38-
...event,
39-
import_id: importId,
40-
import_status: 'pending',
41-
imported_at_meta: new Date(),
42-
}));
43-
44-
await ch.insert({
45-
table: TABLE_NAMES.events_imports,
46-
values: eventsWithMetadata,
47-
format: 'JSONEachRow',
38+
// Important to have same order as events_imports table
39+
// CSV format: properly quotes fields that need it
40+
const csvRows = events.map((event) => {
41+
// Properties need to be converted to JSON for Map(String, String)
42+
// All fields must be CSV-escaped when joining with commas
43+
const fields = [
44+
csvEscapeField(event.id || ''),
45+
csvEscapeField(event.name),
46+
csvEscapeField(event.sdk_name || ''),
47+
csvEscapeField(event.sdk_version || ''),
48+
csvEscapeField(event.device_id || ''),
49+
csvEscapeField(event.profile_id || ''),
50+
csvEscapeField(event.project_id || ''),
51+
csvEscapeField(event.session_id || ''),
52+
csvEscapeField(event.path),
53+
csvEscapeField(event.origin || ''),
54+
csvEscapeField(event.referrer || ''),
55+
csvEscapeField(event.referrer_name || ''),
56+
csvEscapeField(event.referrer_type || ''),
57+
csvEscapeField(event.duration ?? 0),
58+
csvEscapeJson(event.properties),
59+
csvEscapeField(
60+
event.created_at
61+
? formatClickhouseDate(event.created_at)
62+
: formatClickhouseDate(new Date()),
63+
),
64+
csvEscapeField(event.country || ''),
65+
csvEscapeField(event.city || ''),
66+
csvEscapeField(event.region || ''),
67+
csvEscapeField(event.longitude != null ? event.longitude : '\\N'),
68+
csvEscapeField(event.latitude != null ? event.latitude : '\\N'),
69+
csvEscapeField(event.os || ''),
70+
csvEscapeField(event.os_version || ''),
71+
csvEscapeField(event.browser || ''),
72+
csvEscapeField(event.browser_version || ''),
73+
csvEscapeField(event.device || ''),
74+
csvEscapeField(event.brand || ''),
75+
csvEscapeField(event.model || ''),
76+
csvEscapeField('\\N'), // imported_at (Nullable)
77+
csvEscapeField(importId),
78+
csvEscapeField('pending'), // import_status
79+
csvEscapeField(formatClickhouseDate(new Date()).replace(/\.\d{3}$/, '')), // imported_at_meta (DateTime, not DateTime64, so no milliseconds)
80+
];
81+
return fields.join(',');
4882
});
4983

84+
await chInsertCSV(TABLE_NAMES.events_imports, csvRows);
85+
5086
return {
5187
importId,
5288
totalEvents: events.length,

packages/importer/src/providers/mixpanel.test.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,55 @@ describe('mixpanel', () => {
124124
sdk_version: '1.0.0',
125125
});
126126
});
127+
128+
it('should parse stringified JSON in properties and flatten them', async () => {
129+
const provider = new MixpanelProvider('pid', {
130+
from: '2025-01-01',
131+
to: '2025-01-02',
132+
serviceAccount: 'sa',
133+
serviceSecret: 'ss',
134+
projectId: '123',
135+
provider: 'mixpanel',
136+
type: 'api',
137+
mapScreenViewProperty: undefined,
138+
});
139+
140+
const rawEvent = {
141+
event: 'custom_event',
142+
properties: {
143+
time: 1746097970,
144+
distinct_id: '$device:123',
145+
$device_id: '123',
146+
$user_id: 'user123',
147+
mp_lib: 'web',
148+
// Stringified JSON object - should be parsed and flattened
149+
area: '{"displayText":"Malab, Nuh, Mewat","id":1189005}',
150+
// Stringified JSON array - should be parsed and flattened
151+
tags: '["tag1","tag2","tag3"]',
152+
// Regular string - should remain as is
153+
regularString: 'just a string',
154+
// Number - should be converted to string
155+
count: 42,
156+
// Object - should be flattened
157+
nested: { level1: { level2: 'value' } },
158+
},
159+
};
160+
161+
const res = provider.transformEvent(rawEvent);
162+
163+
expect(res.properties).toMatchObject({
164+
// Parsed JSON object should be flattened with dot notation
165+
'area.displayText': 'Malab, Nuh, Mewat',
166+
'area.id': '1189005',
167+
// Parsed JSON array should be flattened with numeric indices
168+
'tags.0': 'tag1',
169+
'tags.1': 'tag2',
170+
'tags.2': 'tag3',
171+
// Regular values
172+
regularString: 'just a string',
173+
count: '42',
174+
// Nested object flattened
175+
'nested.level1.level2': 'value',
176+
});
177+
});
127178
});

packages/importer/src/providers/mixpanel.ts

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ export class MixpanelProvider extends BaseImportProvider<MixpanelRawEvent> {
278278
profile_id: profileId,
279279
project_id: projectId,
280280
session_id: '', // Will be generated in SQL after import
281-
properties: toDots(properties),
281+
properties: toDots(properties), // Flatten nested objects/arrays to Map(String, String)
282282
created_at: new Date(props.time * 1000).toISOString(),
283283
country,
284284
city,
@@ -418,24 +418,23 @@ export class MixpanelProvider extends BaseImportProvider<MixpanelRawEvent> {
418418
),
419419
);
420420

421-
// Coerce all values to strings to satisfy Map(String, String)
422-
const stringProperties: Record<string, string> = {};
421+
// Parse JSON strings back to objects/arrays so toDots() can flatten them
422+
const parsed: Record<string, any> = {};
423423
for (const [key, value] of Object.entries(filtered)) {
424-
if (value === null || value === undefined) {
425-
stringProperties[key] = '';
426-
continue;
427-
}
428-
if (typeof value === 'object') {
424+
if (
425+
typeof value === 'string' &&
426+
(value.startsWith('{') || value.startsWith('['))
427+
) {
429428
try {
430-
stringProperties[key] = JSON.stringify(value);
429+
parsed[key] = JSON.parse(value);
431430
} catch {
432-
stringProperties[key] = String(value);
431+
parsed[key] = value; // Keep as string if parsing fails
433432
}
434433
} else {
435-
stringProperties[key] = String(value);
434+
parsed[key] = value;
436435
}
437436
}
438437

439-
return stringProperties;
438+
return parsed;
440439
}
441440
}

0 commit comments

Comments
 (0)