Skip to content

Incremental compaction #32381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COMPACTION_HEURISTIC_MIN_UPDATES)
.add(&COMPACTION_MEMORY_BOUND_BYTES)
.add(&GC_BLOB_DELETE_CONCURRENCY_LIMIT)
.add(&INCREMENTAL_COMPACTION_DISABLED)
.add(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT)
.add(&USAGE_STATE_FETCH_CONCURRENCY_LIMIT)
.add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
Expand Down Expand Up @@ -514,6 +515,15 @@ pub const GC_BLOB_DELETE_CONCURRENCY_LIMIT: Config<usize> = Config::new(
"Limit the number of concurrent deletes GC can perform to this threshold.",
);

/// Whether to disable incremental compaction. This is a break-glass flag
/// that can be toggled in case incremental compaction is causing issues
/// for CRDB.
pub const INCREMENTAL_COMPACTION_DISABLED: Config<bool> = Config::new(
"persist_incremental_compaction_disabled",
false,
"Disable incremental compaction.",
);

/// The # of diffs to initially scan when fetching the latest consensus state, to
/// determine which requests go down the fast vs slow path. Should be large enough
/// to fetch all live diffs in the steady-state, and small enough to query Consensus
Expand Down
52 changes: 42 additions & 10 deletions src/persist-client/src/cli/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::{Duration, Instant};
use anyhow::{anyhow, bail};
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use futures::pin_mut;
use futures_util::{StreamExt, TryStreamExt, stream};
use mz_dyncfg::{Config, ConfigSet};
use mz_ore::metrics::MetricsRegistry;
Expand All @@ -36,10 +37,11 @@ use crate::cache::StateCache;
use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
use crate::cli::inspect::FAKE_OPAQUE_CODEC;
use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
use crate::internal::compact::{CompactConfig, CompactReq, CompactRes, Compactor};
use crate::internal::encoding::Schemas;
use crate::internal::gc::{GarbageCollector, GcReq};
use crate::internal::machine::Machine;
use crate::internal::state::HollowBatch;
use crate::internal::trace::FueledMergeRes;
use crate::rpc::{NoopPubSubSender, PubSubSender};
use crate::write::{WriteHandle, WriterId};
Expand Down Expand Up @@ -489,16 +491,44 @@ where
val: Arc::clone(&val_schema),
};

let res = Compactor::<K, V, T, D>::compact(
let stream = Compactor::<K, V, T, D>::compact_stream(
CompactConfig::new(&cfg, shard_id),
Arc::clone(&blob),
Arc::clone(&metrics),
Arc::clone(&machine.applier.shard_metrics),
Arc::new(IsolatedRuntime::default()),
req,
req.clone(),
schemas,
)
.await?;
);
pin_mut!(stream);

let mut all_parts = vec![];
let mut all_run_splits = vec![];
let mut all_run_meta = vec![];
let mut len = 0;

while let Some(res) = stream.next().await {
let res = res?;
let (parts, updates, run_meta, run_splits) = (
res.output.parts,
res.output.len,
res.output.run_meta,
res.output.run_splits,
);
let run_offset = all_parts.len();
if !all_parts.is_empty() {
all_run_splits.push(run_offset);
}
all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
all_run_meta.extend(run_meta);
all_parts.extend(parts);
len += updates;
}

let res = CompactRes {
output: HollowBatch::new(req.desc, all_parts, len, all_run_meta, all_run_splits),
};

metrics.compaction.admin_count.inc();
info!(
"attempt {} req {}: compacted into {} parts {} bytes in {:?}",
Expand All @@ -509,7 +539,10 @@ where
start.elapsed(),
);
let (apply_res, maintenance) = machine
.merge_res(&FueledMergeRes { output: res.output })
.merge_res(&FueledMergeRes {
output: res.output,
new_active_compaction: None,
})
.await;
if !maintenance.is_empty() {
info!("ignoring non-empty requested maintenance: {maintenance:?}")
Expand Down Expand Up @@ -761,7 +794,7 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
write.write_schemas.clone(),
)
.await;
let (res, apply_maintenance) = match res {
let apply_maintenance = match res {
Ok(x) => x,
Err(err) => {
warn!(
Expand All @@ -775,11 +808,10 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
};
machine.applier.metrics.compaction.admin_count.inc();
info!(
"force_compaction {} {} compacted in {:?}: {:?}",
"force_compaction {} {} compacted in {:?}",
machine.applier.shard_metrics.name,
machine.applier.shard_metrics.shard_id,
start.elapsed(),
res
start.elapsed()
);
maintenance.merge(apply_maintenance);
}
Expand Down
Loading