diff --git a/java/metastore/.gitignore b/java/metastore/.gitignore new file mode 100644 index 0000000..d34e216 --- /dev/null +++ b/java/metastore/.gitignore @@ -0,0 +1,5 @@ +.classpath +.project +.settings +spark-warehouse +target diff --git a/java/metastore/README.md b/java/metastore/README.md new file mode 100644 index 0000000..bac6afe --- /dev/null +++ b/java/metastore/README.md @@ -0,0 +1,63 @@ +# Convert CSV data to Parquet + +The most common first step in data processing applications, is to take data from some source and get it into a format that is suitable for reporting and other forms of analytics. In a database, you would load a flat file into the database and create indexes. In Spark, your first step is usually to clean and convert data from a text format into Parquet format. Parquet is an optimized binary format supporting efficient reads, making it ideal for reporting and analytics. + +Before you begin: + +* Ensure your tenant is configured according to the instructions to [setup admin](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#set_up_admin) +* Know your object store namespace. +* Know the OCID of a compartment where you want to load your data and create applications. +* (Optional, strongly recommended): Install Spark to test your code locally before deploying. + +## Instructions + +1. Upload a sample CSV file to object store +2. Customize(if required) ```src/main/java/example/adw/MetastoreToADW.java``` with the OCI path to your CSV data. The format is ```oci://@/path```\ + 2a. Don't know what your namespace is? Run ```oci os ns get```\ + 2b. Don't have the OCI CLI installed? [See](https://docs.cloud.oracle.com/en-us/iaas/Content/API/SDKDocs/cliinstall.htm) to install it. +3. Customize ```src/main/java/example/adw/MetastoreToADW.java``` with the OCI path where you would like to save output data. +4. Compile with MVN to generate the jar file ```metastore-1.0-SNAPSHOT.jar```. +5. Recommended: run the sample locally/code editor to test it. +6. Upload the JAR file ```csv_to_parquet-1.0-SNAPSHOT.jar``` to an object store bucket. +7. Create a Java Data Flow application pointing to the JAR file ```metastore-1.0-SNAPSHOT.jar``` + 7a. Refer [Create Java App](https://docs.oracle.com/en-us/iaas/data-flow/using/dfs_data_flow_library.htm#create_java_app) + +## To Compile + +```sh +mvn clean package +``` + +## To Test Locally + +```shspark-submit --properties-file spark-properties.conf --jars "" --conf spark.driver.extraJavaOptions="-Djava.io.tmpdir=" --conf spark.executor.extraJavaOptions="-Djava.io.tmpdir=" --conf spark.oracle.datasource.enabled=true --conf spark.sql.warehouse.dir= --conf spark.hadoop.oracle.dcat.metastore.id= --conf spark.hadoop.OCI_TENANT_METADATA= --conf spark.hadoop.OCI_USER_METADATA= --conf spark.hadoop.OCI_FINGERPRINT_METADATA= --conf spark.hadoop.OCI_PVT_KEY_FILE_PATH= --conf spark.hadoop.fs.oci.client.auth.tenantId= --conf spark.hadoop.fs.oci.client.auth.userId= --conf spark.hadoop.fs.oci.client.auth.fingerprint= --conf spark.hadoop.fs.oci.client.auth.pemfilepath= --conf spark.hadoop.OCI_REGION_METADATA= --conf spark.hadoop.fs.oci.client.hostname= --conf spark.hadoop.oci.metastore.uris= --class eexample.adw.MetastoreToADW metastore-1.0-SNAPSHOT.jar +``` + +## To use OCI CLI to run the Java Application + +Create a bucket. Alternatively you can re-use an existing bucket. + +```sh +oci os bucket create --name --compartment-id +oci os object put --bucket-name --file csv_to_parquet.py +oci data-flow application create \ + --compartment-id \ + --display-name "Metastore to ADW" + --driver-shape VM.Standard2.1 \ + --executor-shape VM.Standard2.1 \ + --num-executors 1 \ + --spark-version 3.2.1 \ + --file-uri oci://@/metastore-1.0-SNAPSHOT.jar \ + --arguments + --language Java \ + -class-name eexample.adw.MetastoreToADW +``` + +Make note of the Application ID produced. + +```sh +oci data-flow run create \ + --compartment-id \ + --application-id \ + --display-name "Metastore to ADW" +``` diff --git a/java/metastore/pom.xml b/java/metastore/pom.xml new file mode 100644 index 0000000..285394b --- /dev/null +++ b/java/metastore/pom.xml @@ -0,0 +1,158 @@ + + + 4.0.0 + + example + metastore + 1.0-SNAPSHOT + + + UTF-8 + UTF-8 + 2.12.0 + + + + + + org.scala-lang + scala-library + 2.12.15 + + + + + org.apache.spark + spark-core_2.12 + 3.2.1 + provided + + + + org.apache.spark + spark-sql_2.12 + 3.2.1 + provided + + + + com.oracle.oci.sdk + oci-java-sdk-core + ${oci-java-sdk-version} + + + com.oracle.oci.sdk + oci-java-sdk-objectstorage + ${oci-java-sdk-version} + + + com.oracle.oci.sdk + oci-java-sdk-secrets + ${oci-java-sdk-version} + + + + + install + src/main/java + src/test/java + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-resources-plugin + 3.0.2 + + UTF-8 + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + + compile + testCompile + + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + package + + shade + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.bouncycastle:bcpkix-jdk15on + org.bouncycastle:bcprov-jdk15on + + com.google.code.findbugs:jsr305 + org.glassfish.jersey.core:jersey-common + + + + + + + + \ No newline at end of file diff --git a/java/metastore/src/main/java/example/adw/MetastoreToADW.java b/java/metastore/src/main/java/example/adw/MetastoreToADW.java new file mode 100644 index 0000000..4574f82 --- /dev/null +++ b/java/metastore/src/main/java/example/adw/MetastoreToADW.java @@ -0,0 +1,147 @@ +package example.adw; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.DataTypes; + +public class MetastoreToADW { + + public static void main(String args[]) { + SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate(); + String OS_BUCKET = "oci://@/"; + String relativeInputPath = ""; + String relativeOutputPath = ""; + String databaseName = ""; + String tableName = ""; + //var adwDetailsObj: ADWDetails = null + System.out.println("Received args -> "); + System.out.println(String.join(" ", args)); + ADWDetails adwDetailsObj = null; + if (args.length > 0) { + OS_BUCKET = args[0].trim(); + relativeInputPath = args[1].trim(); + relativeOutputPath = args[2].trim(); + if (args.length > 3) { + databaseName = args[3].trim(); + tableName = args[4].trim(); + } + if (args.length > 5) { + adwDetailsObj = new ADWDetails(); + adwDetailsObj.walletPath = args[5].trim(); + adwDetailsObj.user = args[6].trim(); + adwDetailsObj.tnsName = args[7].trim(); + adwDetailsObj.secretValue = args[8].trim(); + System.out.println("ADW details received: " + adwDetailsObj.toString()); + } + } + System.out.println("OS_BUCKET -> " + OS_BUCKET); + if (!OS_BUCKET.endsWith("/")) { + OS_BUCKET = OS_BUCKET + "/"; + } + // Use Case 1: Read csv from object storage + System.out.println("Step 1: Read csv from object storage"); + Dataset df = spark.read().option("header", "true").csv(OS_BUCKET + relativeInputPath); + System.out.println("Reading data from object storage !"); + df.show(false); + System.out.println( + "================================================================================================"); + // Use Case 2: Write csv data into Metastore + System.out.println("Step 2: Write csv data into Metastore"); + spark.sql("show databases").show(30, false); + Dataset databasesDf = spark.sql("show databases"); + if (databasesDf.filter(org.apache.spark.sql.functions.col("namespace") + .contains(org.apache.spark.sql.functions.lit(databaseName))).count() > 0) { + System.out.println("Database: " + databaseName + " present !"); + } else { + System.out.println("Database: " + databaseName + " absent, creating !"); + spark.sql("create database IF NOT EXISTS " + databaseName); + System.out.println("Successfully created database: " + databaseName); + System.out.println("List of databases -> "); + databasesDf.show(false); + } + spark.sql("use " + databaseName); + spark.sql("show tables").show(30, false); + df.write().mode("overwrite").saveAsTable(databaseName + "." + tableName); + System.out.println("Wrote data in Database: " + databaseName + " ; table: " + tableName); + System.out.println( + "================================================================================================"); + // Use Case 3: Read data from Metastore and write into ADW + System.out.println("Step 3: Read data from Metastore and write into ADW"); + Dataset tableDf = spark.sql("select * from " + databaseName + "." + tableName); + System.out.println("Reading data from metastore !"); + tableDf.show(false); + if (!spark.conf().getAll().contains("spark.oracle.datasource.enabled") || + spark.conf().get("spark.oracle.datasource.enabled").equalsIgnoreCase("false")) { + return; + } + System.out.println("Writing data into ADW"); + tableDf.write().format("oracle") + //.option("walletUri", adwDetailsObj.getWalletPath()) + .option("adbId", adwDetailsObj.getWalletPath()) + .option("connectionId", adwDetailsObj.getTnsName()) + .option("dbtable", "sample") + .option("user", adwDetailsObj.getUser()) + .option("password", adwDetailsObj.getSecretValue()) + .mode("Overwrite").save(); + System.out.println("Reading data from ADW -> "); + Dataset adwDf = spark.read().format("oracle") + //.option("walletUri", adwDetailsObj.getWalletPath()) + .option("adbId", adwDetailsObj.getWalletPath()) + .option("connectionId", adwDetailsObj.getTnsName()) + .option("dbtable", "sample") + .option("user", adwDetailsObj.getUser()) + .option("password", adwDetailsObj.getSecretValue()) + .load(); + adwDf.show(false); + } + + static class ADWDetails { + + public ADWDetails() { + } + + public String getWalletPath() { + return walletPath; + } + + public void setWalletPath(String walletPath) { + this.walletPath = walletPath; + } + + public String getTnsName() { + return tnsName; + } + + public void setTnsName(String tnsName) { + this.tnsName = tnsName; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getSecretValue() { + return secretValue; + } + + public void setSecretValue(String secretValue) { + this.secretValue = secretValue; + } + + String walletPath; + String tnsName; + String user; + String secretValue; + } +} \ No newline at end of file diff --git a/java/metastore/src/main/resources/log4j.properties b/java/metastore/src/main/resources/log4j.properties new file mode 100644 index 0000000..5333973 --- /dev/null +++ b/java/metastore/src/main/resources/log4j.properties @@ -0,0 +1,9 @@ +log4j.rootCategory=WARN, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN +