Skip to content

Commit 378badb

Browse files
committed
Now to_redshift() will created the table if it does not exists yet even for append or upsert mode.
1 parent d022396 commit 378badb

File tree

2 files changed

+97
-9
lines changed

2 files changed

+97
-9
lines changed

awswrangler/redshift.py

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -242,16 +242,32 @@ def load_table(dataframe,
242242
varchar_default_length=varchar_default_length,
243243
varchar_lengths=varchar_lengths)
244244
table_name = f"{schema_name}.{table_name}"
245-
elif mode == "upsert":
246-
guid: str = pa.compat.guid()
247-
temp_table_name = f"temp_redshift_{guid}"
248-
final_table_name = table_name
249-
table_name = temp_table_name
250-
sql: str = f"CREATE TEMPORARY TABLE {temp_table_name} (LIKE {schema_name}.{final_table_name})"
251-
logger.debug(sql)
252-
cursor.execute(sql)
253245
else:
254-
table_name = f"{schema_name}.{table_name}"
246+
if Redshift.does_table_exists(cursor=cursor, schema=schema_name, table=table_name) is False:
247+
Redshift._create_table(cursor=cursor,
248+
dataframe=dataframe,
249+
dataframe_type=dataframe_type,
250+
schema_name=schema_name,
251+
table_name=table_name,
252+
diststyle=diststyle,
253+
distkey=distkey,
254+
sortstyle=sortstyle,
255+
sortkey=sortkey,
256+
primary_keys=primary_keys,
257+
preserve_index=preserve_index,
258+
cast_columns=cast_columns,
259+
varchar_default_length=varchar_default_length,
260+
varchar_lengths=varchar_lengths)
261+
if mode == "upsert":
262+
guid: str = pa.compat.guid()
263+
temp_table_name = f"temp_redshift_{guid}"
264+
final_table_name = table_name
265+
table_name = temp_table_name
266+
sql: str = f"CREATE TEMPORARY TABLE {temp_table_name} (LIKE {schema_name}.{final_table_name})"
267+
logger.debug(sql)
268+
cursor.execute(sql)
269+
else:
270+
table_name = f"{schema_name}.{table_name}"
255271

256272
sql = ("-- AWS DATA WRANGLER\n"
257273
f"COPY {table_name} FROM '{manifest_path}'\n"
@@ -513,3 +529,24 @@ def to_parquet(self,
513529
logger.debug(f"Waiting path: {p}")
514530
self._session.s3.wait_object_exists(path=p, timeout=30.0)
515531
return paths
532+
533+
@staticmethod
534+
def does_table_exists(cursor, schema: str, table: str) -> bool:
535+
"""
536+
Check if the table exists.
537+
538+
:param cursor: A PEP 249 compatible cursor (A valid connection be generated with Redshift.generate_connection())
539+
:param schema: Schema name
540+
:param table: Table name
541+
:return: True or False
542+
"""
543+
cursor.execute(f"SELECT true WHERE EXISTS ("
544+
f"SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE "
545+
f"TABLE_SCHEMA = '{schema}' AND TABLE_NAME = '{table}'"
546+
f");")
547+
rows: List = cursor.fetchall()
548+
if len(rows) > 0:
549+
res: bool = rows[0][0]
550+
else:
551+
res = False
552+
return res

testing/test_awswrangler/test_redshift.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,3 +947,54 @@ def test_to_redshift_int_na(bucket, redshift_parameters):
947947
rows = cursor.fetchall()
948948
assert rows[0][0] == 3
949949
conn.close()
950+
951+
952+
def test_does_table_exists():
953+
conn = wr.glue.get_connection("aws-data-wrangler-redshift")
954+
with conn.cursor() as cursor:
955+
res = wr.redshift.does_table_exists(cursor=cursor, schema="information_schema", table="tables")
956+
assert res is True
957+
res = wr.redshift.does_table_exists(cursor=cursor, schema="information_schema", table="adjwbdjawbjd")
958+
assert res is False
959+
conn.close()
960+
961+
962+
def test_append_create_table(bucket, redshift_parameters):
963+
df = pd.DataFrame({
964+
"id": [1, 2, 3],
965+
"decimal_2": [Decimal((0, (1, 9, 9), -2)), None, Decimal((0, (1, 9, 0), -2))],
966+
"decimal_5": [Decimal((0, (1, 9, 9, 9, 9, 9), -5)), None,
967+
Decimal((0, (1, 9, 0, 0, 0, 0), -5))],
968+
})
969+
con = wr.redshift.get_connection("aws-data-wrangler-redshift")
970+
cursor = con.cursor()
971+
cursor.execute("DROP TABLE IF EXISTS public.test_append_create_table")
972+
cursor.close()
973+
path = f"s3://{bucket}/test_append_create_table/"
974+
wr.pandas.to_redshift(
975+
dataframe=df,
976+
path=path,
977+
schema="public",
978+
table="test_append_create_table",
979+
connection=con,
980+
iam_role=redshift_parameters.get("RedshiftRole"),
981+
mode="append",
982+
preserve_index=False,
983+
)
984+
cursor = con.cursor()
985+
cursor.execute("SELECT * from public.test_append_create_table")
986+
rows = cursor.fetchall()
987+
cursor.close()
988+
con.close()
989+
assert len(df.index) == len(rows)
990+
assert len(list(df.columns)) == len(list(rows[0]))
991+
for row in rows:
992+
if row[0] == 1:
993+
assert row[1] == Decimal((0, (1, 9, 9), -2))
994+
assert row[2] == Decimal((0, (1, 9, 9, 9, 9, 9), -5))
995+
elif row[1] == 2:
996+
assert row[1] is None
997+
assert row[2] is None
998+
elif row[2] == 3:
999+
assert row[1] == Decimal((0, (1, 9, 0), -2))
1000+
assert row[2] == Decimal((0, (1, 9, 0, 0, 0, 0), -5))

0 commit comments

Comments
 (0)