Skip to content

Sample to read csv from Objstrg write and read from OCI metastore/ADW #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions java/metastore/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.classpath
.project
.settings
spark-warehouse
target
63 changes: 63 additions & 0 deletions java/metastore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Convert CSV data to Parquet

The most common first step in data processing applications, is to take data from some source and get it into a format that is suitable for reporting and other forms of analytics. In a database, you would load a flat file into the database and create indexes. In Spark, your first step is usually to clean and convert data from a text format into Parquet format. Parquet is an optimized binary format supporting efficient reads, making it ideal for reporting and analytics.

Before you begin:

* Ensure your tenant is configured according to the instructions to [setup admin](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#set_up_admin)
* Know your object store namespace.
* Know the OCID of a compartment where you want to load your data and create applications.
* (Optional, strongly recommended): Install Spark to test your code locally before deploying.

## Instructions

1. Upload a sample CSV file to object store
2. Customize(if required) ```src/main/java/example/adw/MetastoreToADW.java``` with the OCI path to your CSV data. The format is ```oci://<bucket>@<namespace>/path```\
2a. Don't know what your namespace is? Run ```oci os ns get```\
2b. Don't have the OCI CLI installed? [See](https://docs.cloud.oracle.com/en-us/iaas/Content/API/SDKDocs/cliinstall.htm) to install it.
3. Customize ```src/main/java/example/adw/MetastoreToADW.java``` with the OCI path where you would like to save output data.
4. Compile with MVN to generate the jar file ```metastore-1.0-SNAPSHOT.jar```.
5. Recommended: run the sample locally/code editor to test it.
6. Upload the JAR file ```csv_to_parquet-1.0-SNAPSHOT.jar``` to an object store bucket.
7. Create a Java Data Flow application pointing to the JAR file ```metastore-1.0-SNAPSHOT.jar```
7a. Refer [Create Java App](https://docs.oracle.com/en-us/iaas/data-flow/using/dfs_data_flow_library.htm#create_java_app)

## To Compile

```sh
mvn clean package
```

## To Test Locally

```shspark-submit --properties-file spark-properties.conf --jars "<ojdbc8-21.7.0.0,oraclepki-21.7.0.0,osdt_cert-21.7.0.0,osdt_core-21.7.0.0,ucp-21.7.0.0>" --conf spark.driver.extraJavaOptions="-Djava.io.tmpdir=<anyTempLocationWithReadAndWritePermission>" --conf spark.executor.extraJavaOptions="-Djava.io.tmpdir=<anyTempLocationWithReadAndWritePermission>" --conf spark.oracle.datasource.enabled=true --conf spark.sql.warehouse.dir=<warehouseDir> --conf spark.hadoop.oracle.dcat.metastore.id=<metastoreId> --conf spark.hadoop.OCI_TENANT_METADATA=<tenantId> --conf spark.hadoop.OCI_USER_METADATA=<userId> --conf spark.hadoop.OCI_FINGERPRINT_METADATA=<fingerPrint> --conf spark.hadoop.OCI_PVT_KEY_FILE_PATH=<privateKeyPemFile> --conf spark.hadoop.fs.oci.client.auth.tenantId=<tenantId> --conf spark.hadoop.fs.oci.client.auth.userId=<userId> --conf spark.hadoop.fs.oci.client.auth.fingerprint=<fingerPrint> --conf spark.hadoop.fs.oci.client.auth.pemfilepath=<privateKeyPemFile> --conf spark.hadoop.OCI_REGION_METADATA=<region> --conf spark.hadoop.fs.oci.client.hostname=<hostName> --conf spark.hadoop.oci.metastore.uris=<metastore_uri> --class eexample.adw.MetastoreToADW metastore-1.0-SNAPSHOT.jar <csv_file_path> <csv_file_name> <object_storage_output_path> <databses_name> <table_name> <object_storage_adb_wallet_path/adbID> <adb_user> <adb_connection_id> <adb_password>
```

## To use OCI CLI to run the Java Application

Create a bucket. Alternatively you can re-use an existing bucket.

```sh
oci os bucket create --name <bucket> --compartment-id <compartment_ocid>
oci os object put --bucket-name <bucket> --file csv_to_parquet.py
oci data-flow application create \
--compartment-id <compartment_ocid> \
--display-name "Metastore to ADW"
--driver-shape VM.Standard2.1 \
--executor-shape VM.Standard2.1 \
--num-executors 1 \
--spark-version 3.2.1 \
--file-uri oci://<bucket>@<namespace>/metastore-1.0-SNAPSHOT.jar \
--arguments <csv_file_path> <csv_file_name> <object_storage_output_path> <databses_name> <table_name> <object_storage_adb_wallet_path/adbID> <adb_user> <adb_connection_id> <adb_password>
--language Java \
-class-name eexample.adw.MetastoreToADW
```

Make note of the Application ID produced.

```sh
oci data-flow run create \
--compartment-id <compartment_ocid> \
--application-id <application_ocid> \
--display-name "Metastore to ADW"
```
158 changes: 158 additions & 0 deletions java/metastore/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>example</groupId>
<artifactId>metastore</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<oci-java-sdk-version>2.12.0</oci-java-sdk-version>
</properties>
<dependencies>

<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.15</version>
</dependency>

<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-core</artifactId>
<version>${oci-java-sdk-version}</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-objectstorage</artifactId>
<version>${oci-java-sdk-version}</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-secrets</artifactId>
<version>${oci-java-sdk-version}</version>
</dependency>
</dependencies>

<build>
<defaultGoal>install</defaultGoal>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>

<plugins>

<!-- the Maven compiler plugin will compile Java source files -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>

<!-- the Maven Scala plugin will compile Scala source files -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- package dependency jar -->

<!--<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.oracle.delta.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> &lt;!&ndash; this is used for inheritance merges &ndash;&gt;
<phase>package</phase> &lt;!&ndash; bind to the packaging phase &ndash;&gt;
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.bouncycastle:bcpkix-jdk15on</exclude>
<exclude>org.bouncycastle:bcprov-jdk15on</exclude>
<!-- Including jsr305 in the shaded jar causes a SecurityException
due to signer mismatch for class "javax.annotation.Nonnull" -->
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.glassfish.jersey.core:jersey-common</exclude>
</excludes>
</artifactSet>
</configuration>
</plugin>
</plugins>
</build>

</project>
147 changes: 147 additions & 0 deletions java/metastore/src/main/java/example/adw/MetastoreToADW.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package example.adw;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;

public class MetastoreToADW {

public static void main(String args[]) {
SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate();
String OS_BUCKET = "oci://<bucket>@<namespace>/";
String relativeInputPath = "<object storage location>";
String relativeOutputPath = "<object storage location>";
String databaseName = "<dbName>";
String tableName = "<table Name>";
//var adwDetailsObj: ADWDetails = null
System.out.println("Received args -> ");
System.out.println(String.join(" ", args));
ADWDetails adwDetailsObj = null;
if (args.length > 0) {
OS_BUCKET = args[0].trim();
relativeInputPath = args[1].trim();
relativeOutputPath = args[2].trim();
if (args.length > 3) {
databaseName = args[3].trim();
tableName = args[4].trim();
}
if (args.length > 5) {
adwDetailsObj = new ADWDetails();
adwDetailsObj.walletPath = args[5].trim();
adwDetailsObj.user = args[6].trim();
adwDetailsObj.tnsName = args[7].trim();
adwDetailsObj.secretValue = args[8].trim();
System.out.println("ADW details received: " + adwDetailsObj.toString());
}
}
System.out.println("OS_BUCKET -> " + OS_BUCKET);
if (!OS_BUCKET.endsWith("/")) {
OS_BUCKET = OS_BUCKET + "/";
}
// Use Case 1: Read csv from object storage
System.out.println("Step 1: Read csv from object storage");
Dataset<Row> df = spark.read().option("header", "true").csv(OS_BUCKET + relativeInputPath);
System.out.println("Reading data from object storage !");
df.show(false);
System.out.println(
"================================================================================================");
// Use Case 2: Write csv data into Metastore
System.out.println("Step 2: Write csv data into Metastore");
spark.sql("show databases").show(30, false);
Dataset<Row> databasesDf = spark.sql("show databases");
if (databasesDf.filter(org.apache.spark.sql.functions.col("namespace")
.contains(org.apache.spark.sql.functions.lit(databaseName))).count() > 0) {
System.out.println("Database: " + databaseName + " present !");
} else {
System.out.println("Database: " + databaseName + " absent, creating !");
spark.sql("create database IF NOT EXISTS " + databaseName);
System.out.println("Successfully created database: " + databaseName);
System.out.println("List of databases -> ");
databasesDf.show(false);
}
spark.sql("use " + databaseName);
spark.sql("show tables").show(30, false);
df.write().mode("overwrite").saveAsTable(databaseName + "." + tableName);
System.out.println("Wrote data in Database: " + databaseName + " ; table: " + tableName);
System.out.println(
"================================================================================================");
// Use Case 3: Read data from Metastore and write into ADW
System.out.println("Step 3: Read data from Metastore and write into ADW");
Dataset<Row> tableDf = spark.sql("select * from " + databaseName + "." + tableName);
System.out.println("Reading data from metastore !");
tableDf.show(false);
if (!spark.conf().getAll().contains("spark.oracle.datasource.enabled") ||
spark.conf().get("spark.oracle.datasource.enabled").equalsIgnoreCase("false")) {
return;
}
System.out.println("Writing data into ADW");
tableDf.write().format("oracle")
//.option("walletUri", adwDetailsObj.getWalletPath())
.option("adbId", adwDetailsObj.getWalletPath())
.option("connectionId", adwDetailsObj.getTnsName())
.option("dbtable", "sample")
.option("user", adwDetailsObj.getUser())
.option("password", adwDetailsObj.getSecretValue())
.mode("Overwrite").save();
System.out.println("Reading data from ADW -> ");
Dataset<Row> adwDf = spark.read().format("oracle")
//.option("walletUri", adwDetailsObj.getWalletPath())
.option("adbId", adwDetailsObj.getWalletPath())
.option("connectionId", adwDetailsObj.getTnsName())
.option("dbtable", "sample")
.option("user", adwDetailsObj.getUser())
.option("password", adwDetailsObj.getSecretValue())
.load();
adwDf.show(false);
}

static class ADWDetails {

public ADWDetails() {
}

public String getWalletPath() {
return walletPath;
}

public void setWalletPath(String walletPath) {
this.walletPath = walletPath;
}

public String getTnsName() {
return tnsName;
}

public void setTnsName(String tnsName) {
this.tnsName = tnsName;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getSecretValue() {
return secretValue;
}

public void setSecretValue(String secretValue) {
this.secretValue = secretValue;
}

String walletPath;
String tnsName;
String user;
String secretValue;
}
}
9 changes: 9 additions & 0 deletions java/metastore/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN