-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathcsv_to_parquet.py
92 lines (75 loc) · 2.95 KB
/
csv_to_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
# Copyright © 2021, Oracle and/or its affiliates.
# The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl.
import argparse
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input-path", required=True)
parser.add_argument("--output-path", required=True)
args = parser.parse_args()
# Set up Spark.
spark_session = get_dataflow_spark_session()
sql_context = SQLContext(spark_session)
# Load our data.
input_dataframe = sql_context.read.option("header", "true").csv(args.input_path)
# Save the results as Parquet.
input_dataframe.write.mode("overwrite").parquet(args.output_path)
# Show on the console that something happened.
print("Successfully converted {} rows to Parquet and wrote to {}.".format(input_dataframe.count(), args.output_path))
def get_dataflow_spark_session(
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""
if in_dataflow():
spark_builder = SparkSession.builder.appName(app_name)
else:
# Import OCI.
try:
import oci
except:
raise Exception(
"You need to install the OCI python library to test locally"
)
# Use defaults for anything unset.
if file_location is None:
file_location = oci.config.DEFAULT_LOCATION
if profile_name is None:
profile_name = oci.config.DEFAULT_PROFILE
# Load the config file.
try:
oci_config = oci.config.from_file(
file_location=file_location, profile_name=profile_name
)
except Exception as e:
print("You need to set up your OCI config properly to run locally")
raise e
conf = SparkConf()
conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
conf.set("fs.oci.client.auth.userId", oci_config["user"])
conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
conf.set(
"fs.oci.client.hostname",
"https://objectstorage.{0}.oraclecloud.com".format(oci_config["region"]),
)
spark_builder = SparkSession.builder.appName(app_name).config(conf=conf)
# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)
# Create the Spark session.
session = spark_builder.getOrCreate()
return session
def in_dataflow():
"""
Determine if we are running in OCI Data Flow by checking the environment.
"""
if os.environ.get("HOME") == "/home/dataflow":
return True
return False
if __name__ == "__main__":
main()