Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ pub trait SubgraphStore: Send + Sync + 'static {
/// being set up
async fn least_block_ptr(&self, id: &DeploymentHash) -> Result<Option<BlockPtr>, StoreError>;

/// Return the earliest block for which the deployment with this `id`
/// retains complete data. For unpruned deployments this is the start
/// block; for pruned deployments it advances as historical entity
/// versions are removed. Blocks earlier than this are not queryable
/// and cannot be used as a graft point because the entity versions
/// live at that block are no longer present.
async fn earliest_block_number(&self, id: &DeploymentHash) -> Result<BlockNumber, StoreError>;

async fn is_healthy(&self, id: &DeploymentHash) -> Result<bool, StoreError>;

/// Find all deployment locators for the subgraph with the given hash.
Expand Down
43 changes: 42 additions & 1 deletion graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,23 @@ pub struct Graft {
}

impl Graft {
async fn validate<S: SubgraphStore>(
/// Validate that this `Graft` can be performed against the configured
/// base subgraph. Checks (in order):
/// 1. the base has processed at least one block,
/// 2. the graft block is at or above the base's `earliest_block_number`
/// (the prune floor — grafting below it would silently produce a
/// subgraph with reset state for any entity whose live-at-graft
/// version was a closed historical version),
/// 3. the base has processed past `self.block` (i.e., its head is at
/// or above the graft block),
/// 4. the graft block is at least `reorg_threshold` blocks behind the
/// base's head, so a reorg of the base cannot invalidate the copy,
/// 5. if the base is unhealthy, the graft block is strictly before its
/// failure block.
///
/// `pub` so that tooling and tests can validate a graft directly
/// without resolving a full `UnvalidatedSubgraphManifest`.
pub async fn validate<S: SubgraphStore>(
&self,
store: Arc<S>,
) -> Result<(), SubgraphManifestValidationError> {
Expand All @@ -562,6 +578,31 @@ impl Graft {
.await
.map_err(|e| GraftBaseInvalid(e.to_string()))?;

// Reject grafts below the base's `earliest_block_number` (its prune
// floor). If the base has been pruned past `self.block`, the entity
// versions that were live at the graft block are gone, and grafting
// would silently produce a subgraph with reset state for any entity
// whose live-at-graft version was a closed historical version
// (heavily-updated mutable singletons are the worst-affected case).
// We only consult `earliest_block_number` when the base has
// processed at least one block, since the `(None, _)` arm below
// emits a clearer error otherwise.
if last_processed_block.is_some() {
let earliest_block = store
.earliest_block_number(&self.base)
.await
.map_err(|e| GraftBaseInvalid(e.to_string()))?;
if self.block < earliest_block {
return Err(GraftBaseInvalid(format!(
"failed to graft onto `{}` at block {} because the base \
subgraph only retains data starting at block {}; earlier \
blocks have been pruned. Graft at block {} or later, or \
use a base subgraph with sufficient retention.",
self.base, self.block, earliest_block, earliest_block
)));
}
}

// We are being defensive here: we don't know which specific
// instance of a subgraph we will use as the base for the graft,
// since the notion of which of these instances is active can change
Expand Down
89 changes: 83 additions & 6 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1562,12 +1562,41 @@ impl DeploymentStore {
dst.catalog.site.namespace
);

// Defense in depth against grafting below the base's prune
// floor. `Graft::validate` enforces this at manifest validation
// time, but callers that reach this code path directly
// (graphman, tests, custom tooling) bypass the registrar. The
// copy reads entity versions whose `lower(block_range) <= block`;
// if the version live at `block` has already been pruned, those
// versions are silently missing and the resulting graft has
// reset state for any entity whose live-at-graft version was
// closed.
let src_earliest_before = {
let mut conn = self.pool.get_permitted().await?;
deployment::state(&mut conn, &src.site)
.await?
.earliest_block_number
};
if block.number < src_earliest_before {
return Err(StoreError::Unknown(anyhow::anyhow!(
"cannot graft `{}` onto `{}` at block {} because the base \
subgraph only retains data starting at block {}; earlier \
blocks have been pruned. Graft at block {} or later, or \
use a base subgraph with sufficient retention.",
dst.catalog.site.namespace,
src.catalog.site.namespace,
block.number,
src_earliest_before,
src_earliest_before,
)));
}

let src_manifest_idx_and_name = src_deployment.manifest.template_idx_and_name()?;
let dst_manifest_idx_and_name = self
.load_deployment(dst.site.clone())
.await?
.manifest
.template_idx_and_name()?;
// Keep `dst_deployment` bound so we can read its
// `history_blocks` below; otherwise the chained
// `.manifest.template_idx_and_name()?` would consume it.
let dst_deployment = self.load_deployment(dst.site.clone()).await?;
let dst_manifest_idx_and_name = dst_deployment.manifest.template_idx_and_name()?;

// Copy subgraph data
// We allow both not copying tables at all from the source, as well
Expand Down Expand Up @@ -1644,10 +1673,25 @@ impl DeploymentStore {
info!(logger, "Rewound subgraph to block {}", block.number;
"time_ms" => start.elapsed().as_millis());

// Use the *maximum* of the source's and destination's
// `history_blocks` rather than overwriting the destination
// with the source's value. The destination has just
// received the source's full retained history, so its
// retention must be at least as long as the source's
// (otherwise the inherited data would be immediately
// eligible for pruning); but if the destination's
// manifest requests longer retention (notably
// `prune: never`, which yields `BLOCK_NUMBER_MAX`), that
// intent must be honoured. The previous unconditional
// overwrite silently downgraded `prune: never` children
// to whatever retention the parent used.
deployment::set_history_blocks(
conn,
&dst.site,
src_deployment.manifest.history_blocks,
src_deployment
.manifest
.history_blocks
.max(dst_deployment.manifest.history_blocks),
)
.await?;

Expand All @@ -1660,6 +1704,39 @@ impl DeploymentStore {
// removed data just before we copied it
deployment::copy_earliest_block(conn, &src.site, &dst.site).await?;

// Detect the prune-during-copy race: if the base pruned
// past our graft block while the copy was running, the
// `copy_earliest_block` call above propagated that floor
// into `dst`, leaving us with `earliest_block > graft_block`
// — a structurally invalid graft. Fail the transaction so
// the bookkeeping is rolled back; partial copied data
// remains in `dst`'s tables and will require operator
// cleanup (e.g. delete + redeploy) before retrying.
//
// We re-read the source's earliest rather than the
// destination's because the destination row exists but has
// not yet been advanced past genesis (`forward_block_ptr`
// below is what sets its head), so `deployment::state` on
// the destination would error with "has not started
// syncing yet". The source has a head and is the value
// `copy_earliest_block` just propagated.
let src_earliest_after = deployment::state(conn, &src.site)
.await?
.earliest_block_number;
if src_earliest_after > block.number {
return Err(StoreError::Unknown(anyhow::anyhow!(
"graft of `{}` at block {} was invalidated mid-copy: \
the base subgraph `{}` was pruned past the graft block \
(earliest is now {}). The deployment must be deleted \
and recreated, ideally targeting a graft block within \
the base's retained range.",
dst.catalog.site.namespace,
block.number,
src.catalog.site.namespace,
src_earliest_after,
)));
}

// Set the block ptr to the graft point to signal that we successfully
// performed the graft
crate::deployment::forward_block_ptr(conn, &dst.site, &block).await?;
Expand Down
18 changes: 18 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,18 @@ impl Inner {
.await
}

/// Return the current `history_blocks` retention setting recorded in
/// the deployment's manifest. Mirrors [`Self::set_history_blocks`] and
/// is useful for tooling and tests that need to inspect the effective
/// retention after operations such as grafting.
pub async fn history_blocks(
&self,
deployment: &DeploymentLocator,
) -> Result<BlockNumber, StoreError> {
let site = self.find_site(deployment.id.into()).await?;
Ok(self.load_deployment(site).await?.manifest.history_blocks)
}

pub async fn load_deployment(
&self,
site: Arc<Site>,
Expand Down Expand Up @@ -1888,6 +1900,12 @@ impl SubgraphStoreTrait for SubgraphStore {
store.block_ptr(site.cheap_clone()).await
}

async fn earliest_block_number(&self, id: &DeploymentHash) -> Result<BlockNumber, StoreError> {
let (store, site) = self.store(id).await?;
let state = store.deployment_state(site.cheap_clone()).await?;
Ok(state.earliest_block_number)
}

async fn is_healthy(&self, id: &DeploymentHash) -> Result<bool, StoreError> {
let (store, site) = self.store(id).await?;
let health = store.health(&site).await?;
Expand Down
Loading