Skip to content

Commit 0569539

Browse files
author
Anand Chandak
committed
Oracle NoSQL Database Integration
Oracle NoSQL Database Integration
1 parent 4e6f23e commit 0569539

File tree

5 files changed

+304
-0
lines changed

5 files changed

+304
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ You must have Set Up Your Tenancy and be able to Access Data Flow
3232
| CSV to Parquet |This application shows how to use PySpark to convert CSV data store in OCI Object Store to Apache Parquet format which is then written back to Object Store. |[CSV to Parquet](./python/csv_to_parquet)| [CSV to Parquet](./java/csv_to_parquet)| [CSV to Parquet](./scala/csv_to_parquet)|
3333
| Load to ADW |This application shows how to read a file from OCI Object Store, perform some transformation and write the results to an Autonomous Data Warehouse instance. |[Load to ADW](./python/loadadw)| [Load to ADW](./java/loadadw)|[Load to ADW](./scala/loadadw)|
3434
| Random Forest Regression |This application shows how to build a model and make prediction using Random Forest Regression. |[Random Forest Regression](./python/random_forest_regression)|
35+
| Oracle NoSQL Database cloud service |This application shows how to interface with Oracle NoSQL Database cloud service. |[Oracle NoSQL Database cloud service](./python/nosql_example.py)|
3536

3637
For step-by-step instructions, see the README files included with each sample.
3738

python/oracle_nosql/.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
__pycache__
2+
logs
3+
archive.zip

python/oracle_nosql/README.md

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Overview
2+
3+
This example shows you how to use OCI Data Flow to interface with Oracle NoSQL Database Cloud Service.
4+
5+
## Prerequisites
6+
7+
Before you begin:
8+
9+
1. Ensure your tenant is configured for Data Flow by following [instructions](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#set_up_admin)
10+
2. Provision an Oracle NoSQL Database cloud service table.
11+
3. Download the Oracle NoSQL Database python sdk. The home for the project is [here](https://nosql-python-sdk.readthedocs.io/en/stable/index.html)
12+
The SDK can be installed using pip:
13+
14+
```bash
15+
pip install borneo
16+
```
17+
18+
* See [the installation guide](https://nosql-python-sdk.readthedocs.io/en/stable/installation.html) for additional requirements and and alternative install methods.
19+
20+
4. (Optional, strongly recommended): Install Spark to test your code locally before deploying to Data Flow.
21+
22+
## Application Setup
23+
24+
Customize ```nosql_example.py``` with:
25+
26+
* Set COMPARTMENT_ID to the Oracle NoSQL Database Cloud service table.
27+
* Set ENDPOINT to region that has Oracle NoSQL Database Cloud service table for e.g. ``us-ashburn-1``.
28+
* Set TABLE_NAME to the table in Oracle NoSQL Database cloud service table.
29+
* Set INDEX_NAME to the name of the index to create in Oracle NoSQL Database cloud service table.
30+
31+
## Testing Locally
32+
33+
Test the Application Locally (recommended):
34+
35+
```bash
36+
python nosql_example.py
37+
```
38+
39+
## Packaging your Application
40+
41+
* Create the Data Flow Dependencies Archive as follows:
42+
43+
```bash
44+
docker pull phx.ocir.io/oracle/dataflow/dependency-packager:latest
45+
docker run --rm -v $(pwd):/opt/dataflow -it phx.ocir.io/oracle/dataflow/dependency-packager:latest
46+
```
47+
48+
* Confirm you have a file named **archive.zip** with the Oracle NoSQL Database python SDK in it.
49+
50+
## Deploy and Run the Application
51+
52+
* Copy ```nosql_example.py``` to object store.
53+
* Copy the ```archive.zip``` generated while packaging the application to object store.
54+
* Create a Data Flow Python application. Be sure to include archive.zip as the dependency archive.
55+
* Refer [here](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_data_flow_library.htm#create_pyspark_app) for more information.
56+
* Run the application.
57+
58+
## Run the Application using OCI Cloud Shell or OCI CLI
59+
60+
Create a bucket. Alternatively you can re-use an existing bucket.
61+
62+
```sh
63+
oci os object put --bucket-name <bucket> --file nosql_example.py
64+
oci os object put --bucket-name <bucket> --file archive.zip
65+
oci data-flow application create \
66+
--compartment-id <compartment_ocid> \
67+
--display-name "Oracle NoSQL Example" \
68+
--driver-shape VM.Standard2.1 \
69+
--executor-shape VM.Standard2.1 \
70+
--num-executors 1 \
71+
--spark-version 2.4.4 \
72+
--file-uri oci://<bucket>@<namespace>/nosql_example.py \
73+
--archive-uri oci://<bucket>@<namespace>/archive.zip \
74+
--language Python
75+
oci data-flow run create \
76+
--application-id <application_ocid> \
77+
--compartment-id <compartment_ocid> \
78+
--application-id <application_ocid> \
79+
--display-name 'Oracle NoSQL Example"
80+
```

python/oracle_nosql/nosql_example.py

+218
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
#!/usr/bin/env python3
2+
3+
import os
4+
import sys
5+
6+
from borneo import (GetIndexesRequest, GetTableRequest, ListTablesRequest,
7+
NoSQLHandle, NoSQLHandleConfig, TableLimits, TableRequest,
8+
TableUsageRequest)
9+
from borneo.iam import SignatureProvider
10+
from pyspark import SparkConf
11+
from pyspark.sql import SparkSession, SQLContext
12+
13+
14+
def main():
15+
# You can hard code your own values here if you want.
16+
COMPARTMENT_ID = "ocid1.compartment.oc1..aaaaaaaati55ggp45kgnterqwveayuyioyhz7hw7f46umo277mn5vecpny6q"
17+
ENDPOINT = "us-ashburn-1"
18+
TABLE_NAME = "pysparktable"
19+
INDEX_NAME = "pythonindex"
20+
try:
21+
COMPARTMENT_ID = sys.argv[1]
22+
ENDPOINT = sys.argv[2]
23+
TABLE_NAME = sys.argv[3]
24+
INDEX_NAME = sys.argv[4]
25+
except:
26+
pass
27+
28+
# Set up Spark.
29+
spark_session = get_dataflow_spark_session()
30+
31+
# Get our IAM signer.
32+
token_path = get_delegation_token_path(spark_session)
33+
signer = get_signer(token_path)
34+
35+
# The Handle to our table.
36+
provider = SignatureProvider(provider=signer)
37+
38+
# XXX: This needs to get fixed.
39+
provider.region = ENDPOINT.upper().replace("-", "_")
40+
41+
config = NoSQLHandleConfig(ENDPOINT, provider).set_default_compartment(
42+
COMPARTMENT_ID
43+
)
44+
45+
handle = NoSQLHandle(config)
46+
try:
47+
# List any existing tables for this tenant
48+
print("Listing tables")
49+
ltr = ListTablesRequest()
50+
lr_result = handle.list_tables(ltr)
51+
print("Existing tables: " + str(lr_result))
52+
53+
# Create a table
54+
statement = (
55+
"Create table if not exists "
56+
+ TABLE_NAME
57+
+ "(id integer, \
58+
sid integer, name string, primary key(shard(sid), id))"
59+
)
60+
print("Creating table: " + statement)
61+
request = (
62+
TableRequest()
63+
.set_statement(statement)
64+
.set_table_limits(TableLimits(30, 10, 1))
65+
)
66+
handle.do_table_request(request, 40000, 3000)
67+
print("After create table")
68+
69+
# Create an index
70+
statement = (
71+
"Create index if not exists " + INDEX_NAME + " on " + TABLE_NAME + "(name)"
72+
)
73+
print("Creating index: " + statement)
74+
request = TableRequest().set_statement(statement)
75+
handle.do_table_request(request, 40000, 3000)
76+
print("After create index")
77+
78+
# Get the table
79+
request = GetTableRequest().set_table_name(TABLE_NAME)
80+
result = handle.get_table(request)
81+
print("After get table: " + str(result))
82+
83+
# Get the indexes
84+
request = GetIndexesRequest().set_table_name(TABLE_NAME)
85+
result = handle.get_indexes(request)
86+
print("The indexes for: " + TABLE_NAME)
87+
for idx in result.get_indexes():
88+
print("\t" + str(idx))
89+
90+
# Get the table usage information
91+
request = TableUsageRequest().set_table_name(TABLE_NAME)
92+
result = handle.get_table_usage(request)
93+
print("The table usage information for: " + TABLE_NAME)
94+
for record in result.get_usage_records():
95+
print("\t" + str(record))
96+
finally:
97+
if handle is not None:
98+
handle.close()
99+
100+
101+
def get_dataflow_spark_session(
102+
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
103+
):
104+
"""
105+
Get a Spark session in a way that supports running locally or in Data Flow.
106+
"""
107+
if in_dataflow():
108+
spark_builder = SparkSession.builder.appName(app_name)
109+
else:
110+
# Import OCI.
111+
try:
112+
import oci
113+
except:
114+
raise Exception(
115+
"You need to install the OCI python library to test locally"
116+
)
117+
118+
# Use defaults for anything unset.
119+
if file_location is None:
120+
file_location = oci.config.DEFAULT_LOCATION
121+
if profile_name is None:
122+
profile_name = oci.config.DEFAULT_PROFILE
123+
124+
# Load the config file.
125+
try:
126+
oci_config = oci.config.from_file(
127+
file_location=file_location, profile_name=profile_name
128+
)
129+
except Exception as e:
130+
print("You need to set up your OCI config properly to run locally")
131+
raise e
132+
conf = SparkConf()
133+
conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
134+
conf.set("fs.oci.client.auth.userId", oci_config["user"])
135+
conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
136+
conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
137+
conf.set(
138+
"fs.oci.client.hostname",
139+
"https://objectstorage.{0}.oraclecloud.com".format(oci_config["region"]),
140+
)
141+
spark_builder = SparkSession.builder.appName(app_name).config(conf=conf)
142+
143+
# Add in extra configuration.
144+
for key, val in spark_config.items():
145+
spark_builder.config(key, val)
146+
147+
# Create the Spark session.
148+
session = spark_builder.getOrCreate()
149+
return session
150+
151+
152+
def get_signer(token_path, file_location=None, profile_name=None):
153+
"""
154+
Automatically get a local or delegation token signer.
155+
156+
Example: get_signer(token_path)
157+
"""
158+
import oci
159+
160+
if not in_dataflow():
161+
# We are running locally, use our API Key.
162+
if file_location is None:
163+
file_location = oci.config.DEFAULT_LOCATION
164+
if profile_name is None:
165+
profile_name = oci.config.DEFAULT_PROFILE
166+
config = oci.config.from_file(
167+
file_location=file_location, profile_name=profile_name
168+
)
169+
signer = oci.signer.Signer(
170+
tenancy=config["tenancy"],
171+
user=config["user"],
172+
fingerprint=config["fingerprint"],
173+
private_key_file_location=config["key_file"],
174+
pass_phrase=config["pass_phrase"],
175+
)
176+
else:
177+
# We are running in Data Flow, use our Delegation Token.
178+
with open(token_path) as fd:
179+
delegation_token = fd.read()
180+
signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
181+
delegation_token=delegation_token
182+
)
183+
return signer
184+
185+
186+
def in_dataflow():
187+
"""
188+
Determine if we are running in OCI Data Flow by checking the environment.
189+
"""
190+
if os.environ.get("HOME") == "/home/dataflow":
191+
return True
192+
return False
193+
194+
195+
def get_delegation_token_path(spark):
196+
"""
197+
Get the delegation token path when we're running in Data Flow.
198+
"""
199+
if not in_dataflow():
200+
return None
201+
token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
202+
token_path = spark.sparkContext.getConf().get(token_key)
203+
if not token_path:
204+
raise Exception(f"{token_key} is not set")
205+
return token_path
206+
207+
208+
def get_temporary_directory():
209+
if in_dataflow():
210+
return "/opt/spark/work-dir/"
211+
else:
212+
import tempfile
213+
214+
return tempfile.gettempdir()
215+
216+
217+
if __name__ == "__main__":
218+
main()

python/oracle_nosql/requirements.txt

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
borneo
2+
oci

0 commit comments

Comments
 (0)