Skip to content

Commit b891182

Browse files
authored
Merge pull request #6 from beam-pyio/feature/implement-ptransform
Feature/implement ptransform
2 parents 3f66e6e + a7eecba commit b891182

File tree

10 files changed

+1484
-318
lines changed

10 files changed

+1484
-318
lines changed

.github/workflows/doc.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
uses: ammaraskar/[email protected]
2626

2727
- name: Upload artifacts
28-
uses: actions/upload-artifact@v1
28+
uses: actions/upload-artifact@v4
2929
with:
3030
name: html-docs
3131
path: docs/_build/html/
@@ -34,4 +34,4 @@ jobs:
3434
uses: peaceiris/actions-gh-pages@v3
3535
with:
3636
github_token: ${{ secrets.GITHUB_TOKEN }}
37-
publish_dir: docs/_build/html
37+
publish_dir: docs/_build/html

.github/workflows/test.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ on:
66

77
jobs:
88
test:
9-
109
runs-on: ${{ matrix.os }}
1110

1211
strategy:
@@ -30,9 +29,9 @@ jobs:
3029
run: poetry install
3130

3231
- name: Test with pytest
33-
run: poetry run pytest tests/ --cov=dynamodb_pyio --cov-report=xml
32+
run: poetry run pytest tests/ -svv --cov=dynamodb_pyio --cov-report=xml -m "not integration"
3433

3534
- name: Use Codecov to track coverage
3635
uses: codecov/codecov-action@v3
3736
with:
38-
files: ./coverage.xml # coverage report
37+
files: ./coverage.xml # coverage report

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.vscode
2+
13
# Byte-compiled / optimized / DLL files
24
__pycache__/
35
*.py[cod]

poetry.lock

Lines changed: 832 additions & 307 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ apache-beam = {extras = ["aws"], version = ">=2.19.0"}
2828
pytest = ">=7.1.2,<8.0"
2929
pytest-cov = "^5.0.0"
3030
apache-beam = {extras = ["test"], version = ">=2.19.0"}
31-
moto = {extras = ["firehose", "s3", "sts"], version = "^5.0.9"}
31+
moto = {extras = ["all"], version = "^5.0.9"}
3232
localstack-utils = ">=1.0.0"
3333

3434
[tool.pytest.ini_options]

src/dynamodb_pyio/boto3_client.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import typing
19+
import boto3
20+
from apache_beam.options import pipeline_options
21+
22+
from dynamodb_pyio.options import DynamoDBOptions
23+
24+
__all__ = ["DynamoDBClient", "DynamoDBClientError"]
25+
26+
27+
def get_http_error_code(exc):
28+
if hasattr(exc, "response"):
29+
return exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
30+
return None
31+
32+
33+
class DynamoDBClientError(Exception):
34+
def __init__(self, message=None, code=None):
35+
self.message = message
36+
self.code = code
37+
38+
39+
class DynamoDBClient(object):
40+
"""
41+
Wrapper for boto3 library.
42+
"""
43+
44+
def __init__(self, options: typing.Union[DynamoDBOptions, dict]):
45+
"""Constructor of the DynamoDBClient.
46+
47+
Args:
48+
options (Union[DynamoDBOptions, dict]): Options to create a boto3 DynamoDB client.
49+
"""
50+
assert boto3 is not None, "Missing boto3 requirement"
51+
if isinstance(options, pipeline_options.PipelineOptions):
52+
options = options.view_as(DynamoDBOptions)
53+
access_key_id = options.aws_access_key_id
54+
secret_access_key = options.aws_secret_access_key
55+
session_token = options.aws_session_token
56+
endpoint_url = options.endpoint_url
57+
use_ssl = not options.disable_ssl
58+
region_name = options.region_name
59+
api_version = options.api_version
60+
verify = options.verify
61+
else:
62+
access_key_id = options.get("aws_access_key_id")
63+
secret_access_key = options.get("aws_secret_access_key")
64+
session_token = options.get("aws_session_token")
65+
endpoint_url = options.get("endpoint_url")
66+
use_ssl = not options.get("disable_ssl", False)
67+
region_name = options.get("region_name")
68+
api_version = options.get("api_version")
69+
verify = options.get("verify")
70+
71+
session = boto3.session.Session()
72+
self.resource = session.resource(
73+
service_name="dynamodb",
74+
region_name=region_name,
75+
api_version=api_version,
76+
use_ssl=use_ssl,
77+
verify=verify,
78+
endpoint_url=endpoint_url,
79+
aws_access_key_id=access_key_id,
80+
aws_secret_access_key=secret_access_key,
81+
aws_session_token=session_token,
82+
)
83+
84+
def put_items_batch(self, records: list, table_name: str, dedup_pkeys: list = None):
85+
"""Put records to an Amazon DynamoDB table using a batch writer object.
86+
87+
Args:
88+
records (list): Records to send into an Amazon SQS queue.
89+
table_name (str): Amazon DynamoDB table name.
90+
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer.
91+
92+
Raises:
93+
DynamoDBClientError: DynamoDB client error.
94+
95+
Returns:
96+
(Object): Boto3 response message.
97+
"""
98+
99+
if not isinstance(records, list):
100+
raise DynamoDBClientError("Records should be a list.")
101+
try:
102+
table = self.resource.Table(table_name)
103+
with table.batch_writer(overwrite_by_pkeys=dedup_pkeys or []) as batch:
104+
for record in records:
105+
batch.put_item(Item=record)
106+
except Exception as e:
107+
raise DynamoDBClientError(str(e), get_http_error_code(e))

src/dynamodb_pyio/io.py

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,101 @@
1-
def my_fn():
2-
return 1
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import logging
19+
import typing
20+
import apache_beam as beam
21+
from apache_beam import metrics
22+
from apache_beam.pvalue import PCollection
23+
24+
from dynamodb_pyio.boto3_client import DynamoDBClient
25+
from dynamodb_pyio.options import DynamoDBOptions
26+
27+
28+
__all__ = ["WriteToDynamoDB"]
29+
30+
31+
class _DynamoDBWriteFn(beam.DoFn):
32+
"""Create the connector can send messages in batch to an Amazon DynamoDB table.
33+
34+
Args:
35+
table_name (str): Amazon DynamoDB table name.
36+
dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer.
37+
options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client.
38+
"""
39+
40+
total_elements_count = metrics.Metrics.counter(
41+
"_DynamoDBWriteFn", "total_elements_count"
42+
)
43+
44+
def __init__(
45+
self,
46+
table_name: str,
47+
dedup_pkeys: list,
48+
options: typing.Union[DynamoDBOptions, dict],
49+
):
50+
"""Constructor of _DynamoDBWriteFn
51+
52+
Args:
53+
table_name (str): Amazon DynamoDB table name.
54+
dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer.
55+
options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client.
56+
"""
57+
super().__init__()
58+
self.table_name = table_name
59+
self.dedup_pkeys = dedup_pkeys
60+
self.options = options
61+
62+
def start_bundle(self):
63+
self.client = DynamoDBClient(self.options)
64+
65+
def process(self, element):
66+
if isinstance(element, tuple):
67+
element = element[1]
68+
self.client.put_items_batch(element, self.table_name, self.dedup_pkeys)
69+
self.total_elements_count.inc(len(element))
70+
logging.info(f"total {len(element)} elements processed...")
71+
72+
73+
class WriteToDynamoDB(beam.PTransform):
74+
"""A transform that puts records to an Amazon DynamoDB table.
75+
76+
Takes an input PCollection and put them in batch using the boto3 package.
77+
For more information, visit the `Boto3 Documentation <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html>`__.
78+
79+
Note that, if the PCollection element is a tuple (i.e. keyed stream), only the value is used to put records in batch.
80+
81+
Args:
82+
table_name (str): Amazon DynamoDB table name.
83+
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating records in buffer.
84+
"""
85+
86+
def __init__(self, table_name: str, dedup_pkeys: list = None):
87+
"""Constructor of the transform that puts records into an Amazon DynamoDB table.
88+
89+
Args:
90+
table_name (str): Amazon DynamoDB table name.
91+
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer.
92+
"""
93+
super().__init__()
94+
self.table_name = table_name
95+
self.dedup_pkeys = dedup_pkeys
96+
97+
def expand(self, pcoll: PCollection):
98+
options = pcoll.pipeline.options.view_as(DynamoDBOptions)
99+
return pcoll | beam.ParDo(
100+
_DynamoDBWriteFn(self.table_name, self.dedup_pkeys, options)
101+
)

src/dynamodb_pyio/options.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from apache_beam.options.pipeline_options import PipelineOptions
19+
20+
21+
class DynamoDBOptions(PipelineOptions):
22+
@classmethod
23+
def _add_argparse_args(cls, parser):
24+
# These options are passed to the Firehose PyIO Client
25+
parser.add_argument(
26+
"--aws_access_key_id",
27+
default=None,
28+
help="The secret key to use when creating the boto3 client.",
29+
)
30+
parser.add_argument(
31+
"--aws_secret_access_key",
32+
default=None,
33+
help="The secret key to use when creating the boto3 client.",
34+
)
35+
parser.add_argument(
36+
"--aws_session_token",
37+
default=None,
38+
help="The session token to use when creating the boto3 client.",
39+
)
40+
parser.add_argument(
41+
"--endpoint_url",
42+
default=None,
43+
help="The complete URL to use for the constructed boto3 client.",
44+
)
45+
parser.add_argument(
46+
"--region_name",
47+
default=None,
48+
help="The name of the region associated with the boto3 client.",
49+
)
50+
parser.add_argument(
51+
"--api_version",
52+
default=None,
53+
help="The API version to use with the boto3 client.",
54+
)
55+
parser.add_argument(
56+
"--verify",
57+
default=None,
58+
help="Whether or not to verify SSL certificates with the boto3 client.",
59+
)
60+
parser.add_argument(
61+
"--disable_ssl",
62+
default=False,
63+
action="store_true",
64+
help=(
65+
"Whether or not to use SSL with the boto3 client. "
66+
"By default, SSL is used."
67+
),
68+
)

0 commit comments

Comments
 (0)