Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 77 additions & 89 deletions apps/ccusage/src/data-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,25 +486,6 @@ function filterByProject<T>(
});
}

/**
* Checks if an entry is a duplicate based on hash
*/
function isDuplicateEntry(uniqueHash: string | null, processedHashes: Set<string>): boolean {
if (uniqueHash == null) {
return false;
}
return processedHashes.has(uniqueHash);
}

/**
* Marks an entry as processed
*/
function markAsProcessed(uniqueHash: string | null, processedHashes: Set<string>): void {
if (uniqueHash != null) {
processedHashes.add(uniqueHash);
}
}

/**
* Extracts unique models from entries, excluding synthetic model
*/
Expand Down Expand Up @@ -774,17 +755,17 @@ export async function loadDailyUsageData(options?: LoadOptions): Promise<DailyUs
// Use PricingFetcher with using statement for automatic cleanup
using fetcher = mode === 'display' ? null : new PricingFetcher(options?.offline);

// Track processed message+request combinations for deduplication
const processedHashes = new Set<string>();

// Collect all valid data entries first
const allEntries: {
// Track entries by hash to keep the last (most complete) entry for each unique message+request
// This is important for streaming responses where output_tokens accumulate across multiple entries
type EntryType = {
data: UsageData;
date: string;
cost: number;
model: string | undefined;
project: string;
}[] = [];
};
const entriesByHash = new Map<string, EntryType>();
const entriesWithoutHash: EntryType[] = [];

for (const file of sortedFiles) {
// Extract project name from file path once per file
Expand All @@ -799,30 +780,32 @@ export async function loadDailyUsageData(options?: LoadOptions): Promise<DailyUs
}
const data = result.output;

// Check for duplicate message + request ID combination
const uniqueHash = createUniqueHash(data);
if (isDuplicateEntry(uniqueHash, processedHashes)) {
// Skip duplicate message
return;
}

// Mark this combination as processed
markAsProcessed(uniqueHash, processedHashes);

// Always use DEFAULT_LOCALE for date grouping to ensure YYYY-MM-DD format
const date = formatDate(data.timestamp, options?.timezone, DEFAULT_LOCALE);
// If fetcher is available, calculate cost based on mode and tokens
// If fetcher is null, use pre-calculated costUSD or default to 0
const cost =
fetcher != null ? await calculateCostForEntry(data, mode, fetcher) : (data.costUSD ?? 0);

allEntries.push({ data, date, cost, model: data.message.model, project });
const entry: EntryType = { data, date, cost, model: data.message.model, project };

// For entries with a unique hash, keep the last one (replaces previous)
// This ensures we get the final output_tokens count from streaming responses
const uniqueHash = createUniqueHash(data);
if (uniqueHash != null) {
entriesByHash.set(uniqueHash, entry);
} else {
entriesWithoutHash.push(entry);
}
} catch {
// Skip invalid JSON lines
}
});
}

// Combine entries: those tracked by hash + those without hash
const allEntries = [...entriesByHash.values(), ...entriesWithoutHash];

// Group by date, optionally including project
// Automatically enable project grouping when project filter is specified
const needsProjectGrouping = options?.groupByProject === true || options?.project != null;
Expand Down Expand Up @@ -931,19 +914,19 @@ export async function loadSessionData(options?: LoadOptions): Promise<SessionUsa
// Use PricingFetcher with using statement for automatic cleanup
using fetcher = mode === 'display' ? null : new PricingFetcher(options?.offline);

// Track processed message+request combinations for deduplication
const processedHashes = new Set<string>();

// Collect all valid data entries with session info first
const allEntries: Array<{
// Track entries by hash to keep the last (most complete) entry for each unique message+request
// This is important for streaming responses where output_tokens accumulate across multiple entries
type EntryType = {
data: UsageData;
sessionKey: string;
sessionId: string;
projectPath: string;
cost: number;
timestamp: string;
model: string | undefined;
}> = [];
};
const entriesByHash = new Map<string, EntryType>();
const entriesWithoutHash: EntryType[] = [];

for (const { file, baseDir } of sortedFilesWithBase) {
// Extract session info from file path using its specific base directory
Expand All @@ -965,35 +948,37 @@ export async function loadSessionData(options?: LoadOptions): Promise<SessionUsa
}
const data = result.output;

// Check for duplicate message + request ID combination
const uniqueHash = createUniqueHash(data);
if (isDuplicateEntry(uniqueHash, processedHashes)) {
// Skip duplicate message
return;
}

// Mark this combination as processed
markAsProcessed(uniqueHash, processedHashes);

const sessionKey = `${projectPath}/${sessionId}`;
const cost =
fetcher != null ? await calculateCostForEntry(data, mode, fetcher) : (data.costUSD ?? 0);

allEntries.push({
const entry: EntryType = {
data,
sessionKey,
sessionId,
projectPath,
cost,
timestamp: data.timestamp,
model: data.message.model,
});
};

// For entries with a unique hash, keep the last one (replaces previous)
// This ensures we get the final output_tokens count from streaming responses
const uniqueHash = createUniqueHash(data);
if (uniqueHash != null) {
entriesByHash.set(uniqueHash, entry);
} else {
entriesWithoutHash.push(entry);
}
} catch {
// Skip invalid JSON lines
}
});
}

// Combine entries: those tracked by hash + those without hash
const allEntries = [...entriesByHash.values(), ...entriesWithoutHash];

// Group by session using Object.groupBy
const groupedBySessions = groupBy(allEntries, (entry) => entry.sessionKey);

Expand Down Expand Up @@ -1376,11 +1361,10 @@ export async function loadSessionBlockData(options?: LoadOptions): Promise<Sessi
// Use PricingFetcher with using statement for automatic cleanup
using fetcher = mode === 'display' ? null : new PricingFetcher(options?.offline);

// Track processed message+request combinations for deduplication
const processedHashes = new Set<string>();

// Collect all valid data entries first
const allEntries: LoadedUsageEntry[] = [];
// Track entries by hash to keep the last (most complete) entry for each unique message+request
// This is important for streaming responses where output_tokens accumulate across multiple entries
const entriesByHash = new Map<string, LoadedUsageEntry>();
const entriesWithoutHash: LoadedUsageEntry[] = [];

for (const file of sortedFiles) {
await processJSONLFileByLine(file, async (line) => {
Expand All @@ -1392,23 +1376,13 @@ export async function loadSessionBlockData(options?: LoadOptions): Promise<Sessi
}
const data = result.output;

// Check for duplicate message + request ID combination
const uniqueHash = createUniqueHash(data);
if (isDuplicateEntry(uniqueHash, processedHashes)) {
// Skip duplicate message
return;
}

// Mark this combination as processed
markAsProcessed(uniqueHash, processedHashes);

const cost =
fetcher != null ? await calculateCostForEntry(data, mode, fetcher) : (data.costUSD ?? 0);

// Get Claude Code usage limit expiration date
const usageLimitResetTime = getUsageLimitResetTime(data);

allEntries.push({
const entry: LoadedUsageEntry = {
timestamp: new Date(data.timestamp),
usage: {
inputTokens: data.message.usage.input_tokens,
Expand All @@ -1420,7 +1394,16 @@ export async function loadSessionBlockData(options?: LoadOptions): Promise<Sessi
model: data.message.model ?? 'unknown',
version: data.version,
usageLimitResetTime: usageLimitResetTime ?? undefined,
});
};

// For entries with a unique hash, keep the last one (replaces previous)
// This ensures we get the final output_tokens count from streaming responses
const uniqueHash = createUniqueHash(data);
if (uniqueHash != null) {
entriesByHash.set(uniqueHash, entry);
} else {
entriesWithoutHash.push(entry);
}
} catch (error) {
// Skip invalid JSON lines but log for debugging purposes
logger.debug(
Expand All @@ -1430,6 +1413,9 @@ export async function loadSessionBlockData(options?: LoadOptions): Promise<Sessi
});
}

// Combine entries: those tracked by hash + those without hash
const allEntries = [...entriesByHash.values(), ...entriesWithoutHash];

// Identify session blocks
const blocks = identifySessionBlocks(allEntries, options?.sessionDurationHours);

Expand Down Expand Up @@ -4412,14 +4398,15 @@ if (import.meta.vitest != null) {
mode: 'display',
});

// Should only have one entry for 2025-01-10
// Should only have one entry - keeping the LAST one (2025-01-15)
// This is important for streaming responses where output_tokens accumulate
expect(data).toHaveLength(1);
expect(data[0]?.date).toBe('2025-01-10');
expect(data[0]?.date).toBe('2025-01-15');
expect(data[0]?.inputTokens).toBe(100);
expect(data[0]?.outputTokens).toBe(50);
});

it('should process files in chronological order', async () => {
it('should process files in chronological order and keep last entry', async () => {
await using fixture = await createFixture({
projects: {
'newer.jsonl': JSON.stringify({
Expand Down Expand Up @@ -4454,11 +4441,12 @@ if (import.meta.vitest != null) {
mode: 'display',
});

// Should keep the older entry (100/50 tokens) not the newer one (200/100)
// Should keep the LAST entry (200/100 tokens) for streaming response accuracy
// Files are processed in chronological order, last entry wins
expect(data).toHaveLength(1);
expect(data[0]?.date).toBe('2025-01-10');
expect(data[0]?.inputTokens).toBe(100);
expect(data[0]?.outputTokens).toBe(50);
expect(data[0]?.date).toBe('2025-01-15');
expect(data[0]?.inputTokens).toBe(200);
expect(data[0]?.outputTokens).toBe(100);
});
});

Expand Down Expand Up @@ -4504,19 +4492,19 @@ if (import.meta.vitest != null) {
mode: 'display',
});

// Session 1 should have the entry
const session1 = sessions.find((s) => s.sessionId === 'session1');
expect(session1).toBeDefined();
expect(session1?.inputTokens).toBe(100);
expect(session1?.outputTokens).toBe(50);

// Session 2 should either not exist or have 0 tokens (duplicate was skipped)
// Session 2 should have the entry (LAST one wins for streaming accuracy)
const session2 = sessions.find((s) => s.sessionId === 'session2');
if (session2 != null) {
expect(session2.inputTokens).toBe(0);
expect(session2.outputTokens).toBe(0);
expect(session2).toBeDefined();
expect(session2?.inputTokens).toBe(100);
expect(session2?.outputTokens).toBe(50);

// Session 1 should either not exist or have 0 tokens (superseded by later entry)
const session1 = sessions.find((s) => s.sessionId === 'session1');
if (session1 != null) {
expect(session1.inputTokens).toBe(0);
expect(session1.outputTokens).toBe(0);
} else {
// It's also valid for session2 to not be included if it has no entries
// It's also valid for session1 to not be included if it has no entries
expect(sessions.length).toBe(1);
}
});
Expand Down