Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ You can read more details about it in our series of blog posts
## Delete the sample workflow and its data
- `curl --header "user: tester" -X DELETE 'http://127.0.0.1:8080/api/v3/workflows/sample-dag-test-1'`

## Run it with maestro-extensions (foreach flattening service)
The `maestro-extensions` module runs as a separate Spring Boot service that listens to maestro
events via SQS (subscribed to the SNS topic maestro-server publishes to) and provides additional
functionality such as foreach step flattening views.

To run maestro-server and maestro-extensions together locally:
1. Start LocalStack (provides local SQS/SNS):
- `docker compose -f maestro-aws/docker-compose.yml up -d`
2. Start maestro-server (port 8080):
- `./gradlew :maestro-server:bootRun --args='--spring.profiles.active=aws'`
3. Start maestro-extensions (port 8081):
- `./gradlew :maestro-extensions:bootRun`

Once both services are running, maestro-extensions will consume step instance status change events
from the `maestro-event` SQS queue and process foreach flattening. Query the flattened views via
the extensions REST API on port 8081.

## Run it with Kubernetes support
- setup kubernetes configs so the kubectl command works
- `./gradlew bootRun`
Expand Down
16 changes: 16 additions & 0 deletions maestro-aws/localstack/sns_bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,19 @@ LOCALSTACK_HOST=localhost
AWS_REGION=us-east-1

awslocal --endpoint-url=http://${LOCALSTACK_HOST}:4566 sns create-topic --name maestro-test --region ${AWS_REGION}

# Subscribe maestro-event SQS queue to SNS topic for maestro-extensions consumption
# RawMessageDelivery=true sends the raw message body (not the SNS envelope JSON),
# matching internal's raw delivery configuration.
SUBSCRIPTION_ARN=$(awslocal --endpoint-url=http://${LOCALSTACK_HOST}:4566 sns subscribe \
--topic-arn arn:aws:sns:${AWS_REGION}:000000000000:maestro-test \
--protocol sqs \
--notification-endpoint arn:aws:sqs:${AWS_REGION}:000000000000:maestro-event \
--region ${AWS_REGION} \
--output text --query 'SubscriptionArn')

awslocal --endpoint-url=http://${LOCALSTACK_HOST}:4566 sns set-subscription-attributes \
--subscription-arn "${SUBSCRIPTION_ARN}" \
--attribute-name RawMessageDelivery \
--attribute-value true \
--region ${AWS_REGION}
12 changes: 12 additions & 0 deletions maestro-aws/localstack/sqs_bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ awslocal --endpoint-url=http://${LOCALSTACK_HOST}:4566 sqs create-queue --queue-
create_fifo_queue "maestro-signal-instance.fifo"
create_fifo_queue "maestro-signal-trigger-match.fifo"
create_fifo_queue "maestro-signal-trigger-execution.fifo"

# Dead-letter queue for maestro-event messages that exceed max receive count
create_queue "maestro-event-dlq"

# Queue for maestro-extensions to consume maestro events (subscribed to SNS topic)
# Redrive policy sends messages to DLQ after 5 failed processing attempts
create_queue "maestro-event"
DLQ_ARN="arn:aws:sqs:${AWS_REGION}:000000000000:maestro-event-dlq"
awslocal --endpoint-url=http://${LOCALSTACK_HOST}:4566 sqs set-queue-attributes \
--queue-url http://${LOCALSTACK_HOST}:4566/000000000000/maestro-event \
--region ${AWS_REGION} \
--attributes "{\"RedrivePolicy\":\"{\\\"deadLetterTargetArn\\\":\\\"${DLQ_ARN}\\\",\\\"maxReceiveCount\\\":\\\"5\\\"}\"}"
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,32 @@ private <R> R withTransaction(final ConnectionFunction<R> function) throws SQLEx
}
}

/**
* Mark the current transaction as SERIALIZABLE isolation level.
*
* @param conn the connection
* @throws SQLException sql exception
*/
protected void markTransactionSerializable(Connection conn) throws SQLException {
try (PreparedStatement stmt =
conn.prepareStatement("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")) {
stmt.execute();
}
}

/**
* Mark the current transaction as SERIALIZABLE READ ONLY isolation level.
*
* @param conn the connection
* @throws SQLException sql exception
*/
protected void markTransactionSerializableReadOnly(Connection conn) throws SQLException {
try (PreparedStatement stmt =
conn.prepareStatement("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY")) {
stmt.execute();
}
}

/**
* Initialize a new transactional {@link Connection} from {@link #dataSource}, prepare a new SQL
* statement, and pass then it to {@literal function}. It includes a retry mechanism for client
Expand Down
30 changes: 30 additions & 0 deletions maestro-extensions/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
dependencies {
implementation project(':maestro-database')
implementation project(':maestro-common')
implementation spectatorApiDep
implementation jacksonDatabindDep
implementation jacksonAnnotationsDep
implementation jakartaValidationDep
api slf4jApiDep

compileOnly lombokDep
annotationProcessor lombokDep
testCompileOnly lombokDep
testAnnotationProcessor lombokDep

implementation 'org.springframework.boot:spring-boot-starter-web:3.+'
implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs:3.+'
implementation 'io.swagger.core.v3:swagger-annotations:2.+'
implementation bvalJsrDep

runtimeOnly testcontainerDep
runtimeOnly postgresqlDep

testImplementation junitDep
testImplementation mockitoCoreDep
testImplementation assertJCoreDep
testImplementation hikaricpDep
testImplementation flywayDep
testImplementation(testFixtures(project(':maestro-common')))
testImplementation(testFixtures(project(':maestro-database')))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.maestro.extensions.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.maestro.annotations.SuppressFBWarnings;
import com.netflix.maestro.database.DatabaseConfiguration;
import com.netflix.maestro.extensions.dao.MaestroForeachFlattenedDao;
import com.netflix.maestro.extensions.handlers.ForeachFlatteningHandler;
import com.netflix.maestro.extensions.listeners.SqsMaestroEventListener;
import com.netflix.maestro.extensions.processors.MaestroEventProcessor;
import com.netflix.maestro.extensions.processors.StepEventPreprocessor;
import com.netflix.maestro.extensions.properties.MaestroExtensionsProperties;
import com.netflix.maestro.extensions.provider.HttpMaestroClient;
import com.netflix.maestro.extensions.provider.MaestroClient;
import com.netflix.maestro.extensions.utils.ForeachFlatteningHelper;
import com.netflix.maestro.metrics.MaestroMetrics;
import com.netflix.maestro.models.Constants;
import java.net.http.HttpClient;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
* Auto-configuration for the Maestro Extensions module. Activates when {@code extensions.enabled}
* is set to {@code true}. Injects shared beans (DataSource, ObjectMapper, MaestroMetrics,
* DatabaseConfiguration) from the hosting application.
*/
@Configuration
@ComponentScan(basePackages = "com.netflix.maestro.extensions")
@EnableConfigurationProperties(MaestroExtensionsProperties.class)
@ConditionalOnProperty(value = "extensions.enabled", havingValue = "true")
@SuppressFBWarnings("EI_EXPOSE_REP2")
@Slf4j
public class MaestroExtensionsConfiguration {

@Bean
public MaestroClient maestroClient(
MaestroExtensionsProperties properties,
@Qualifier(Constants.MAESTRO_QUALIFIER) ObjectMapper objectMapper,
HttpClient httpClient) {
LOG.info("Creating HttpMaestroClient within Spring boot...");
return new HttpMaestroClient(properties.getMaestroBaseUrl(), objectMapper, httpClient);
}

@Bean
public ForeachFlatteningHelper foreachFlatteningHelper() {
return new ForeachFlatteningHelper();
}

@Bean
public MaestroForeachFlattenedDao maestroForeachFlattenedDao(
DataSource maestroDataSource,
@Qualifier(Constants.MAESTRO_QUALIFIER) ObjectMapper objectMapper,
DatabaseConfiguration config,
MaestroMetrics metrics) {
LOG.info("Creating MaestroForeachFlattenedDao within Spring boot...");
return new MaestroForeachFlattenedDao(maestroDataSource, objectMapper, config, metrics);
}

@Bean
public ForeachFlatteningHandler foreachFlatteningHandler(
MaestroClient maestroClient, MaestroForeachFlattenedDao dao, ForeachFlatteningHelper helper) {
LOG.info("Creating ForeachFlatteningHandler within Spring boot...");
return new ForeachFlatteningHandler(maestroClient, dao, helper);
}

@Bean
public StepEventPreprocessor stepEventPreprocessor(
MaestroClient maestroClient, MaestroMetrics metrics) {
LOG.info("Creating StepEventPreprocessor within Spring boot...");
return new StepEventPreprocessor(maestroClient, metrics);
}

@Bean
public MaestroEventProcessor maestroEventProcessor(
StepEventPreprocessor preprocessor,
ForeachFlatteningHandler handler,
MaestroMetrics metrics) {
LOG.info("Creating MaestroEventProcessor within Spring boot...");
return new MaestroEventProcessor(preprocessor, handler, metrics);
}

@Bean
public SqsMaestroEventListener sqsMaestroEventListener(
MaestroEventProcessor processor,
@Qualifier(Constants.MAESTRO_QUALIFIER) ObjectMapper objectMapper) {
LOG.info("Creating SqsMaestroEventListener within Spring boot...");
return new SqsMaestroEventListener(processor, objectMapper);
}
}
Loading