From f76192a034b036e6582cfad92af6b7a3c1d737f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20P=C3=A1ll=20Sturluson?= Date: Mon, 14 Mar 2022 11:48:34 +0000 Subject: [PATCH 01/11] Add custom_encoding input to table_import_from_s3 Some tools do not properly set the ContentEncoding metadata on write in S3. This update adds functionality to specify a custom encoding (e.g. "gzip") as input to the `table_import_from_s3` function. --- aws_s3--0.0.1.sql | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/aws_s3--0.0.1.sql b/aws_s3--0.0.1.sql index 93ee6d0..0f153c4 100644 --- a/aws_s3--0.0.1.sql +++ b/aws_s3--0.0.1.sql @@ -48,7 +48,8 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3 ( access_key text default null, secret_key text default null, session_token text default null, - endpoint_url text default null + endpoint_url text default null, + content_encoding text default null ) RETURNS int LANGUAGE plpython3u AS $$ @@ -90,10 +91,10 @@ AS $$ obj = s3.Object(bucket, file_path) response = obj.get() - content_encoding = response.get('ContentEncoding') - body = response['Body'] + content_encoding = content_encoding or response.get('ContentEncoding') user_content_encoding = response.get('x-amz-meta-content-encoding') - + body = response['Body'] + with tempfile.NamedTemporaryFile() as fd: if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'): with gzip.GzipFile(fileobj=body) as gzipfile: @@ -124,14 +125,15 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3( options text, s3_info aws_commons._s3_uri_1, credentials aws_commons._aws_credentials_1, - endpoint_url text default null + endpoint_url text default null, + content_encoding text default null ) RETURNS INT LANGUAGE plpython3u AS $$ plan = plpy.prepare( - 'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9) AS num_rows', - ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT'] + 'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) AS num_rows', + ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT'] ) return plan.execute( [ @@ -144,7 +146,8 @@ AS $$ credentials['access_key'], credentials['secret_key'], credentials['session_token'], - endpoint_url + endpoint_url, + content_encoding ] )[0]['num_rows'] $$; From 2b06a64485fbd222ee74ae5d002ab525bef45cc1 Mon Sep 17 00:00:00 2001 From: GRBurst Date: Tue, 21 Jun 2022 16:44:03 +0200 Subject: [PATCH 02/11] Minor Readme fixes + enhancements. Signed-off-by: GRBurst --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index e0dabab..da27003 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ table_name | the name of the table column_list | list of columns to copy options | options passed to the COPY command in Postgres s3_info | An aws_commons._s3_uri_1 composite type containing the bucket, file path and region information about the s3 object -credentials | An aws_commons._aws_credentials_1 composite type containing the access key, secret key, session token credentials +credentials | An aws_commons._aws_credentials_1 composite type containing the access key, secret key, session token credentials. To omit a value, set it `null`. endpoint_url | optional endpoint to use (e.g., `http://localhost:4566`) ##### Example @@ -282,7 +282,7 @@ Parameter | Description ----------|------------ query | query that returns the data to export s3_info | An aws_commons._s3_uri_1 composite type containing the bucket, file path and region information about the s3 object -credentials | An aws_commons._aws_credentials_1 composite type containing the access key, secret key, session token credentials +credentials | An aws_commons._aws_credentials_1 composite type containing the access key, secret key, session token credentials. To omit a value, set it `null`. options | options passed to the COPY command in Postgres endpoint_url | optional endpoint to use (e.g., `http://localhost:4566`) @@ -314,7 +314,7 @@ You can omit the credentials. ##### Example -#### Using the function table_import_from_s3 with all the parameters +#### Using the function query_export_to_s3 with all the parameters ``` aws_s3.query_export_to_s3( query text, From f4b2705d7c7f4444b8069afdcc035c2cb6459db3 Mon Sep 17 00:00:00 2001 From: Tzach Shabtay Date: Mon, 1 Aug 2022 12:31:27 -0400 Subject: [PATCH 03/11] Fix uploaded records to ignore header --- aws_s3--0.0.1.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aws_s3--0.0.1.sql b/aws_s3--0.0.1.sql index 93ee6d0..e284954 100644 --- a/aws_s3--0.0.1.sql +++ b/aws_s3--0.0.1.sql @@ -219,6 +219,8 @@ AS $$ size += len(buffer) fd.seek(0) s3.upload_fileobj(fd, bucket, file_path) + if 'HEADER TRUE' in options.upper(): + num_lines -= 1 yield (num_lines, 1, size) $$; From 47ee35614aee778bd30836a2f9036fa9bf7498cf Mon Sep 17 00:00:00 2001 From: Tzach Shabtay Date: Mon, 1 Aug 2022 13:24:06 -0400 Subject: [PATCH 04/11] fix tabs/spaces --- aws_s3--0.0.1.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_s3--0.0.1.sql b/aws_s3--0.0.1.sql index e284954..cf2ddc3 100644 --- a/aws_s3--0.0.1.sql +++ b/aws_s3--0.0.1.sql @@ -219,8 +219,8 @@ AS $$ size += len(buffer) fd.seek(0) s3.upload_fileobj(fd, bucket, file_path) - if 'HEADER TRUE' in options.upper(): - num_lines -= 1 + if 'HEADER TRUE' in options.upper(): + num_lines -= 1 yield (num_lines, 1, size) $$; From a051676518f1671bc746ec3831ef97c57db45e44 Mon Sep 17 00:00:00 2001 From: zhangLianZhuang Date: Fri, 17 Mar 2023 11:36:05 +0800 Subject: [PATCH 05/11] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index e0dabab..4210b4a 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +## chimpler/postgres-aws-s3 is inactive + # postgres-aws-s3 Starting on Postgres version 11.1, AWS RDS added [support](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.S3Import.html#USER_PostgreSQL.S3Import.FileFormats) for S3 import using the extension `aws_s3`. It allows to import data from S3 within Postgres using the function `aws_s3.table_import_from_s3` and export the data to S3 using the function `aws_s3.query_export_to_s3`. From f64bbe220fc961cc1ba042f339773d3c07a912f8 Mon Sep 17 00:00:00 2001 From: yanbo Date: Fri, 17 Mar 2023 15:18:19 +0800 Subject: [PATCH 06/11] import/export add read_timeout param --- .gitignore | 1 + aws_s3--0.0.1.sql | 26 +++++++++++++++++--------- 2 files changed, 18 insertions(+), 9 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..723ef36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea \ No newline at end of file diff --git a/aws_s3--0.0.1.sql b/aws_s3--0.0.1.sql index 9041d35..ed99227 100644 --- a/aws_s3--0.0.1.sql +++ b/aws_s3--0.0.1.sql @@ -49,7 +49,8 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3 ( secret_key text default null, session_token text default null, endpoint_url text default null, - content_encoding text default null + content_encoding text default null, + read_timeout integer default 60 ) RETURNS int LANGUAGE plpython3u AS $$ @@ -86,6 +87,7 @@ AS $$ s3 = boto3.resource( 's3', region_name=region, + config=boto3.session.Config(read_timeout={read_timeout}), **aws_settings ) @@ -126,14 +128,15 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3( s3_info aws_commons._s3_uri_1, credentials aws_commons._aws_credentials_1, endpoint_url text default null, - content_encoding text default null + content_encoding text default null, + read_timeout integer default 60 ) RETURNS INT LANGUAGE plpython3u AS $$ plan = plpy.prepare( - 'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) AS num_rows', - ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT'] + 'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) AS num_rows', + ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER'] ) return plan.execute( [ @@ -146,8 +149,9 @@ AS $$ credentials['access_key'], credentials['secret_key'], credentials['session_token'], - endpoint_url, - content_encoding + endpoint_url, + content_encoding, + read_timeout ] )[0]['num_rows'] $$; @@ -162,6 +166,7 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3( session_token text default null, options text default null, endpoint_url text default null, + read_timeout integer default 60, OUT rows_uploaded bigint, OUT files_uploaded bigint, OUT bytes_uploaded bigint @@ -199,6 +204,7 @@ AS $$ s3 = boto3.client( 's3', region_name=region, + config=boto3.session.Config(read_timeout={read_timeout}), **aws_settings ) @@ -233,6 +239,7 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3( credentials aws_commons._aws_credentials_1 default null, options text default null, endpoint_url text default null, + read_timeout integer default 60, OUT rows_uploaded bigint, OUT files_uploaded bigint, OUT bytes_uploaded bigint @@ -240,8 +247,8 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3( LANGUAGE plpython3u AS $$ plan = plpy.prepare( - 'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9)', - ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT'] + 'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)', + ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER'] ) return plan.execute( [ @@ -253,7 +260,8 @@ AS $$ credentials.get('secret_key') if credentials else None, credentials.get('session_token') if credentials else None, options, - endpoint_url + endpoint_url, + read_timeout ] ) $$; From 1ce00fa90b47b2991872aad953c51287ed6c50d4 Mon Sep 17 00:00:00 2001 From: yanbo Date: Fri, 17 Mar 2023 15:49:42 +0800 Subject: [PATCH 07/11] table_import_from_s3 support directory or multi files --- aws_s3--0.0.1.sql | 65 ++++++++++++++++++++++++++++++----------------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/aws_s3--0.0.1.sql b/aws_s3--0.0.1.sql index ed99227..85b2f25 100644 --- a/aws_s3--0.0.1.sql +++ b/aws_s3--0.0.1.sql @@ -91,30 +91,47 @@ AS $$ **aws_settings ) - obj = s3.Object(bucket, file_path) - response = obj.get() - content_encoding = content_encoding or response.get('ContentEncoding') - user_content_encoding = response.get('x-amz-meta-content-encoding') - body = response['Body'] - - with tempfile.NamedTemporaryFile() as fd: - if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'): - with gzip.GzipFile(fileobj=body) as gzipfile: - while fd.write(gzipfile.read(204800)): - pass - else: - while fd.write(body.read(204800)): - pass - fd.flush() - formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else '' - res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format( - table_name=table_name, - filename=plpy.quote_literal(fd.name), - formatted_column_list=formatted_column_list, - options=options - ) - ) - return res.nrows() + formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else '' + num_rows = 0 + + for file_path_item in file_path.split(","): + file_path_item = file_path_item.strip() + if not file_path_item: + continue + + s3_objects = [] + if file_path_item.endswith("/"): # Directory + bucket_objects = s3.Bucket(bucket).objects.filter(Prefix=file_path_item) + s3_objects = [bucket_object for bucket_object in bucket_objects] + else: # File + s3_object = s3.Object(bucket, file_path_item) + s3_objects = [s3_object] + + for s3_object in s3_objects: + response = s3_object.get() + content_encoding = content_encoding or response.get('ContentEncoding') + user_content_encoding = response.get('x-amz-meta-content-encoding') + body = response['Body'] + + with tempfile.NamedTemporaryFile() as fd: + if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'): + with gzip.GzipFile(fileobj=body) as gzipfile: + while fd.write(gzipfile.read(204800)): + pass + else: + while fd.write(body.read(204800)): + pass + fd.flush() + + res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format( + table_name=table_name, + filename=plpy.quote_literal(fd.name), + formatted_column_list=formatted_column_list, + options=options + ) + ) + num_rows += res.nrows() + return num_rows $$; -- From 0e364e4cb27fe9851a68864a1f87914d2690d0ba Mon Sep 17 00:00:00 2001 From: yanbo Date: Fri, 17 Mar 2023 17:02:19 +0800 Subject: [PATCH 08/11] remove content_encoding becase error --- aws_s3--0.0.1.sql | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/aws_s3--0.0.1.sql b/aws_s3--0.0.1.sql index 85b2f25..840f3f4 100644 --- a/aws_s3--0.0.1.sql +++ b/aws_s3--0.0.1.sql @@ -49,7 +49,6 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3 ( secret_key text default null, session_token text default null, endpoint_url text default null, - content_encoding text default null, read_timeout integer default 60 ) RETURNS int LANGUAGE plpython3u @@ -109,9 +108,9 @@ AS $$ for s3_object in s3_objects: response = s3_object.get() - content_encoding = content_encoding or response.get('ContentEncoding') - user_content_encoding = response.get('x-amz-meta-content-encoding') + content_encoding = response.get('ContentEncoding') body = response['Body'] + user_content_encoding = response.get('x-amz-meta-content-encoding') with tempfile.NamedTemporaryFile() as fd: if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'): @@ -145,15 +144,14 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3( s3_info aws_commons._s3_uri_1, credentials aws_commons._aws_credentials_1, endpoint_url text default null, - content_encoding text default null, read_timeout integer default 60 ) RETURNS INT LANGUAGE plpython3u AS $$ plan = plpy.prepare( - 'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) AS num_rows', - ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER'] + 'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) AS num_rows', + ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER'] ) return plan.execute( [ @@ -167,7 +165,6 @@ AS $$ credentials['secret_key'], credentials['session_token'], endpoint_url, - content_encoding, read_timeout ] )[0]['num_rows'] From 05cd962a8378406e3f9f6c8af96ae78a2e84d1bd Mon Sep 17 00:00:00 2001 From: yanbo Date: Fri, 17 Mar 2023 17:10:49 +0800 Subject: [PATCH 09/11] fix read_timeout param --- aws_s3--0.0.1.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_s3--0.0.1.sql b/aws_s3--0.0.1.sql index 840f3f4..ee101ab 100644 --- a/aws_s3--0.0.1.sql +++ b/aws_s3--0.0.1.sql @@ -86,7 +86,7 @@ AS $$ s3 = boto3.resource( 's3', region_name=region, - config=boto3.session.Config(read_timeout={read_timeout}), + config=boto3.session.Config(read_timeout=read_timeout), **aws_settings ) @@ -218,7 +218,7 @@ AS $$ s3 = boto3.client( 's3', region_name=region, - config=boto3.session.Config(read_timeout={read_timeout}), + config=boto3.session.Config(read_timeout=read_timeout), **aws_settings ) From 5c08c3848bf773b48c0f70b1669f1c38b809f161 Mon Sep 17 00:00:00 2001 From: yanbo Date: Fri, 17 Mar 2023 17:13:56 +0800 Subject: [PATCH 10/11] set version to v1.0.0 --- Makefile | 2 +- aws_s3--0.0.1.sql => aws_s3--1.0.0.sql | 0 aws_s3.control | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename aws_s3--0.0.1.sql => aws_s3--1.0.0.sql (100%) diff --git a/Makefile b/Makefile index ca355fc..0bffbae 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ EXTENSION = aws_s3 # the extensions name -DATA = aws_s3--0.0.1.sql # script files to install +DATA = aws_s3--1.0.0.sql # script files to install # postgres build stuff PG_CONFIG = pg_config diff --git a/aws_s3--0.0.1.sql b/aws_s3--1.0.0.sql similarity index 100% rename from aws_s3--0.0.1.sql rename to aws_s3--1.0.0.sql diff --git a/aws_s3.control b/aws_s3.control index 5a111ce..5431451 100644 --- a/aws_s3.control +++ b/aws_s3.control @@ -1,5 +1,5 @@ # aws_s3 extension comment = 'aws s3 wrapper for non rds postgres' -default_version = '0.0.1' +default_version = '1.0.0' module_pathname = '$libdir/aws_s3' relocatable = true From 97491ca41c369cd1208486e54eb175ce8295cdf5 Mon Sep 17 00:00:00 2001 From: yanbo Date: Mon, 20 Mar 2023 11:11:34 +0800 Subject: [PATCH 11/11] export not override file --- aws_s3--1.0.0.sql | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/aws_s3--1.0.0.sql b/aws_s3--1.0.0.sql index ee101ab..8dfbb74 100644 --- a/aws_s3--1.0.0.sql +++ b/aws_s3--1.0.0.sql @@ -199,8 +199,19 @@ AS $$ module_cache[module_name] = _module return _module + def file_exists(bucket, file_path, s3_client): + try: + s3_client.head_object(Bucket=bucket, Key=file_path) + return True + except: + return False + + def get_unique_file_path(base_name, counter, extension): + return f"{base_name}_part{counter}{extension}" + boto3 = cache_import('boto3') tempfile = cache_import('tempfile') + re = cache_import("re") plan = plpy.prepare("select name, current_setting('aws_s3.' || name, true) as value from (select unnest(array['access_key_id', 'secret_access_key', 'session_token', 'endpoint_url']) as name) a"); default_aws_settings = { @@ -222,6 +233,15 @@ AS $$ **aws_settings ) + # generate unique file path + file_path_parts = re.match(r'^(.*?)(\.[^.]*$|$)', file_path) + base_name = file_path_parts.group(1) + extension = file_path_parts.group(2) + counter = 0 + while file_exists(bucket, get_unique_file_path(base_name, counter, extension), s3): + counter += 1 + unique_file_path = get_unique_file_path(base_name, counter, extension) + with tempfile.NamedTemporaryFile() as fd: plan = plpy.prepare( "COPY ({query}) TO '{filename}' {options}".format( @@ -241,7 +261,7 @@ AS $$ num_lines += buffer.count(b'\n') size += len(buffer) fd.seek(0) - s3.upload_fileobj(fd, bucket, file_path) + s3.upload_fileobj(fd, bucket, unique_file_path) if 'HEADER TRUE' in options.upper(): num_lines -= 1 yield (num_lines, 1, size)