diff --git a/typescript/airflow-lambda-dynamodb-approval/.gitignore b/typescript/airflow-lambda-dynamodb-approval/.gitignore new file mode 100644 index 000000000..a7d381914 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/.gitignore @@ -0,0 +1,68 @@ +*.js +!jest.config.js +*.d.ts +node_modules + +# CDK asset staging directory +.cdk.staging +cdk.out + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +!lib/*.ts +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Virtual environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db diff --git a/typescript/airflow-lambda-dynamodb-approval/.npmignore b/typescript/airflow-lambda-dynamodb-approval/.npmignore new file mode 100644 index 000000000..c1d6d45dc --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/.npmignore @@ -0,0 +1,6 @@ +*.ts +!*.d.ts + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/typescript/airflow-lambda-dynamodb-approval/README.md b/typescript/airflow-lambda-dynamodb-approval/README.md new file mode 100644 index 000000000..0b1a4b30e --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/README.md @@ -0,0 +1,123 @@ + +--- + +![Stability: Stable](https://img.shields.io/badge/stability-Stable-success.svg?style=for-the-badge) + +> **This is a stable example.** It should successfully build out of the box + +--- + + +# AWS MWAA (Managed Workflows for Apache Airflow) with CDK + +This sample demonstrates how to deploy AWS Managed Workflows for Apache Airflow (MWAA) using AWS CDK with example DAGs for basic workflows, Lambda integration, and human approval processes. + +## Overview + +This CDK project creates: +- **VPC** with public/private subnets and NAT Gateway +- **S3 bucket** for DAG storage with automatic deployment +- **MWAA environment** with proper IAM roles and security groups +- **DynamoDB table** for human approval workflows +- **Lambda function** for integration testing +- **Three example DAGs** demonstrating different patterns + +### DAG Workflows + +```mermaid +graph TD + subgraph "example_dag" + A1[Start] --> A2[Print Hello] + A2 --> A3[Print Goodbye] + A3 --> A4[End] + end + + subgraph "lambda_invoke_dag" + B1[List Lambda Functions] --> B2[Invoke Demo Lambda] + B2 --> B3[Process Response] + end + + subgraph "ddb_approval_dag" + C1[Create Approval Request] --> C5[(DynamoDB Table)] + C1 --> C2[Wait for Human Approval] + C5 --> C2 + C2 --> C3[Process Transaction] + C3 --> C4[Complete] + + C6[Manual Approval via AWS Console] --> C5 + end +``` + +**Example DAGs included:** +1. **`example_dag.py`** - Basic Airflow workflow with simple tasks +2. **`lambda_invoke_dag.py`** - Demonstrates Lambda function invocation from Airflow +3. **`ddb_approval_dag.py`** - Human approval workflow using DynamoDB sensors + +## Build + +```bash +npm install +npm run build +``` + +## Deploy + +```bash +cdk deploy +``` + +**Note the outputs** after deployment: +- **MwaaWebServerUrl** - Access the Airflow web interface +- **S3BucketName** - Where your DAGs are stored +- **ApprovalTableName** - DynamoDB table for approval workflows + +## Usage + +### Access Airflow Web UI +1. **Login to AWS Console first** - Ensure you're logged into the AWS Console in your browser +2. **Use the deployment output URL** - Copy the `MwaaWebServerUrl` from the deployment outputs +3. **Access Airflow** - Open the URL in the same browser where you're logged into AWS Console + +**Note:** MWAA requires AWS authentication even with `PUBLIC_ONLY` access mode. You must be logged into the AWS Console to access the Airflow web interface. + +### Test Basic Workflow +1. Trigger the `example_dag` DAG from the Airflow UI +2. Watch it execute the simple "Hello" and "Goodbye" tasks +3. Verify successful completion in the DAG run logs + +### Test Lambda Integration +1. Trigger the `lambda_invoke_example` DAG from the Airflow UI +2. View logs to see Lambda function listing and invocation results + +### Test Human Approval Workflow +1. Trigger the `dynamodb_human_approval_pipeline` DAG from the Airflow UI +2. Go to AWS Console → DynamoDB → Tables → `mwaa-approval-table-{region}` +3. Find your process record and change `approval_status` from `PENDING` to `APPROVED` +4. Watch the workflow complete automatically + +## Clean up + +```bash +cdk destroy +``` + +All resources are configured with `RemovalPolicy.DESTROY` for easy cleanup. + +## Architecture + +- **Environment Class**: `mw1.small` (1-2 workers) +- **Airflow Version**: 2.7.2 +- **Web Access**: Public (configure private access for production) +- **Logging**: All log types enabled at INFO level + +## Security Notes + +This sample uses demo-friendly settings. For production: + +- **Web Access**: Change from `PUBLIC_ONLY` to `PRIVATE_ONLY` for MWAA environment +- **IAM Permissions**: Replace wildcard (`'*'`) permissions with specific resource ARNs +- **S3 Encryption**: Enable server-side encryption for the DAGs bucket +- **VPC Endpoints**: Add endpoints for S3, DynamoDB, and Lambda to avoid internet traffic +- **Resource Policies**: Use `RETAIN` instead of `DESTROY` for production resources +- **DynamoDB**: Enable point-in-time recovery (currently disabled) +- **Monitoring**: Enable CloudTrail and CloudWatch alarms for security monitoring diff --git a/typescript/airflow-lambda-dynamodb-approval/bin/aws-mwaa-cdk.ts b/typescript/airflow-lambda-dynamodb-approval/bin/aws-mwaa-cdk.ts new file mode 100644 index 000000000..c1284d730 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/bin/aws-mwaa-cdk.ts @@ -0,0 +1,6 @@ +#!/usr/bin/env node +import * as cdk from 'aws-cdk-lib'; +import { AwsMwaaCdkStack } from '../lib/aws-mwaa-cdk-stack'; + +const app = new cdk.App(); +new AwsMwaaCdkStack(app, 'AwsMwaaCdkStack'); diff --git a/typescript/airflow-lambda-dynamodb-approval/cdk.json b/typescript/airflow-lambda-dynamodb-approval/cdk.json new file mode 100644 index 000000000..d1cb713b8 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/cdk.json @@ -0,0 +1,99 @@ +{ + "app": "npx ts-node --prefer-ts-exts bin/aws-mwaa-cdk.ts", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "**/*.d.ts", + "**/*.js", + "tsconfig.json", + "package*.json", + "yarn.lock", + "node_modules", + "test" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-kms:applyImportedAliasPermissionsToPrincipal": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true, + "@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true, + "@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true, + "@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true, + "@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true, + "@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true, + "@aws-cdk/aws-eks:nodegroupNameAttribute": true, + "@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true, + "@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true, + "@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false, + "@aws-cdk/aws-s3:keepNotificationInImportedBucket": false, + "@aws-cdk/core:explicitStackTags": true, + "@aws-cdk/aws-ecs:enableImdsBlockingDeprecatedFeature": false, + "@aws-cdk/aws-ecs:disableEcsImdsBlocking": true, + "@aws-cdk/aws-ecs:reduceEc2FargateCloudWatchPermissions": true, + "@aws-cdk/aws-dynamodb:resourcePolicyPerReplica": true, + "@aws-cdk/aws-ec2:ec2SumTImeoutEnabled": true, + "@aws-cdk/aws-appsync:appSyncGraphQLAPIScopeLambdaPermission": true, + "@aws-cdk/aws-rds:setCorrectValueForDatabaseInstanceReadReplicaInstanceResourceId": true, + "@aws-cdk/core:cfnIncludeRejectComplexResourceUpdateCreatePolicyIntrinsics": true, + "@aws-cdk/aws-lambda-nodejs:sdkV3ExcludeSmithyPackages": true, + "@aws-cdk/aws-stepfunctions-tasks:fixRunEcsTaskPolicy": true, + "@aws-cdk/aws-ec2:bastionHostUseAmazonLinux2023ByDefault": true, + "@aws-cdk/aws-route53-targets:userPoolDomainNameMethodWithoutCustomResource": true, + "@aws-cdk/aws-elasticloadbalancingV2:albDualstackWithoutPublicIpv4SecurityGroupRulesDefault": true, + "@aws-cdk/aws-iam:oidcRejectUnauthorizedConnections": true, + "@aws-cdk/core:enableAdditionalMetadataCollection": true, + "@aws-cdk/aws-lambda:createNewPoliciesWithAddToRolePolicy": false, + "@aws-cdk/aws-s3:setUniqueReplicationRoleName": true, + "@aws-cdk/aws-events:requireEventBusPolicySid": true, + "@aws-cdk/core:aspectPrioritiesMutating": true, + "@aws-cdk/aws-dynamodb:retainTableReplica": true, + "@aws-cdk/aws-stepfunctions:useDistributedMapResultWriterV2": true, + "@aws-cdk/s3-notifications:addS3TrustKeyPolicyForSnsSubscriptions": true, + "@aws-cdk/aws-ec2:requirePrivateSubnetsForEgressOnlyInternetGateway": true, + "@aws-cdk/aws-s3:publicAccessBlockedByDefault": true, + "@aws-cdk/aws-lambda:useCdkManagedLogGroup": true + } +} diff --git a/typescript/airflow-lambda-dynamodb-approval/dags/ddb_approval_dag.py b/typescript/airflow-lambda-dynamodb-approval/dags/ddb_approval_dag.py new file mode 100644 index 000000000..6ad662d6a --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/dags/ddb_approval_dag.py @@ -0,0 +1,186 @@ +import pendulum +import os +import boto3 +from datetime import datetime +from airflow.models.dag import DAG +from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator + +def get_approval_table_name(): + """Get the DynamoDB approval table name created by CDK""" + # Use the same naming pattern as CDK: mwaa-approval-table-${region} + try: + session = boto3.Session() + region = session.region_name or 'us-west-2' # Default region + return f"mwaa-approval-table-{region}" + except: + # Final fallback + return "mwaa-approval-table-us-west-2" + +def create_approval_request(**context): + """Create an approval request in DynamoDB that needs human approval""" + import boto3 + import json + from datetime import datetime + + # Get process ID from DAG run config or use default + dag_conf = context.get('dag_run').conf or {} + process_id = dag_conf.get('process_id', "finance-run-12345") + + # Get approval details from config + approval_context = { + 'id': process_id, # DynamoDB partition key must be 'id' + 'process_id': process_id, # Keep for backwards compatibility + 'transaction_amount': dag_conf.get('amount', 25000), + 'requester': dag_conf.get('requester', 'requester-123'), + 'department': dag_conf.get('department', 'finance'), + 'reason': dag_conf.get('reason', 'reason-123'), + 'urgency': dag_conf.get('urgency', 'HIGH'), + 'created_at': datetime.now().isoformat(), + 'approval_status': 'PENDING', + 'dag_run_id': context['dag_run'].run_id, + 'dag_id': context['dag'].dag_id + } + + table_name = get_approval_table_name() + print(f"📝 Creating approval request in DynamoDB table: {table_name}") + print(f" Process ID: {process_id}") + print(f" Amount: ${approval_context['transaction_amount']:,}") + print(f" Requester: {approval_context['requester']}") + print(f" Department: {approval_context['department']}") + print(f" Urgency: {approval_context['urgency']}") + + try: + dynamodb = boto3.resource('dynamodb') + table = dynamodb.Table(table_name) + + # Create the approval request + response = table.put_item(Item=approval_context) + + print("✅ Approval request created successfully") + print(f" Waiting for human approval on process: {process_id}") + print(f" Approvers should set 'approval_status' to 'APPROVED' in DynamoDB") + + # Store process ID for the sensor + context['task_instance'].xcom_push(key='process_id', value=process_id) + + return approval_context + + except Exception as e: + print(f"❌ Error creating approval request: {e}") + raise + +def process_approved_transaction(**context): + """Process the transaction after human approval""" + + # Get process ID from XCom + process_id = context['task_instance'].xcom_pull( + task_ids='create_approval_request', + key='process_id' + ) + + # Get the approved record from DynamoDB + table_name = get_approval_table_name() + + try: + dynamodb = boto3.resource('dynamodb') + table = dynamodb.Table(table_name) + + response = table.get_item(Key={'id': process_id}) + + if 'Item' in response: + approved_item = response['Item'] + + print("🎉 TRANSACTION APPROVED AND PROCESSING") + print("=" * 50) + print(f" Process ID: {approved_item.get('process_id', 'unknown')}") + print(f" Amount: ${approved_item.get('transaction_amount', 0):,}") + print(f" Requester: {approved_item.get('requester', 'unknown')}") + print(f" Department: {approved_item.get('department', 'unknown')}") + print(f" Approved At: {approved_item.get('approval_status', 'unknown')}") + print("=" * 50) + + # Simulate transaction processing + processing_steps = [ + "Validating approval authority", + "Checking budget allocation", + "Processing payment", + "Updating financial records", + "Sending confirmation" + ] + + for step in processing_steps: + print(f" ✓ {step}") + + # Update the record to show it's been processed + table.update_item( + Key={'id': process_id}, + UpdateExpression='SET approval_status = :status, processed_at = :processed_at', + ExpressionAttributeValues={ + ':status': 'PROCESSED', + ':processed_at': datetime.now().isoformat() + } + ) + + print("✅ Transaction processing completed successfully!") + + return approved_item + else: + print(f"⚠️ Warning: Could not find approved record for process {process_id}") + return None + + except Exception as e: + print(f"❌ Error processing approved transaction: {e}") + raise + +# Get the DynamoDB table name from CDK +APPROVAL_TABLE = get_approval_table_name() + +with DAG( + dag_id="dynamodb_human_approval_pipeline", + start_date=pendulum.datetime(2025, 1, 1), + schedule=None, + tags=["dynamodb", "human-approval", "demo"], + catchup=False, + description="Human approval workflow using DynamoDB sensor - waits for manual approval in DDB table", +) as dag: + + # Task 1: Create approval request in DynamoDB + create_request = PythonOperator( + task_id="create_approval_request", + python_callable=create_approval_request, + ) + + # Task 2: Wait for human approval in DynamoDB + wait_for_approval = DynamoDBValueSensor( + task_id="wait_for_approval", + aws_conn_id="aws_default", + table_name=APPROVAL_TABLE, + # Key for the item - will be the process_id from the previous task + partition_key_name="id", + partition_key_value="{{ ti.xcom_pull(task_ids='create_approval_request', key='process_id') }}", + # Check for approval_status = 'APPROVED' + attribute_name="approval_status", + attribute_value="APPROVED", + # How often to check (in seconds) + poke_interval=30, + # Maximum time to wait (1 day) + timeout=60 * 60 * 24, + mode="reschedule", + ) + + # Task 3: Process the approved transaction + process_transaction = PythonOperator( + task_id="process_approved_transaction", + python_callable=process_approved_transaction, + ) + + # Task 4: Final confirmation + completion_message = BashOperator( + task_id="approval_workflow_complete", + bash_command='echo "🎉 Human approval workflow completed successfully! Transaction processed."' + ) + + # Define task flow + create_request >> wait_for_approval >> process_transaction >> completion_message diff --git a/typescript/airflow-lambda-dynamodb-approval/dags/example_dag.py b/typescript/airflow-lambda-dynamodb-approval/dags/example_dag.py new file mode 100644 index 000000000..1e5a60d07 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/dags/example_dag.py @@ -0,0 +1,59 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.empty import EmptyOperator +from airflow.operators.python import PythonOperator + +def print_hello(): + print("Hello!") + return "Hello!" + +def print_goodbye(): + print("Goodbye!") + return "Goodbye!" + +# Default arguments for the DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +# Define the DAG +dag = DAG( + 'example_dag', + default_args=default_args, + description='A simple example DAG for MWAA', + schedule_interval=timedelta(hours=1), + catchup=False, + tags=['example'], +) + +# Define tasks +start_task = EmptyOperator( + task_id='start', + dag=dag, +) + +hello_task = PythonOperator( + task_id='print_hello', + python_callable=print_hello, + dag=dag, +) + +goodbye_task = PythonOperator( + task_id='print_goodbye', + python_callable=print_goodbye, + dag=dag, +) + +end_task = EmptyOperator( + task_id='end', + dag=dag, +) + +# Define task dependencies +start_task >> hello_task >> goodbye_task >> end_task diff --git a/typescript/airflow-lambda-dynamodb-approval/dags/lambda_invoke_dag.py b/typescript/airflow-lambda-dynamodb-approval/dags/lambda_invoke_dag.py new file mode 100644 index 000000000..e81a262d1 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/dags/lambda_invoke_dag.py @@ -0,0 +1,97 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python import PythonOperator +import boto3 +import json + +def check_lambda_functions(): + """Python function to list available Lambda functions""" + lambda_client = boto3.client('lambda') + + try: + response = lambda_client.list_functions() + functions = response.get('Functions', []) + + print(f"Found {len(functions)} Lambda functions:") + for func in functions[:5]: # Show first 5 functions + print(f"- {func['FunctionName']} (Runtime: {func['Runtime']})") + + return f"Successfully listed {len(functions)} Lambda functions" + except Exception as e: + print(f"Error listing Lambda functions: {e}") + return f"Error: {e}" + +def invoke_lambda_with_boto3(**context): + """Example of invoking Lambda using boto3 client directly""" + lambda_client = boto3.client('lambda') + + # Example payload - modify according to your Lambda function + payload = { + "message": "Hello from Airflow!", + "timestamp": datetime.now().isoformat(), + "dag_id": context['dag'].dag_id, + "task_id": context['task'].task_id + } + + function_name = "mwaa-demo-function" # Demo Lambda function created by CDK + + try: + response = lambda_client.invoke( + FunctionName=function_name, + InvocationType='RequestResponse', # Synchronous invocation + Payload=json.dumps(payload) + ) + + # Read the response + response_payload = json.loads(response['Payload'].read()) + print(f"Lambda response: {response_payload}") + + return response_payload + except lambda_client.exceptions.ResourceNotFoundException: + print(f"Lambda function '{function_name}' not found. This is expected for the demo.") + print("To use this task with a real Lambda function:") + print("1. Create a Lambda function in your AWS account") + print("2. Replace 'your-lambda-function-name' with the actual function name") + print("3. Redeploy your DAGs to S3") + return "Demo completed - Lambda function not found" + except Exception as e: + print(f"Error invoking Lambda function '{function_name}': {e}") + return f"Error: {e}" + +# Default arguments for the DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +# Define the DAG +dag = DAG( + 'lambda_invoke_example', + default_args=default_args, + description='Example DAG showing Lambda function invocation ', + schedule_interval=None, # Manual trigger only + catchup=False, + tags=['lambda', 'example', 'aws'], +) + +# Task 1: List available Lambda functions +list_functions_task = PythonOperator( + task_id='list_lambda_functions', + python_callable=check_lambda_functions, + dag=dag, +) + +# Task 2: Invoke Lambda using boto3 (works for any Lambda function) +invoke_with_boto3_task = PythonOperator( + task_id='invoke_lambda_boto3', + python_callable=invoke_lambda_with_boto3, + dag=dag, +) + +# Define task dependencies +list_functions_task >> invoke_with_boto3_task diff --git a/typescript/airflow-lambda-dynamodb-approval/jest.config.js b/typescript/airflow-lambda-dynamodb-approval/jest.config.js new file mode 100644 index 000000000..08263b895 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/jest.config.js @@ -0,0 +1,8 @@ +module.exports = { + testEnvironment: 'node', + roots: ['/test'], + testMatch: ['**/*.test.ts'], + transform: { + '^.+\\.tsx?$': 'ts-jest' + } +}; diff --git a/typescript/airflow-lambda-dynamodb-approval/lib/aws-mwaa-cdk-stack.ts b/typescript/airflow-lambda-dynamodb-approval/lib/aws-mwaa-cdk-stack.ts new file mode 100644 index 000000000..52a9eafd7 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/lib/aws-mwaa-cdk-stack.ts @@ -0,0 +1,365 @@ +import { Stack, StackProps, RemovalPolicy, CfnOutput, Duration } from 'aws-cdk-lib'; +import * as s3 from 'aws-cdk-lib/aws-s3'; +import * as s3deploy from 'aws-cdk-lib/aws-s3-deployment'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as mwaa from 'aws-cdk-lib/aws-mwaa'; +import * as ec2 from 'aws-cdk-lib/aws-ec2'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import { Construct } from 'constructs'; + +export class AwsMwaaCdkStack extends Stack { + constructor(scope: Construct, id: string, props?: StackProps) { + super(scope, id, props); + + // 1. Create VPC for MWAA (required) + const vpc = new ec2.Vpc(this, 'MwaaVpc', { + maxAzs: 2, + natGateways: 1, + subnetConfiguration: [ + { + cidrMask: 24, + name: 'public', + subnetType: ec2.SubnetType.PUBLIC, + }, + { + cidrMask: 24, + name: 'private', + subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS, + } + ] + }); + + // 2. Create S3 bucket for MWAA DAGs + const dagsBucket = new s3.Bucket(this, 'MwaaDagsBucket', { + bucketName: `mwaa-dags-${this.account}-${this.region}`, + versioned: true, + removalPolicy: RemovalPolicy.DESTROY, // For demo purposes + autoDeleteObjects: true, // For demo purposes + }); + + // 3. Upload local dags folder to the S3 bucket + new s3deploy.BucketDeployment(this, 'DeployDags', { + sources: [s3deploy.Source.asset('./dags')], + destinationBucket: dagsBucket, + destinationKeyPrefix: 'dags', + }); + + // 4. Create Demo Lambda Function for MWAA testing (original simple demo) + const demoLambdaFunction = new lambda.Function(this, 'MwaaTestLambda', { + functionName: 'mwaa-demo-function', + runtime: lambda.Runtime.PYTHON_3_9, + handler: 'index.lambda_handler', + code: lambda.Code.fromInline(` +import json +import datetime + +def lambda_handler(event, context): + """ + Demo Lambda function for MWAA integration testing + Processes data sent from Airflow and returns a response + """ + + # Log the incoming event + print(f"Received event: {json.dumps(event)}") + + # Extract Airflow context if present + airflow_context = { + 'dag_id': event.get('dag_id', 'unknown'), + 'task_id': event.get('task_id', 'unknown'), + 'timestamp': event.get('timestamp', 'unknown'), + 'message': event.get('message', 'Hello from Lambda!') + } + + # Process the data (simple example) + processed_data = { + 'status': 'success', + 'processed_at': datetime.datetime.now().isoformat(), + 'original_message': airflow_context['message'], + 'airflow_context': airflow_context, + 'lambda_function_name': context.function_name, + 'lambda_request_id': context.aws_request_id, + 'processed_by': 'mwaa-demo-lambda' + } + + # Return response + response = { + 'statusCode': 200, + 'body': json.dumps(processed_data) + } + + print(f"Returning response: {json.dumps(response)}") + return response + `), + timeout: Duration.seconds(30), + memorySize: 128, + description: 'Demo Lambda function for MWAA Airflow integration testing' + }); + + // 4. Create DynamoDB Approval Table for Human Approval Workflows + const approvalTable = new dynamodb.Table(this, 'ApprovalTable', { + tableName: `mwaa-approval-table-${this.region}`, + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, + removalPolicy: RemovalPolicy.DESTROY, // For demo purposes + pointInTimeRecovery: false, // For demo purposes + encryption: dynamodb.TableEncryption.AWS_MANAGED, + }); + + // Add GSI for querying by status and timestamp + approvalTable.addGlobalSecondaryIndex({ + indexName: 'approval_status-index', + partitionKey: { + name: 'approval_status', + type: dynamodb.AttributeType.STRING + }, + sortKey: { + name: 'created_at', + type: dynamodb.AttributeType.STRING + } + }); + + // 5. Create MWAA Execution Role + const mwaaExecutionRole = new iam.Role(this, 'MwaaExecutionRole', { + assumedBy: new iam.ServicePrincipal('airflow-env.amazonaws.com'), + inlinePolicies: { + MwaaExecutionRolePolicy: new iam.PolicyDocument({ + statements: [ + // S3 permissions for DAG bucket + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: [ + 's3:GetObject*', + 's3:GetBucket*', + 's3:List*' + ], + resources: [ + dagsBucket.bucketArn, + `${dagsBucket.bucketArn}/*` + ] + }), + // CloudWatch Logs permissions - Comprehensive MWAA logging access + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: [ + 'logs:CreateLogStream', + 'logs:CreateLogGroup', + 'logs:PutLogEvents', + 'logs:GetLogEvents', + 'logs:GetLogRecord', + 'logs:GetLogGroupFields', + 'logs:GetQueryResults', + 'logs:DescribeLogGroups', + 'logs:DescribeLogStreams', + 'logs:DescribeDestinations', + 'logs:DescribeExportTasks', + 'logs:DescribeMetricFilters', + 'logs:DescribeQueries', + 'logs:DescribeResourcePolicies', + 'logs:DescribeSubscriptionFilters', + 'logs:FilterLogEvents', + 'logs:StartQuery', + 'logs:StopQuery' + ], + resources: [ + `arn:aws:logs:${this.region}:${this.account}:log-group:airflow-*`, + `arn:aws:logs:${this.region}:${this.account}:log-group:/aws/mwaa/*`, + `arn:aws:logs:${this.region}:${this.account}:log-group:*airflow*`, + `arn:aws:logs:${this.region}:${this.account}:*` + ] + }), + // CloudWatch Metrics permissions - Comprehensive access + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: [ + 'cloudwatch:PutMetricData', + 'cloudwatch:GetMetricStatistics', + 'cloudwatch:GetMetricData', + 'cloudwatch:ListMetrics', + 'cloudwatch:DescribeAlarms', + 'cloudwatch:DescribeAlarmsForMetric', + 'cloudwatch:GetDashboard', + 'cloudwatch:ListDashboards' + ], + resources: ['*'] + }), + // SQS permissions for Airflow + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: [ + 'sqs:ChangeMessageVisibility', + 'sqs:DeleteMessage', + 'sqs:GetQueueAttributes', + 'sqs:GetQueueUrl', + 'sqs:ReceiveMessage', + 'sqs:SendMessage' + ], + resources: [ + `arn:aws:sqs:${this.region}:*:airflow-celery-*` + ] + }), + // KMS permissions for encryption + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: [ + 'kms:Decrypt', + 'kms:DescribeKey', + 'kms:GenerateDataKey*', + 'kms:Encrypt' + ], + resources: ['*'], + conditions: { + StringEquals: { + 'kms:ViaService': [ + `sqs.${this.region}.amazonaws.com`, + `s3.${this.region}.amazonaws.com` + ] + } + } + }) + ] + }), + LambdaExecutionPolicy: new iam.PolicyDocument({ + statements: [ + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: [ + 'lambda:InvokeFunction', + 'lambda:GetFunction', + 'lambda:ListFunctions' + ], + resources: ['*'], // Allows access to all Lambda functions in the account + sid: 'LambdaInvokePermissions' + }) + ] + }), + DynamoDBAccessPolicy: new iam.PolicyDocument({ + statements: [ + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: [ + 'dynamodb:GetItem', + 'dynamodb:PutItem', + 'dynamodb:UpdateItem', + 'dynamodb:DeleteItem', + 'dynamodb:Query', + 'dynamodb:Scan', + 'dynamodb:BatchGetItem', + 'dynamodb:BatchWriteItem', + 'dynamodb:DescribeTable' + ], + resources: [ + approvalTable.tableArn, + `${approvalTable.tableArn}/index/*` + ], + sid: 'DynamoDBAccessPermissions' + }) + ] + }) + } + }); + + // Grant S3 permissions to the role + dagsBucket.grantReadWrite(mwaaExecutionRole); + + // 5. Create Security Group for MWAA + const mwaaSecurityGroup = new ec2.SecurityGroup(this, 'MwaaSecurityGroup', { + vpc, + description: 'Security group for MWAA environment', + allowAllOutbound: true, + }); + + // Allow inbound traffic within the security group + mwaaSecurityGroup.addIngressRule( + mwaaSecurityGroup, + ec2.Port.allTraffic(), + 'Allow all traffic within security group' + ); + + // 6. Create MWAA Environment + const mwaaEnvironment = new mwaa.CfnEnvironment(this, 'MwaaEnvironment', { + name: 'MyMWAAEnvironment', + dagS3Path: 'dags', + executionRoleArn: mwaaExecutionRole.roleArn, + sourceBucketArn: dagsBucket.bucketArn, + + // Network configuration + networkConfiguration: { + subnetIds: vpc.privateSubnets.map(subnet => subnet.subnetId), + securityGroupIds: [mwaaSecurityGroup.securityGroupId], + }, + + // Environment configuration + environmentClass: 'mw1.small', // Smallest size for cost efficiency + maxWorkers: 2, + minWorkers: 1, + + // Airflow configuration + airflowVersion: '2.7.2', + webserverAccessMode: 'PUBLIC_ONLY', + + // Environment variables for DAGs + airflowConfigurationOptions: { + 'core.default_timezone': 'UTC', + 'webserver.expose_config': 'True' + }, + + // Resource names are automatically discovered by DAGs + // DAGs use the same naming patterns as CDK to find resources automatically + // No manual configuration of Airflow Variables required + + // Logging configuration + loggingConfiguration: { + dagProcessingLogs: { + enabled: true, + logLevel: 'INFO' + }, + schedulerLogs: { + enabled: true, + logLevel: 'INFO' + }, + taskLogs: { + enabled: true, + logLevel: 'INFO' + }, + webserverLogs: { + enabled: true, + logLevel: 'INFO' + }, + workerLogs: { + enabled: true, + logLevel: 'INFO' + } + } + }); + + // 7. Output important information + new CfnOutput(this, 'S3BucketName', { + value: dagsBucket.bucketName, + description: 'Name of the S3 bucket containing DAG files' + }); + + new CfnOutput(this, 'MwaaEnvironmentName', { + value: mwaaEnvironment.name!, + description: 'Name of the MWAA environment' + }); + + new CfnOutput(this, 'MwaaWebServerUrl', { + value: `https://${mwaaEnvironment.attrWebserverUrl}`, + description: 'MWAA Airflow Web Server URL' + }); + + new CfnOutput(this, 'DemoLambdaFunctionName', { + value: demoLambdaFunction.functionName, + description: 'Name of the demo Lambda function for MWAA testing' + }); + + new CfnOutput(this, 'ApprovalTableName', { + value: approvalTable.tableName, + description: 'Name of the DynamoDB approval table for human approval workflows' + }); + } +} diff --git a/typescript/airflow-lambda-dynamodb-approval/package.json b/typescript/airflow-lambda-dynamodb-approval/package.json new file mode 100644 index 000000000..171a668e4 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/package.json @@ -0,0 +1,26 @@ +{ + "name": "aws-mwaa-cdk", + "version": "0.1.0", + "bin": { + "aws-mwaa-cdk": "bin/aws-mwaa-cdk.js" + }, + "scripts": { + "build": "tsc", + "watch": "tsc -w", + "test": "jest", + "cdk": "cdk" + }, + "devDependencies": { + "@types/jest": "^29.5.14", + "@types/node": "22.7.9", + "jest": "^29.7.0", + "ts-jest": "^29.2.5", + "aws-cdk": "2.1025.0", + "ts-node": "^10.9.2", + "typescript": "~5.6.3" + }, + "dependencies": { + "aws-cdk-lib": "2.208.0", + "constructs": "^10.0.0" + } +} \ No newline at end of file diff --git a/typescript/airflow-lambda-dynamodb-approval/test/aws-mwaa-cdk.test.ts b/typescript/airflow-lambda-dynamodb-approval/test/aws-mwaa-cdk.test.ts new file mode 100644 index 000000000..e35d4ccb7 --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/test/aws-mwaa-cdk.test.ts @@ -0,0 +1,103 @@ +import * as cdk from 'aws-cdk-lib'; +import { Template } from 'aws-cdk-lib/assertions'; +import * as AwsMwaaCdk from '../lib/aws-mwaa-cdk-stack'; + +test('MWAA Stack Resources Created', () => { + const app = new cdk.App(); + // WHEN + const stack = new AwsMwaaCdk.AwsMwaaCdkStack(app, 'MyTestStack'); + // THEN + const template = Template.fromStack(stack); + + // Test S3 bucket for DAGs is created + template.resourceCountIs('AWS::S3::Bucket', 1); + + // Test MWAA environment is created + template.resourceCountIs('AWS::MWAA::Environment', 1); + + // Test VPC is created + template.resourceCountIs('AWS::EC2::VPC', 1); + + // Test DynamoDB table for approval workflow is created + template.resourceCountIs('AWS::DynamoDB::Table', 1); + + // Test our demo Lambda function is created (CDK creates additional Lambda functions for infrastructure) + template.hasResourceProperties('AWS::Lambda::Function', { + FunctionName: 'mwaa-demo-function' + }); + + // Test IAM role for MWAA execution is created (CDK creates additional roles for infrastructure) + template.hasResourceProperties('AWS::IAM::Role', { + AssumeRolePolicyDocument: { + Statement: [ + { + Effect: 'Allow', + Principal: { + Service: 'airflow-env.amazonaws.com' + }, + Action: 'sts:AssumeRole' + } + ] + } + }); + + // Test security group is created + template.resourceCountIs('AWS::EC2::SecurityGroup', 1); +}); + +test('MWAA Environment Configuration', () => { + const app = new cdk.App(); + const stack = new AwsMwaaCdk.AwsMwaaCdkStack(app, 'MyTestStack'); + const template = Template.fromStack(stack); + + // Test MWAA environment has correct configuration + template.hasResourceProperties('AWS::MWAA::Environment', { + Name: 'MyMWAAEnvironment', + EnvironmentClass: 'mw1.small', + MinWorkers: 1, + MaxWorkers: 2, + AirflowVersion: '2.7.2', + WebserverAccessMode: 'PUBLIC_ONLY' + }); +}); + +test('DynamoDB Table Configuration', () => { + const app = new cdk.App(); + const stack = new AwsMwaaCdk.AwsMwaaCdkStack(app, 'MyTestStack'); + const template = Template.fromStack(stack); + + // Test DynamoDB table configuration for approval workflow + template.hasResourceProperties('AWS::DynamoDB::Table', { + BillingMode: 'PAY_PER_REQUEST', + KeySchema: [{ + AttributeName: 'id', + KeyType: 'HASH' + }] + }); +}); + +test('Lambda Function Configuration', () => { + const app = new cdk.App(); + const stack = new AwsMwaaCdk.AwsMwaaCdkStack(app, 'MyTestStack'); + const template = Template.fromStack(stack); + + // Test Lambda function configuration + template.hasResourceProperties('AWS::Lambda::Function', { + FunctionName: 'mwaa-demo-function', + Runtime: 'python3.9', + Handler: 'index.lambda_handler' + }); +}); + +test('S3 Bucket Configuration', () => { + const app = new cdk.App(); + const stack = new AwsMwaaCdk.AwsMwaaCdkStack(app, 'MyTestStack'); + const template = Template.fromStack(stack); + + // Test S3 bucket has versioning enabled + template.hasResourceProperties('AWS::S3::Bucket', { + VersioningConfiguration: { + Status: 'Enabled' + } + }); +}); diff --git a/typescript/airflow-lambda-dynamodb-approval/tsconfig.json b/typescript/airflow-lambda-dynamodb-approval/tsconfig.json new file mode 100644 index 000000000..28bb557fa --- /dev/null +++ b/typescript/airflow-lambda-dynamodb-approval/tsconfig.json @@ -0,0 +1,31 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "lib": [ + "es2022" + ], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "typeRoots": [ + "./node_modules/@types" + ] + }, + "exclude": [ + "node_modules", + "cdk.out" + ] +}