-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdedupe_contracts.mjs
More file actions
170 lines (144 loc) · 5.08 KB
/
dedupe_contracts.mjs
File metadata and controls
170 lines (144 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import * as fs from 'fs';
import * as path from 'path';
import { ParquetReader, ParquetWriter } from '@dsnp/parquetjs';
// Helper to convert Buffer address to hex string
function addressToHex(buffer) {
if (!buffer) return null;
if (Buffer.isBuffer(buffer)) {
return '0x' + buffer.toString('hex');
}
return '0x' + Buffer.from(buffer).toString('hex');
}
// Find all contracts.parquet files in data directory
function findContractsParquetFiles(dir) {
const files = [];
const entries = fs.readdirSync(dir, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(dir, entry.name);
if (entry.isDirectory()) {
const contractPath = path.join(fullPath, 'contracts.parquet');
if (fs.existsSync(contractPath)) {
files.push(contractPath);
}
}
}
return files;
}
// Read all records from a parquet file using @dsnp/parquetjs
async function readParquetFile(filePath) {
try {
const reader = await ParquetReader.openFile(filePath);
const cursor = reader.getCursor();
const records = [];
let row;
while ((row = await cursor.next()) !== null) {
records.push(row);
}
await reader.close();
return records;
} catch (err) {
console.error(`Error reading ${filePath}: ${err.message}`);
return null;
}
}
// Define schema for @dsnp/parquetjs
const schema = {
number: { type: 'INT64', optional: false },
address: { type: 'BYTE_ARRAY', optional: false },
decimals: { type: 'INT32', optional: true },
name: { type: 'BYTE_ARRAY', optional: true },
symbol: { type: 'BYTE_ARRAY', optional: true }
};
// Write records to a parquet file
async function writeParquetFile(filePath, records) {
const writer = await ParquetWriter.openFile(schema, filePath);
for (const record of records) {
await writer.appendRow(record);
}
await writer.close();
}
async function main() {
console.log('Finding all contracts.parquet files...');
const parquetFiles = findContractsParquetFiles('data');
console.log(`Found ${parquetFiles.length} contracts.parquet files`);
// Read all records from all files
console.log('Reading all records...');
const allRecords = [];
const fileRecords = new Map(); // Map of file path -> records
let readErrors = 0;
for (const filePath of parquetFiles) {
const records = await readParquetFile(filePath);
if (records === null) {
readErrors++;
continue;
}
fileRecords.set(filePath, records);
allRecords.push(...records.map(r => ({ ...r, _file: filePath })));
console.log(` ${path.basename(path.dirname(filePath))}: ${records.length} records`);
}
console.log(`Files read errors: ${readErrors}`);
console.log(`Total records: ${allRecords.length}`);
// Group by address and find duplicates
console.log('Finding duplicates by address...');
const addressMap = new Map(); // address -> records
for (const record of allRecords) {
const addrHex = addressToHex(record.address);
if (!addressMap.has(addrHex)) {
addressMap.set(addrHex, []);
}
addressMap.get(addrHex).push(record);
}
// Find duplicates
const duplicates = [];
const uniqueAddresses = new Set();
for (const [addr, records] of addressMap) {
if (records.length > 1) {
duplicates.push({ address: addr, records });
} else {
uniqueAddresses.add(addr);
}
}
console.log(`Unique addresses: ${uniqueAddresses.size}`);
console.log(`Duplicate addresses: ${duplicates.length}`);
// For each duplicate address, find the one with lowest number
const recordsToKeep = new Set(); // Set of records to keep (by file + index)
for (const dup of duplicates) {
// Sort by number (ascending) and keep the first one
// Handle BigInt comparison
dup.records.sort((a, b) => {
if (a.number < b.number) return -1;
if (a.number > b.number) return 1;
return 0;
});
const toKeep = dup.records[0];
const numStr = String(toKeep.number);
recordsToKeep.add(`${toKeep._file}::${numStr}::${addressToHex(toKeep.address)}`);
}
// Now process each file and write cleaned data
console.log('Writing cleaned data...');
let totalRemoved = 0;
let writeErrors = 0;
for (const [filePath, records] of fileRecords) {
const cleanedRecords = [];
for (const record of records) {
const numStr = String(record.number);
const key = `${filePath}::${numStr}::${addressToHex(record.address)}`;
if (recordsToKeep.has(key) || uniqueAddresses.has(addressToHex(record.address))) {
cleanedRecords.push(record);
} else {
totalRemoved++;
}
}
try {
await writeParquetFile(filePath, cleanedRecords);
console.log(` ${path.basename(path.dirname(filePath))}: ${records.length} -> ${cleanedRecords.length} records (removed ${records.length - cleanedRecords.length})`);
} catch (err) {
writeErrors++;
console.error(`Error writing ${filePath}: ${err.message}`);
}
}
console.log(`\nTotal duplicates removed: ${totalRemoved}`);
console.log(`Write errors: ${writeErrors}`);
console.log('Done!');
}
main().catch(console.error);