Skip to content

CDK Seed/stepfunction event logger #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7bee2be
initial commit for stepfunction logger
leothomas Nov 12, 2020
5c828b2
added base lamdba code
leothomas Nov 12, 2020
0c40d09
Merge branch 'main' into seed/stepfunction-event-logger
leothomas Nov 12, 2020
63194c1
finalized dynamodb version of event logger
leothomas Nov 16, 2020
fde747e
removed commented code
leothomas Nov 16, 2020
25b8a69
added permissions for SQS Message processor lambda to access dynamodb…
leothomas Nov 16, 2020
c3570e0
add build scripts
markdboyd Nov 16, 2020
8ee92e5
refactor TS code to remove ! expressions
markdboyd Nov 16, 2020
3bcca1f
moved statement granting lambda access to SQS to outside the if block
leothomas Nov 16, 2020
28522e0
move env vars
markdboyd Nov 16, 2020
529016d
move datastore creation to a method
markdboyd Nov 16, 2020
1e9a85b
use Datastore ENUM
markdboyd Nov 16, 2020
ca75224
update type for eventLoggingLevel
markdboyd Nov 16, 2020
bf9d980
figured out typechecking
leothomas Nov 16, 2020
ff6f02c
added typeguard, big thanks to @alukach and @mbody!
leothomas Nov 16, 2020
6610daa
removed unused typecheck
leothomas Nov 16, 2020
453aec9
finalized MVP
leothomas Nov 19, 2020
cd40de9
updated event logger
leothomas Nov 19, 2020
60ad9f0
fixed issue with custom error handling lambda not being assigned to t…
leothomas Nov 19, 2020
cc1956a
updated lambda to contain correct env var key
leothomas Nov 23, 2020
57d72c4
initialized infrastructure for unit tests
leothomas Nov 25, 2020
f5e26b5
added preliminary unit tests
leothomas Nov 25, 2020
97c57d1
inlcuded .vscode/* in .gitignore
leothomas Nov 25, 2020
7b95881
working on unit tests
leothomas Nov 25, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules/
npm-debug.log
lerna-debug.log
*.vscode*
24 changes: 24 additions & 0 deletions packages/stepfunction-event-logger/Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]
pytest = "6.1.2"
black = "20.8b1"
flake8 = "3.8.4"
moto = "1.3.16"
boto3 = "1.16.25"

[packages]

[scripts]
unit-tests = "pytest lib/lambda/unit_tests"

[requires]
python_version = "3.8"


[pipenv]
allow_prereleases = true

708 changes: 708 additions & 0 deletions packages/stepfunction-event-logger/Pipfile.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions packages/stepfunction-event-logger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# `@cdk-seed/stepfunction-event-logger`

> TODO: description
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see this README fleshed out a bit more


## Usage

```
const stepfunctionEventLogger = require('@cdk-seed/stepfunction-event-logger');

// TODO: DEMONSTRATE API
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
'use strict';

const stepfunctionEventLogger = require('..');

describe('@cdk-seed/stepfunction-event-logger', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if implemented, the tests would demonstrate that CDK code creates the right infrastructure? This whole thing of using tests to valid your infra-as-code is wild to me.

If so, is there any reason we're not implementing that?

it('needs tests');
});
223 changes: 223 additions & 0 deletions packages/stepfunction-event-logger/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import { Construct, Duration } from "@aws-cdk/core";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this warning from a build of this package (both packages in fact)

lib/index.ts:32:1 - warning JSII5: The type "constructs.Construct" is exposed in the public API of this module. Therefore, the module "constructs" must also be defined under "peerDependencies". This will be auto-corrected unless --no-fix-peer-dependencies was specified.

It seems like it's just saying you need to add this module as a peer dependency: https://www.npmjs.com/package/constructs?activeTab=versions

import * as sqs from "@aws-cdk/aws-sqs";
import * as events from "@aws-cdk/aws-events";
import * as events_targets from "@aws-cdk/aws-events-targets";
import * as lambda from "@aws-cdk/aws-lambda";
import * as lambda_event_sources from "@aws-cdk/aws-lambda-event-sources";
import * as dynamodb from "@aws-cdk/aws-dynamodb";
import * as path from "path";
import { StateMachine } from "@aws-cdk/aws-stepfunctions";

export enum EventLoggingLevel {
FULL = "FULL",
SUMMARY = "SUMMARY"
}
export enum Datastore {
DYNAMODB = "Dynamodb",
POSTGRES = "Postgres"
}

export interface EventLoggerBaseProps {
readonly stepfunctions: Array<StateMachine>
}
export interface EventLoggerCustomLambdaProps extends EventLoggerBaseProps {
readonly lambda: lambda.Function
}
export interface EventLoggerStandardLambdaProps extends EventLoggerBaseProps {
readonly eventLoggingLevel: EventLoggingLevel
readonly datastore: Datastore
// readonly dynamodbSettings?: DynamodbSettings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a personal preference, but I prefer to delete commented code that isn't needed yet

}

export class StepFunctionEventLogger extends Construct {
constructor(scope: Construct, id: string, props: EventLoggerStandardLambdaProps | EventLoggerCustomLambdaProps) {
super(scope, id);

function isCustomLambda(props: EventLoggerStandardLambdaProps | EventLoggerCustomLambdaProps): props is EventLoggerCustomLambdaProps {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very well done

return (props as EventLoggerCustomLambdaProps).lambda !== undefined
};

const deadLetterQueue = new sqs.Queue(
this, "EventLoggerDeadLetterQueue", {
retentionPeriod: Duration.days(14)
});

const mainQueue = new sqs.Queue(
this, "EventLoggingQueue", {
deadLetterQueue: {
maxReceiveCount: 6,
queue: deadLetterQueue
},
retentionPeriod: Duration.days(1),
visibilityTimeout: Duration.minutes(1)
});

var stepfunctionArns = Array<string>();

props.stepfunctions.forEach(sf => {
stepfunctionArns.push(sf.stateMachineArn)
});
Comment on lines +55 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm nitpicking a bit here, but var is pretty much obsolete with const and let in ES6:

https://blog.usejournal.com/awesome-javascript-no-more-var-working-title-999428999994

Also, I'd recommend using a functional style operator when building an array from another array, as you're doing here. You get the immutability benefits of FP and I think it's a bit more expressive. So in this case:

       const stepfunctionArns: Array<string>() = props.stepfunctions.map(sf => sf.stateMachineArn);


new events.Rule(
this, "EventNotificationToSQSRule",
{
ruleName: "SendNewStepFunctionEventToQueue",
description: "Captures status changed events from StepFunction and creates a message in SQS",
eventPattern: {
detail: {
status: ["SUCCEEDED", "FAILED", "ABORTED", "TIMED_OUT"],
stateMachineArn: stepfunctionArns
},
detailType: ["Step Functions Execution Status Change"],
source: ["aws.states"],
},
targets: [new events_targets.SqsQueue(mainQueue)]
}
)
const SQSMessageProcessorFunction = isCustomLambda(props) ? props.lambda : this.createMessageProcessorFunction(props.eventLoggingLevel, props.datastore)

// grants message processor lambda permission to consume messages from SQS Queue
mainQueue.grantConsumeMessages(SQSMessageProcessorFunction)

SQSMessageProcessorFunction.addEventSource(new lambda_event_sources.SqsEventSource(mainQueue))

// grants message processor lambda permsisions to read stepfunction execution history
props.stepfunctions.forEach(sf => { sf.grantRead(SQSMessageProcessorFunction) });
}


createMessageProcessorFunction(
eventLoggingLevel: EventLoggingLevel,
datastore: Datastore
) {
const SQSMessageProcessorFunction = new lambda.Function(
this, "SQSMessageProcessorFunction",
{
runtime: lambda.Runtime.PYTHON_3_8,
code: lambda.Code.fromAsset(path.join(__dirname, "lambda", "src")),
handler: "event_logger.handler",
timeout: Duration.minutes(1),

}
);

SQSMessageProcessorFunction.addEnvironment(
"EVENT_LOGGING_LEVEL", eventLoggingLevel
)

SQSMessageProcessorFunction.addEnvironment(
"DATASTORE_TYPE", datastore
)

this.createDatastore(SQSMessageProcessorFunction, datastore, eventLoggingLevel);

return SQSMessageProcessorFunction;
}

createDatastore(
SQSMessageProcessorFunction: lambda.Function,
datastore: Datastore,
loglevel: EventLoggingLevel
) {
if (datastore === Datastore.DYNAMODB) {
const dynamodb_datastore = new dynamodb.Table(
this, "EventsDatastore", {
partitionKey: {
name: "execution_id",
type: dynamodb.AttributeType.STRING
},
sortKey: {
name: "step_id",
type: dynamodb.AttributeType.STRING
},
});
// TODO: make capacity and indexes configurable
dynamodb_datastore.autoScaleReadCapacity({
minCapacity: 5,
maxCapacity: 10000
})
dynamodb_datastore.autoScaleWriteCapacity({
minCapacity: 5,
maxCapacity: 10000
})


if (loglevel === EventLoggingLevel.FULL) {
dynamodb_datastore.addGlobalSecondaryIndex(
{
indexName: "EventType-Timestamp-Index",
partitionKey: {
name: "type",
type: dynamodb.AttributeType.STRING
},
sortKey: {
name: "timestamp",
type: dynamodb.AttributeType.STRING
}
}
)


dynamodb_datastore.autoScaleGlobalSecondaryIndexReadCapacity(
"EventType-Timestamp-Index",
{
minCapacity: 5,
maxCapacity: 10000
}
)
dynamodb_datastore.autoScaleGlobalSecondaryIndexWriteCapacity(
"EventType-Timestamp-Index",
{
minCapacity: 5,
maxCapacity: 10000
}
)
} else if (loglevel === EventLoggingLevel.SUMMARY) {
dynamodb_datastore.addGlobalSecondaryIndex(
{
indexName: "ExecutionStatus-Timestamp-Index",
partitionKey: {
name: "Status",
type: dynamodb.AttributeType.STRING
},
sortKey: {
name: "timestamp",
type: dynamodb.AttributeType.STRING
}
}
)


dynamodb_datastore.autoScaleGlobalSecondaryIndexReadCapacity(
"ExecutionStatus-Timestamp-Index",
{
minCapacity: 5,
maxCapacity: 10000
}
)
dynamodb_datastore.autoScaleGlobalSecondaryIndexWriteCapacity(
"ExecutionStatus-Timestamp-Index",
{
minCapacity: 5,
maxCapacity: 10000
}
)
}


// grants message processor lambda permission to write to DynamoDB
dynamodb_datastore.grantWriteData(SQSMessageProcessorFunction)

SQSMessageProcessorFunction.addEnvironment(
"DATASTORE_ARN", dynamodb_datastore.tableArn
)
} else if (datastore === Datastore.POSTGRES) {
// TODO:
// const postgres_datastore = new ...

// TODO: grant lambda access to postgres_datastore
// datastoreArn = postgres_datastore.tableArn;
};
}

}
Empty file.
Empty file.
98 changes: 98 additions & 0 deletions packages/stepfunction-event-logger/lib/lambda/src/event_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import json
import os
from decimal import Decimal
from datetime import datetime
from typing import List
from .utils.types import (
ExecutionDetails,
CloudwatchEvent,
SqsMessage,
StepFunctionHistoryEvent,
)
from .utils.services import stepfunction_client as sfn, dynamodb_resource as ddb

TIMESTAMP_FMT = "%Y-%m-%dT%H:%M%S.%f%z"


def get_steps_details(execution_arn: str) -> List[StepFunctionHistoryEvent]:

response = sfn().get_execution_history(executionArn=execution_arn)
execution_history: List[StepFunctionHistoryEvent] = response["events"]
while "nextToken" in response:
response = sfn().get_execution_history(executionArn=execution_arn)
execution_history.extend(response["events"])

return [
{
"execution_id": execution_arn.split(":")[-1],
"stepfunction_name": execution_arn.split(":")[-2],
"step_id": f"{i['timestamp'].strftime(TIMESTAMP_FMT)}_{i['id']}",
"step_number": i["id"],
**{k: v for k, v in i.items() if k != "id"},
}
for i in execution_history
]


def generate_table_records(sqs_message: SqsMessage):
"""
Handler to process an SQS message generated by a StepFunction FAILED or SUCCEEDED
EventBridge Rule. First the StepFunction execution history is queried and
appropriate DynamoDB records are generated to store the successfull ingestion or
the failure.

"""
msg: CloudwatchEvent = json.loads(sqs_message["body"])
detail: ExecutionDetails = msg["detail"]

execution_id = detail["executionArn"].split(":")[-1]
stepfunction_name = detail["executionArn"].split(":")[-2]

items = [
{
"execution_id": execution_id,
"step_id": f"{datetime.fromtimestamp(detail['startDate'] / 1000).strftime(TIMESTAMP_FMT)}_summary",
"stepfunction_name": stepfunction_name,
"status": detail["status"],
"input": json.loads(detail["input"]),
"output": json.loads(detail["output"]) if detail["output"] else "",
"startDate": datetime.fromtimestamp(detail["startDate"] / 1000).strftime(
TIMESTAMP_FMT
),
"stopDate": datetime.fromtimestamp(detail["stopDate"] / 1000).strftime(
TIMESTAMP_FMT
),
"startDate_raw": detail["startDate"],
"stopDate_raw": detail["stopDate"],
}
]

if os.environ.get("EVENT_LOGGING_LEVEL", "") == "SUMMARY":
return items

items.extend(get_steps_details(detail["executionArn"]))
return items


def handle_dynamodb(event):
table_name = os.environ["DATASTORE_ARN"].split("/")[-1]
table = ddb().Table(table_name)

with table.batch_writer() as batch:
for sqs_message in event["Records"]:
for item in generate_table_records(sqs_message):
batch.put_item(
Item=json.loads(json.dumps(item, default=str), parse_float=Decimal)
)


def handle_postgres(event):
# TODO - implement
raise NotImplementedError


def handler(event, context):
if os.environ["DATASTORE_TYPE"] == "Dynamodb":
return handle_dynamodb(event)
if os.environ["DATASTORE_TYPE"] == "Postgres":
return handle_postgres(event)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
def stepfunction_client():
import boto3
from botocore.config import Config

return boto3.client(
"stepfunctions", config=Config(retries={"max_attempts": 10, "mode": "standard"})
)


def dynamodb_resource():
import boto3

return boto3.resource("dynamodb")
Loading