Skip to content

Commit 6c90cc1

Browse files
committed
fix(migration): erase corrupt ids from failed edits
1 parent 8b92f00 commit 6c90cc1

File tree

3 files changed

+79
-20
lines changed

3 files changed

+79
-20
lines changed

pychunkedgraph/ingest/upgrade/atomic_layer.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from collections import defaultdict
44
from datetime import datetime, timedelta, timezone
5-
import logging, time
5+
import logging, time, os
66
from copy import copy
77

88
import fastremap
@@ -11,7 +11,7 @@
1111
from pychunkedgraph.graph.attributes import Connectivity, Hierarchy
1212
from pychunkedgraph.graph.utils import serializers
1313

14-
from .utils import get_end_timestamps, get_parent_timestamps
14+
from .utils import fix_corrupt_nodes, get_end_timestamps, get_parent_timestamps
1515

1616
CHILDREN = {}
1717

@@ -130,29 +130,37 @@ def update_chunk(cg: ChunkedGraph, chunk_coords: list[int]):
130130
start = time.time()
131131
x, y, z = chunk_coords
132132
chunk_id = cg.get_chunk_id(layer=2, x=x, y=y, z=z)
133-
cg.copy_fake_edges(chunk_id)
134133
rr = cg.range_read_chunk(chunk_id)
135134

136135
nodes = []
137136
nodes_ts = []
138137
earliest_ts = cg.get_earliest_timestamp()
138+
corrupt_nodes = []
139139
for k, v in rr.items():
140140
try:
141-
_ = v[Hierarchy.Parent]
142-
nodes.append(k)
143141
CHILDREN[k] = v[Hierarchy.Child][0].value
144142
ts = v[Hierarchy.Child][0].timestamp
143+
_ = v[Hierarchy.Parent]
144+
nodes.append(k)
145145
nodes_ts.append(earliest_ts if ts < earliest_ts else ts)
146146
except KeyError:
147-
# invalid nodes from failed tasks w/o parent column entry
148-
continue
147+
# ignore invalid nodes from failed ingest tasks, w/o parent column entry
148+
# retain invalid nodes from edits to fix the hierarchy
149+
if ts > earliest_ts:
150+
corrupt_nodes.append(k)
151+
152+
clean_task = os.environ.get("CLEAN_CHUNKS", "false") == "clean"
153+
if clean_task:
154+
logging.info(f"found {len(corrupt_nodes)} corrupt nodes {corrupt_nodes[:3]}...")
155+
fix_corrupt_nodes(cg, corrupt_nodes, CHILDREN)
156+
return
149157

150-
if len(nodes) > 0:
151-
logging.info(f"processing {len(nodes)} nodes.")
152-
assert len(CHILDREN) > 0, (nodes, CHILDREN)
153-
else:
158+
cg.copy_fake_edges(chunk_id)
159+
if len(nodes) == 0:
154160
return
155161

162+
logging.info(f"processing {len(nodes)} nodes.")
163+
assert len(CHILDREN) > 0, (nodes, CHILDREN)
156164
rows = update_nodes(cg, nodes, nodes_ts)
157165
cg.client.write(rows)
158166
logging.info(f"mutations: {len(rows)}, time: {time.time() - start}")

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# pylint: disable=invalid-name, missing-docstring, c-extension-no-member
22

3-
import logging, math, random, time
3+
import logging, math, random, time, os
44
import multiprocessing as mp
55
from collections import defaultdict
66
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -17,7 +17,7 @@
1717
from pychunkedgraph.graph.types import empty_2d
1818
from pychunkedgraph.utils.general import chunked
1919

20-
from .utils import get_end_timestamps, get_parent_timestamps
20+
from .utils import fix_corrupt_nodes, get_end_timestamps, get_parent_timestamps
2121

2222

2323
CHILDREN = {}
@@ -61,6 +61,11 @@ def _populate_cx_edges_with_timestamps(
6161
when cross edges of children were updated.
6262
"""
6363

64+
clean_task = os.environ.get("CLEAN_CHUNKS", "false") == "clean"
65+
# this data is not needed for clean tasks
66+
if clean_task:
67+
return
68+
6469
start = time.time()
6570
global CX_EDGES
6671
attrs = [Connectivity.CrossChunkEdge[l] for l in range(layer, cg.meta.layer_count)]
@@ -139,20 +144,33 @@ def _update_cross_edges_helper_thread(args):
139144

140145

141146
def _update_cross_edges_helper(args):
142-
cg_info, layer, nodes, nodes_ts = args
143147
rows = []
148+
clean_task = os.environ.get("CLEAN_CHUNKS", "false") == "clean"
149+
cg_info, layer, nodes, nodes_ts = args
144150
cg = ChunkedGraph(**cg_info)
145151
parents = cg.get_parents(nodes, fail_to_zero=True)
146152

147153
tasks = []
154+
corrupt_nodes = []
155+
earliest_ts = cg.get_earliest_timestamp()
148156
for node, parent, node_ts in zip(nodes, parents, nodes_ts):
149157
if parent == 0:
150-
# invalid id caused by failed ingest task / edits
151-
continue
152-
tasks.append((cg, layer, node, node_ts))
158+
# ignore invalid nodes from failed ingest tasks, w/o parent column entry
159+
# retain invalid nodes from edits to fix the hierarchy
160+
if node_ts > earliest_ts:
161+
corrupt_nodes.append(node)
162+
else:
163+
tasks.append((cg, layer, node, node_ts))
164+
165+
if clean_task:
166+
logging.info(f"found {len(corrupt_nodes)} corrupt nodes {corrupt_nodes[:3]}...")
167+
fix_corrupt_nodes(cg, corrupt_nodes, CHILDREN)
168+
return
153169

154170
with ThreadPoolExecutor(max_workers=4) as executor:
155-
futures = [executor.submit(_update_cross_edges_helper_thread, task) for task in tasks]
171+
futures = [
172+
executor.submit(_update_cross_edges_helper_thread, task) for task in tasks
173+
]
156174
for future in tqdm(as_completed(futures), total=len(futures)):
157175
rows.extend(future.result())
158176
cg.client.write(rows)
@@ -164,7 +182,7 @@ def update_chunk(
164182
"""
165183
Iterate over all layer IDs in a chunk and update their cross chunk edges.
166184
"""
167-
debug = nodes is not None
185+
debug = nodes is not None
168186
start = time.time()
169187
x, y, z = chunk_coords
170188
chunk_id = cg.get_chunk_id(layer=layer, x=x, y=y, z=z)

pychunkedgraph/ingest/upgrade/utils.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# pylint: disable=invalid-name, missing-docstring
22

33
from collections import defaultdict
4-
from datetime import datetime, timezone
4+
from datetime import datetime, timedelta
55

66
import numpy as np
77
from pychunkedgraph.graph import ChunkedGraph
88
from pychunkedgraph.graph.attributes import Hierarchy
9+
from pychunkedgraph.graph.utils import serializers
10+
from google.cloud.bigtable.row_filters import TimestampRange
911

1012

1113
def exists_as_parent(cg: ChunkedGraph, parent, nodes) -> bool:
@@ -102,3 +104,34 @@ def get_parent_timestamps(
102104
ts = cell.timestamp
103105
result[k].add(earliest_ts if ts < earliest_ts else ts)
104106
return result
107+
108+
109+
def fix_corrupt_nodes(cg: ChunkedGraph, nodes: list, children_d: dict):
110+
"""
111+
Iteratively removes a node from parent column of its children.
112+
Then removes the node iteself, effectively erasing it.
113+
"""
114+
table = cg.client._table
115+
batcher = table.mutations_batcher(flush_count=500)
116+
for node in nodes:
117+
children = children_d[node]
118+
_map = cg.client.read_nodes(node_ids=children, properties=Hierarchy.Parent)
119+
120+
for child, parent_cells in _map.items():
121+
row = table.direct_row(serializers.serialize_uint64(child))
122+
for cell in parent_cells:
123+
if cell.value == node:
124+
start = cell.timestamp
125+
end = start + timedelta(microseconds=1)
126+
row.delete_cell(
127+
column_family_id=Hierarchy.Parent.family_id,
128+
column=Hierarchy.Parent.key,
129+
time_range=TimestampRange(start=start, end=end),
130+
)
131+
batcher.mutate(row)
132+
133+
row = table.direct_row(serializers.serialize_uint64(node))
134+
row.delete()
135+
batcher.mutate(row)
136+
137+
batcher.flush()

0 commit comments

Comments
 (0)