Skip to content

Commit 69e7465

Browse files
authored
Incremental compaction (#32381)
Towards MaterializeInc/database-issues#9191 Today, we have no good way to split the work of compaction into smaller parts. This presents an issue as datasets and clusters continue to grow in size. If a compaction takes a significant amount of time there is a risk that the process running the compaction might not live long enough (for whatever reason: failure, shutdown, schedule, etc). This PR aims to improve the situation when dealing with compacting many shorter runs. We already split the work up into "chunks" based on the size of the runs but we don't write the work back out into state until all chunks are complete. This is suboptimal. Imagine a big amount of compaction is chugging along, 99 of the 100 batches of work are done, but before the last one can finish the cluster shuts down. All that work is wasted. This PR "checkpoints" it's work into state after each chunk is done. That way in the example above, only the partially finished 100th chunk is lost. (Incremental work within chunks will be the subject of future work). There is a tradeoff here though, it means writing to state more often, this risks putting CRDB under additional load. We currently seem to execute 650-750 writes per second to each of our CRDB nodes in us-east-1 on average. There is significant potential risk here. In us-east-1, on the order of 200 chunks per second are queued up. That means that if each chunk completes immediately and concurrently, we significantly push the QPS of our crdb cluster (I think our cluster can handle it based on resource usage I'm seeing but setting that aside...) I don't think that every chunk across every environment is going to complete immediately and concurrently so I think the likely impact on the QPS is likely to be lower than 200/s. That said we don't have a sense of _per chunk_ timing so it's harder to estimate specifically. An anecdotal test in staging didn't reveal any undue load. If this remains a concern, some form of backpressure could be implemented to batch applies. <!-- Describe the contents of the PR briefly but completely. If you write detailed commit messages, it is acceptable to copy/paste them here, or write "see commit messages for details." If there is only one commit in the PR, GitHub will have already added its commit message above. --> ### Motivation <!-- Which of the following best describes the motivation behind this PR? * This PR fixes a recognized bug. [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent 4c07796 commit 69e7465

File tree

9 files changed

+372
-227
lines changed

9 files changed

+372
-227
lines changed

src/persist-client/src/cfg.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
304304
.add(&COMPACTION_HEURISTIC_MIN_UPDATES)
305305
.add(&COMPACTION_MEMORY_BOUND_BYTES)
306306
.add(&GC_BLOB_DELETE_CONCURRENCY_LIMIT)
307+
.add(&INCREMENTAL_COMPACTION_DISABLED)
307308
.add(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT)
308309
.add(&USAGE_STATE_FETCH_CONCURRENCY_LIMIT)
309310
.add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
@@ -514,6 +515,15 @@ pub const GC_BLOB_DELETE_CONCURRENCY_LIMIT: Config<usize> = Config::new(
514515
"Limit the number of concurrent deletes GC can perform to this threshold.",
515516
);
516517

518+
/// Whether to disable incremental compaction. This is a break-glass flag
519+
/// that can be toggled in case incremental compaction is causing issues
520+
/// for CRDB.
521+
pub const INCREMENTAL_COMPACTION_DISABLED: Config<bool> = Config::new(
522+
"persist_incremental_compaction_disabled",
523+
false,
524+
"Disable incremental compaction.",
525+
);
526+
517527
/// The # of diffs to initially scan when fetching the latest consensus state, to
518528
/// determine which requests go down the fast vs slow path. Should be large enough
519529
/// to fetch all live diffs in the steady-state, and small enough to query Consensus

src/persist-client/src/cli/admin.rs

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::time::{Duration, Instant};
1818
use anyhow::{anyhow, bail};
1919
use differential_dataflow::difference::Semigroup;
2020
use differential_dataflow::lattice::Lattice;
21+
use futures::pin_mut;
2122
use futures_util::{StreamExt, TryStreamExt, stream};
2223
use mz_dyncfg::{Config, ConfigSet};
2324
use mz_ore::metrics::MetricsRegistry;
@@ -36,10 +37,11 @@ use crate::cache::StateCache;
3637
use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
3738
use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
3839
use crate::cli::inspect::FAKE_OPAQUE_CODEC;
39-
use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40+
use crate::internal::compact::{CompactConfig, CompactReq, CompactRes, Compactor};
4041
use crate::internal::encoding::Schemas;
4142
use crate::internal::gc::{GarbageCollector, GcReq};
4243
use crate::internal::machine::Machine;
44+
use crate::internal::state::HollowBatch;
4345
use crate::internal::trace::FueledMergeRes;
4446
use crate::rpc::{NoopPubSubSender, PubSubSender};
4547
use crate::write::{WriteHandle, WriterId};
@@ -489,16 +491,44 @@ where
489491
val: Arc::clone(&val_schema),
490492
};
491493

492-
let res = Compactor::<K, V, T, D>::compact(
494+
let stream = Compactor::<K, V, T, D>::compact_stream(
493495
CompactConfig::new(&cfg, shard_id),
494496
Arc::clone(&blob),
495497
Arc::clone(&metrics),
496498
Arc::clone(&machine.applier.shard_metrics),
497499
Arc::new(IsolatedRuntime::default()),
498-
req,
500+
req.clone(),
499501
schemas,
500-
)
501-
.await?;
502+
);
503+
pin_mut!(stream);
504+
505+
let mut all_parts = vec![];
506+
let mut all_run_splits = vec![];
507+
let mut all_run_meta = vec![];
508+
let mut len = 0;
509+
510+
while let Some(res) = stream.next().await {
511+
let res = res?;
512+
let (parts, updates, run_meta, run_splits) = (
513+
res.output.parts,
514+
res.output.len,
515+
res.output.run_meta,
516+
res.output.run_splits,
517+
);
518+
let run_offset = all_parts.len();
519+
if !all_parts.is_empty() {
520+
all_run_splits.push(run_offset);
521+
}
522+
all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
523+
all_run_meta.extend(run_meta);
524+
all_parts.extend(parts);
525+
len += updates;
526+
}
527+
528+
let res = CompactRes {
529+
output: HollowBatch::new(req.desc, all_parts, len, all_run_meta, all_run_splits),
530+
};
531+
502532
metrics.compaction.admin_count.inc();
503533
info!(
504534
"attempt {} req {}: compacted into {} parts {} bytes in {:?}",
@@ -509,7 +539,10 @@ where
509539
start.elapsed(),
510540
);
511541
let (apply_res, maintenance) = machine
512-
.merge_res(&FueledMergeRes { output: res.output })
542+
.merge_res(&FueledMergeRes {
543+
output: res.output,
544+
new_active_compaction: None,
545+
})
513546
.await;
514547
if !maintenance.is_empty() {
515548
info!("ignoring non-empty requested maintenance: {maintenance:?}")
@@ -761,7 +794,7 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
761794
write.write_schemas.clone(),
762795
)
763796
.await;
764-
let (res, apply_maintenance) = match res {
797+
let apply_maintenance = match res {
765798
Ok(x) => x,
766799
Err(err) => {
767800
warn!(
@@ -775,11 +808,10 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
775808
};
776809
machine.applier.metrics.compaction.admin_count.inc();
777810
info!(
778-
"force_compaction {} {} compacted in {:?}: {:?}",
811+
"force_compaction {} {} compacted in {:?}",
779812
machine.applier.shard_metrics.name,
780813
machine.applier.shard_metrics.shard_id,
781-
start.elapsed(),
782-
res
814+
start.elapsed()
783815
);
784816
maintenance.merge(apply_maintenance);
785817
}

0 commit comments

Comments
 (0)