-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmake_optional.py
More file actions
95 lines (72 loc) · 2.8 KB
/
make_optional.py
File metadata and controls
95 lines (72 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python3
"""
Script to make 'from', 'to', and 'value' fields optional in transactions.parquet files.
Finds all transactions.parquet files in ./data/ subdirectories and modifies the schema.
"""
import os
import pyarrow.parquet as pq
import pyarrow as pa
def make_fields_optional(input_path: str, output_path: str) -> bool:
"""
Read a parquet file and make 'from', 'to', 'value' fields optional.
Returns True if file was modified, False otherwise.
"""
# Read the original table
table = pq.read_table(input_path)
schema = table.schema
# Check if fields already have nullable=True
from_field = schema.field('from')
to_field = schema.field('to')
value_field = schema.field('value')
# If all fields are already nullable, skip
if from_field.nullable and to_field.nullable and value_field.nullable:
print(f" Skipping (already optional): {input_path}")
return False
# Create new schema with nullable fields
new_fields = []
for field in schema:
if field.name in ('from', 'to', 'value'):
new_fields.append(pa.field(field.name, field.type, nullable=True))
else:
new_fields.append(field)
new_schema = pa.schema(new_fields)
# Cast columns to new schema (handles the conversion)
table = table.cast(new_schema)
# Write to output path (overwrite original)
pq.write_table(table, output_path, compression='zstd')
print(f" Modified: {input_path}")
return True
def process_directory(data_dir: str) -> tuple[int, int]:
"""
Process all transactions.parquet files in subdirectories.
Returns (modified_count, skipped_count).
"""
modified = 0
skipped = 0
# Get all subdirectories
subdirs = sorted([d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))])
print(f"Found {len(subdirs)} subdirectories in {data_dir}")
for subdir in subdirs:
parquet_path = os.path.join(data_dir, subdir, 'transactions.parquet')
if os.path.exists(parquet_path):
try:
if make_fields_optional(parquet_path, parquet_path):
modified += 1
else:
skipped += 1
except Exception as e:
print(f" Error processing {parquet_path}: {e}")
else:
print(f" No transactions.parquet in {subdir}")
return modified, skipped
def main():
data_dir = 'data'
print(f"Processing transactions.parquet files in ./{data_dir}/")
print("=" * 60)
modified, skipped = process_directory(data_dir)
print("=" * 60)
print(f"Summary:")
print(f" Modified: {modified}")
print(f" Skipped (already optional): {skipped}")
if __name__ == '__main__':
main()