Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 40 additions & 18 deletions src/main/java/com/datastax/fallout/harness/ActiveTestRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,23 +263,35 @@ Optional<TestResult> getResult()

private CheckResourcesResult transitionEnsembleStateUpwards(Optional<NodeGroup.State> maximumState)
{
List<CompletableFuture<CheckResourcesResult>> futures = new ArrayList<>();

for (NodeGroup nodeGroup : ensemble.getUniqueNodeGroupInstances())
{
// Use ensemble-level transition to support batch provisioning
Function<NodeGroup, NodeGroup.State> stateFunction = nodeGroup -> {
NodeGroup.State requiredState =
FalloutPropertySpecs.launchRunLevelPropertySpec.value(nodeGroup);
requiredState = NodeGroup.State.values()[Math.min(maximumState.orElse(requiredState).ordinal(),
return NodeGroup.State.values()[Math.min(maximumState.orElse(requiredState).ordinal(),
requiredState.ordinal())];
};

CompletableFuture<Boolean> transitionFuture = ensemble.transitionStateWithFunction(
stateFunction,
NodeGroup::transitionStateIfUpwards);

boolean success = transitionFuture.join();

futures.add(nodeGroup.transitionStateIfUpwards(requiredState));
if (!success)
{
return CheckResourcesResult.FAILED;
}

CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {})).join();
for (NodeGroup nodeGroup : ensemble.getUniqueNodeGroupInstances())
{
NodeGroup.State targetState = stateFunction.apply(nodeGroup);
if (nodeGroup.getState().ordinal() < targetState.ordinal())
{
return CheckResourcesResult.FAILED;
}
}

return futures.stream()
.map(CompletableFuture::join)
.reduce(CheckResourcesResult.AVAILABLE, CheckResourcesResult::worstCase);
return CheckResourcesResult.AVAILABLE;
}

private CheckResourcesResult transitionEnsembleStateUpwards(Optional<NodeGroup.State> maximumState,
Expand Down Expand Up @@ -579,14 +591,24 @@ public void checkArtifacts()

private CompletableFuture<Boolean> transitionEnsembleForTearDown(Optional<NodeGroup.State> endState)
{
List<CompletableFuture<Boolean>> transitions = ensemble.getUniqueNodeGroupInstances().stream()
.flatMap(ng -> getStateForTearDownTransition(ng, endState)
.map(tearDownState -> ng.transitionStateIfDownwards(tearDownState)
.thenApplyAsync(CheckResourcesResult::wasSuccessful))
.stream())
.toList();

return Utils.waitForAllAsync(transitions);
// Collect NodeGroups that need transition along with their target states
Map<NodeGroup, NodeGroup.State> nodeGroupsToTransition = new java.util.HashMap<>();

for (NodeGroup ng : ensemble.getUniqueNodeGroupInstances())
{
getStateForTearDownTransition(ng, endState).ifPresent(tearDownState ->
nodeGroupsToTransition.put(ng, tearDownState));
}

if (nodeGroupsToTransition.isEmpty())
{
return CompletableFuture.completedFuture(true);
}

return ensemble.transitionStateWithFunction(
new ArrayList<>(nodeGroupsToTransition.keySet()),
nodeGroupsToTransition::get,
NodeGroup::transitionStateIfDownwards);
}

/** Calculate the {@link NodeGroup.State} that should be used when the code requires
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,33 @@ private void prepareEnsemble(Map<String, Object> yamlMap)
if (yamlMap.containsKey("ensemble"))
{
Map<String, Object> ensembleMap = (Map) yamlMap.get("ensemble");

// Read the provisioning batch size property if present
String batchSizePropertyName = "fallout.system.provisioning.batch_size";
Optional<Integer> batchSize = Optional.empty();

if (ensembleMap.containsKey(batchSizePropertyName))
{
Object batchSizeValue = ensembleMap.get(batchSizePropertyName);
logger.info("Found batch size property with value: {}", batchSizeValue);
if (batchSizeValue instanceof Number)
{
batchSize = Optional.of(((Number) batchSizeValue).intValue());
logger.info("Ensemble provisioning batch size set to: {}", batchSize.get());
}
else
{
logger.warn("Batch size value is not a number: {} (type: {})",
batchSizeValue, batchSizeValue != null ? batchSizeValue.getClass() : "null");
}
}
else
{
logger.debug("Batch size property not found, using default parallel provisioning");
}

ensembleBuilder.withProvisioningBatchSize(batchSize);

//init or link each ensemble group
for (Map.Entry<String, Object> entry : ensembleMap.entrySet())
{
Expand All @@ -229,6 +256,9 @@ private void prepareEnsemble(Map<String, Object> yamlMap)
explicitLocalFilesHandler = Optional.of(LocalFilesHandler.fromMaps(
(List<Map<String, Object>>) entry.getValue(), testRunArtifactPath, commandExecutor));
break;
case "fallout.system.provisioning.batch_size":
// skip to avoid treating it as a node group
break;
default:
readNodeGroup(ensembleGroup, ensembleValue);
break;
Expand Down
179 changes: 174 additions & 5 deletions src/main/java/com/datastax/fallout/ops/Ensemble.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ public enum Role
private final LocalScratchSpace workloadScratchSpace;
private final Logger logger;
private final LocalFilesHandler localFilesHandler;
private final Optional<Integer> provisioningBatchSize;

protected Ensemble(UUID testRunId, List<NodeGroup> serverGroups, List<NodeGroup> clientGroups,
NodeGroup observerGroup, NodeGroup controllerGroup, TestRunLinkUpdater testRunLinkUpdater,
LocalScratchSpace workloadScratchSpace, Logger logger,
LocalFilesHandler localFilesHandler)
LocalFilesHandler localFilesHandler, Optional<Integer> provisioningBatchSize)
{
this.testRunId = testRunId;
this.serverGroups = serverGroups;
Expand All @@ -77,6 +78,7 @@ protected Ensemble(UUID testRunId, List<NodeGroup> serverGroups, List<NodeGroup>
this.workloadScratchSpace = workloadScratchSpace;
this.logger = logger;
this.localFilesHandler = localFilesHandler;
this.provisioningBatchSize = provisioningBatchSize;

for (NodeGroup nodeGroup : getUniqueNodeGroupInstances())
{
Expand Down Expand Up @@ -220,27 +222,194 @@ public LocalScratchSpace makeScratchSpaceFor(WorkloadComponent component)
return workloadScratchSpace.makeScratchSpaceFor(component);
}

public Optional<Integer> getProvisioningBatchSize()
{
return provisioningBatchSize;
}

/** Transition entire ensemble at once */
public CompletableFuture<Boolean> transitionState(NodeGroup.State state)
{
return transitionState(state, NodeGroup::transitionState);
}


/**
* Transition node groups with per-group state determination, respecting batch size if configured.
*
* @param stateFunction Function that determines the target state for each NodeGroup
* @param transitionFunction Function that performs the actual transition
* @return CompletableFuture that completes when all transitions are done
*/
public CompletableFuture<Boolean> transitionStateWithFunction(
Function<NodeGroup, NodeGroup.State> stateFunction,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionFunction)
{
return transitionStateWithFunction(new ArrayList<>(getUniqueNodeGroupInstances()),
stateFunction, transitionFunction);
}

/**
* Transition specific node groups with per-group state determination, respecting batch size if configured.
*
* @param nodeGroups List of NodeGroups to transition
* @param stateFunction Function that determines the target state for each NodeGroup
* @param transitionFunction Function that performs the actual transition
* @return CompletableFuture that completes when all transitions are done
*/
public CompletableFuture<Boolean> transitionStateWithFunction(
List<NodeGroup> nodeGroups,
Function<NodeGroup, NodeGroup.State> stateFunction,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionFunction)
{
if (nodeGroups.isEmpty())
{
return CompletableFuture.completedFuture(true);
}

if (provisioningBatchSize.isEmpty())
{
logger.info("Transitioning {} node groups in parallel (per-group states)", nodeGroups.size());
return transitionNodeGroupsInParallel(nodeGroups, stateFunction, transitionFunction);
}

int batchSize = provisioningBatchSize.get();
logger.info("Transitioning {} node groups in batches of {} (per-group states)",
nodeGroups.size(), batchSize);
return transitionNodeGroupsInBatches(nodeGroups, batchSize, stateFunction, transitionFunction);
}

private CompletableFuture<Boolean> transitionNodeGroupsInParallel(
List<NodeGroup> nodeGroups,
Function<NodeGroup, NodeGroup.State> stateFunction,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionFunction)
{
List<CompletableFuture<Boolean>> futures = nodeGroups.stream()
.map(ng -> transitionSingleNodeGroup(ng, stateFunction, transitionFunction))
.toList();
return Utils.waitForAllAsync(futures);
}

private CompletableFuture<Boolean> transitionNodeGroupsInBatches(
List<NodeGroup> nodeGroups,
int batchSize,
Function<NodeGroup, NodeGroup.State> stateFunction,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionFunction)
{
return processBatches(nodeGroups, batchSize,
ng -> transitionSingleNodeGroup(ng, stateFunction, transitionFunction),
"transition");
}

private CompletableFuture<Boolean> transitionSingleNodeGroup(
NodeGroup nodeGroup,
Function<NodeGroup, NodeGroup.State> stateFunction,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionFunction)
{
NodeGroup.State targetState = stateFunction.apply(nodeGroup);
return transitionFunction.apply(nodeGroup, targetState)
.thenApplyAsync(checkResourcesResult -> checkResourcesResult != CheckResourcesResult.FAILED);
}

private CompletableFuture<Boolean> transitionState(
NodeGroup.State state,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionNodeGroup)
{
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
List<NodeGroup> nodeGroups = new ArrayList<>(getUniqueNodeGroupInstances());

for (NodeGroup nodeGroup : getUniqueNodeGroupInstances())
if (provisioningBatchSize.isEmpty())
{
futures.add(transitionNodeGroup.apply(nodeGroup, state)
.thenApplyAsync(checkResourcesResult -> checkResourcesResult != CheckResourcesResult.FAILED));
logger.info("Transitioning {} node groups to {} in parallel", nodeGroups.size(), state);
return transitionNodeGroupsToStateInParallel(nodeGroups, state, transitionNodeGroup);
}

int batchSize = provisioningBatchSize.get();
logger.info("Transitioning {} node groups to {} in batches of {}",
nodeGroups.size(), state, batchSize);
return transitionNodeGroupsToStateInBatches(nodeGroups, state, batchSize, transitionNodeGroup);
}

private CompletableFuture<Boolean> transitionNodeGroupsToStateInParallel(
List<NodeGroup> nodeGroups,
NodeGroup.State state,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionNodeGroup)
{
List<CompletableFuture<Boolean>> futures = nodeGroups.stream()
.map(ng -> transitionNodeGroupToState(ng, state, transitionNodeGroup))
.toList();
return Utils.waitForAllAsync(futures);
}

private CompletableFuture<Boolean> transitionNodeGroupsToStateInBatches(
List<NodeGroup> nodeGroups,
NodeGroup.State state,
int batchSize,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionNodeGroup)
{
return processBatches(nodeGroups, batchSize,
ng -> transitionNodeGroupToState(ng, state, transitionNodeGroup),
"transition to " + state);
}

/**
* Generic batch processor for node groups. Processes node groups in sequential batches,
* with each batch processed in parallel. Stops on first batch failure.
*
* @param nodeGroups List of node groups to process
* @param batchSize Size of each batch
* @param processor Function to process each node group, returning a CompletableFuture<Boolean>
* @param operationDescription Description of the operation for logging (e.g., "transition", "transition to STARTED_SERVICES_RUNNING")
* @return CompletableFuture that completes when all batches are processed or first failure occurs
*/
private CompletableFuture<Boolean> processBatches(
List<NodeGroup> nodeGroups,
int batchSize,
Function<NodeGroup, CompletableFuture<Boolean>> processor,
String operationDescription)
{
List<CompletableFuture<Boolean>> allFutures = new ArrayList<>();
int totalBatches = (nodeGroups.size() + batchSize - 1) / batchSize;

for (int i = 0; i < nodeGroups.size(); i += batchSize)
{
int batchNumber = (i / batchSize) + 1;
int batchEnd = Math.min(i + batchSize, nodeGroups.size());
List<NodeGroup> batch = nodeGroups.subList(i, batchEnd);

logger.info("Processing batch {}/{}: NodeGroups {} to {} ({})",
batchNumber, totalBatches, i + 1, batchEnd,
batch.stream().map(ng -> ng.name).collect(java.util.stream.Collectors.joining(", ")));

List<CompletableFuture<Boolean>> batchFutures = batch.stream()
.map(processor)
.toList();

boolean batchSuccess = Utils.waitForAll(batchFutures, logger,
"batch " + batchNumber + " " + operationDescription);

allFutures.addAll(batchFutures);

if (!batchSuccess)
{
logger.error("Batch {}/{} failed, stopping further batches", batchNumber, totalBatches);
break;
}

logger.info("Batch {}/{} completed successfully", batchNumber, totalBatches);
}

return Utils.waitForAllAsync(allFutures);
}

private CompletableFuture<Boolean> transitionNodeGroupToState(
NodeGroup nodeGroup,
NodeGroup.State state,
BiFunction<NodeGroup, NodeGroup.State, CompletableFuture<CheckResourcesResult>> transitionNodeGroup)
{
return transitionNodeGroup.apply(nodeGroup, state)
.thenApplyAsync(checkResourcesResult -> checkResourcesResult != CheckResourcesResult.FAILED);
}

/**
* Last-ditch option to really destroy an ensemble
* @return
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/datastax/fallout/ops/EnsembleBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class EnsembleBuilder
private EnsembleCredentials credentials;
private TestRunLinkUpdater testRunLinkUpdater = new NullTestRunLinkUpdater();
private LocalFilesHandler localFilesHandler = LocalFilesHandler.empty();
private Optional<Integer> provisioningBatchSize = Optional.empty();

public static EnsembleBuilder create()
{
Expand Down Expand Up @@ -177,6 +178,12 @@ public EnsembleBuilder withLocalFilesHandler(LocalFilesHandler localFilesHandler
return this;
}

public EnsembleBuilder withProvisioningBatchSize(Optional<Integer> batchSize)
{
this.provisioningBatchSize = batchSize;
return this;
}

private void check()
{
Preconditions.checkArgument(!serverBuilders.isEmpty(), "Server Builders are missing");
Expand Down Expand Up @@ -242,6 +249,7 @@ public Ensemble build(Path testRunArtifactPath, TestRunScratchSpace testRunScrat
NodeGroup controllers = controllerBuilder.build();

return new Ensemble(testRunId, servers, clients, observers, controllers, testRunLinkUpdater,
testRunScratchSpace.makeScratchSpaceForWorkload(), loggers.getShared(), localFilesHandler);
testRunScratchSpace.makeScratchSpaceForWorkload(), loggers.getShared(), localFilesHandler,
provisioningBatchSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ public class FalloutPropertySpecs
.description("Name of the test")
.internal()
.build();

public static final PropertySpec<Integer> provisioningBatchSizePropertySpec = PropertySpecBuilder.createInt(prefix)
.name("provisioning.batch_size")
.description("Optional: Batch size for provisioning NodeGroups. " +
"When set, NodeGroups will be provisioned in batches of this size " +
"instead of all in parallel.")
.required(false)
.build();
}
Loading