diff --git a/src/main/java/com/datastax/fallout/harness/ActiveTestRun.java b/src/main/java/com/datastax/fallout/harness/ActiveTestRun.java index d417b77..6a6af03 100644 --- a/src/main/java/com/datastax/fallout/harness/ActiveTestRun.java +++ b/src/main/java/com/datastax/fallout/harness/ActiveTestRun.java @@ -263,23 +263,35 @@ Optional getResult() private CheckResourcesResult transitionEnsembleStateUpwards(Optional maximumState) { - List> futures = new ArrayList<>(); - - for (NodeGroup nodeGroup : ensemble.getUniqueNodeGroupInstances()) - { + // Use ensemble-level transition to support batch provisioning + Function stateFunction = nodeGroup -> { NodeGroup.State requiredState = FalloutPropertySpecs.launchRunLevelPropertySpec.value(nodeGroup); - requiredState = NodeGroup.State.values()[Math.min(maximumState.orElse(requiredState).ordinal(), + return NodeGroup.State.values()[Math.min(maximumState.orElse(requiredState).ordinal(), requiredState.ordinal())]; + }; + + CompletableFuture transitionFuture = ensemble.transitionStateWithFunction( + stateFunction, + NodeGroup::transitionStateIfUpwards); + + boolean success = transitionFuture.join(); - futures.add(nodeGroup.transitionStateIfUpwards(requiredState)); + if (!success) + { + return CheckResourcesResult.FAILED; } - CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {})).join(); + for (NodeGroup nodeGroup : ensemble.getUniqueNodeGroupInstances()) + { + NodeGroup.State targetState = stateFunction.apply(nodeGroup); + if (nodeGroup.getState().ordinal() < targetState.ordinal()) + { + return CheckResourcesResult.FAILED; + } + } - return futures.stream() - .map(CompletableFuture::join) - .reduce(CheckResourcesResult.AVAILABLE, CheckResourcesResult::worstCase); + return CheckResourcesResult.AVAILABLE; } private CheckResourcesResult transitionEnsembleStateUpwards(Optional maximumState, @@ -579,14 +591,24 @@ public void checkArtifacts() private CompletableFuture transitionEnsembleForTearDown(Optional endState) { - List> transitions = ensemble.getUniqueNodeGroupInstances().stream() - .flatMap(ng -> getStateForTearDownTransition(ng, endState) - .map(tearDownState -> ng.transitionStateIfDownwards(tearDownState) - .thenApplyAsync(CheckResourcesResult::wasSuccessful)) - .stream()) - .toList(); - - return Utils.waitForAllAsync(transitions); + // Collect NodeGroups that need transition along with their target states + Map nodeGroupsToTransition = new java.util.HashMap<>(); + + for (NodeGroup ng : ensemble.getUniqueNodeGroupInstances()) + { + getStateForTearDownTransition(ng, endState).ifPresent(tearDownState -> + nodeGroupsToTransition.put(ng, tearDownState)); + } + + if (nodeGroupsToTransition.isEmpty()) + { + return CompletableFuture.completedFuture(true); + } + + return ensemble.transitionStateWithFunction( + new ArrayList<>(nodeGroupsToTransition.keySet()), + nodeGroupsToTransition::get, + NodeGroup::transitionStateIfDownwards); } /** Calculate the {@link NodeGroup.State} that should be used when the code requires diff --git a/src/main/java/com/datastax/fallout/harness/ActiveTestRunBuilder.java b/src/main/java/com/datastax/fallout/harness/ActiveTestRunBuilder.java index 909ea80..51fed25 100644 --- a/src/main/java/com/datastax/fallout/harness/ActiveTestRunBuilder.java +++ b/src/main/java/com/datastax/fallout/harness/ActiveTestRunBuilder.java @@ -209,6 +209,33 @@ private void prepareEnsemble(Map yamlMap) if (yamlMap.containsKey("ensemble")) { Map ensembleMap = (Map) yamlMap.get("ensemble"); + + // Read the provisioning batch size property if present + String batchSizePropertyName = "fallout.system.provisioning.batch_size"; + Optional batchSize = Optional.empty(); + + if (ensembleMap.containsKey(batchSizePropertyName)) + { + Object batchSizeValue = ensembleMap.get(batchSizePropertyName); + logger.info("Found batch size property with value: {}", batchSizeValue); + if (batchSizeValue instanceof Number) + { + batchSize = Optional.of(((Number) batchSizeValue).intValue()); + logger.info("Ensemble provisioning batch size set to: {}", batchSize.get()); + } + else + { + logger.warn("Batch size value is not a number: {} (type: {})", + batchSizeValue, batchSizeValue != null ? batchSizeValue.getClass() : "null"); + } + } + else + { + logger.debug("Batch size property not found, using default parallel provisioning"); + } + + ensembleBuilder.withProvisioningBatchSize(batchSize); + //init or link each ensemble group for (Map.Entry entry : ensembleMap.entrySet()) { @@ -229,6 +256,9 @@ private void prepareEnsemble(Map yamlMap) explicitLocalFilesHandler = Optional.of(LocalFilesHandler.fromMaps( (List>) entry.getValue(), testRunArtifactPath, commandExecutor)); break; + case "fallout.system.provisioning.batch_size": + // skip to avoid treating it as a node group + break; default: readNodeGroup(ensembleGroup, ensembleValue); break; diff --git a/src/main/java/com/datastax/fallout/ops/Ensemble.java b/src/main/java/com/datastax/fallout/ops/Ensemble.java index 3f509f2..5b45308 100644 --- a/src/main/java/com/datastax/fallout/ops/Ensemble.java +++ b/src/main/java/com/datastax/fallout/ops/Ensemble.java @@ -62,11 +62,12 @@ public enum Role private final LocalScratchSpace workloadScratchSpace; private final Logger logger; private final LocalFilesHandler localFilesHandler; + private final Optional provisioningBatchSize; protected Ensemble(UUID testRunId, List serverGroups, List clientGroups, NodeGroup observerGroup, NodeGroup controllerGroup, TestRunLinkUpdater testRunLinkUpdater, LocalScratchSpace workloadScratchSpace, Logger logger, - LocalFilesHandler localFilesHandler) + LocalFilesHandler localFilesHandler, Optional provisioningBatchSize) { this.testRunId = testRunId; this.serverGroups = serverGroups; @@ -77,6 +78,7 @@ protected Ensemble(UUID testRunId, List serverGroups, List this.workloadScratchSpace = workloadScratchSpace; this.logger = logger; this.localFilesHandler = localFilesHandler; + this.provisioningBatchSize = provisioningBatchSize; for (NodeGroup nodeGroup : getUniqueNodeGroupInstances()) { @@ -220,27 +222,194 @@ public LocalScratchSpace makeScratchSpaceFor(WorkloadComponent component) return workloadScratchSpace.makeScratchSpaceFor(component); } + public Optional getProvisioningBatchSize() + { + return provisioningBatchSize; + } + /** Transition entire ensemble at once */ public CompletableFuture transitionState(NodeGroup.State state) { return transitionState(state, NodeGroup::transitionState); } + + /** + * Transition node groups with per-group state determination, respecting batch size if configured. + * + * @param stateFunction Function that determines the target state for each NodeGroup + * @param transitionFunction Function that performs the actual transition + * @return CompletableFuture that completes when all transitions are done + */ + public CompletableFuture transitionStateWithFunction( + Function stateFunction, + BiFunction> transitionFunction) + { + return transitionStateWithFunction(new ArrayList<>(getUniqueNodeGroupInstances()), + stateFunction, transitionFunction); + } + + /** + * Transition specific node groups with per-group state determination, respecting batch size if configured. + * + * @param nodeGroups List of NodeGroups to transition + * @param stateFunction Function that determines the target state for each NodeGroup + * @param transitionFunction Function that performs the actual transition + * @return CompletableFuture that completes when all transitions are done + */ + public CompletableFuture transitionStateWithFunction( + List nodeGroups, + Function stateFunction, + BiFunction> transitionFunction) + { + if (nodeGroups.isEmpty()) + { + return CompletableFuture.completedFuture(true); + } + + if (provisioningBatchSize.isEmpty()) + { + logger.info("Transitioning {} node groups in parallel (per-group states)", nodeGroups.size()); + return transitionNodeGroupsInParallel(nodeGroups, stateFunction, transitionFunction); + } + + int batchSize = provisioningBatchSize.get(); + logger.info("Transitioning {} node groups in batches of {} (per-group states)", + nodeGroups.size(), batchSize); + return transitionNodeGroupsInBatches(nodeGroups, batchSize, stateFunction, transitionFunction); + } + + private CompletableFuture transitionNodeGroupsInParallel( + List nodeGroups, + Function stateFunction, + BiFunction> transitionFunction) + { + List> futures = nodeGroups.stream() + .map(ng -> transitionSingleNodeGroup(ng, stateFunction, transitionFunction)) + .toList(); + return Utils.waitForAllAsync(futures); + } + + private CompletableFuture transitionNodeGroupsInBatches( + List nodeGroups, + int batchSize, + Function stateFunction, + BiFunction> transitionFunction) + { + return processBatches(nodeGroups, batchSize, + ng -> transitionSingleNodeGroup(ng, stateFunction, transitionFunction), + "transition"); + } + + private CompletableFuture transitionSingleNodeGroup( + NodeGroup nodeGroup, + Function stateFunction, + BiFunction> transitionFunction) + { + NodeGroup.State targetState = stateFunction.apply(nodeGroup); + return transitionFunction.apply(nodeGroup, targetState) + .thenApplyAsync(checkResourcesResult -> checkResourcesResult != CheckResourcesResult.FAILED); + } + private CompletableFuture transitionState( NodeGroup.State state, BiFunction> transitionNodeGroup) { - List> futures = new ArrayList<>(); + List nodeGroups = new ArrayList<>(getUniqueNodeGroupInstances()); - for (NodeGroup nodeGroup : getUniqueNodeGroupInstances()) + if (provisioningBatchSize.isEmpty()) { - futures.add(transitionNodeGroup.apply(nodeGroup, state) - .thenApplyAsync(checkResourcesResult -> checkResourcesResult != CheckResourcesResult.FAILED)); + logger.info("Transitioning {} node groups to {} in parallel", nodeGroups.size(), state); + return transitionNodeGroupsToStateInParallel(nodeGroups, state, transitionNodeGroup); } + int batchSize = provisioningBatchSize.get(); + logger.info("Transitioning {} node groups to {} in batches of {}", + nodeGroups.size(), state, batchSize); + return transitionNodeGroupsToStateInBatches(nodeGroups, state, batchSize, transitionNodeGroup); + } + + private CompletableFuture transitionNodeGroupsToStateInParallel( + List nodeGroups, + NodeGroup.State state, + BiFunction> transitionNodeGroup) + { + List> futures = nodeGroups.stream() + .map(ng -> transitionNodeGroupToState(ng, state, transitionNodeGroup)) + .toList(); return Utils.waitForAllAsync(futures); } + private CompletableFuture transitionNodeGroupsToStateInBatches( + List nodeGroups, + NodeGroup.State state, + int batchSize, + BiFunction> transitionNodeGroup) + { + return processBatches(nodeGroups, batchSize, + ng -> transitionNodeGroupToState(ng, state, transitionNodeGroup), + "transition to " + state); + } + + /** + * Generic batch processor for node groups. Processes node groups in sequential batches, + * with each batch processed in parallel. Stops on first batch failure. + * + * @param nodeGroups List of node groups to process + * @param batchSize Size of each batch + * @param processor Function to process each node group, returning a CompletableFuture + * @param operationDescription Description of the operation for logging (e.g., "transition", "transition to STARTED_SERVICES_RUNNING") + * @return CompletableFuture that completes when all batches are processed or first failure occurs + */ + private CompletableFuture processBatches( + List nodeGroups, + int batchSize, + Function> processor, + String operationDescription) + { + List> allFutures = new ArrayList<>(); + int totalBatches = (nodeGroups.size() + batchSize - 1) / batchSize; + + for (int i = 0; i < nodeGroups.size(); i += batchSize) + { + int batchNumber = (i / batchSize) + 1; + int batchEnd = Math.min(i + batchSize, nodeGroups.size()); + List batch = nodeGroups.subList(i, batchEnd); + + logger.info("Processing batch {}/{}: NodeGroups {} to {} ({})", + batchNumber, totalBatches, i + 1, batchEnd, + batch.stream().map(ng -> ng.name).collect(java.util.stream.Collectors.joining(", "))); + + List> batchFutures = batch.stream() + .map(processor) + .toList(); + + boolean batchSuccess = Utils.waitForAll(batchFutures, logger, + "batch " + batchNumber + " " + operationDescription); + + allFutures.addAll(batchFutures); + + if (!batchSuccess) + { + logger.error("Batch {}/{} failed, stopping further batches", batchNumber, totalBatches); + break; + } + + logger.info("Batch {}/{} completed successfully", batchNumber, totalBatches); + } + + return Utils.waitForAllAsync(allFutures); + } + + private CompletableFuture transitionNodeGroupToState( + NodeGroup nodeGroup, + NodeGroup.State state, + BiFunction> transitionNodeGroup) + { + return transitionNodeGroup.apply(nodeGroup, state) + .thenApplyAsync(checkResourcesResult -> checkResourcesResult != CheckResourcesResult.FAILED); + } + /** * Last-ditch option to really destroy an ensemble * @return diff --git a/src/main/java/com/datastax/fallout/ops/EnsembleBuilder.java b/src/main/java/com/datastax/fallout/ops/EnsembleBuilder.java index e2f7292..3a77b1d 100644 --- a/src/main/java/com/datastax/fallout/ops/EnsembleBuilder.java +++ b/src/main/java/com/datastax/fallout/ops/EnsembleBuilder.java @@ -55,6 +55,7 @@ public class EnsembleBuilder private EnsembleCredentials credentials; private TestRunLinkUpdater testRunLinkUpdater = new NullTestRunLinkUpdater(); private LocalFilesHandler localFilesHandler = LocalFilesHandler.empty(); + private Optional provisioningBatchSize = Optional.empty(); public static EnsembleBuilder create() { @@ -177,6 +178,12 @@ public EnsembleBuilder withLocalFilesHandler(LocalFilesHandler localFilesHandler return this; } + public EnsembleBuilder withProvisioningBatchSize(Optional batchSize) + { + this.provisioningBatchSize = batchSize; + return this; + } + private void check() { Preconditions.checkArgument(!serverBuilders.isEmpty(), "Server Builders are missing"); @@ -242,6 +249,7 @@ public Ensemble build(Path testRunArtifactPath, TestRunScratchSpace testRunScrat NodeGroup controllers = controllerBuilder.build(); return new Ensemble(testRunId, servers, clients, observers, controllers, testRunLinkUpdater, - testRunScratchSpace.makeScratchSpaceForWorkload(), loggers.getShared(), localFilesHandler); + testRunScratchSpace.makeScratchSpaceForWorkload(), loggers.getShared(), localFilesHandler, + provisioningBatchSize); } } diff --git a/src/main/java/com/datastax/fallout/ops/FalloutPropertySpecs.java b/src/main/java/com/datastax/fallout/ops/FalloutPropertySpecs.java index a50cf83..0ca219d 100644 --- a/src/main/java/com/datastax/fallout/ops/FalloutPropertySpecs.java +++ b/src/main/java/com/datastax/fallout/ops/FalloutPropertySpecs.java @@ -76,4 +76,12 @@ public class FalloutPropertySpecs .description("Name of the test") .internal() .build(); + + public static final PropertySpec provisioningBatchSizePropertySpec = PropertySpecBuilder.createInt(prefix) + .name("provisioning.batch_size") + .description("Optional: Batch size for provisioning NodeGroups. " + + "When set, NodeGroups will be provisioned in batches of this size " + + "instead of all in parallel.") + .required(false) + .build(); }