Skip to content

Commit e0337fe

Browse files
authored
Merge pull request #28 from edmundmiller/execute
Add execute functions
2 parents e503057 + c877be5 commit e0337fe

File tree

14 files changed

+895
-24
lines changed

14 files changed

+895
-24
lines changed

Makefile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
config ?= compileClasspath
2+
version ?= $(shell grep 'Plugin-Version' plugins/nf-sqldb/src/resources/META-INF/MANIFEST.MF | awk '{ print $$2 }')
23

34
ifdef module
45
mm = :${module}:
@@ -69,3 +70,9 @@ upload-plugins:
6970

7071
publish-index:
7172
./gradlew plugins:publishIndex
73+
74+
# Install the plugin into local nextflow plugins dir
75+
install:
76+
./gradlew copyPluginZip
77+
rm -rf ${HOME}/.nextflow/plugins/nf-sqldb-${version}
78+
cp -r build/plugins/nf-sqldb-${version} ${HOME}/.nextflow/plugins/nf-sqldb-${version}

README.md

Lines changed: 91 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ This plugin provides support for interacting with SQL databases in Nextflow scri
44

55
The following databases are currently supported:
66

7-
* [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md))
8-
* [DuckDB](https://duckdb.org/)
9-
* [H2](https://www.h2database.com)
10-
* [MySQL](https://www.mysql.com/)
11-
* [MariaDB](https://mariadb.org/)
12-
* [PostgreSQL](https://www.postgresql.org/)
13-
* [SQLite](https://www.sqlite.org/index.html)
7+
- [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md))
8+
- [DuckDB](https://duckdb.org/)
9+
- [H2](https://www.h2database.com)
10+
- [MySQL](https://www.mysql.com/)
11+
- [MariaDB](https://mariadb.org/)
12+
- [PostgreSQL](https://www.postgresql.org/)
13+
- [SQLite](https://www.sqlite.org/index.html)
1414

1515
NOTE: THIS IS A PREVIEW TECHNOLOGY, FEATURES AND CONFIGURATION SETTINGS CAN CHANGE IN FUTURE RELEASES.
1616

@@ -24,7 +24,6 @@ plugins {
2424
}
2525
```
2626

27-
2827
## Configuration
2928

3029
You can configure any number of databases under the `sql.db` configuration scope. For example:
@@ -79,7 +78,7 @@ The following options are available:
7978

8079
`batchSize`
8180
: Query the data in batches of the given size. This option is recommended for queries that may return large a large result set, so that the entire result set is not loaded into memory at once.
82-
: *NOTE:* this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`.
81+
: _NOTE:_ this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`.
8382

8483
`emitColumns`
8584
: When `true`, the column names in the `SELECT` statement are emitted as the first tuple in the resulting channel.
@@ -104,7 +103,7 @@ INSERT INTO SAMPLE (NAME, LEN) VALUES ('HELLO', 5);
104103
INSERT INTO SAMPLE (NAME, LEN) VALUES ('WORLD!', 6);
105104
```
106105

107-
*NOTE:* the target table (e.g. `SAMPLE` in the above example) must be created beforehand.
106+
_NOTE:_ the target table (e.g. `SAMPLE` in the above example) must be created beforehand.
108107

109108
The following options are available:
110109

@@ -125,7 +124,87 @@ The following options are available:
125124

126125
`setup`
127126
: A SQL statement that is executed before inserting the data, e.g. to create the target table.
128-
: *NOTE:* the underlying database should support the *create table if not exist* idiom, as the plugin will execute this statement every time the script is run.
127+
: _NOTE:_ the underlying database should support the _create table if not exist_ idiom, as the plugin will execute this statement every time the script is run.
128+
129+
## SQL Execution Functions
130+
131+
This plugin provides the following function for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations.
132+
133+
### sqlExecute
134+
135+
The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For DML statements (`INSERT`, `UPDATE`, `DELETE`), it returns a Map with `success: true` and `result` set to the number of rows affected. For DDL statements (`CREATE`, `ALTER`, `DROP`), it returns a Map with `success: true` and `result: null`.
136+
137+
For example:
138+
139+
```nextflow
140+
include { sqlExecute } from 'plugin/nf-sqldb'
141+
142+
// Create a table (returns Map with result: null for DDL operations)
143+
def createResult = sqlExecute(
144+
db: 'foo',
145+
statement: '''
146+
CREATE TABLE IF NOT EXISTS sample_table (
147+
id INTEGER PRIMARY KEY,
148+
name VARCHAR(100),
149+
value DOUBLE
150+
)
151+
'''
152+
)
153+
println "Create result: $createResult" // [success: true, result: null]
154+
155+
// Insert data (returns Map with result: 1 for number of rows affected)
156+
def insertedRows = sqlExecute(
157+
db: 'foo',
158+
statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)"
159+
)
160+
println "Inserted $insertedRows.row(s)" // [success: true, result: 1]
161+
162+
// Update data (returns Map with result: number of rows updated)
163+
def updatedRows = sqlExecute(
164+
db: 'foo',
165+
statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'alpha'"
166+
)
167+
println "Updated $updatedRows.row(s)" // [success: true, result: <number>]
168+
169+
// Delete data (returns Map with result: number of rows deleted)
170+
def deletedRows = sqlExecute(
171+
db: 'foo',
172+
statement: "DELETE FROM sample_table WHERE value > 25"
173+
)
174+
println "Deleted $deletedRows.row(s)" // [success: true, result: <number>]
175+
```
176+
177+
The following options are available:
178+
179+
`db`
180+
: The database handle. It must be defined under `sql.db` in the Nextflow configuration.
181+
182+
`statement`
183+
: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set.
184+
185+
## Differences Between Dataflow Operators and Execution Function
186+
187+
The plugin provides two different ways to interact with databases:
188+
189+
1. **Dataflow Operators** (`fromQuery` and `sqlInsert`): These are designed to integrate with Nextflow's dataflow programming model, operating on channels.
190+
191+
- `fromQuery`: Queries data from a database and returns a channel that emits the results.
192+
- `sqlInsert`: Takes data from a channel and inserts it into a database.
193+
194+
2. **Execution Function** (`sqlExecute`): This is designed for direct SQL statement execution that doesn't require channel integration.
195+
- `sqlExecute`: Executes a SQL statement. For DML operations, it returns the count of affected rows. For DDL operations, it returns null.
196+
197+
Use **Dataflow Operators** when you need to:
198+
199+
- Query data that will flow into your pipeline processing
200+
- Insert data from your pipeline processing into a database
201+
202+
Use **Execution Function** when you need to:
203+
204+
- Perform database setup (creating tables, schemas, etc.)
205+
- Execute administrative commands
206+
- Perform one-off operations (deleting all records, truncating a table, etc.)
207+
- Execute statements where you don't need the results as part of your dataflow
129208

130209
## Querying CSV files
131210

@@ -159,4 +238,4 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an
159238

160239
Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously.
161240

162-
In particular, data inserted using the `sqlInsert` operator is *not* guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation.
241+
In particular, data inserted using the `sqlInsert` operator is _not_ guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation.

docs/databricks.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Databricks integration
2+
3+
## Running Integration Tests
4+
5+
To run the integration tests, you'll need to set up the following environment variables:
6+
7+
### Databricks Integration Tests
8+
9+
For Databricks integration tests, you need to set:
10+
11+
```bash
12+
export DATABRICKS_JDBC_URL="jdbc:databricks://<workspace-url>:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o/<org-id>/<workspace-id>"
13+
export DATABRICKS_TOKEN="<your-databricks-token>"
14+
```
15+
16+
You can get these values from your Databricks workspace:
17+
18+
1. The JDBC URL can be found in the Databricks SQL endpoint connection details
19+
2. The token can be generated from your Databricks user settings
20+
21+
After setting up the required environment variables, you can run the integration tests using:
22+
23+
```bash
24+
./gradlew test
25+
```
26+
27+
<!-- Note: Integration tests are skipped by default when running in smoke test mode (when `NXF_SMOKE` environment variable is set). -->

examples/sql-execution/main.nf

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#!/usr/bin/env nextflow
2+
3+
/*
4+
* Example script demonstrating how to use the SQL sqlExecute function
5+
*/
6+
7+
include { sqlExecute } from 'plugin/nf-sqldb'
8+
include { fromQuery } from 'plugin/nf-sqldb'
9+
10+
// Define database configuration in nextflow.config file
11+
// sql.db.demo = [url: 'jdbc:h2:mem:demo', driver: 'org.h2.Driver']
12+
13+
workflow {
14+
log.info """
15+
=========================================
16+
SQL Execution Function Example
17+
=========================================
18+
"""
19+
20+
// Step 1: Create a table (DDL operation returns null)
21+
log.info "Creating a sample table..."
22+
def createResult = sqlExecute(
23+
db: 'demo',
24+
statement: '''
25+
CREATE TABLE IF NOT EXISTS TEST_TABLE (
26+
ID INTEGER PRIMARY KEY,
27+
NAME VARCHAR(100),
28+
VALUE DOUBLE
29+
)
30+
'''
31+
)
32+
log.info "Create table result: $createResult"
33+
34+
// Step 2: Insert some data (DML operation returns affected row count)
35+
log.info "Inserting data..."
36+
def insertCount = sqlExecute(
37+
db: 'demo',
38+
statement: '''
39+
INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES
40+
(1, 'alpha', 10.5),
41+
(2, 'beta', 20.7),
42+
(3, 'gamma', 30.2),
43+
(4, 'delta', 40.9);
44+
'''
45+
)
46+
log.info "Inserted $insertCount rows"
47+
48+
// Step 3: Update some data (DML operation returns affected row count)
49+
log.info "Updating data..."
50+
def updateCount = sqlExecute(
51+
db: 'demo',
52+
statement: '''
53+
UPDATE TEST_TABLE
54+
SET VALUE = VALUE * 2
55+
WHERE ID = 2;
56+
'''
57+
)
58+
log.info "Updated $updateCount rows"
59+
60+
// Step 4: Delete some data (DML operation returns affected row count)
61+
log.info "Deleting data..."
62+
def deleteCount = sqlExecute(
63+
db: 'demo',
64+
statement: '''
65+
DELETE FROM TEST_TABLE
66+
WHERE ID = 4;
67+
'''
68+
)
69+
log.info "Deleted $deleteCount rows"
70+
71+
// Step 5: Query results
72+
log.info "Querying results..."
73+
channel
74+
.fromQuery("SELECT * FROM TEST_TABLE ORDER BY ID", db: 'demo')
75+
.view { row -> "ID: ${row[0]}, Name: ${row[1]}, Value: ${row[2]}" }
76+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Configuration file for the SQL execution example script
3+
*/
4+
5+
// Enable the SQL DB plugin
6+
plugins {
7+
8+
}
9+
10+
// Define a file-based H2 database for the example
11+
sql {
12+
db {
13+
demo {
14+
url = 'jdbc:h2:./demo'
15+
driver = 'org.h2.Driver'
16+
}
17+
}
18+
}
19+
20+
// Silence unnecessary Nextflow logs
21+
process.echo = false
22+
dag.enabled = false

plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,21 @@ import groovyx.gpars.dataflow.DataflowReadChannel
2424
import groovyx.gpars.dataflow.DataflowWriteChannel
2525
import groovyx.gpars.dataflow.expression.DataflowExpression
2626
import nextflow.Channel
27+
import nextflow.Global
2728
import nextflow.NF
2829
import nextflow.Session
2930
import nextflow.extension.CH
3031
import nextflow.extension.DataflowHelper
3132
import nextflow.plugin.extension.Factory
33+
import nextflow.plugin.extension.Function
3234
import nextflow.plugin.extension.Operator
3335
import nextflow.plugin.extension.PluginExtensionPoint
3436
import nextflow.sql.config.SqlConfig
3537
import nextflow.sql.config.SqlDataSource
3638
import nextflow.util.CheckHelper
39+
import java.sql.Connection
40+
import java.sql.Statement
41+
import groovy.sql.Sql
3742
/**
3843
* Provide a channel factory extension that allows the execution of Sql queries
3944
*
@@ -133,4 +138,75 @@ class ChannelSqlExtension extends PluginExtensionPoint {
133138
return target
134139
}
135140

141+
private static final Map EXECUTE_PARAMS = [
142+
db: CharSequence,
143+
statement: CharSequence
144+
]
145+
146+
/**
147+
* Execute a SQL statement that does not return a result set (DDL/DML statements)
148+
* For DML statements (INSERT, UPDATE, DELETE), it returns a result map with success status and number of affected rows
149+
* For DDL statements (CREATE, ALTER, DROP), it returns a result map with success status
150+
*
151+
* @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute)
152+
* @return A map containing 'success' (boolean), 'result' (rows affected or null) and optionally 'error' (message)
153+
*/
154+
@Function
155+
Map sqlExecute(Map params) {
156+
CheckHelper.checkParams('sqlExecute', params, EXECUTE_PARAMS)
157+
158+
final String dbName = params.db as String ?: 'default'
159+
final String statement = params.statement as String
160+
161+
if (!statement)
162+
return [success: false, error: "Missing required parameter 'statement'"]
163+
164+
final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db'))
165+
final SqlDataSource dataSource = sqlConfig.getDataSource(dbName)
166+
167+
if (dataSource == null) {
168+
def msg = "Unknown db name: $dbName"
169+
def choices = sqlConfig.getDataSourceNames().closest(dbName) ?: sqlConfig.getDataSourceNames()
170+
if (choices?.size() == 1)
171+
msg += " - Did you mean: ${choices.get(0)}?"
172+
else if (choices)
173+
msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n'
174+
return [success: false, error: msg]
175+
}
176+
177+
try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) {
178+
try (Statement stm = conn.createStatement()) {
179+
String normalizedStatement = normalizeStatement(statement)
180+
181+
boolean isDDL = normalizedStatement.trim().toLowerCase().matches("^(create|alter|drop|truncate).*")
182+
183+
if (isDDL) {
184+
stm.execute(normalizedStatement)
185+
return [success: true, result: null]
186+
} else {
187+
Integer rowsAffected = stm.executeUpdate(normalizedStatement)
188+
return [success: true, result: rowsAffected]
189+
}
190+
}
191+
}
192+
catch (Exception e) {
193+
log.error("Error executing SQL statement: ${e.message}", e)
194+
return [success: false, error: e.message]
195+
}
196+
}
197+
198+
/**
199+
* Normalizes a SQL statement by adding a semicolon if needed
200+
*
201+
* @param statement The SQL statement to normalize
202+
* @return The normalized SQL statement
203+
*/
204+
private static String normalizeStatement(String statement) {
205+
if (!statement)
206+
return null
207+
def result = statement.trim()
208+
if (!result.endsWith(';'))
209+
result += ';'
210+
return result
211+
}
136212
}

0 commit comments

Comments
 (0)