Skip to content

Commit 9b55196

Browse files
authored
Merge pull request #7 from beam-pyio/feature/integration-testing
Feature/integration testing
2 parents b891182 + fd460a7 commit 9b55196

File tree

3 files changed

+178
-18
lines changed

3 files changed

+178
-18
lines changed

tests/boto3_client_test.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from dynamodb_pyio.boto3_client import DynamoDBClient, DynamoDBClientError
2626

2727

28-
def set_client(service_name="dynamodb"):
28+
def create_client(service_name="dynamodb"):
2929
options = {
3030
"service_name": service_name,
3131
"aws_access_key_id": "testing",
@@ -35,12 +35,8 @@ def set_client(service_name="dynamodb"):
3535
return boto3.session.Session().client(**options)
3636

3737

38-
def describe_table(**kwargs):
39-
return set_client().describe_table(**kwargs)
40-
41-
4238
def create_table(params):
43-
return set_client().create_table(**params)
39+
return create_client().create_table(**params)
4440

4541

4642
def to_int_if_decimal(v):
@@ -54,7 +50,7 @@ def to_int_if_decimal(v):
5450

5551

5652
def scan_table(**kwargs):
57-
paginator = set_client().get_paginator("scan")
53+
paginator = create_client().get_paginator("scan")
5854
page_iterator = paginator.paginate(**kwargs)
5955
items = []
6056
for page in page_iterator:
@@ -135,7 +131,7 @@ def test_put_items_batch_with_wrong_data_types(self):
135131
)
136132

137133
def test_put_items_batch_duplicate_records_without_dedup_keys(self):
138-
records = [{"pk": str(1), "sk": 1} for i in range(20)]
134+
records = [{"pk": str(1), "sk": 1} for _ in range(20)]
139135
self.assertRaises(
140136
DynamoDBClientError,
141137
self.dynamodb_client.put_items_batch,
@@ -144,7 +140,7 @@ def test_put_items_batch_duplicate_records_without_dedup_keys(self):
144140
)
145141

146142
def test_put_items_batch_duplicate_records_with_dedup_keys(self):
147-
records = [{"pk": str(1), "sk": 1} for i in range(20)]
143+
records = [{"pk": str(1), "sk": 1} for _ in range(20)]
148144
self.dynamodb_client.put_items_batch(
149145
records, self.table_name, dedup_pkeys=["pk", "sk"]
150146
)

tests/io_it_test.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import decimal
19+
import unittest
20+
import pytest
21+
import docker
22+
import boto3
23+
from boto3.dynamodb.types import TypeDeserializer
24+
import apache_beam as beam
25+
from apache_beam.transforms.util import BatchElements
26+
from apache_beam import GroupIntoBatches
27+
from apache_beam.options import pipeline_options
28+
from apache_beam.testing.test_pipeline import TestPipeline
29+
from localstack_utils.localstack import startup_localstack, stop_localstack
30+
31+
from dynamodb_pyio.io import WriteToDynamoDB
32+
33+
34+
def create_client(service_name="dynamodb"):
35+
return boto3.client(
36+
service_name=service_name,
37+
aws_access_key_id="test",
38+
aws_secret_access_key="test",
39+
region_name="us-east-1",
40+
endpoint_url="http://localhost:4566",
41+
)
42+
43+
44+
def create_table(params):
45+
return create_client().create_table(**params)
46+
47+
48+
def to_int_if_decimal(v):
49+
try:
50+
if isinstance(v, decimal.Decimal):
51+
return int(v)
52+
else:
53+
return v
54+
except Exception:
55+
return v
56+
57+
58+
def scan_table(**kwargs):
59+
paginator = create_client().get_paginator("scan")
60+
page_iterator = paginator.paginate(**kwargs)
61+
items = []
62+
for page in page_iterator:
63+
for document in page["Items"]:
64+
items.append(
65+
{
66+
k: to_int_if_decimal(TypeDeserializer().deserialize(v))
67+
for k, v in document.items()
68+
}
69+
)
70+
return sorted(items, key=lambda d: d["sk"])
71+
72+
73+
@pytest.mark.integration
74+
class TestWriteToSqs(unittest.TestCase):
75+
table_name = "test-table"
76+
77+
def setUp(self):
78+
startup_localstack()
79+
80+
## create resources
81+
params = {
82+
"TableName": self.table_name,
83+
"KeySchema": [
84+
{"AttributeName": "pk", "KeyType": "HASH"},
85+
{"AttributeName": "sk", "KeyType": "RANGE"},
86+
],
87+
"AttributeDefinitions": [
88+
{"AttributeName": "pk", "AttributeType": "S"},
89+
{"AttributeName": "sk", "AttributeType": "N"},
90+
],
91+
"BillingMode": "PAY_PER_REQUEST",
92+
}
93+
create_table(params)
94+
95+
self.pipeline_opts = pipeline_options.PipelineOptions(
96+
[
97+
"--runner",
98+
"FlinkRunner",
99+
"--parallelism",
100+
"1",
101+
"--aws_access_key_id",
102+
"testing",
103+
"--aws_secret_access_key",
104+
"testing",
105+
"--aws_access_key_id",
106+
"testing",
107+
"--region_name",
108+
"us-east-1",
109+
"--endpoint_url",
110+
"http://localhost:4566",
111+
]
112+
)
113+
114+
def tearDown(self):
115+
stop_localstack()
116+
docker_client = docker.from_env()
117+
docker_client.containers.prune()
118+
return super().tearDown()
119+
120+
def test_write_to_dynamodb(self):
121+
records = [{"pk": str(i), "sk": i} for i in range(500)]
122+
with TestPipeline(options=self.pipeline_opts) as p:
123+
(p | beam.Create([records]) | WriteToDynamoDB(table_name=self.table_name))
124+
self.assertListEqual(records, scan_table(TableName=self.table_name))
125+
126+
def test_write_to_dynamodb_duplicate_records_without_dedup_keys(self):
127+
records = [{"pk": str(1), "sk": 1} for _ in range(500)]
128+
with self.assertRaises(RuntimeError):
129+
with TestPipeline(options=self.pipeline_opts) as p:
130+
(
131+
p
132+
| beam.Create([records])
133+
| WriteToDynamoDB(table_name=self.table_name)
134+
)
135+
136+
def test_write_to_dynamodb_duplicate_records_with_dedup_keys(self):
137+
records = [{"pk": str(1), "sk": 1} for _ in range(500)]
138+
with TestPipeline(options=self.pipeline_opts) as p:
139+
(
140+
p
141+
| beam.Create([records])
142+
| WriteToDynamoDB(table_name=self.table_name, dedup_pkeys=["pk", "sk"])
143+
)
144+
self.assertListEqual(records[:1], scan_table(TableName=self.table_name))
145+
146+
def test_write_to_dynamodb_with_batch_elements(self):
147+
records = [{"pk": str(i), "sk": i} for i in range(500)]
148+
with TestPipeline(options=self.pipeline_opts) as p:
149+
(
150+
p
151+
| beam.Create(records)
152+
| BatchElements(min_batch_size=50, max_batch_size=100)
153+
| WriteToDynamoDB(table_name=self.table_name)
154+
)
155+
self.assertListEqual(records, scan_table(TableName=self.table_name))
156+
157+
def test_write_to_dynamodb_with_group_into_batches(self):
158+
records = [(i % 2, {"pk": str(i), "sk": i}) for i in range(500)]
159+
with TestPipeline(options=self.pipeline_opts) as p:
160+
(
161+
p
162+
| beam.Create(records)
163+
| GroupIntoBatches(batch_size=100)
164+
| WriteToDynamoDB(table_name=self.table_name)
165+
)
166+
self.assertListEqual(
167+
[r[1] for r in records], scan_table(TableName=self.table_name)
168+
)

tests/io_test.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from dynamodb_pyio.io import WriteToDynamoDB, _DynamoDBWriteFn
3333

3434

35-
def set_client(service_name="dynamodb"):
35+
def create_client(service_name="dynamodb"):
3636
options = {
3737
"service_name": service_name,
3838
"aws_access_key_id": "testing",
@@ -42,12 +42,8 @@ def set_client(service_name="dynamodb"):
4242
return boto3.session.Session().client(**options)
4343

4444

45-
def describe_table(**kwargs):
46-
return set_client().describe_table(**kwargs)
47-
48-
4945
def create_table(params):
50-
return set_client().create_table(**params)
46+
return create_client().create_table(**params)
5147

5248

5349
def to_int_if_decimal(v):
@@ -61,7 +57,7 @@ def to_int_if_decimal(v):
6157

6258

6359
def scan_table(**kwargs):
64-
paginator = set_client().get_paginator("scan")
60+
paginator = create_client().get_paginator("scan")
6561
page_iterator = paginator.paginate(**kwargs)
6662
items = []
6763
for page in page_iterator:
@@ -154,7 +150,7 @@ def test_write_to_dynamodb_with_wrong_data_type(self):
154150
)
155151

156152
def test_write_to_dynamodb_duplicate_records_without_dedup_keys(self):
157-
records = [{"pk": str(1), "sk": 1} for i in range(20)]
153+
records = [{"pk": str(1), "sk": 1} for _ in range(20)]
158154
with self.assertRaises(DynamoDBClientError):
159155
with TestPipeline(options=self.pipeline_opts) as p:
160156
(
@@ -164,7 +160,7 @@ def test_write_to_dynamodb_duplicate_records_without_dedup_keys(self):
164160
)
165161

166162
def test_write_to_dynamodb_duplicate_records_with_dedup_keys(self):
167-
records = [{"pk": str(1), "sk": 1} for i in range(20)]
163+
records = [{"pk": str(1), "sk": 1} for _ in range(20)]
168164
with TestPipeline(options=self.pipeline_opts) as p:
169165
(
170166
p

0 commit comments

Comments
 (0)