Skip to content

Commit fc6bd13

Browse files
Fine-grained AutoRepair observability on a node
1 parent 5e003af commit fc6bd13

18 files changed

+917
-222
lines changed

src/java/org/apache/cassandra/metrics/AutoRepairMetrics.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ public class AutoRepairMetrics
4646
public final Gauge<Integer> skippedTablesCount;
4747
public final Gauge<Integer> totalMVTablesConsideredForRepair;
4848
public final Gauge<Integer> totalDisabledRepairTables;
49+
public final Gauge<Long> totalBytesToRepair;
50+
public final Gauge<Long> bytesAlreadyRepaired;
51+
public final Gauge<Integer> totalKeyspaceRepairPlansToRepair;
52+
public final Gauge<Integer> keyspaceRepairPlansAlreadyRepaired;
53+
4954
public Counter repairTurnMyTurn;
5055
public Counter repairTurnMyTurnDueToPriority;
5156
public Counter repairTurnMyTurnForceRepair;
@@ -155,6 +160,34 @@ public Integer getValue()
155160
return AutoRepair.instance.getRepairState(repairType).getTotalDisabledTablesRepairCount();
156161
}
157162
});
163+
totalBytesToRepair = Metrics.register(factory.createMetricName("TotalBytesToRepair"), new Gauge<Long>()
164+
{
165+
public Long getValue()
166+
{
167+
return AutoRepair.instance.getRepairState(repairType).getTotalBytesToRepair();
168+
}
169+
});
170+
bytesAlreadyRepaired = Metrics.register(factory.createMetricName("BytesAlreadyRepaired"), new Gauge<Long>()
171+
{
172+
public Long getValue()
173+
{
174+
return AutoRepair.instance.getRepairState(repairType).getBytesAlreadyRepaired();
175+
}
176+
});
177+
totalKeyspaceRepairPlansToRepair = Metrics.register(factory.createMetricName("TotalKeyspaceRepairPlansToRepair"), new Gauge<Integer>()
178+
{
179+
public Integer getValue()
180+
{
181+
return AutoRepair.instance.getRepairState(repairType).getTotalKeyspaceRepairPlansToRepair();
182+
}
183+
});
184+
keyspaceRepairPlansAlreadyRepaired = Metrics.register(factory.createMetricName("KeyspaceRepairPlansAlreadyRepaired"), new Gauge<Integer>()
185+
{
186+
public Integer getValue()
187+
{
188+
return AutoRepair.instance.getRepairState(repairType).getKeyspaceRepairPlansAlreadyRepaired();
189+
}
190+
});
158191
}
159192

160193
public void recordTurn(AutoRepairUtils.RepairTurn turn)

src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,15 @@ public void setup()
120120
repairExecutors = new EnumMap<>(AutoRepairConfig.RepairType.class);
121121
repairRunnableExecutors = new EnumMap<>(AutoRepairConfig.RepairType.class);
122122
repairStates = new EnumMap<>(AutoRepairConfig.RepairType.class);
123+
AutoRepairConfig config = DatabaseDescriptor.getAutoRepairConfig();
124+
123125
for (AutoRepairConfig.RepairType repairType : AutoRepairConfig.RepairType.values())
124126
{
125127
repairExecutors.put(repairType, executorFactory().scheduled(false, "AutoRepair-Repair-" + repairType.getConfigName(), Thread.NORM_PRIORITY));
126128
repairRunnableExecutors.put(repairType, executorFactory().scheduled(false, "AutoRepair-RepairRunnable-" + repairType.getConfigName(), Thread.NORM_PRIORITY));
127-
repairStates.put(repairType, AutoRepairConfig.RepairType.getAutoRepairState(repairType));
129+
repairStates.put(repairType, AutoRepairConfig.RepairType.getAutoRepairState(repairType, config));
128130
}
129131

130-
AutoRepairConfig config = DatabaseDescriptor.getAutoRepairConfig();
131132
AutoRepairUtils.setup();
132133

133134
for (AutoRepairConfig.RepairType repairType : AutoRepairConfig.RepairType.values())
@@ -223,23 +224,29 @@ public void repair(AutoRepairConfig.RepairType repairType)
223224
}
224225

225226
// Separate out the keyspaces and tables to repair based on their priority, with each repair plan representing a uniquely occuring priority.
226-
List<PrioritizedRepairPlan> repairPlans = PrioritizedRepairPlan.build(keyspacesAndTablesToRepair, repairType, shuffleFunc);
227+
List<PrioritizedRepairPlan> repairPlans = PrioritizedRepairPlan.build(keyspacesAndTablesToRepair, repairType, shuffleFunc, primaryRangeOnly);
228+
229+
repairState.updateRepairScheduleStatistics(repairPlans);
227230

228231
// calculate the repair assignments for each priority:keyspace.
229232
Iterator<KeyspaceRepairAssignments> repairAssignmentsIterator = config.getTokenRangeSplitterInstance(repairType).getRepairAssignments(primaryRangeOnly, repairPlans);
230233

234+
int keyspaceRepairAssignmentsAlreadyRepaired = 0;
231235
while (repairAssignmentsIterator.hasNext())
232236
{
233237
KeyspaceRepairAssignments repairAssignments = repairAssignmentsIterator.next();
234238
List<RepairAssignment> assignments = repairAssignments.getRepairAssignments();
235239
if (assignments.isEmpty())
236240
{
241+
keyspaceRepairAssignmentsAlreadyRepaired++;
237242
logger.info("Skipping repairs for priorityBucket={} for keyspace={} since it yielded no assignments", repairAssignments.getPriority(), repairAssignments.getKeyspaceName());
238243
continue;
239244
}
240245

241246
logger.info("Submitting repairs for priorityBucket={} for keyspace={} with assignmentCount={}", repairAssignments.getPriority(), repairAssignments.getKeyspaceName(), repairAssignments.getRepairAssignments().size());
242247
repairKeyspace(repairType, primaryRangeOnly, repairAssignments.getKeyspaceName(), repairAssignments.getRepairAssignments(), collectedRepairStats);
248+
keyspaceRepairAssignmentsAlreadyRepaired++;
249+
repairState.setKeyspaceRepairPlansAlreadyRepaired(keyspaceRepairAssignmentsAlreadyRepaired);
243250
}
244251

245252
cleanupAndUpdateStats(turn, repairType, repairState, myId, startTime, collectedRepairStats);
@@ -269,6 +276,7 @@ private void repairKeyspace(AutoRepairConfig.RepairType repairType, boolean prim
269276
long tableStartTime = timeFunc.get();
270277
int totalProcessedAssignments = 0;
271278
Set<Range<Token>> ranges = new HashSet<>();
279+
long bytesAlreadyRepaired = 0;
272280
for (RepairAssignment curRepairAssignment : repairAssignments)
273281
{
274282
try
@@ -372,7 +380,9 @@ else if (retryCount < config.getRepairMaxRetries(repairType))
372380
}
373381
ranges.clear();
374382
}
375-
logger.info("Repair completed for {} tables {}, range {}", keyspaceName, curRepairAssignment.getTableNames(), curRepairAssignment.getTokenRange());
383+
bytesAlreadyRepaired += curRepairAssignment.getEstimatedBytes();
384+
repairState.setBytesAlreadyRepaired(bytesAlreadyRepaired);
385+
logger.info("Repair completed for {} tables {}, range {}, bytesAlreadyRepaired {}", keyspaceName, curRepairAssignment.getTableNames(), curRepairAssignment.getTokenRange(), bytesAlreadyRepaired);
376386
}
377387
catch (Exception e)
378388
{
@@ -462,7 +472,6 @@ private void cleanupAndUpdateStats(RepairTurn turn, AutoRepairConfig.RepairType
462472
repairState.setSucceededTokenRangesCount(collectedRepairStats.succeededTokenRanges);
463473
repairState.setSkippedTokenRangesCount(collectedRepairStats.skippedTokenRanges);
464474
repairState.setSkippedTablesCount(collectedRepairStats.skippedTables);
465-
repairState.setNodeRepairTimeInSec((int) TimeUnit.MILLISECONDS.toSeconds(timeFunc.get() - startTime));
466475
long timeInHours = TimeUnit.SECONDS.toHours(repairState.getNodeRepairTimeInSec());
467476
logger.info("Local {} repair time {} hour(s), stats: repairKeyspaceCount {}, " +
468477
"repairTokenRangesSuccessCount {}, repairTokenRangesFailureCount {}, " +
@@ -484,6 +493,7 @@ private void cleanupAndUpdateStats(RepairTurn turn, AutoRepairConfig.RepairType
484493
logger.info("Wait for {} for repair type {}.", SLEEP_IF_REPAIR_FINISHES_QUICKLY, repairType);
485494
Thread.sleep(SLEEP_IF_REPAIR_FINISHES_QUICKLY.toMilliseconds());
486495
}
496+
repairState.setNodeRepairTimeInSec((int) TimeUnit.MILLISECONDS.toSeconds(timeFunc.get() - startTime));
487497
repairState.setRepairInProgress(false);
488498
AutoRepairUtils.updateFinishAutoRepairHistory(repairType, myId, timeFunc.get());
489499
}

src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,16 @@ public String getConfigName()
9090
return configName;
9191
}
9292

93-
public static AutoRepairState getAutoRepairState(RepairType repairType)
93+
public static AutoRepairState getAutoRepairState(RepairType repairType, AutoRepairConfig config)
9494
{
9595
switch (repairType)
9696
{
9797
case FULL:
98-
return new FullRepairState();
98+
return new FullRepairState(config);
9999
case INCREMENTAL:
100-
return new IncrementalRepairState();
100+
return new IncrementalRepairState(config);
101101
case PREVIEW_REPAIRED:
102-
return new PreviewRepairedState();
102+
return new PreviewRepairedState(config);
103103
}
104104

105105
throw new IllegalArgumentException("Invalid repair type: " + repairType);

src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public abstract class AutoRepairState
6060
@VisibleForTesting
6161
protected final RepairType repairType;
6262
@VisibleForTesting
63+
protected AutoRepairConfig config;
64+
@VisibleForTesting
6365
protected int totalTablesConsideredForRepair = 0;
6466
@VisibleForTesting
6567
protected long lastRepairTimeInMs;
@@ -84,13 +86,22 @@ public abstract class AutoRepairState
8486
@VisibleForTesting
8587
protected int skippedTablesCount = 0;
8688
@VisibleForTesting
89+
protected long totalBytesToRepair = 0;
90+
@VisibleForTesting
91+
protected long bytesAlreadyRepaired = 0;
92+
@VisibleForTesting
93+
protected int totalKeyspaceRepairPlansToRepair = 0;
94+
@VisibleForTesting
95+
protected int keyspaceRepairPlansAlreadyRepaired = 0;
96+
@VisibleForTesting
8797
protected AutoRepairHistory longestUnrepairedNode;
8898
protected final AutoRepairMetrics metrics;
8999

90-
protected AutoRepairState(RepairType repairType)
100+
protected AutoRepairState(RepairType repairType, AutoRepairConfig config)
91101
{
92102
metrics = AutoRepairMetricsManager.getMetrics(repairType);
93103
this.repairType = repairType;
104+
this.config = config;
94105
}
95106

96107
public abstract RepairCoordinator getRepairRunnable(String keyspace, List<String> tables, Set<Range<Token>> ranges, boolean primaryRangeOnly);
@@ -101,6 +112,19 @@ protected RepairCoordinator getRepairRunnable(String keyspace, RepairOption opti
101112
options, keyspace);
102113
}
103114

115+
public void updateRepairScheduleStatistics(List<PrioritizedRepairPlan> repairPlans)
116+
{
117+
for (PrioritizedRepairPlan repairPlan : repairPlans)
118+
{
119+
for (KeyspaceRepairPlan keyspaceRepairPlan : repairPlan.getKeyspaceRepairPlans())
120+
{
121+
totalBytesToRepair += keyspaceRepairPlan.getEstimatedBytes();
122+
}
123+
totalKeyspaceRepairPlansToRepair += repairPlan.getKeyspaceRepairPlans().size();
124+
}
125+
totalBytesToRepair = config.getTokenRangeSplitterInstance(repairType).adjustTotalBytesToRepair(totalBytesToRepair);
126+
}
127+
104128
public long getLastRepairTime()
105129
{
106130
return lastRepairTimeInMs;
@@ -239,13 +263,53 @@ public int getTotalDisabledTablesRepairCount()
239263
{
240264
return totalDisabledTablesRepairCount;
241265
}
266+
267+
public void setTotalBytesToRepair(long totalBytes)
268+
{
269+
totalBytesToRepair = totalBytes;
270+
}
271+
272+
public long getTotalBytesToRepair()
273+
{
274+
return totalBytesToRepair;
275+
}
276+
277+
public void setBytesAlreadyRepaired(long bytesRepaired)
278+
{
279+
bytesAlreadyRepaired = bytesRepaired;
280+
}
281+
282+
public long getBytesAlreadyRepaired()
283+
{
284+
return bytesAlreadyRepaired;
285+
}
286+
287+
public void setTotalKeyspaceRepairPlansToRepair(int totalKeyspaces)
288+
{
289+
totalKeyspaceRepairPlansToRepair = totalKeyspaces;
290+
}
291+
292+
public int getTotalKeyspaceRepairPlansToRepair()
293+
{
294+
return totalKeyspaceRepairPlansToRepair;
295+
}
296+
297+
public void setKeyspaceRepairPlansAlreadyRepaired(int keyspacesRepaired)
298+
{
299+
keyspaceRepairPlansAlreadyRepaired = keyspacesRepaired;
300+
}
301+
302+
public int getKeyspaceRepairPlansAlreadyRepaired()
303+
{
304+
return keyspaceRepairPlansAlreadyRepaired;
305+
}
242306
}
243307

244308
class PreviewRepairedState extends AutoRepairState
245309
{
246-
public PreviewRepairedState()
310+
public PreviewRepairedState(AutoRepairConfig config)
247311
{
248-
super(RepairType.PREVIEW_REPAIRED);
312+
super(RepairType.PREVIEW_REPAIRED, config);
249313
}
250314

251315
@Override
@@ -262,9 +326,9 @@ public RepairCoordinator getRepairRunnable(String keyspace, List<String> tables,
262326

263327
class IncrementalRepairState extends AutoRepairState
264328
{
265-
public IncrementalRepairState()
329+
public IncrementalRepairState(AutoRepairConfig config)
266330
{
267-
super(RepairType.INCREMENTAL);
331+
super(RepairType.INCREMENTAL, config);
268332
}
269333

270334
@Override
@@ -307,9 +371,9 @@ protected List<String> filterOutUnsafeTables(String keyspaceName, List<String> t
307371

308372
class FullRepairState extends AutoRepairState
309373
{
310-
public FullRepairState()
374+
public FullRepairState(AutoRepairConfig config)
311375
{
312-
super(RepairType.FULL);
376+
super(RepairType.FULL, config);
313377
}
314378

315379
@Override

0 commit comments

Comments
 (0)