1- from typing import List , Tuple , Dict , Any
1+ from typing import List , Tuple , Dict , Any , Optional
22import logging
33import os
44
1818class Spark :
1919 def __init__ (self , session ):
2020 self ._session = session
21- cpus : int = os .cpu_count ()
22- if cpus == 1 :
23- self ._procs_io_bound : int = 1
24- else :
25- self ._procs_io_bound = int (cpus / 2 )
21+ self ._procs_io_bound : int = 1
2622 logging .info (f"_procs_io_bound: { self ._procs_io_bound } " )
2723
2824 def read_csv (self , ** args ) -> DataFrame :
@@ -61,9 +57,9 @@ def to_redshift(
6157 table : str ,
6258 iam_role : str ,
6359 diststyle : str = "AUTO" ,
64- distkey = None ,
60+ distkey : Optional [ str ] = None ,
6561 sortstyle : str = "COMPOUND" ,
66- sortkey = None ,
62+ sortkey : Optional [ str ] = None ,
6763 min_num_partitions : int = 200 ,
6864 mode : str = "append" ,
6965 ) -> None :
@@ -87,7 +83,7 @@ def to_redshift(
8783 logger .debug (f"Minimum number of partitions : { min_num_partitions } " )
8884 if path [- 1 ] != "/" :
8985 path += "/"
90- self ._session .s3 .delete_objects (path = path )
86+ self ._session .s3 .delete_objects (path = path , procs_io_bound = self . _procs_io_bound )
9187 spark : SparkSession = self ._session .spark_session
9288 casts : Dict [str , str ] = Spark ._extract_casts (dataframe .dtypes )
9389 dataframe = Spark .date2timestamp (dataframe )
@@ -125,9 +121,9 @@ def write(pandas_dataframe: pd.DataFrame) -> pd.DataFrame:
125121 cast_columns = casts )
126122 return pd .DataFrame .from_dict ({"objects_paths" : paths })
127123
128- df_objects_paths = dataframe .repartition (numPartitions = num_partitions ) # type: ignore
129- df_objects_paths = df_objects_paths .withColumn (par_col_name , spark_partition_id ()) # type: ignore
130- df_objects_paths = df_objects_paths .groupby (par_col_name ).apply (write ) # type: ignore
124+ df_objects_paths : DataFrame = dataframe .repartition (numPartitions = num_partitions ) # type: ignore
125+ df_objects_paths : DataFrame = df_objects_paths .withColumn (par_col_name , spark_partition_id ()) # type: ignore
126+ df_objects_paths : DataFrame = df_objects_paths .groupby (par_col_name ).apply (write ) # type: ignore
131127
132128 objects_paths : List [str ] = list (df_objects_paths .toPandas ()["objects_paths" ])
133129 dataframe .unpersist ()
@@ -155,7 +151,7 @@ def write(pandas_dataframe: pd.DataFrame) -> pd.DataFrame:
155151 sortkey = sortkey ,
156152 mode = mode ,
157153 cast_columns = casts )
158- self ._session .s3 .delete_objects (path = path )
154+ self ._session .s3 .delete_objects (path = path , procs_io_bound = self . _procs_io_bound )
159155
160156 def create_glue_table (self ,
161157 database ,
0 commit comments