Skip to content

Add execute functions #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
0ce1d63
build: Add version extraction and local installation target to Makefile
edmundmiller Apr 10, 2025
d1c18ad
fix: Add executeUpdate option to QueryHandler for DDL and UPDATE stat…
edmundmiller Apr 10, 2025
3589d9b
refactor: Remove executeUpdate option from QueryHandler and simplify …
edmundmiller Apr 30, 2025
54e502d
feat: Add SQL execution methods to ChannelSqlExtension
edmundmiller Apr 30, 2025
ce4dcf5
docs: Enhance README and add examples for SQL execution functions
edmundmiller Apr 30, 2025
9f2eb2b
test: Add comprehensive SQL execution tests for ChannelSqlExtension
edmundmiller Apr 30, 2025
ea1c26c
refactor: Update ChannelSqlExtension methods to instance methods
edmundmiller Apr 30, 2025
b738fab
refactor: Update SQL execution example to use file-based H2 database
edmundmiller Apr 30, 2025
86881f5
fix: Enhance error handling for database connection and commit operat…
edmundmiller May 1, 2025
13111a8
feat: Add integration testing framework for database backends
edmundmiller May 2, 2025
593e4f4
refactor: Rename execute function to sqlExecute and update documentation
edmundmiller May 2, 2025
3598b14
refactor: Rewrite execute functions into on one function
edmundmiller May 3, 2025
343cbca
fix: Update sqlExecute method to return structured result maps
edmundmiller May 3, 2025
8b3cb8f
chore: Restructure test directory names
edmundmiller May 3, 2025
a0df808
docs: Update sqlExecute documentation on outputs
edmundmiller May 7, 2025
c19c8e7
docs: Remove .envrc and make a note about ENV
edmundmiller May 7, 2025
c877be5
refactor: Clean up integration test structure
edmundmiller May 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
config ?= compileClasspath
version ?= $(shell grep 'Plugin-Version' plugins/nf-sqldb/src/resources/META-INF/MANIFEST.MF | awk '{ print $$2 }')

ifdef module
mm = :${module}:
Expand Down Expand Up @@ -69,3 +70,9 @@ upload-plugins:

publish-index:
./gradlew plugins:publishIndex

# Install the plugin into local nextflow plugins dir
install:
./gradlew copyPluginZip
rm -rf ${HOME}/.nextflow/plugins/nf-sqldb-${version}
cp -r build/plugins/nf-sqldb-${version} ${HOME}/.nextflow/plugins/nf-sqldb-${version}
103 changes: 91 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ This plugin provides support for interacting with SQL databases in Nextflow scri

The following databases are currently supported:

* [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md))
* [DuckDB](https://duckdb.org/)
* [H2](https://www.h2database.com)
* [MySQL](https://www.mysql.com/)
* [MariaDB](https://mariadb.org/)
* [PostgreSQL](https://www.postgresql.org/)
* [SQLite](https://www.sqlite.org/index.html)
- [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md))
- [DuckDB](https://duckdb.org/)
- [H2](https://www.h2database.com)
- [MySQL](https://www.mysql.com/)
- [MariaDB](https://mariadb.org/)
- [PostgreSQL](https://www.postgresql.org/)
- [SQLite](https://www.sqlite.org/index.html)

NOTE: THIS IS A PREVIEW TECHNOLOGY, FEATURES AND CONFIGURATION SETTINGS CAN CHANGE IN FUTURE RELEASES.

Expand All @@ -24,7 +24,6 @@ plugins {
}
```


## Configuration

You can configure any number of databases under the `sql.db` configuration scope. For example:
Expand Down Expand Up @@ -79,7 +78,7 @@ The following options are available:

`batchSize`
: Query the data in batches of the given size. This option is recommended for queries that may return large a large result set, so that the entire result set is not loaded into memory at once.
: *NOTE:* this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`.
: _NOTE:_ this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`.

`emitColumns`
: When `true`, the column names in the `SELECT` statement are emitted as the first tuple in the resulting channel.
Expand All @@ -104,7 +103,7 @@ INSERT INTO SAMPLE (NAME, LEN) VALUES ('HELLO', 5);
INSERT INTO SAMPLE (NAME, LEN) VALUES ('WORLD!', 6);
```

*NOTE:* the target table (e.g. `SAMPLE` in the above example) must be created beforehand.
_NOTE:_ the target table (e.g. `SAMPLE` in the above example) must be created beforehand.

The following options are available:

Expand All @@ -125,7 +124,87 @@ The following options are available:

`setup`
: A SQL statement that is executed before inserting the data, e.g. to create the target table.
: *NOTE:* the underlying database should support the *create table if not exist* idiom, as the plugin will execute this statement every time the script is run.
: _NOTE:_ the underlying database should support the _create table if not exist_ idiom, as the plugin will execute this statement every time the script is run.

## SQL Execution Functions

This plugin provides the following function for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations.

### sqlExecute

The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For DML statements (`INSERT`, `UPDATE`, `DELETE`), it returns a Map with `success: true` and `result` set to the number of rows affected. For DDL statements (`CREATE`, `ALTER`, `DROP`), it returns a Map with `success: true` and `result: null`.

For example:

```nextflow
include { sqlExecute } from 'plugin/nf-sqldb'

// Create a table (returns Map with result: null for DDL operations)
def createResult = sqlExecute(
db: 'foo',
statement: '''
CREATE TABLE IF NOT EXISTS sample_table (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
value DOUBLE
)
'''
)
println "Create result: $createResult" // [success: true, result: null]

// Insert data (returns Map with result: 1 for number of rows affected)
def insertedRows = sqlExecute(
db: 'foo',
statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)"
)
println "Inserted $insertedRows.row(s)" // [success: true, result: 1]

// Update data (returns Map with result: number of rows updated)
def updatedRows = sqlExecute(
db: 'foo',
statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'alpha'"
)
println "Updated $updatedRows.row(s)" // [success: true, result: <number>]

// Delete data (returns Map with result: number of rows deleted)
def deletedRows = sqlExecute(
db: 'foo',
statement: "DELETE FROM sample_table WHERE value > 25"
)
println "Deleted $deletedRows.row(s)" // [success: true, result: <number>]
```

The following options are available:

`db`
: The database handle. It must be defined under `sql.db` in the Nextflow configuration.

`statement`
: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set.

## Differences Between Dataflow Operators and Execution Function

The plugin provides two different ways to interact with databases:

1. **Dataflow Operators** (`fromQuery` and `sqlInsert`): These are designed to integrate with Nextflow's dataflow programming model, operating on channels.

- `fromQuery`: Queries data from a database and returns a channel that emits the results.
- `sqlInsert`: Takes data from a channel and inserts it into a database.

2. **Execution Function** (`sqlExecute`): This is designed for direct SQL statement execution that doesn't require channel integration.
- `sqlExecute`: Executes a SQL statement. For DML operations, it returns the count of affected rows. For DDL operations, it returns null.

Use **Dataflow Operators** when you need to:

- Query data that will flow into your pipeline processing
- Insert data from your pipeline processing into a database

Use **Execution Function** when you need to:

- Perform database setup (creating tables, schemas, etc.)
- Execute administrative commands
- Perform one-off operations (deleting all records, truncating a table, etc.)
- Execute statements where you don't need the results as part of your dataflow

## Querying CSV files

Expand Down Expand Up @@ -159,4 +238,4 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an

Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously.

In particular, data inserted using the `sqlInsert` operator is *not* guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation.
In particular, data inserted using the `sqlInsert` operator is _not_ guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation.
27 changes: 27 additions & 0 deletions docs/databricks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Databricks integration

## Running Integration Tests

To run the integration tests, you'll need to set up the following environment variables:

### Databricks Integration Tests

For Databricks integration tests, you need to set:

```bash
export DATABRICKS_JDBC_URL="jdbc:databricks://<workspace-url>:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o/<org-id>/<workspace-id>"
export DATABRICKS_TOKEN="<your-databricks-token>"
```

You can get these values from your Databricks workspace:

1. The JDBC URL can be found in the Databricks SQL endpoint connection details
2. The token can be generated from your Databricks user settings

After setting up the required environment variables, you can run the integration tests using:

```bash
./gradlew test
```

<!-- Note: Integration tests are skipped by default when running in smoke test mode (when `NXF_SMOKE` environment variable is set). -->
76 changes: 76 additions & 0 deletions examples/sql-execution/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env nextflow

/*
* Example script demonstrating how to use the SQL sqlExecute function
*/

include { sqlExecute } from 'plugin/nf-sqldb'
include { fromQuery } from 'plugin/nf-sqldb'

// Define database configuration in nextflow.config file
// sql.db.demo = [url: 'jdbc:h2:mem:demo', driver: 'org.h2.Driver']

workflow {
log.info """
=========================================
SQL Execution Function Example
=========================================
"""

// Step 1: Create a table (DDL operation returns null)
log.info "Creating a sample table..."
def createResult = sqlExecute(
db: 'demo',
statement: '''
CREATE TABLE IF NOT EXISTS TEST_TABLE (
ID INTEGER PRIMARY KEY,
NAME VARCHAR(100),
VALUE DOUBLE
)
'''
)
log.info "Create table result: $createResult"

// Step 2: Insert some data (DML operation returns affected row count)
log.info "Inserting data..."
def insertCount = sqlExecute(
db: 'demo',
statement: '''
INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES
(1, 'alpha', 10.5),
(2, 'beta', 20.7),
(3, 'gamma', 30.2),
(4, 'delta', 40.9);
'''
)
log.info "Inserted $insertCount rows"

// Step 3: Update some data (DML operation returns affected row count)
log.info "Updating data..."
def updateCount = sqlExecute(
db: 'demo',
statement: '''
UPDATE TEST_TABLE
SET VALUE = VALUE * 2
WHERE ID = 2;
'''
)
log.info "Updated $updateCount rows"

// Step 4: Delete some data (DML operation returns affected row count)
log.info "Deleting data..."
def deleteCount = sqlExecute(
db: 'demo',
statement: '''
DELETE FROM TEST_TABLE
WHERE ID = 4;
'''
)
log.info "Deleted $deleteCount rows"

// Step 5: Query results
log.info "Querying results..."
channel
.fromQuery("SELECT * FROM TEST_TABLE ORDER BY ID", db: 'demo')
.view { row -> "ID: ${row[0]}, Name: ${row[1]}, Value: ${row[2]}" }
}
22 changes: 22 additions & 0 deletions examples/sql-execution/nextflow.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Configuration file for the SQL execution example script
*/

// Enable the SQL DB plugin
plugins {
id '[email protected]'
}

// Define a file-based H2 database for the example
sql {
db {
demo {
url = 'jdbc:h2:./demo'
driver = 'org.h2.Driver'
}
}
}

// Silence unnecessary Nextflow logs
process.echo = false
dag.enabled = false
76 changes: 76 additions & 0 deletions plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@ import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.expression.DataflowExpression
import nextflow.Channel
import nextflow.Global
import nextflow.NF
import nextflow.Session
import nextflow.extension.CH
import nextflow.extension.DataflowHelper
import nextflow.plugin.extension.Factory
import nextflow.plugin.extension.Function
import nextflow.plugin.extension.Operator
import nextflow.plugin.extension.PluginExtensionPoint
import nextflow.sql.config.SqlConfig
import nextflow.sql.config.SqlDataSource
import nextflow.util.CheckHelper
import java.sql.Connection
import java.sql.Statement
import groovy.sql.Sql
/**
* Provide a channel factory extension that allows the execution of Sql queries
*
Expand Down Expand Up @@ -133,4 +138,75 @@ class ChannelSqlExtension extends PluginExtensionPoint {
return target
}

private static final Map EXECUTE_PARAMS = [
db: CharSequence,
statement: CharSequence
]

/**
* Execute a SQL statement that does not return a result set (DDL/DML statements)
* For DML statements (INSERT, UPDATE, DELETE), it returns a result map with success status and number of affected rows
* For DDL statements (CREATE, ALTER, DROP), it returns a result map with success status
*
* @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute)
* @return A map containing 'success' (boolean), 'result' (rows affected or null) and optionally 'error' (message)
*/
@Function
Map sqlExecute(Map params) {
CheckHelper.checkParams('sqlExecute', params, EXECUTE_PARAMS)

final String dbName = params.db as String ?: 'default'
final String statement = params.statement as String

if (!statement)
return [success: false, error: "Missing required parameter 'statement'"]

final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db'))
final SqlDataSource dataSource = sqlConfig.getDataSource(dbName)

if (dataSource == null) {
def msg = "Unknown db name: $dbName"
def choices = sqlConfig.getDataSourceNames().closest(dbName) ?: sqlConfig.getDataSourceNames()
if (choices?.size() == 1)
msg += " - Did you mean: ${choices.get(0)}?"
else if (choices)
msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n'
return [success: false, error: msg]
}

try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) {
try (Statement stm = conn.createStatement()) {
String normalizedStatement = normalizeStatement(statement)

boolean isDDL = normalizedStatement.trim().toLowerCase().matches("^(create|alter|drop|truncate).*")

if (isDDL) {
stm.execute(normalizedStatement)
return [success: true, result: null]
} else {
Integer rowsAffected = stm.executeUpdate(normalizedStatement)
return [success: true, result: rowsAffected]
}
}
}
catch (Exception e) {
log.error("Error executing SQL statement: ${e.message}", e)
return [success: false, error: e.message]
}
}

/**
* Normalizes a SQL statement by adding a semicolon if needed
*
* @param statement The SQL statement to normalize
* @return The normalized SQL statement
*/
private static String normalizeStatement(String statement) {
if (!statement)
return null
def result = statement.trim()
if (!result.endsWith(';'))
result += ';'
return result
}
}
Loading