From 778d8252518479b026e07a20b773e688bc65b988 Mon Sep 17 00:00:00 2001 From: psteinroe Date: Tue, 13 Jan 2026 09:01:21 +0100 Subject: [PATCH] feat(sink): add AWS SQS sink for queue messaging --- Cargo.lock | 622 +++++++++++++++++++++++++++++++++--- Cargo.toml | 4 + src/config/sink.rs | 8 + src/core.rs | 18 ++ src/sink/mod.rs | 13 + src/sink/sqs.rs | 288 +++++++++++++++++ src/test_utils/container.rs | 41 +++ tests/sqs_sink_tests.rs | 216 +++++++++++++ 8 files changed, 1165 insertions(+), 45 deletions(-) create mode 100644 src/sink/sqs.rs create mode 100644 tests/sqs_sink_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 3bcd855..f35f78e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -304,7 +304,7 @@ dependencies = [ "thiserror 1.0.69", "time", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-util", "tokio-websockets", "tracing", @@ -362,6 +362,48 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-config" +version = "1.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96571e6996817bf3d58f6b569e4b9fd2e9d2fcf9f7424eed07b2ce9bb87535e5" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "hex", + "http 1.4.0", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cd362783681b15d136480ad555a099e82ecd8e2d10a841e14dfd0078d67fee3" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + [[package]] name = "aws-lc-rs" version = "1.15.2" @@ -384,6 +426,321 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-runtime" +version = "1.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d81b5b2898f6798ad58f484856768bca817e3cd9de0974c24ae0f1113fe88f1b" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-sqs" +version = "1.91.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e007a58d1e4be0e611a664d6b1cba4523011f67589619048c1902becef3e891f" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.91.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee6402a36f27b52fe67661c6732d684b2635152b676aa2babbfb5204f99115d" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.93.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a45a7f750bbd170ee3677671ad782d90b894548f4e4ae168302c57ec9de5cb3e" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.95.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55542378e419558e6b1f398ca70adb0b2088077e79ad9f14eb09441f2f7b2164" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand 2.3.0", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e523e1c4e8e7e8ff219d732988e22bfeae8a1cafdbe6d9eca1546fa080be7c" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.4.0", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ee19095c7c4dda59f1697d028ce704c24b2d33c6718790c7f1d5a3015b4107c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.62.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826141069295752372f8203c17f28e30c464d22899a43a0c9fd9c458d469c88b" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "futures-util", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http-client" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59e62db736db19c488966c8d787f52e6270be565727236fd5579eaa301e7bc4a" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "h2 0.3.27", + "h2 0.4.13", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper 1.8.1", + "hyper-rustls 0.24.2", + "hyper-rustls 0.27.7", + "hyper-util", + "pin-project-lite", + "rustls 0.21.12", + "rustls 0.23.36", + "rustls-native-certs 0.8.3", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.4", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.61.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-observability" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17f616c3f2260612fe44cede278bafa18e73e6479c4e393e2c4518cf2a9a228a" +dependencies = [ + "aws-smithy-runtime-api", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae5d689cf437eae90460e944a58b5668530d433b4ff85789e69d2f2a556e057d" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-client", + "aws-smithy-observability", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "pin-project-lite", + "pin-utils", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0d43d899f9e508300e587bf582ba54c27a452dd0a9ea294690669138ae14a2" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.4.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "905cb13a9895626d49cf2ced759b062d913834c7482c38e49557eac4e6193f01" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11b2f670422ff42bf7065031e72b45bc52a3508bd089f743ea90731ca2b6ea57" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d980627d2dd7bfc32a3c025685a033eeab8d365cc840c631ef59d1b8f428164" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "backon" version = "1.6.0" @@ -405,6 +762,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.8.3" @@ -470,16 +837,16 @@ dependencies = [ "futures-util", "hex", "home", - "http", + "http 1.4.0", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-named-pipe", - "hyper-rustls", + "hyper-rustls 0.27.7", "hyper-util", "hyperlocal", "log", "pin-project-lite", - "rustls", + "rustls 0.23.36", "rustls-native-certs 0.8.3", "rustls-pemfile", "rustls-pki-types", @@ -528,6 +895,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cbc" version = "0.1.2" @@ -1052,12 +1429,12 @@ dependencies = [ "pg_escape", "pin-project-lite", "postgres-replication", - "rustls", + "rustls 0.23.36", "serde_json", "sqlx", "tokio", "tokio-postgres", - "tokio-rustls", + "tokio-rustls 0.26.4", "tracing", "uuid", "x509-cert", @@ -1401,6 +1778,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "h2" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.13.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.13" @@ -1412,7 +1808,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.4.0", "indexmap 2.13.0", "slab", "tokio", @@ -1525,6 +1921,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.4.0" @@ -1535,6 +1942,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -1542,7 +1960,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.4.0", ] [[package]] @@ -1553,8 +1971,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -1570,6 +1988,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.8.1" @@ -1580,9 +2022,9 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -1600,7 +2042,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" dependencies = [ "hex", - "hyper", + "hyper 1.8.1", "hyper-util", "pin-project-lite", "tokio", @@ -1608,19 +2050,35 @@ dependencies = [ "winapi", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.4.0", + "hyper 1.8.1", "hyper-util", - "rustls", + "rustls 0.23.36", + "rustls-native-certs 0.8.3", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower-service", "webpki-roots 1.0.5", ] @@ -1636,9 +2094,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.4.0", + "http-body 1.0.1", + "hyper 1.8.1", "ipnet", "libc", "percent-encoding", @@ -1657,7 +2115,7 @@ checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" dependencies = [ "hex", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "pin-project-lite", "tokio", @@ -2046,7 +2504,7 @@ checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" dependencies = [ "base64 0.22.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "indexmap 2.13.0", "ipnet", @@ -2246,6 +2704,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391" +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + [[package]] name = "p12-keystore" version = "0.1.5" @@ -2596,6 +3060,8 @@ version = "0.1.0" dependencies = [ "anyhow", "async-nats", + "aws-config", + "aws-sdk-sqs", "chrono", "config", "const-oid", @@ -2611,7 +3077,7 @@ dependencies = [ "redis", "reqwest", "ring", - "rustls", + "rustls 0.23.36", "rustls-pemfile", "secrecy", "serde", @@ -2627,7 +3093,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "tokio-postgres", - "tokio-rustls", + "tokio-rustls 0.26.4", "tracing", "tracing-subscriber", "uuid", @@ -2718,7 +3184,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls", + "rustls 0.23.36", "socket2 0.6.1", "thiserror 2.0.17", "tokio", @@ -2738,7 +3204,7 @@ dependencies = [ "rand 0.9.2", "ring", "rustc-hash", - "rustls", + "rustls 0.23.36", "rustls-pki-types", "slab", "thiserror 2.0.17", @@ -2997,6 +3463,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d942b98df5e658f56f20d592c7f868833fe38115e65c33003d8cd224b0155da" + [[package]] name = "regex-syntax" version = "0.8.8" @@ -3012,25 +3484,25 @@ dependencies = [ "base64 0.22.1", "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", + "hyper 1.8.1", + "hyper-rustls 0.27.7", "hyper-util", "js-sys", "log", "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.36", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower", "tower-http", "tower-service", @@ -3126,6 +3598,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.36" @@ -3149,7 +3633,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70cc376c6ba1823ae229bacf8ad93c136d93524eab0e4e5e0e4f96b9c4e5b212" dependencies = [ "log", - "rustls", + "rustls 0.23.36", "rustls-native-certs 0.7.3", "rustls-pki-types", "rustls-webpki 0.103.8", @@ -3199,6 +3683,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.102.8" @@ -3293,6 +3787,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "secrecy" version = "0.10.3" @@ -3646,7 +4150,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rustls", + "rustls 0.23.36", "serde", "serde_json", "sha2", @@ -4150,13 +4654,23 @@ dependencies = [ "whoami", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls", + "rustls 0.23.36", "tokio", ] @@ -4209,14 +4723,14 @@ dependencies = [ "bytes", "futures-core", "futures-sink", - "http", + "http 1.4.0", "httparse", "rand 0.8.5", "ring", "rustls-native-certs 0.8.3", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-util", ] @@ -4274,8 +4788,8 @@ dependencies = [ "bitflags 2.10.0", "bytes", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "iri-string", "pin-project-lite", "tower", @@ -4416,6 +4930,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -4445,6 +4965,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "waker-fn" version = "1.2.0" @@ -4910,9 +5436,9 @@ dependencies = [ "base64 0.22.1", "deadpool", "futures", - "http", + "http 1.4.0", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "log", "once_cell", @@ -4973,6 +5499,12 @@ dependencies = [ "rustix 1.1.3", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yaml-rust2" version = "0.8.1" @@ -5089,6 +5621,6 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac93432f5b761b22864c774aac244fa5c0fd877678a4c37ebf6cf42208f9c9ec" +checksum = "bd8f3f50b848df28f887acb68e41201b5aea6bc8a8dacc00fb40635ff9a72fea" diff --git a/Cargo.toml b/Cargo.toml index 59ee93e..f766cd0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ sink-nats = ["dep:async-nats"] sink-rabbitmq = ["dep:lapin"] sink-redis-streams = ["dep:redis"] sink-redis-strings = ["dep:redis"] +sink-sqs = ["dep:aws-sdk-sqs", "dep:aws-config"] sink-webhook = ["dep:reqwest"] test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"] @@ -54,6 +55,8 @@ uuid = { version = "1.19.0", default-features = false, features = ["v4"] # Optional sink dependencies. async-nats = { version = "0.38", optional = true } +aws-config = { version = "1.5", optional = true } +aws-sdk-sqs = { version = "1.60", optional = true } lapin = { version = "2.5", optional = true } rdkafka = { version = "0.36", optional = true, default-features = false, features = ["tokio", "cmake-build"] } redis = { version = "0.27", default-features = false, features = [ @@ -79,6 +82,7 @@ testcontainers-modules = { version = "0.11", optional = true, features = [ "nats", "rabbitmq", "kafka", + "elasticmq", "blocking", ] } diff --git a/src/config/sink.rs b/src/config/sink.rs index 19b7bb0..63c4717 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -18,6 +18,9 @@ use crate::sink::rabbitmq::RabbitmqSinkConfig; #[cfg(feature = "sink-webhook")] use crate::sink::webhook::WebhookSinkConfig; +#[cfg(feature = "sink-sqs")] +use crate::sink::sqs::SqsSinkConfig; + /// Sink destination configuration. /// /// Determines where replicated events are sent. @@ -56,4 +59,9 @@ pub enum SinkConfig { #[cfg(feature = "sink-kafka")] #[serde(rename = "kafka")] Kafka(KafkaSinkConfig), + + /// AWS SQS sink for queue messaging. + #[cfg(feature = "sink-sqs")] + #[serde(rename = "sqs")] + Sqs(SqsSinkConfig), } diff --git a/src/core.rs b/src/core.rs index 5d728fe..19333e3 100644 --- a/src/core.rs +++ b/src/core.rs @@ -155,6 +155,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> { })?; AnySink::Kafka(s) } + + #[cfg(feature = "sink-sqs")] + SinkConfig::Sqs(cfg) => { + use crate::sink::sqs::SqsSink; + let s = SqsSink::new(cfg.clone()).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to create SQS sink", + e.to_string() + ) + })?; + AnySink::Sqs(s) + } }; // Create PgStream as an ETL destination @@ -230,6 +243,11 @@ fn log_sink_config(config: &SinkConfig) { SinkConfig::Kafka(_cfg) => { debug!("using kafka sink"); } + + #[cfg(feature = "sink-sqs")] + SinkConfig::Sqs(_cfg) => { + debug!("using sqs sink"); + } } } diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 0af9a4f..f759290 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -19,6 +19,9 @@ pub mod webhook; #[cfg(feature = "sink-kafka")] pub mod kafka; +#[cfg(feature = "sink-sqs")] +pub mod sqs; + pub use base::Sink; use etl::error::EtlResult; @@ -42,6 +45,9 @@ use webhook::WebhookSink; #[cfg(feature = "sink-kafka")] use kafka::KafkaSink; +#[cfg(feature = "sink-sqs")] +use sqs::SqsSink; + use crate::types::TriggeredEvent; /// Wrapper enum for all supported sink types. @@ -76,6 +82,10 @@ pub enum AnySink { /// Kafka sink for Apache Kafka messaging. #[cfg(feature = "sink-kafka")] Kafka(KafkaSink), + + /// AWS SQS sink for queue messaging. + #[cfg(feature = "sink-sqs")] + Sqs(SqsSink), } impl Sink for AnySink { @@ -104,6 +114,9 @@ impl Sink for AnySink { #[cfg(feature = "sink-kafka")] AnySink::Kafka(sink) => sink.publish_events(events).await, + + #[cfg(feature = "sink-sqs")] + AnySink::Sqs(sink) => sink.publish_events(events).await, } } } diff --git a/src/sink/sqs.rs b/src/sink/sqs.rs new file mode 100644 index 0000000..b545432 --- /dev/null +++ b/src/sink/sqs.rs @@ -0,0 +1,288 @@ +//! AWS SQS sink for publishing events to an Amazon SQS queue. +//! +//! Sends each event's payload as JSON to a queue URL determined by: +//! 1. `queue_url` key in event metadata (from subscription's metadata/metadata_extensions) +//! 2. Fallback to `queue_url` in sink config +//! +//! # Dynamic Routing +//! +//! The target queue URL can be configured per-event using metadata_extensions: +//! +//! ```sql +//! metadata_extensions = '[ +//! {"json_path": "queue_url", "expression": "''https://sqs.us-east-1.amazonaws.com/123456789/'' || queue_name"} +//! ]' +//! ``` + +use aws_sdk_sqs::Client; +use aws_sdk_sqs::types::SendMessageBatchRequestEntry; +use etl::error::EtlResult; +use futures::future::try_join_all; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::sink::Sink; +use crate::types::TriggeredEvent; + +/// Maximum number of messages per SQS batch request. +const SQS_MAX_BATCH_SIZE: usize = 10; + +/// Configuration for the AWS SQS sink. +/// +/// This intentionally does not implement [`Serialize`] to avoid accidentally +/// leaking secrets (AWS credentials, endpoint URLs) in serialized forms. +#[derive(Clone, Debug, Deserialize)] +pub struct SqsSinkConfig { + /// SQS queue URL to send messages to. Optional if provided via event metadata. + pub queue_url: Option, + + /// AWS region (e.g., "us-east-1"). + pub region: String, + + /// Optional custom endpoint URL for LocalStack or ElasticMQ testing. + #[serde(default)] + pub endpoint_url: Option, + + /// Optional AWS access key ID (uses default credentials chain if not set). + #[serde(default)] + pub access_key_id: Option, + + /// Optional AWS secret access key (uses default credentials chain if not set). + #[serde(default)] + pub secret_access_key: Option, +} + +/// Configuration for the AWS SQS sink without sensitive data. +/// +/// Safe to serialize and log. Use this for debugging and metrics. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SqsSinkConfigWithoutSecrets { + /// SQS queue URL to send messages to (if configured). + pub queue_url: Option, + + /// AWS region. + pub region: String, + + /// Whether a custom endpoint is configured. + pub has_custom_endpoint: bool, + + /// Whether explicit credentials are configured. + pub has_explicit_credentials: bool, +} + +impl From for SqsSinkConfigWithoutSecrets { + fn from(config: SqsSinkConfig) -> Self { + Self { + queue_url: config.queue_url, + region: config.region, + has_custom_endpoint: config.endpoint_url.is_some(), + has_explicit_credentials: config.access_key_id.is_some(), + } + } +} + +impl From<&SqsSinkConfig> for SqsSinkConfigWithoutSecrets { + fn from(config: &SqsSinkConfig) -> Self { + Self { + queue_url: config.queue_url.clone(), + region: config.region.clone(), + has_custom_endpoint: config.endpoint_url.is_some(), + has_explicit_credentials: config.access_key_id.is_some(), + } + } +} + +/// Sink that sends events to an AWS SQS queue. +/// +/// Each event's payload is sent as JSON to the configured queue. +/// The sink uses the AWS SDK with automatic retry handling. +#[derive(Clone)] +pub struct SqsSink { + /// AWS SQS client. + client: Arc, + + /// Default queue URL. Can be overridden per-event via metadata. + queue_url: Option, +} + +impl SqsSink { + /// Creates a new SQS sink from configuration. + /// + /// # Errors + /// + /// Returns an error if the AWS client cannot be configured. + pub async fn new( + config: SqsSinkConfig, + ) -> Result> { + let region = aws_sdk_sqs::config::Region::new(config.region); + + let mut sdk_config_loader = + aws_config::defaults(aws_config::BehaviorVersion::latest()).region(region); + + // Configure custom endpoint if provided (for LocalStack/ElasticMQ). + if let Some(endpoint) = &config.endpoint_url { + sdk_config_loader = sdk_config_loader.endpoint_url(endpoint); + } + + // Configure explicit credentials if provided. + if let (Some(access_key), Some(secret_key)) = + (&config.access_key_id, &config.secret_access_key) + { + sdk_config_loader = + sdk_config_loader.credentials_provider(aws_sdk_sqs::config::Credentials::new( + access_key.clone(), + secret_key.clone(), + None, + None, + "pgstream", + )); + } + + let sdk_config = sdk_config_loader.load().await; + let client = Client::new(&sdk_config); + + Ok(Self { + client: Arc::new(client), + queue_url: config.queue_url, + }) + } + + /// Resolves the queue URL for an event from metadata or config. + fn resolve_queue_url<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> { + // First check event metadata for dynamic queue URL + if let Some(ref metadata) = event.metadata { + if let Some(queue_url) = metadata.get("queue_url").and_then(|v| v.as_str()) { + return Some(queue_url); + } + } + // Fall back to config queue URL + self.queue_url.as_deref() + } +} + +impl Sink for SqsSink { + fn name() -> &'static str { + "sqs" + } + + async fn publish_events(&self, events: Vec) -> EtlResult<()> { + if events.is_empty() { + return Ok(()); + } + + // Group events by queue URL for batch sending. + let mut events_by_queue: HashMap> = HashMap::new(); + + for (idx, event) in events.into_iter().enumerate() { + let queue_url = self.resolve_queue_url(&event).ok_or_else(|| { + etl::etl_error!( + etl::error::ErrorKind::ConfigError, + "No queue URL configured", + "Queue URL must be provided in sink config or event metadata" + ) + })?; + + events_by_queue + .entry(queue_url.to_string()) + .or_default() + .push((idx, event)); + } + + // Prepare all batch requests. + let mut batch_futures = Vec::new(); + + for (queue_url, mut queue_events) in events_by_queue { + // Process in chunks of SQS_MAX_BATCH_SIZE (SQS limit is 10 per batch). + while !queue_events.is_empty() { + let chunk_size = queue_events.len().min(SQS_MAX_BATCH_SIZE); + let chunk: Vec<_> = queue_events.drain(..chunk_size).collect(); + + let mut entries = Vec::with_capacity(chunk.len()); + + for (idx, event) in chunk { + let entry = SendMessageBatchRequestEntry::builder() + .id(idx.to_string()) + .message_body(event.payload.to_string()) + .build() + .map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to build batch entry", + e.to_string() + ) + })?; + + entries.push(entry); + } + + // Create future for this batch. + let client = self.client.clone(); + let queue_url = queue_url.clone(); + batch_futures.push(async move { + let result = client + .send_message_batch() + .queue_url(&queue_url) + .set_entries(Some(entries)) + .send() + .await + .map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Failed to send batch to SQS", + e.to_string() + ) + })?; + + // Check for partial failures. + if !result.failed.is_empty() { + let failed_ids: Vec<_> = result.failed.iter().map(|f| f.id()).collect(); + return Err(etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Some messages failed to send to SQS", + format!("Failed message IDs: {:?}", failed_ids) + )); + } + + Ok(()) + }); + } + } + + // Send all batches concurrently. + try_join_all(batch_futures).await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sink_name() { + assert_eq!(SqsSink::name(), "sqs"); + } + + #[test] + fn test_config_without_secrets() { + let config = SqsSinkConfig { + queue_url: Some("https://sqs.us-east-1.amazonaws.com/123456789/my-queue".to_string()), + region: "us-east-1".to_string(), + endpoint_url: Some("http://localhost:9324".to_string()), + access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()), + secret_access_key: Some("secret".to_string()), + }; + + let without_secrets: SqsSinkConfigWithoutSecrets = (&config).into(); + + assert_eq!( + without_secrets.queue_url, + Some("https://sqs.us-east-1.amazonaws.com/123456789/my-queue".to_string()) + ); + assert_eq!(without_secrets.region, "us-east-1"); + assert!(without_secrets.has_custom_endpoint); + assert!(without_secrets.has_explicit_credentials); + } +} diff --git a/src/test_utils/container.rs b/src/test_utils/container.rs index 976b5d7..e686b74 100644 --- a/src/test_utils/container.rs +++ b/src/test_utils/container.rs @@ -2,6 +2,7 @@ use ctor::dtor; use etl::config::{PgConnectionConfig, TlsConfig}; use std::sync::{Mutex, OnceLock}; use testcontainers::{ContainerRequest, ImageExt, runners::SyncRunner}; +use testcontainers_modules::elasticmq::ElasticMq; use testcontainers_modules::kafka::Kafka; use testcontainers_modules::nats::Nats; use testcontainers_modules::postgres::Postgres; @@ -14,6 +15,7 @@ static REDIS_PORT: OnceLock = OnceLock::new(); static NATS_PORT: OnceLock = OnceLock::new(); static RABBITMQ_PORT: OnceLock = OnceLock::new(); static KAFKA_PORT: OnceLock = OnceLock::new(); +static ELASTICMQ_PORT: OnceLock = OnceLock::new(); // Using Mutex> so we can take ownership for cleanup. static POSTGRES_CONTAINER: OnceLock>>> = @@ -23,6 +25,8 @@ static NATS_CONTAINER: OnceLock>>> static RABBITMQ_CONTAINER: OnceLock>>> = OnceLock::new(); static KAFKA_CONTAINER: OnceLock>>> = OnceLock::new(); +static ELASTICMQ_CONTAINER: OnceLock>>> = + OnceLock::new(); /// Cleanup function that runs at program exit to stop and remove the postgres container. #[dtor] @@ -84,6 +88,18 @@ fn cleanup_kafka_container() { } } +/// Cleanup function that runs at program exit to stop and remove the ElasticMQ container. +#[dtor] +fn cleanup_elasticmq_container() { + if let Some(mutex) = ELASTICMQ_CONTAINER.get() { + if let Ok(mut guard) = mutex.lock() { + if let Some(container) = guard.take() { + let _ = container.rm(); + } + } + } +} + pub async fn ensure_postgres() -> u16 { // Use get_or_init to handle concurrent initialization attempts *POSTGRES_PORT.get_or_init(|| { @@ -229,3 +245,28 @@ pub async fn ensure_kafka() -> u16 { .expect("Failed to join kafka container startup thread") }) } + +/// Ensures an ElasticMQ container is running and returns its port. +/// +/// Uses singleton pattern to reuse the same container across tests. +pub async fn ensure_elasticmq() -> u16 { + *ELASTICMQ_PORT.get_or_init(|| { + std::thread::spawn(|| { + let container: ContainerRequest = ElasticMq::default().into(); + + let container = container + .start() + .expect("Failed to start elasticmq container"); + + let port = container + .get_host_port_ipv4(9324) + .expect("Failed to get elasticmq container port"); + + let _ = ELASTICMQ_CONTAINER.set(Mutex::new(Some(container))); + + port + }) + .join() + .expect("Failed to join elasticmq container startup thread") + }) +} diff --git a/tests/sqs_sink_tests.rs b/tests/sqs_sink_tests.rs new file mode 100644 index 0000000..50a2a94 --- /dev/null +++ b/tests/sqs_sink_tests.rs @@ -0,0 +1,216 @@ +//! Integration tests for the AWS SQS sink. + +#![cfg(feature = "sink-sqs")] + +use postgres_stream::sink::Sink; +use postgres_stream::sink::sqs::{SqsSink, SqsSinkConfig}; +use postgres_stream::test_utils::ensure_elasticmq; +use postgres_stream::types::{EventIdentifier, StreamId, TriggeredEvent}; + +use aws_sdk_sqs::Client; +use chrono::Utc; + +/// Creates a test event with the given ID. +fn make_test_event(id: &str) -> TriggeredEvent { + TriggeredEvent { + id: EventIdentifier::new(id.to_string(), Utc::now()), + payload: serde_json::json!({ + "test_id": id, + "message": format!("Test event {}", id), + }), + metadata: Some(serde_json::json!({ "source": "test" })), + stream_id: StreamId::from(1u64), + lsn: Some("0/16B3748".parse().unwrap()), + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_sqs_sink_publishes_events() { + let elasticmq_port = ensure_elasticmq().await; + let endpoint_url = format!("http://127.0.0.1:{elasticmq_port}"); + let region = "us-east-1"; + + // Create SQS client for test setup. + let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(aws_sdk_sqs::config::Region::new(region)) + .endpoint_url(&endpoint_url) + .credentials_provider(aws_sdk_sqs::config::Credentials::new( + "x", "x", None, None, "test", + )) + .load() + .await; + let client = Client::new(&sdk_config); + + // Create a test queue. + let queue_name = "pgstream-test-queue"; + let create_result = client + .create_queue() + .queue_name(queue_name) + .send() + .await + .expect("Failed to create queue"); + let queue_url = create_result.queue_url().expect("Queue URL not returned"); + + // Create the sink. + let config = SqsSinkConfig { + queue_url: Some(queue_url.to_string()), + region: region.to_string(), + endpoint_url: Some(endpoint_url), + access_key_id: Some("x".to_string()), + secret_access_key: Some("x".to_string()), + }; + + let sink = SqsSink::new(config) + .await + .expect("Failed to create SQS sink"); + + // Publish test events. + let events = vec![ + make_test_event("sqs-event-1"), + make_test_event("sqs-event-2"), + ]; + sink.publish_events(events) + .await + .expect("Failed to publish events"); + + // Receive messages to verify. + let receive_result = client + .receive_message() + .queue_url(queue_url) + .max_number_of_messages(10) + .send() + .await + .expect("Failed to receive messages"); + + let messages = receive_result.messages(); + assert_eq!(messages.len(), 2); + + // Verify message content - only payload is sent now. + for message in messages { + let body = message.body().expect("Message has no body"); + let payload: serde_json::Value = + serde_json::from_str(body).expect("Failed to parse message"); + + // Payload fields from make_test_event + assert!(payload.get("test_id").is_some()); + assert!(payload.get("message").is_some()); + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_sqs_sink_empty_batch() { + let elasticmq_port = ensure_elasticmq().await; + let endpoint_url = format!("http://127.0.0.1:{elasticmq_port}"); + let region = "us-east-1"; + + // Create SQS client for test setup. + let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(aws_sdk_sqs::config::Region::new(region)) + .endpoint_url(&endpoint_url) + .credentials_provider(aws_sdk_sqs::config::Credentials::new( + "x", "x", None, None, "test", + )) + .load() + .await; + let client = Client::new(&sdk_config); + + // Create a test queue. + let queue_name = "pgstream-empty-queue"; + let create_result = client + .create_queue() + .queue_name(queue_name) + .send() + .await + .expect("Failed to create queue"); + let queue_url = create_result.queue_url().expect("Queue URL not returned"); + + let config = SqsSinkConfig { + queue_url: Some(queue_url.to_string()), + region: region.to_string(), + endpoint_url: Some(endpoint_url), + access_key_id: Some("x".to_string()), + secret_access_key: Some("x".to_string()), + }; + + let sink = SqsSink::new(config) + .await + .expect("Failed to create SQS sink"); + + // Empty batch should succeed without error. + sink.publish_events(vec![]) + .await + .expect("Empty batch should succeed"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_sqs_sink_queue_url_from_metadata() { + let elasticmq_port = ensure_elasticmq().await; + let endpoint_url = format!("http://127.0.0.1:{elasticmq_port}"); + let region = "us-east-1"; + + // Create SQS client for test setup. + let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(aws_sdk_sqs::config::Region::new(region)) + .endpoint_url(&endpoint_url) + .credentials_provider(aws_sdk_sqs::config::Credentials::new( + "x", "x", None, None, "test", + )) + .load() + .await; + let client = Client::new(&sdk_config); + + // Create a test queue. + let queue_name = "pgstream-metadata-queue"; + let create_result = client + .create_queue() + .queue_name(queue_name) + .send() + .await + .expect("Failed to create queue"); + let queue_url = create_result.queue_url().expect("Queue URL not returned"); + + // Create the sink with no queue URL - it must come from metadata. + let config = SqsSinkConfig { + queue_url: None, + region: region.to_string(), + endpoint_url: Some(endpoint_url), + access_key_id: Some("x".to_string()), + secret_access_key: Some("x".to_string()), + }; + + let sink = SqsSink::new(config) + .await + .expect("Failed to create SQS sink"); + + // Create event with queue_url in metadata. + let event = TriggeredEvent { + id: EventIdentifier::new("metadata-queue-event".to_string(), Utc::now()), + payload: serde_json::json!({ "test": "data" }), + metadata: Some(serde_json::json!({ + "queue_url": queue_url + })), + stream_id: StreamId::from(1u64), + lsn: None, + }; + + sink.publish_events(vec![event]) + .await + .expect("Failed to publish event with metadata queue URL"); + + // Verify message was sent. + let receive_result = client + .receive_message() + .queue_url(queue_url) + .max_number_of_messages(1) + .send() + .await + .expect("Failed to receive messages"); + + let messages = receive_result.messages(); + assert_eq!(messages.len(), 1); +} + +#[test] +fn test_sink_name() { + assert_eq!(SqsSink::name(), "sqs"); +}