@@ -949,3 +949,153 @@ def test_read_sql_athena_with_nulls(session, bucket, database):
949949 assert df2 .dtypes [1 ] == "bool"
950950 assert df2 .dtypes [2 ] == "object"
951951 session .s3 .delete_objects (path = path )
952+
953+
954+ def test_partition_date (session , bucket , database ):
955+ df = pd .DataFrame ({
956+ "col1" : ["val1" , "val2" ],
957+ "datecol" : ["2019-11-09" , "2019-11-08" ],
958+ 'partcol' : ["2019-11-09" , "2019-11-08" ]
959+ })
960+ df ["datecol" ] = pd .to_datetime (df .datecol ).dt .date
961+ df ["partcol" ] = pd .to_datetime (df .partcol ).dt .date
962+ print (df )
963+ print (df .dtypes )
964+ path = f"s3://{ bucket } /test/"
965+ session .pandas .to_parquet (dataframe = df ,
966+ database = database ,
967+ path = path ,
968+ partition_cols = ["datecol" ],
969+ preserve_index = False ,
970+ mode = "overwrite" )
971+ df2 = None
972+ for counter in range (10 ):
973+ df2 = session .pandas .read_sql_athena (sql = "select * from test" , database = database )
974+ assert len (list (df .columns )) == len (list (df2 .columns ))
975+ if len (df .index ) == len (df2 .index ):
976+ break
977+ sleep (1 )
978+ assert len (df .index ) == len (df2 .index )
979+ print (df2 )
980+ print (df2 .dtypes )
981+ assert df2 .dtypes [0 ] == "object"
982+ assert df2 .dtypes [1 ] == "object"
983+ assert df2 .dtypes [2 ] == "object"
984+ session .s3 .delete_objects (path = path )
985+
986+
987+ def test_partition_cast_date (session , bucket , database ):
988+ df = pd .DataFrame ({
989+ "col1" : ["val1" , "val2" ],
990+ "datecol" : ["2019-11-09" , "2019-11-08" ],
991+ "partcol" : ["2019-11-09" , "2019-11-08" ]
992+ })
993+ print (df )
994+ print (df .dtypes )
995+ path = f"s3://{ bucket } /test/"
996+ schema = {
997+ "col1" : "string" ,
998+ "datecol" : "date" ,
999+ "partcol" : "date" ,
1000+ }
1001+ session .pandas .to_parquet (dataframe = df ,
1002+ database = database ,
1003+ path = path ,
1004+ partition_cols = ["partcol" ],
1005+ preserve_index = False ,
1006+ cast_columns = schema ,
1007+ mode = "overwrite" )
1008+ df2 = None
1009+ for counter in range (10 ):
1010+ df2 = session .pandas .read_sql_athena (sql = "select * from test" , database = database )
1011+ assert len (list (df .columns )) == len (list (df2 .columns ))
1012+ if len (df .index ) == len (df2 .index ):
1013+ break
1014+ sleep (1 )
1015+ assert len (df .index ) == len (df2 .index )
1016+ print (df2 )
1017+ print (df2 .dtypes )
1018+ assert df2 .dtypes [0 ] == "object"
1019+ assert df2 .dtypes [1 ] == "object"
1020+ assert df2 .dtypes [2 ] == "object"
1021+ session .s3 .delete_objects (path = path )
1022+
1023+
1024+ def test_partition_cast_timestamp (session , bucket , database ):
1025+ df = pd .DataFrame ({
1026+ "col1" : ["val1" , "val2" ],
1027+ "datecol" : ["2019-11-09" , "2019-11-08" ],
1028+ "partcol" : ["2019-11-09" , "2019-11-08" ]
1029+ })
1030+ print (df )
1031+ print (df .dtypes )
1032+ path = f"s3://{ bucket } /test/"
1033+ schema = {
1034+ "col1" : "string" ,
1035+ "datecol" : "timestamp" ,
1036+ "partcol" : "timestamp" ,
1037+ }
1038+ session .pandas .to_parquet (dataframe = df ,
1039+ database = database ,
1040+ path = path ,
1041+ partition_cols = ["partcol" ],
1042+ preserve_index = False ,
1043+ cast_columns = schema ,
1044+ mode = "overwrite" )
1045+ df2 = None
1046+ for counter in range (10 ):
1047+ df2 = session .pandas .read_sql_athena (sql = "select * from test" , database = database )
1048+ assert len (list (df .columns )) == len (list (df2 .columns ))
1049+ if len (df .index ) == len (df2 .index ):
1050+ break
1051+ sleep (1 )
1052+ assert len (df .index ) == len (df2 .index )
1053+ print (df2 )
1054+ print (df2 .dtypes )
1055+ assert str (df2 .dtypes [0 ]) == "object"
1056+ assert str (df2 .dtypes [1 ]).startswith ("datetime64" )
1057+ assert str (df2 .dtypes [2 ]).startswith ("datetime64" )
1058+ session .s3 .delete_objects (path = path )
1059+
1060+
1061+ def test_partition_cast (session , bucket , database ):
1062+ df = pd .DataFrame ({
1063+ "col1" : ["val1" , "val2" ],
1064+ "datecol" : ["2019-11-09" , "2019-11-08" ],
1065+ "partcol" : ["2019-11-09" , "2019-11-08" ],
1066+ "col_double" : ["1.0" , "1.1" ],
1067+ "col_bool" : ["True" , "False" ],
1068+ })
1069+ print (df )
1070+ print (df .dtypes )
1071+ path = f"s3://{ bucket } /test/"
1072+ schema = {
1073+ "col1" : "string" ,
1074+ "datecol" : "timestamp" ,
1075+ "partcol" : "timestamp" ,
1076+ "col_double" : "double" ,
1077+ "col_bool" : "boolean" ,
1078+ }
1079+ session .pandas .to_parquet (dataframe = df ,
1080+ database = database ,
1081+ path = path ,
1082+ partition_cols = ["partcol" ],
1083+ preserve_index = False ,
1084+ cast_columns = schema ,
1085+ mode = "overwrite" )
1086+ df2 = None
1087+ for counter in range (10 ):
1088+ df2 = session .pandas .read_sql_athena (sql = "select * from test" , database = database )
1089+ assert len (list (df .columns )) == len (list (df2 .columns ))
1090+ if len (df .index ) == len (df2 .index ):
1091+ break
1092+ sleep (1 )
1093+ assert len (df .index ) == len (df2 .index )
1094+ print (df2 )
1095+ print (df2 .dtypes )
1096+ assert df2 .dtypes [0 ] == "object"
1097+ assert str (df2 .dtypes [1 ]).startswith ("datetime" )
1098+ assert str (df2 .dtypes [2 ]).startswith ("float" )
1099+ assert str (df2 .dtypes [3 ]).startswith ("bool" )
1100+ assert str (df2 .dtypes [4 ]).startswith ("datetime" )
1101+ session .s3 .delete_objects (path = path )
0 commit comments