Skip to content

Commit b69a0a2

Browse files
committed
fix: fixes for mixpanel specific issues and how we handle should run import step
1 parent 12b0675 commit b69a0a2

File tree

4 files changed

+232
-61
lines changed

4 files changed

+232
-61
lines changed

apps/worker/src/jobs/import.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,19 @@ export async function importJob(job: Job<ImportQueuePayload>) {
8989
(record.currentStep === 'backfilling_sessions' && record.currentBatch) ||
9090
undefined;
9191

92+
// Example:
93+
// shouldRunStep(0) // currStep = 2 (should not run)
94+
// shouldRunStep(1) // currStep = 2 (should not run)
95+
// shouldRunStep(2) // currStep = 2 (should run)
96+
// shouldRunStep(3) // currStep = 2 (should run)
9297
const shouldRunStep = (step: ValidStep) => {
9398
if (isNewImport) {
9499
return true;
95100
}
96101

97-
const currentStepIndex = steps[step];
98-
return currentStepIndex > steps[record.currentStep as ValidStep];
102+
const stepToRunIndex = steps[step];
103+
const currentStepIndex = steps[record.currentStep as ValidStep];
104+
return stepToRunIndex >= currentStepIndex;
99105
};
100106

101107
async function whileBounds(

packages/db/src/services/import.service.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,7 @@ export async function insertImportBatch(
5656
csvEscapeField(event.referrer_type || ''),
5757
csvEscapeField(event.duration ?? 0),
5858
csvEscapeJson(event.properties),
59-
csvEscapeField(
60-
event.created_at
61-
? formatClickhouseDate(event.created_at)
62-
: formatClickhouseDate(new Date()),
63-
),
59+
csvEscapeField(event.created_at),
6460
csvEscapeField(event.country || ''),
6561
csvEscapeField(event.city || ''),
6662
csvEscapeField(event.region || ''),
@@ -76,7 +72,7 @@ export async function insertImportBatch(
7672
csvEscapeField('\\N'), // imported_at (Nullable)
7773
csvEscapeField(importId),
7874
csvEscapeField('pending'), // import_status
79-
csvEscapeField(formatClickhouseDate(new Date()).replace(/\.\d{3}$/, '')), // imported_at_meta (DateTime, not DateTime64, so no milliseconds)
75+
csvEscapeField(formatClickhouseDate(new Date())), // imported_at_meta (DateTime, not DateTime64, so no milliseconds)
8076
];
8177
return fields.join(',');
8278
});
@@ -326,7 +322,7 @@ export async function createSessionsStartEndEvents(
326322
string,
327323
string | number | boolean | null | undefined
328324
>, // From last event
329-
created_at: adjustTimestamp(session.last_timestamp, 1000), // 1 second after last event
325+
created_at: adjustTimestamp(session.last_timestamp, 500), // 1 second after last event
330326
country: firstCountry, // Same as session_start
331327
city: firstCity, // Same as session_start
332328
region: firstRegion, // Same as session_start
@@ -499,7 +495,15 @@ export async function backfillSessionsToProduction(
499495
os_version,
500496
sign,
501497
version,
502-
properties
498+
properties,
499+
utm_medium,
500+
utm_source,
501+
utm_campaign,
502+
utm_content,
503+
utm_term,
504+
referrer,
505+
referrer_name,
506+
referrer_type
503507
)
504508
SELECT
505509
any(e.session_id) as id,
@@ -535,11 +539,19 @@ export async function backfillSessionsToProduction(
535539
argMinIf(e.os_version, e.created_at, e.name = 'session_start') as os_version,
536540
1 as sign,
537541
1 as version,
538-
argMinIf(e.properties, e.created_at, e.name = 'session_start') as properties
542+
argMinIf(e.properties, e.created_at, e.name = 'session_start') as properties,
543+
argMinIf(e.properties['__query.utm_medium'], e.created_at, e.name = 'session_start') as utm_medium,
544+
argMinIf(e.properties['__query.utm_source'], e.created_at, e.name = 'session_start') as utm_source,
545+
argMinIf(e.properties['__query.utm_campaign'], e.created_at, e.name = 'session_start') as utm_campaign,
546+
argMinIf(e.properties['__query.utm_content'], e.created_at, e.name = 'session_start') as utm_content,
547+
argMinIf(e.properties['__query.utm_term'], e.created_at, e.name = 'session_start') as utm_term,
548+
argMinIf(e.referrer, e.created_at, e.name = 'session_start') as referrer,
549+
argMinIf(e.referrer_name, e.created_at, e.name = 'session_start') as referrer_name,
550+
argMinIf(e.referrer_type, e.created_at, e.name = 'session_start') as referrer_type
539551
FROM ${TABLE_NAMES.events_imports} e
540552
WHERE
541553
e.import_id = ${sqlstring.escape(importId)}
542-
AND toDate(e.created_at) >= ${sqlstring.escape(from)}
554+
AND toDate(e.created_at) = ${sqlstring.escape(from)}
543555
AND e.session_id != ''
544556
GROUP BY e.session_id
545557
`;
@@ -746,7 +758,7 @@ export async function updateImportStatus(
746758
case 'backfilling_sessions':
747759
data.currentStep = 'backfilling_sessions';
748760
data.currentBatch = options.batch;
749-
data.statusMessage = `Backfilling sessions for ${options.batch}`;
761+
data.statusMessage = `Aggregating sessions for ${options.batch}`;
750762
break;
751763
case 'completed':
752764
data.status = 'completed';

packages/importer/src/providers/mixpanel.test.ts

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { omit } from 'ramda';
12
import { describe, expect, it } from 'vitest';
23
import { MixpanelProvider } from './mixpanel';
34

@@ -120,7 +121,7 @@ describe('mixpanel', () => {
120121
referrer_name: 'Google',
121122
referrer_type: 'search',
122123
imported_at: expect.any(String),
123-
sdk_name: 'mixpanel',
124+
sdk_name: 'mixpanel (web)',
124125
sdk_version: '1.0.0',
125126
});
126127
});
@@ -175,4 +176,144 @@ describe('mixpanel', () => {
175176
'nested.level1.level2': 'value',
176177
});
177178
});
179+
180+
it('should handle react-native referrer', async () => {
181+
const provider = new MixpanelProvider('pid', {
182+
from: '2025-01-01',
183+
to: '2025-01-02',
184+
serviceAccount: 'sa',
185+
serviceSecret: 'ss',
186+
projectId: '123',
187+
provider: 'mixpanel',
188+
type: 'api',
189+
mapScreenViewProperty: undefined,
190+
});
191+
192+
const rawEvent = {
193+
event: 'ec_search_error',
194+
properties: {
195+
time: 1759947367,
196+
distinct_id: '3385916',
197+
$browser: 'Mobile Safari',
198+
$browser_version: null,
199+
$city: 'Bengaluru',
200+
$current_url:
201+
'https://web.landeed.com/karnataka/ec-encumbrance-certificate',
202+
$device: 'iPhone',
203+
$device_id:
204+
'199b498af1036c-0e943279a1292e-5c0f4368-51bf4-199b498af1036c',
205+
$initial_referrer: 'https://www.google.com/',
206+
$initial_referring_domain: 'www.google.com',
207+
$insert_id: 'bclkaepeqcfuzt4v',
208+
$lib_version: '2.60.0',
209+
$mp_api_endpoint: 'api-js.mixpanel.com',
210+
$mp_api_timestamp_ms: 1759927570699,
211+
$os: 'iOS',
212+
$region: 'Karnataka',
213+
$screen_height: 852,
214+
$screen_width: 393,
215+
$search_engine: 'google',
216+
$user_id: '3385916',
217+
binaryReadableVersion: 'NA',
218+
binaryVersion: 'NA',
219+
component: '/karnataka/ec-encumbrance-certificate',
220+
errMsg: 'Request failed with status code 500',
221+
errType: 'SERVER_ERROR',
222+
isSilentSearch: false,
223+
isTimeout: false,
224+
jsVersion: '0.42.0',
225+
language: 'english',
226+
mp_country_code: 'IN',
227+
mp_lib: 'web',
228+
mp_processing_time_ms: 1759927592421,
229+
mp_sent_by_lib_version: '2.60.0',
230+
os: 'web',
231+
osVersion:
232+
'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) GSA/388.0.811331708 Mobile/15E148 Safari/604.1',
233+
phoneBrand: 'NA',
234+
phoneManufacturer: 'NA',
235+
phoneModel: 'NA',
236+
searchUuid: '68e65d08-fd81-4ded-37d3-2b08d2bc70c3',
237+
serverVersion: 'web2.0',
238+
state: 17,
239+
stateStr: '17',
240+
statusCode: 500,
241+
type: 'result_event',
242+
utm_medium: 'cpc',
243+
utm_source:
244+
'google%26utm_medium=cpc%26utm_campaignid=21380769590%26utm_adgroupid=%26utm_adid=%26utm_term=%26utm_device=m%26utm_network=%26utm_location=9062055%26gclid=%26gad_campaignid=21374496705%26gbraid=0AAAAAoV7mTM9mWFripzQ2Od0xXAfrW6p3%26wbraid=CmAKCQjwi4PHBhCUA',
245+
},
246+
};
247+
248+
const res = provider.transformEvent(rawEvent);
249+
250+
expect(res.id.length).toBeGreaterThan(30);
251+
expect(res.imported_at).toMatch(
252+
/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/,
253+
);
254+
expect(omit(['id', 'imported_at'], res)).toEqual({
255+
brand: 'Apple',
256+
browser: 'GSA',
257+
browser_version: 'null',
258+
city: 'Bengaluru',
259+
country: 'IN',
260+
created_at: '2025-10-08T18:16:07.000Z',
261+
device: 'mobile',
262+
device_id: '199b498af1036c-0e943279a1292e-5c0f4368-51bf4-199b498af1036c',
263+
duration: 0,
264+
latitude: null,
265+
longitude: null,
266+
model: 'iPhone',
267+
name: 'ec_search_error',
268+
origin: 'https://web.landeed.com',
269+
os: 'iOS',
270+
os_version: '18.7.0',
271+
path: '/karnataka/ec-encumbrance-certificate',
272+
profile_id: '3385916',
273+
project_id: 'pid',
274+
properties: {
275+
__lib_version: '2.60.0',
276+
'__query.gad_campaignid': '21374496705',
277+
'__query.gbraid': '0AAAAAoV7mTM9mWFripzQ2Od0xXAfrW6p3',
278+
'__query.utm_campaignid': '21380769590',
279+
'__query.utm_device': 'm',
280+
'__query.utm_location': '9062055',
281+
'__query.utm_medium': 'cpc',
282+
'__query.utm_source': 'google',
283+
'__query.wbraid': 'CmAKCQjwi4PHBhCUA',
284+
__screen: '393x852',
285+
__source_insert_id: 'bclkaepeqcfuzt4v',
286+
__userAgent:
287+
'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) GSA/388.0.811331708 Mobile/15E148 Safari/604.1',
288+
binaryReadableVersion: 'NA',
289+
binaryVersion: 'NA',
290+
component: '/karnataka/ec-encumbrance-certificate',
291+
errMsg: 'Request failed with status code 500',
292+
errType: 'SERVER_ERROR',
293+
isSilentSearch: 'false',
294+
isTimeout: 'false',
295+
jsVersion: '0.42.0',
296+
language: 'english',
297+
os: 'web',
298+
osVersion:
299+
'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) GSA/388.0.811331708 Mobile/15E148 Safari/604.1',
300+
phoneBrand: 'NA',
301+
phoneManufacturer: 'NA',
302+
phoneModel: 'NA',
303+
searchUuid: '68e65d08-fd81-4ded-37d3-2b08d2bc70c3',
304+
serverVersion: 'web2.0',
305+
state: '17',
306+
stateStr: '17',
307+
statusCode: '500',
308+
type: 'result_event',
309+
},
310+
referrer: 'https://www.google.com',
311+
referrer_name: 'Google',
312+
referrer_type: 'search',
313+
region: 'Karnataka',
314+
sdk_name: 'mixpanel (web)',
315+
sdk_version: '1.0.0',
316+
session_id: '',
317+
});
318+
});
178319
});

0 commit comments

Comments
 (0)