Skip to content

Commit c9ba99c

Browse files
committed
fix(migration): erase corrupt ids from failed edits
1 parent 8b92f00 commit c9ba99c

File tree

3 files changed

+63
-12
lines changed

3 files changed

+63
-12
lines changed

pychunkedgraph/ingest/upgrade/atomic_layer.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pychunkedgraph.graph.attributes import Connectivity, Hierarchy
1212
from pychunkedgraph.graph.utils import serializers
1313

14-
from .utils import get_end_timestamps, get_parent_timestamps
14+
from .utils import fix_corrupt_nodes, get_end_timestamps, get_parent_timestamps
1515

1616
CHILDREN = {}
1717

@@ -136,16 +136,19 @@ def update_chunk(cg: ChunkedGraph, chunk_coords: list[int]):
136136
nodes = []
137137
nodes_ts = []
138138
earliest_ts = cg.get_earliest_timestamp()
139+
corrupt_nodes = []
139140
for k, v in rr.items():
140141
try:
141-
_ = v[Hierarchy.Parent]
142-
nodes.append(k)
143142
CHILDREN[k] = v[Hierarchy.Child][0].value
144143
ts = v[Hierarchy.Child][0].timestamp
144+
_ = v[Hierarchy.Parent]
145+
nodes.append(k)
145146
nodes_ts.append(earliest_ts if ts < earliest_ts else ts)
146147
except KeyError:
147-
# invalid nodes from failed tasks w/o parent column entry
148-
continue
148+
# ignore invalid nodes from failed ingest tasks, w/o parent column entry
149+
# retain invalid nodes from edits to fix the hierarchy
150+
if ts > earliest_ts:
151+
corrupt_nodes.append(k)
149152

150153
if len(nodes) > 0:
151154
logging.info(f"processing {len(nodes)} nodes.")
@@ -156,3 +159,7 @@ def update_chunk(cg: ChunkedGraph, chunk_coords: list[int]):
156159
rows = update_nodes(cg, nodes, nodes_ts)
157160
cg.client.write(rows)
158161
logging.info(f"mutations: {len(rows)}, time: {time.time() - start}")
162+
163+
if len(corrupt_nodes) > 0:
164+
logging.info(f"found {len(corrupt_nodes)} corrupt nodes {corrupt_nodes[:3]}...")
165+
fix_corrupt_nodes(cg, corrupt_nodes, CHILDREN)

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from pychunkedgraph.graph.types import empty_2d
1818
from pychunkedgraph.utils.general import chunked
1919

20-
from .utils import get_end_timestamps, get_parent_timestamps
20+
from .utils import fix_corrupt_nodes, get_end_timestamps, get_parent_timestamps
2121

2222

2323
CHILDREN = {}
@@ -145,17 +145,28 @@ def _update_cross_edges_helper(args):
145145
parents = cg.get_parents(nodes, fail_to_zero=True)
146146

147147
tasks = []
148+
corrupt_nodes = []
149+
earliest_ts = cg.get_earliest_timestamp()
148150
for node, parent, node_ts in zip(nodes, parents, nodes_ts):
149151
if parent == 0:
150-
# invalid id caused by failed ingest task / edits
151-
continue
152-
tasks.append((cg, layer, node, node_ts))
152+
# ignore invalid nodes from failed ingest tasks, w/o parent column entry
153+
# retain invalid nodes from edits to fix the hierarchy
154+
if node_ts > earliest_ts:
155+
corrupt_nodes.append(node)
156+
else:
157+
tasks.append((cg, layer, node, node_ts))
153158

154159
with ThreadPoolExecutor(max_workers=4) as executor:
155-
futures = [executor.submit(_update_cross_edges_helper_thread, task) for task in tasks]
160+
futures = [
161+
executor.submit(_update_cross_edges_helper_thread, task) for task in tasks
162+
]
156163
for future in tqdm(as_completed(futures), total=len(futures)):
157164
rows.extend(future.result())
165+
158166
cg.client.write(rows)
167+
if len(corrupt_nodes) > 0:
168+
logging.info(f"found {len(corrupt_nodes)} corrupt nodes {corrupt_nodes[:3]}...")
169+
fix_corrupt_nodes(cg, corrupt_nodes, CHILDREN)
159170

160171

161172
def update_chunk(
@@ -164,7 +175,7 @@ def update_chunk(
164175
"""
165176
Iterate over all layer IDs in a chunk and update their cross chunk edges.
166177
"""
167-
debug = nodes is not None
178+
debug = nodes is not None
168179
start = time.time()
169180
x, y, z = chunk_coords
170181
chunk_id = cg.get_chunk_id(layer=layer, x=x, y=y, z=z)

pychunkedgraph/ingest/upgrade/utils.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# pylint: disable=invalid-name, missing-docstring
22

33
from collections import defaultdict
4-
from datetime import datetime, timezone
4+
from datetime import datetime, timedelta
55

66
import numpy as np
77
from pychunkedgraph.graph import ChunkedGraph
88
from pychunkedgraph.graph.attributes import Hierarchy
9+
from pychunkedgraph.graph.utils import serializers
10+
from google.cloud.bigtable.row_filters import TimestampRange
911

1012

1113
def exists_as_parent(cg: ChunkedGraph, parent, nodes) -> bool:
@@ -102,3 +104,34 @@ def get_parent_timestamps(
102104
ts = cell.timestamp
103105
result[k].add(earliest_ts if ts < earliest_ts else ts)
104106
return result
107+
108+
109+
def fix_corrupt_nodes(cg: ChunkedGraph, nodes: list, children_d: dict):
110+
"""
111+
Iteratively removes a node from parent column of its children.
112+
Then removes the node iteself, effectively erasing it.
113+
"""
114+
table = cg.client._table
115+
batcher = table.mutations_batcher(flush_count=500)
116+
for node in nodes:
117+
children = children_d[node]
118+
_map = cg.client.read_nodes(node_ids=children, properties=Hierarchy.Parent)
119+
120+
for child, parent_cells in _map.items():
121+
row = table.direct_row(serializers.serialize_uint64(child))
122+
for cell in parent_cells:
123+
if cell.value == node:
124+
start = cell.timestamp
125+
end = start + timedelta(microseconds=1)
126+
row.delete_cell(
127+
column_family_id=Hierarchy.Parent.family_id,
128+
column=Hierarchy.Parent.key,
129+
time_range=TimestampRange(start=start, end=end),
130+
)
131+
batcher.mutate(row)
132+
133+
row = table.direct_row(serializers.serialize_uint64(node))
134+
row.delete()
135+
batcher.mutate(row)
136+
137+
batcher.flush()

0 commit comments

Comments
 (0)