Add foreach flattened view support via maestro-extensions module#194
Merged
rdeepak2002 merged 11 commits intomainfrom Mar 24, 2026
Merged
Add foreach flattened view support via maestro-extensions module#194rdeepak2002 merged 11 commits intomainfrom
rdeepak2002 merged 11 commits intomainfrom
Conversation
Ports the "flattened foreach" feature from internal Maestro to OSS. This denormalization strategy stores foreach step iterations in a dedicated table for efficient cursor-based pagination, status filtering, and summary statistics - critical for workflows with large iteration counts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Redesign maestro-extensions to match internal's architecture: - Separate Spring Boot app (port 8081) instead of library in maestro-server - SQS event consumption via @SqsListener (SNS -> SQS subscription) - HTTP calls to maestro-server REST API via HttpMaestroDataProvider - Metrics aligned with internal (failure/duration counters, timer) - JavaTimeModule registered on ObjectMapper - Own application.yml, DataSource config, Flyway migrations - LocalStack bootstrap: maestro-event SQS queue + SNS subscription Removed: ForeachNotificationInterceptor, ForeachFlattenConfiguration, DaoBackedMaestroDataProvider, maestro-engine dependency Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add docker-java.properties (api.version=1.44) to both maestro-server and maestro-extensions to fix Testcontainers Docker detection issue - Move testcontainerDep and postgresqlDep from testImplementation to runtimeOnly in extensions so bootRun can use jdbc:tc: URL - Add bvalJsrDep (Bean Validation provider) to extensions for @Min/@max validation in ForeachFlattenController E2E tested: both services start successfully, foreach workflow executes, events flow through SNS->SQS->extensions, flattened view endpoints return correct data. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- SqsMaestroEventListener: switch to MANUAL ack mode with Acknowledgement and Visibility params matching internal's SqsProcessorFinalizer pattern (ack on success, ack+delete on non-retryable, visibility extension on retryable failure, receive count monitoring at threshold 100) - MaestroEventProcessor: re-throw exceptions as MaestroRetryableError so the SQS listener can distinguish retryable from non-retryable failures - sqs_bootstrap.sh: add maestro-event-dlq queue and redrive policy (maxReceiveCount=5) on the maestro-event queue - Update tests for new method signatures and error handling behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Matches internal's processingExceptionShouldThrowAndIncrementMetric test by also verifying the failure counter is incremented when the processor re-throws as MaestroRetryableError. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- sns_bootstrap.sh: enable RawMessageDelivery=true on the SNS→SQS subscription so messages arrive as raw MaestroEvent JSON, matching internal's raw delivery configuration - SqsMaestroEventListener: remove unwrapSnsEnvelope() and JsonNode imports — no longer needed with raw delivery. Directly deserialize the payload as MaestroEvent, matching internal's pattern. - SqsMaestroEventListenerTest: remove SNS-wrapped payload test - ForeachFlatteningHelper: fix javadoc typo (3344-13-223 → 3344-13-23) matching internal Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Rename MaestroDataProvider → MaestroClient and HttpMaestroDataProvider → HttpMaestroClient to align with internal maestro-client naming. Also rename getStepInstance → getWorkflowStepInstance to match internal API surface. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
It was a no-op delegate to withRetryableQuery. The internal version has a TODO to route to a read replica, but OSS has no such plan. Replace the single call site with withRetryableQuery directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The RedrivePolicy JSON inside --attributes shorthand caused a parse error, preventing the maestro-event queue from being created. Split into create-queue + set-queue-attributes to fix. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
9 tasks
…ibrary Converts maestro-extensions from a separate Spring Boot service (port 8081) to an auto-configuration library that registers into maestro-server when extensions.enabled=true. This follows the same pattern as maestro-aws. Key changes: - Remove standalone app class, application.yml, and docker-java.properties - Refactor MaestroExtensionsConfiguration to inject host beans via @qualifier and parameter naming instead of creating its own - Add @componentscan to pick up REST controllers from extensions package - Reuse server's HttpClient bean instead of creating a separate one - Add AutoConfiguration.imports for Spring Boot auto-config discovery - Delete SpectatorMaestroMetrics entirely (tests use mock instead) - Add maestro-extensions dependency to maestro-server build.gradle - Add extensions config block to application-aws.yml Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace boolean-flag method with explicit markTransactionSerializable() and markTransactionSerializableReadOnly() per PR #195 review feedback. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
akashdw
approved these changes
Mar 24, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Pull Request type
./gradlew build --write-locksto refresh dependencies)NOTE: Please remember to run
./gradlew spotlessApplyto fix any format violations.Changes in this PR
Ports the "flattened foreach" feature from internal Maestro (
maestro-signal-service) to OSS as a newmaestro-extensionsmodule. This is an auto-configurable library that registers into maestro-server whenextensions.enabled=true(set in theawsprofile). It consumes maestro events via SQS (subscribed to the SNS topic) and makes HTTP calls to maestro-server for data — matching the internal architecture.The denormalization strategy stores foreach step iterations in a dedicated
maestro_step_foreach_flattenedtable for efficient cursor-based pagination, status filtering, and summary statistics — critical for workflows with large iteration counts.Architecture:
maestro-extensionsis an auto-configurable library registered into maestro-server via Spring Boot auto-configuration (all endpoints on port 8080)@ConditionalOnProperty(value = "extensions.enabled", havingValue = "true")to activate only when configured@ComponentScanto register REST controllers from the extensions packageDataSource,ObjectMapper,MaestroMetrics,DatabaseConfiguration,HttpClientStepInstanceStatusChangeEventfrom an SQS queue (maestro-event) subscribed to the maestro SNS topicHttpMaestroClientfor workflow/step dataclasspath:db/migration/postgres/when jar is on classpathSqsProcessorFinalizerpatternRawMessageDelivery=trueso messages arrive as raw JSON (matching internal)File inventory
Files matching internal (logic identical, only package/annotation/import differences)
These files are direct ports from
maestro-signal-service. The core logic is identical; differences are limited to package names, license headers, DI annotations (@Inject/@Singleton→ Spring@Bean), metrics class names (MetricRepo→MaestroMetrics), and@SuppressFBWarningsannotations.handlers/ForeachFlatteningHandler.javahandlers/foreach/ForeachFlatteningHandler.java@SuppressFBWarningsprocessors/StepEventPreprocessor.javamessageProcessors/StepEventPreprocessor.javaMetricRepo→MaestroMetrics; metric constant inlineddao/models/ForeachFlattenedInstance.javadao/models/ForeachFlattenedInstance.javadao/models/ForeachFlattenedModel.javadao/models/ForeachFlattenedModel.javamodels/StepEventHandlerInput.javamodels/StepEventHandlerInput.java@SuppressFBWarningsmodels/StepIteration.javaclient/models/flattening/StepIteration.javamodels/StepIterationsSummary.javaclient/models/flattening/StepIterationsSummary.javautils/ForeachFlatteningHelper.javautils/ForeachFlatteningHelper.javautils/StepInstanceStatusEncoder.javautils/StepInstanceStatusEncoder.javaEnumMapreturn/field types →Mapinterface (PMD LooseCoupling rule); internal logic identicalcontrollers/ForeachFlattenController.javacontrollers/flattening/ForeachFlattenController.java@SuppressWarnings→@SuppressFBWarnings;javax.annotation.Nullable→com.netflix.maestro.annotations.NullableFiles with logical differences from internal
processors/MaestroEventProcessor.javamessageProcessors/MaestroEventProcessor.javaTargetedAtGroupHandler,TrinoDirectHandler, andWORKFLOW_INSTANCE_STATUS_CHANGE_EVENThandling (not in scope for OSS). ChangesMessageProcessor<T>interface withSupplier<MaestroEvent>to directMaestroEventparameter. Core foreach-handling logic is identical.listeners/SqsMaestroEventListener.javalisteners/SqsMaestroEventListener.javaSqsProcessorFinalizerutility class; OSS inlines the same manual ack / visibility / DLQ logic directly in the listener (same behavior, no shared library abstraction). UsesObjectMapper.readValue()directly instead ofMessageProcessor<T>withSupplier<T>.dao/MaestroForeachFlattenedDao.javadao/flattening/MaestroForeachFlattenedDao.javaSignalFastProperties.isUsePostgresForSignalService()), two separate INSERT queries, and@iterationIdxCRDB index hints. OSS is Postgres-only — single INSERT query, no CRDB conditionals.utils/PaginationHelper.javashared/utils/PaginationHelper.javavalidateParamAndDeriveDirection()+buildEmptyPaginationResult()(used by the controller). Internal's full pagination range calculation and result building are for other signal-service features not ported here.OSS-only files (no internal counterpart)
These files are needed because OSS uses Spring Boot (vs internal's Micronaut) and doesn't have access to internal shared libraries (
maestro-client,maestro-shared).config/MaestroExtensionsConfiguration.java@Configurationwith@ConditionalOnProperty+@ComponentScanfor auto-config. Injects host beans via@Qualifier(Constants.MAESTRO_QUALIFIER)and parameter naming.properties/MaestroExtensionsProperties.java@ConfigurationPropertiesfor SQS queue URL, maestro base URLprovider/MaestroClient.javaMaestroClientmethod names (getWorkflowInstance,getWorkflowDefinition,getWorkflowStepInstance). Internal'sMaestroClientis an 800+ line concrete class with SSLSocketFactory, streaming iterators, POST methods; OSS extracts only the 3 methods needed for foreach flattening.provider/HttpMaestroClient.javaMaestroClientuses Metatron mTLS)Resource files
src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.importssrc/main/resources/db/migration/postgres/V202311161000__add_foreach_flattened.sqlmaestro_step_foreach_flattenedtable (identical to internal, trailing newline only)src/test/resources/flattening/flattening_records.jsonTest files (11 files)
ExtensionsBaseTest.javaSignalBaseTest/MaestroBaseTest)ExtensionsDaoBaseTest.javaSignalDaoBaseTest)MaestroForeachFlattenedDaoTest.javaSignalFastPropertiesmock, dual INSERT)ForeachFlatteningHandlerTest.javaForeachFlatteningHelperTest.javaStepInstanceStatusEncoderTest.javaForeachFlattenControllerTest.javaStepEventPreprocessorTest.javaStepInstanceStatusChangeEvent.builder()+ AssertJ (internal uses mock fields + JUnit Assert + manual Spectator registry metric verification)MaestroEventProcessorTest.javaSqsMaestroEventListenerTest.javaSqsProcessorFinalizerTestbut tests the inlined listener logicHttpMaestroClientTest.javaChanges to existing modules
settings.gradlemaestro-extensionsto multi-project buildmaestro-database/.../AbstractDatabaseDao.javamarkTransactionSerializable()maestro-aws/localstack/sqs_bootstrap.shmaestro-eventqueue with DLQ + redrive policymaestro-aws/localstack/sns_bootstrap.shRawMessageDelivery=truemaestro-server/build.gradleimplementation project(':maestro-extensions')dependencymaestro-server/.../application-aws.ymlextensions.enabled, queue URL, base URL)maestro-server/.../docker-java.propertiesE2E Testing
These commands have been verified end-to-end on macOS. Copy and paste each command in order.
Prerequisites
Step 1: Start infrastructure (LocalStack + Redis)
Wait until
docker psshows both containers as healthy/running.Step 2: Start maestro-server with extensions (single terminal)
./gradlew :maestro-server:bootRun --args='--spring.profiles.active=aws'Wait for:
Started MaestroApp in X seconds. Extensions auto-configure and you should see log lines like:Step 3: Create the foreach workflow
Expected: JSON response with
workflow_definitioncontainingsample-foreach-wf.Step 4: Trigger the workflow
Expected: JSON with
"status":"CREATED"andworkflow_instance_id: 1.Step 5: Wait for execution + event processing
The foreach creates 9 iterations (dates 20200101-20200109), each with 6 inner steps (job.1-job.6). Events flow: maestro-server → SNS → SQS → extensions listener (same process).
Step 6: Check workflow completed
Expected:
"status": "SUCCEEDED"Step 7: Query flattened foreach summary (same port 8080)
The flattened view is keyed by inner step ID (e.g.,
job.1), not the foreach step name.Expected:
{ "representative_iteration": { "step_id": "job.1", "loop_params": {"i": "20200101"}, "step_runtime_state": {"status": "SUCCEEDED"} }, "count_by_status": { "SUCCEEDED": 9 }, "loop_param_values": { "i": ["20200101", "20200102", "20200103", "20200104", "20200105"] } }Step 8: Query paginated iterations
Expected: 3 iterations with
has_next_page: true, each showing loop paramiandSUCCEEDEDstatus.Step 9: Get a single iteration by ID
Expected: Single iteration with
iteration_rank: "11",loop_params: {"i": "20200101"}, statusSUCCEEDED.Cleanup