From 0ce1d63193b3f52f115cb2581e9776cf28d51b80 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 9 Apr 2025 19:45:27 -0500 Subject: [PATCH 01/17] build: Add version extraction and local installation target to Makefile Signed-off-by: Edmund Miller --- Makefile | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Makefile b/Makefile index 4d140c6..3f29b6d 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ config ?= compileClasspath +version ?= $(shell grep 'Plugin-Version' plugins/nf-sqldb/src/resources/META-INF/MANIFEST.MF | awk '{ print $$2 }') ifdef module mm = :${module}: @@ -69,3 +70,9 @@ upload-plugins: publish-index: ./gradlew plugins:publishIndex + +# Install the plugin into local nextflow plugins dir +install: + ./gradlew copyPluginZip + rm -rf ${HOME}/.nextflow/plugins/nf-sqldb-${version} + cp -r build/plugins/nf-sqldb-${version} ${HOME}/.nextflow/plugins/nf-sqldb-${version} \ No newline at end of file From d1c18adff041e9acaae8454c6dff64a7a930894f Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 9 Apr 2025 20:45:15 -0500 Subject: [PATCH 02/17] fix: Add executeUpdate option to QueryHandler for DDL and UPDATE statements Signed-off-by: Edmund Miller --- .../nextflow/sql/ChannelSqlExtension.groovy | 3 +- .../src/main/nextflow/sql/QueryHandler.groovy | 30 ++++++++++++++++--- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index d50f7d6..d5712cc 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -47,7 +47,8 @@ class ChannelSqlExtension extends PluginExtensionPoint { db: CharSequence, emitColumns: Boolean, batchSize: Integer, - batchDelay: Integer + batchDelay: Integer, + executeUpdate: Boolean ] private static final Map INSERT_PARAMS = [ diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy index 680784d..fde3f1f 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy @@ -71,6 +71,7 @@ class QueryHandler implements QueryOp { private Integer batchSize private long batchDelayMillis = 100 private int queryCount + private boolean executeUpdate = false @Override QueryOp withStatement(String stm) { @@ -97,6 +98,8 @@ class QueryHandler implements QueryOp { this.batchSize = opts.batchSize as Integer if( opts.batchDelay ) this.batchDelayMillis = opts.batchDelay as long + if( opts.executeUpdate ) + this.executeUpdate = opts.executeUpdate as boolean return this } @@ -156,10 +159,29 @@ class QueryHandler implements QueryOp { protected void query0(Connection conn) { try { try (Statement stm = conn.createStatement()) { - try( def rs = stm.executeQuery(normalize(statement)) ) { - if( emitColumns ) - emitColumns(rs) - emitRowsAndClose(rs) + final String normalizedStmt = normalize(statement) + // Check if statement is a DDL or UPDATE statement that doesn't return a ResultSet + boolean isUpdateOrDdl = executeUpdate || + normalizedStmt.toUpperCase().startsWith("CREATE ") || + normalizedStmt.toUpperCase().startsWith("ALTER ") || + normalizedStmt.toUpperCase().startsWith("DROP ") || + normalizedStmt.toUpperCase().startsWith("INSERT ") || + normalizedStmt.toUpperCase().startsWith("UPDATE ") || + normalizedStmt.toUpperCase().startsWith("DELETE "); + + if (isUpdateOrDdl) { + // Use executeUpdate for statements that don't return ResultSets + stm.executeUpdate(normalizedStmt) + // Since there's no ResultSet to emit, just close the channel + target.bind(Channel.STOP) + } + else { + // For SELECT and other queries that return ResultSets + try (def rs = stm.executeQuery(normalizedStmt)) { + if (emitColumns) + emitColumns(rs) + emitRowsAndClose(rs) + } } } } From 3589d9b3044c27b1fc51cf4fe07f803280194d29 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 30 Apr 2025 16:05:31 -0500 Subject: [PATCH 03/17] refactor: Remove executeUpdate option from QueryHandler and simplify SQL execution logic - Removed the executeUpdate parameter from QueryHandler and ChannelSqlExtension. - Simplified SQL execution by directly executing queries without checking for DDL or UPDATE statements. Signed-off-by: Edmund Miller --- .../nextflow/sql/ChannelSqlExtension.groovy | 3 +- .../src/main/nextflow/sql/QueryHandler.groovy | 30 ++++--------------- 2 files changed, 6 insertions(+), 27 deletions(-) diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index d5712cc..d50f7d6 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -47,8 +47,7 @@ class ChannelSqlExtension extends PluginExtensionPoint { db: CharSequence, emitColumns: Boolean, batchSize: Integer, - batchDelay: Integer, - executeUpdate: Boolean + batchDelay: Integer ] private static final Map INSERT_PARAMS = [ diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy index fde3f1f..febad90 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy @@ -71,7 +71,6 @@ class QueryHandler implements QueryOp { private Integer batchSize private long batchDelayMillis = 100 private int queryCount - private boolean executeUpdate = false @Override QueryOp withStatement(String stm) { @@ -98,8 +97,6 @@ class QueryHandler implements QueryOp { this.batchSize = opts.batchSize as Integer if( opts.batchDelay ) this.batchDelayMillis = opts.batchDelay as long - if( opts.executeUpdate ) - this.executeUpdate = opts.executeUpdate as boolean return this } @@ -160,28 +157,11 @@ class QueryHandler implements QueryOp { try { try (Statement stm = conn.createStatement()) { final String normalizedStmt = normalize(statement) - // Check if statement is a DDL or UPDATE statement that doesn't return a ResultSet - boolean isUpdateOrDdl = executeUpdate || - normalizedStmt.toUpperCase().startsWith("CREATE ") || - normalizedStmt.toUpperCase().startsWith("ALTER ") || - normalizedStmt.toUpperCase().startsWith("DROP ") || - normalizedStmt.toUpperCase().startsWith("INSERT ") || - normalizedStmt.toUpperCase().startsWith("UPDATE ") || - normalizedStmt.toUpperCase().startsWith("DELETE "); - - if (isUpdateOrDdl) { - // Use executeUpdate for statements that don't return ResultSets - stm.executeUpdate(normalizedStmt) - // Since there's no ResultSet to emit, just close the channel - target.bind(Channel.STOP) - } - else { - // For SELECT and other queries that return ResultSets - try (def rs = stm.executeQuery(normalizedStmt)) { - if (emitColumns) - emitColumns(rs) - emitRowsAndClose(rs) - } + // Execute the SQL query and get results + try (def rs = stm.executeQuery(normalizedStmt)) { + if (emitColumns) + emitColumns(rs) + emitRowsAndClose(rs) } } } From 54e502d33424851eb5c0e62c2e3f9fc5b579ed34 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 30 Apr 2025 16:21:23 -0500 Subject: [PATCH 04/17] feat: Add SQL execution methods to ChannelSqlExtension - Introduced `execute` and `executeUpdate` methods for executing SQL statements without returning a result set. - Added parameter validation and error handling for database connections. - Implemented SQL statement normalization to ensure proper execution. Signed-off-by: Edmund Miller --- .../nextflow/sql/ChannelSqlExtension.groovy | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index d50f7d6..a492993 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -24,6 +24,7 @@ import groovyx.gpars.dataflow.DataflowReadChannel import groovyx.gpars.dataflow.DataflowWriteChannel import groovyx.gpars.dataflow.expression.DataflowExpression import nextflow.Channel +import nextflow.Global import nextflow.NF import nextflow.Session import nextflow.extension.CH @@ -34,6 +35,8 @@ import nextflow.plugin.extension.PluginExtensionPoint import nextflow.sql.config.SqlConfig import nextflow.sql.config.SqlDataSource import nextflow.util.CheckHelper +import java.sql.Connection +import java.sql.Statement /** * Provide a channel factory extension that allows the execution of Sql queries * @@ -133,4 +136,101 @@ class ChannelSqlExtension extends PluginExtensionPoint { return target } + private static final Map EXECUTE_PARAMS = [ + db: CharSequence, + statement: CharSequence + ] + + /** + * Execute a SQL statement that does not return a result set (DDL/DML statements) + * + * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) + */ + static void execute(Map params) { + CheckHelper.checkParams('execute', params, EXECUTE_PARAMS) + + final String dbName = params.db as String ?: 'default' + final String statement = params.statement as String + + if (!statement) + throw new IllegalArgumentException("Missing required parameter 'statement'") + + final sqlConfig = new SqlConfig((Map) Global.session.config.navigate('sql.db')) + final SqlDataSource dataSource = sqlConfig.getDataSource(dbName) + + if (dataSource == null) { + def msg = "Unknown db name: $dbName" + def choices = sqlConfig.getDataSourceNames().closest(dbName) ?: sqlConfig.getDataSourceNames() + if (choices?.size() == 1) + msg += " - Did you mean: ${choices.get(0)}?" + else if (choices) + msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n' + throw new IllegalArgumentException(msg) + } + + try (Connection conn = Sql.newInstance(dataSource.toMap()).getConnection()) { + try (Statement stm = conn.createStatement()) { + stm.execute(normalizeStatement(statement)) + } + } + catch (Exception e) { + log.error("Error executing SQL statement: ${e.message}", e) + throw e + } + } + + /** + * Execute a SQL statement that does not return a result set (DDL/DML statements) + * and returns the number of affected rows + * + * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) + * @return The number of rows affected by the SQL statement + */ + static int executeUpdate(Map params) { + CheckHelper.checkParams('executeUpdate', params, EXECUTE_PARAMS) + + final String dbName = params.db as String ?: 'default' + final String statement = params.statement as String + + if (!statement) + throw new IllegalArgumentException("Missing required parameter 'statement'") + + final sqlConfig = new SqlConfig((Map) Global.session.config.navigate('sql.db')) + final SqlDataSource dataSource = sqlConfig.getDataSource(dbName) + + if (dataSource == null) { + def msg = "Unknown db name: $dbName" + def choices = sqlConfig.getDataSourceNames().closest(dbName) ?: sqlConfig.getDataSourceNames() + if (choices?.size() == 1) + msg += " - Did you mean: ${choices.get(0)}?" + else if (choices) + msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n' + throw new IllegalArgumentException(msg) + } + + try (Connection conn = Sql.newInstance(dataSource.toMap()).getConnection()) { + try (Statement stm = conn.createStatement()) { + return stm.executeUpdate(normalizeStatement(statement)) + } + } + catch (Exception e) { + log.error("Error executing SQL update statement: ${e.message}", e) + throw e + } + } + + /** + * Normalizes a SQL statement by adding a semicolon if needed + * + * @param statement The SQL statement to normalize + * @return The normalized SQL statement + */ + private static String normalizeStatement(String statement) { + if (!statement) + throw new IllegalArgumentException("Missing SQL statement") + def result = statement.trim() + if (!result.endsWith(';')) + result += ';' + return result + } } From ce4dcf54d9272df116614bd0bd03e3bfbaaa2a18 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 30 Apr 2025 16:32:52 -0500 Subject: [PATCH 05/17] docs: Enhance README and add examples for SQL execution functions Signed-off-by: Edmund Miller --- README.md | 103 ++++++++++++++++++ examples/sql-execution-example.config | 22 ++++ examples/sql-execution-example.nf | 67 ++++++++++++ .../nextflow/sql/ChannelSqlExtension.groovy | 5 +- 4 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 examples/sql-execution-example.config create mode 100644 examples/sql-execution-example.nf diff --git a/README.md b/README.md index f94306b..c9690a8 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,109 @@ The following options are available: : A SQL statement that is executed before inserting the data, e.g. to create the target table. : *NOTE:* the underlying database should support the *create table if not exist* idiom, as the plugin will execute this statement every time the script is run. +## SQL Execution Functions + +This plugin provides the following functions for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations. + +### execute + +The `execute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For example: + +```nextflow +include { execute } from 'plugin/nf-sqldb' + +// Create a table +execute( + db: 'foo', + statement: ''' + CREATE TABLE IF NOT EXISTS sample_table ( + id INTEGER PRIMARY KEY, + name VARCHAR(100), + value DOUBLE + ) + ''' +) + +// Insert data +execute( + db: 'foo', + statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)" +) + +// Delete data +execute( + db: 'foo', + statement: "DELETE FROM sample_table WHERE id = 1" +) +``` + +The following options are available: + +`db` +: The database handle. It must be defined under `sql.db` in the Nextflow configuration. + +`statement` +: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set. + +### executeUpdate + +The `executeUpdate` function is similar to `execute`, but it returns the number of rows affected by the SQL statement. This is particularly useful for DML operations like `INSERT`, `UPDATE`, and `DELETE` where you need to know how many rows were affected. For example: + +```nextflow +include { executeUpdate } from 'plugin/nf-sqldb' + +// Insert data and get the number of rows inserted +def insertedRows = executeUpdate( + db: 'foo', + statement: "INSERT INTO sample_table (id, name, value) VALUES (2, 'beta', 20.5)" +) +println "Inserted $insertedRows row(s)" + +// Update data and get the number of rows updated +def updatedRows = executeUpdate( + db: 'foo', + statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'beta'" +) +println "Updated $updatedRows row(s)" + +// Delete data and get the number of rows deleted +def deletedRows = executeUpdate( + db: 'foo', + statement: "DELETE FROM sample_table WHERE value > 25" +) +println "Deleted $deletedRows row(s)" +``` + +The following options are available: + +`db` +: The database handle. It must be defined under `sql.db` in the Nextflow configuration. + +`statement` +: The SQL statement to execute. This should be a DML statement that can return a count of affected rows. + +## Differences Between Dataflow Operators and Execution Functions + +The plugin provides two different ways to interact with databases: + +1. **Dataflow Operators** (`fromQuery` and `sqlInsert`): These are designed to integrate with Nextflow's dataflow programming model, operating on channels. + - `fromQuery`: Queries data from a database and returns a channel that emits the results. + - `sqlInsert`: Takes data from a channel and inserts it into a database. + +2. **Execution Functions** (`execute` and `executeUpdate`): These are designed for direct SQL statement execution that doesn't require channel integration. + - `execute`: Executes a SQL statement without returning any data. + - `executeUpdate`: Executes a SQL statement and returns the count of affected rows. + +Use **Dataflow Operators** when you need to: +- Query data that will flow into your pipeline processing +- Insert data from your pipeline processing into a database + +Use **Execution Functions** when you need to: +- Perform database setup (creating tables, schemas, etc.) +- Execute administrative commands +- Perform one-off operations (deleting all records, truncating a table, etc.) +- Execute statements where you don't need the results as part of your dataflow + ## Querying CSV files This plugin supports the [H2](https://www.h2database.com/html/main.html) database engine, which can query CSV files like database tables using SQL statements. diff --git a/examples/sql-execution-example.config b/examples/sql-execution-example.config new file mode 100644 index 0000000..4de67c8 --- /dev/null +++ b/examples/sql-execution-example.config @@ -0,0 +1,22 @@ +/* + * Configuration file for the SQL execution example script + */ + +// Enable the SQL DB plugin +plugins { + id 'nf-sqldb@0.7.0' +} + +// Define an in-memory H2 database for the example +sql { + db { + demo { + url = 'jdbc:h2:mem:demo' + driver = 'org.h2.Driver' + } + } +} + +// Silence unnecessary Nextflow logs +process.echo = false +dag.enabled = false \ No newline at end of file diff --git a/examples/sql-execution-example.nf b/examples/sql-execution-example.nf new file mode 100644 index 0000000..0690aba --- /dev/null +++ b/examples/sql-execution-example.nf @@ -0,0 +1,67 @@ +#!/usr/bin/env nextflow + +/* + * Example script demonstrating how to use the SQL execute and executeUpdate functions + */ + +include { execute; executeUpdate } from 'plugin/nf-sqldb' +include { fromQuery } from 'plugin/nf-sqldb' + +// Define database configuration in nextflow.config file +// sql.db.demo = [url: 'jdbc:h2:mem:demo', driver: 'org.h2.Driver'] + +workflow { + log.info """ + ========================================= + SQL Execution Functions Example + ========================================= + """ + + // Step 1: Create a table + log.info "Creating a sample table..." + execute( + db: 'demo', + statement: ''' + CREATE TABLE IF NOT EXISTS test_table ( + id INTEGER PRIMARY KEY, + name VARCHAR(100), + value DOUBLE + ) + ''' + ) + + // Step 2: Insert some data + log.info "Inserting data..." + execute( + db: 'demo', + statement: ''' + INSERT INTO test_table (id, name, value) VALUES + (1, 'alpha', 10.5), + (2, 'beta', 20.7), + (3, 'gamma', 30.2), + (4, 'delta', 40.9) + ''' + ) + + // Step 3: Update data and get affected row count + log.info "Updating data..." + def updatedRows = executeUpdate( + db: 'demo', + statement: "UPDATE test_table SET value = value * 2 WHERE value > 20" + ) + log.info "Updated $updatedRows row(s)" + + // Step 4: Delete data and get affected row count + log.info "Deleting data..." + def deletedRows = executeUpdate( + db: 'demo', + statement: "DELETE FROM test_table WHERE value > 60" + ) + log.info "Deleted $deletedRows row(s)" + + // Step 5: Query the results to verify + log.info "Querying remaining data..." + channel + .fromQuery("SELECT * FROM test_table ORDER BY id", db: 'demo') + .view { row -> "ID: ${row[0]}, Name: ${row[1]}, Value: ${row[2]}" } +} \ No newline at end of file diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index a492993..da104b0 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -37,6 +37,7 @@ import nextflow.sql.config.SqlDataSource import nextflow.util.CheckHelper import java.sql.Connection import java.sql.Statement +import groovy.sql.Sql /** * Provide a channel factory extension that allows the execution of Sql queries * @@ -168,7 +169,7 @@ class ChannelSqlExtension extends PluginExtensionPoint { throw new IllegalArgumentException(msg) } - try (Connection conn = Sql.newInstance(dataSource.toMap()).getConnection()) { + try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) { try (Statement stm = conn.createStatement()) { stm.execute(normalizeStatement(statement)) } @@ -208,7 +209,7 @@ class ChannelSqlExtension extends PluginExtensionPoint { throw new IllegalArgumentException(msg) } - try (Connection conn = Sql.newInstance(dataSource.toMap()).getConnection()) { + try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) { try (Statement stm = conn.createStatement()) { return stm.executeUpdate(normalizeStatement(statement)) } From 9f2eb2bb3fe5736e9e2d7827660da0fc7953aa76 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 30 Apr 2025 16:36:42 -0500 Subject: [PATCH 06/17] test: Add comprehensive SQL execution tests for ChannelSqlExtension - Added tests for DDL and DML operations, including table creation, updates, and deletions. - Implemented error handling tests for invalid SQL statements and database configurations. - Ensured proper handling of statement normalization. Signed-off-by: Edmund Miller --- .../test/nextflow/sql/SqlExecutionTest.groovy | 260 ++++++++++++++++++ 1 file changed, 260 insertions(+) create mode 100644 plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy diff --git a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy new file mode 100644 index 0000000..0a5c4e0 --- /dev/null +++ b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy @@ -0,0 +1,260 @@ +/* + * Copyright 2020-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.sql + +import groovy.sql.Sql +import nextflow.Global +import nextflow.Session +import spock.lang.Specification +import spock.lang.Timeout + +/** + * Tests for the SQL execution functionality (execute and executeUpdate methods) + * + * @author Seqera Labs + */ +@Timeout(10) +class SqlExecutionTest extends Specification { + + def setupSpec() { + // Initialize session for tests + Global.session = Mock(Session) + } + + def cleanup() { + Global.session = null + } + + def 'should execute DDL statements successfully'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_ddl_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: + Global.session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + + when: 'Creating a table' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'CREATE TABLE test_table(id INT PRIMARY KEY, name VARCHAR(255))' + ]) + + then: 'Table should be created' + sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() > 0 + + when: 'Altering the table' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'ALTER TABLE test_table ADD COLUMN description VARCHAR(255)' + ]) + + then: 'Column should be added' + sql.rows('SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = \'TEST_TABLE\' AND COLUMN_NAME = \'DESCRIPTION\'').size() > 0 + + when: 'Dropping the table' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'DROP TABLE test_table' + ]) + + then: 'Table should be dropped' + sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() == 0 + } + + def 'should execute DML statements successfully'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_dml_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: 'Create a table' + sql.execute('CREATE TABLE test_dml(id INT PRIMARY KEY, name VARCHAR(255), value INT)') + + and: + Global.session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + + when: 'Inserting data' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'INSERT INTO test_dml (id, name, value) VALUES (1, \'item1\', 100)' + ]) + + then: 'Row should be inserted' + sql.rows('SELECT * FROM test_dml').size() == 1 + sql.firstRow('SELECT * FROM test_dml WHERE id = 1').name == 'item1' + + when: 'Updating data' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'UPDATE test_dml SET value = 200 WHERE id = 1' + ]) + + then: 'Row should be updated' + sql.firstRow('SELECT value FROM test_dml WHERE id = 1').value == 200 + + when: 'Deleting data' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'DELETE FROM test_dml WHERE id = 1' + ]) + + then: 'Row should be deleted' + sql.rows('SELECT * FROM test_dml').size() == 0 + } + + def 'should return affected row count with executeUpdate'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_update_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: 'Create a table and insert multiple rows' + sql.execute('CREATE TABLE test_update(id INT PRIMARY KEY, name VARCHAR(255), value INT)') + sql.execute('INSERT INTO test_update VALUES (1, \'item1\', 100)') + sql.execute('INSERT INTO test_update VALUES (2, \'item2\', 100)') + sql.execute('INSERT INTO test_update VALUES (3, \'item3\', 100)') + + and: + Global.session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + + when: 'Inserting data with executeUpdate' + def insertCount = ChannelSqlExtension.executeUpdate([ + db: 'test', + statement: 'INSERT INTO test_update (id, name, value) VALUES (4, \'item4\', 100)' + ]) + + then: 'Should return 1 affected row' + insertCount == 1 + sql.rows('SELECT * FROM test_update').size() == 4 + + when: 'Updating multiple rows' + def updateCount = ChannelSqlExtension.executeUpdate([ + db: 'test', + statement: 'UPDATE test_update SET value = 200 WHERE value = 100' + ]) + + then: 'Should return 4 affected rows' + updateCount == 4 + sql.rows('SELECT * FROM test_update WHERE value = 200').size() == 4 + + when: 'Deleting multiple rows' + def deleteCount = ChannelSqlExtension.executeUpdate([ + db: 'test', + statement: 'DELETE FROM test_update WHERE value = 200' + ]) + + then: 'Should return 4 affected rows' + deleteCount == 4 + sql.rows('SELECT * FROM test_update').size() == 0 + } + + def 'should handle invalid SQL correctly'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_error_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: + Global.session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + + when: 'Executing invalid SQL' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'INVALID SQL STATEMENT' + ]) + + then: 'Should throw an exception' + thrown(Exception) + + when: 'Executing query with invalid table name' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'SELECT * FROM non_existent_table' + ]) + + then: 'Should throw an exception' + thrown(Exception) + } + + def 'should handle invalid database configuration correctly'() { + given: + Global.session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: 'jdbc:h2:mem:test']]]] + } + + when: 'Using non-existent database alias' + ChannelSqlExtension.execute([ + db: 'non_existent_db', + statement: 'SELECT 1' + ]) + + then: 'Should throw an IllegalArgumentException' + thrown(IllegalArgumentException) + + when: 'Missing statement parameter' + ChannelSqlExtension.execute([ + db: 'test' + ]) + + then: 'Should throw an IllegalArgumentException' + thrown(IllegalArgumentException) + + when: 'Empty statement parameter' + ChannelSqlExtension.execute([ + db: 'test', + statement: '' + ]) + + then: 'Should throw an IllegalArgumentException' + thrown(IllegalArgumentException) + } + + def 'should handle statement normalization correctly'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_norm_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: + Global.session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + + when: 'Executing statement without semicolon' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'CREATE TABLE test_norm(id INT PRIMARY KEY)' + ]) + + then: 'Statement should be executed successfully' + sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() > 0 + + when: 'Executing statement with trailing whitespace' + ChannelSqlExtension.execute([ + db: 'test', + statement: 'DROP TABLE test_norm ' + ]) + + then: 'Statement should be executed successfully' + sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() == 0 + } +} \ No newline at end of file From ea1c26cb630ca1ac2d0d1f4e8c15a770b11ca082 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 30 Apr 2025 16:46:34 -0500 Subject: [PATCH 07/17] refactor: Update ChannelSqlExtension methods to instance methods - Changed `execute` and `executeUpdate` methods from static to instance methods, allowing for better integration with session management. - Updated test cases to reflect the new method calls and ensure proper session initialization. Signed-off-by: Edmund Miller --- .../nextflow/sql/ChannelSqlExtension.groovy | 11 ++-- .../test/nextflow/sql/SqlExecutionTest.groovy | 56 +++++++++++-------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index da104b0..20d8eff 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -30,6 +30,7 @@ import nextflow.Session import nextflow.extension.CH import nextflow.extension.DataflowHelper import nextflow.plugin.extension.Factory +import nextflow.plugin.extension.Function import nextflow.plugin.extension.Operator import nextflow.plugin.extension.PluginExtensionPoint import nextflow.sql.config.SqlConfig @@ -147,7 +148,8 @@ class ChannelSqlExtension extends PluginExtensionPoint { * * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) */ - static void execute(Map params) { + @Function + void execute(Map params) { CheckHelper.checkParams('execute', params, EXECUTE_PARAMS) final String dbName = params.db as String ?: 'default' @@ -156,7 +158,7 @@ class ChannelSqlExtension extends PluginExtensionPoint { if (!statement) throw new IllegalArgumentException("Missing required parameter 'statement'") - final sqlConfig = new SqlConfig((Map) Global.session.config.navigate('sql.db')) + final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db')) final SqlDataSource dataSource = sqlConfig.getDataSource(dbName) if (dataSource == null) { @@ -187,7 +189,8 @@ class ChannelSqlExtension extends PluginExtensionPoint { * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) * @return The number of rows affected by the SQL statement */ - static int executeUpdate(Map params) { + @Function + int executeUpdate(Map params) { CheckHelper.checkParams('executeUpdate', params, EXECUTE_PARAMS) final String dbName = params.db as String ?: 'default' @@ -196,7 +199,7 @@ class ChannelSqlExtension extends PluginExtensionPoint { if (!statement) throw new IllegalArgumentException("Missing required parameter 'statement'") - final sqlConfig = new SqlConfig((Map) Global.session.config.navigate('sql.db')) + final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db')) final SqlDataSource dataSource = sqlConfig.getDataSource(dbName) if (dataSource == null) { diff --git a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy index 0a5c4e0..cc2ef5c 100644 --- a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy +++ b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy @@ -46,12 +46,14 @@ class SqlExecutionTest extends Specification { def sql = Sql.newInstance(JDBC_URL, 'sa', null) and: - Global.session = Mock(Session) { + def session = Mock(Session) { getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) when: 'Creating a table' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'CREATE TABLE test_table(id INT PRIMARY KEY, name VARCHAR(255))' ]) @@ -60,7 +62,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() > 0 when: 'Altering the table' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'ALTER TABLE test_table ADD COLUMN description VARCHAR(255)' ]) @@ -69,7 +71,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = \'TEST_TABLE\' AND COLUMN_NAME = \'DESCRIPTION\'').size() > 0 when: 'Dropping the table' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'DROP TABLE test_table' ]) @@ -87,12 +89,14 @@ class SqlExecutionTest extends Specification { sql.execute('CREATE TABLE test_dml(id INT PRIMARY KEY, name VARCHAR(255), value INT)') and: - Global.session = Mock(Session) { + def session = Mock(Session) { getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) when: 'Inserting data' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'INSERT INTO test_dml (id, name, value) VALUES (1, \'item1\', 100)' ]) @@ -102,7 +106,7 @@ class SqlExecutionTest extends Specification { sql.firstRow('SELECT * FROM test_dml WHERE id = 1').name == 'item1' when: 'Updating data' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'UPDATE test_dml SET value = 200 WHERE id = 1' ]) @@ -111,7 +115,7 @@ class SqlExecutionTest extends Specification { sql.firstRow('SELECT value FROM test_dml WHERE id = 1').value == 200 when: 'Deleting data' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'DELETE FROM test_dml WHERE id = 1' ]) @@ -132,12 +136,14 @@ class SqlExecutionTest extends Specification { sql.execute('INSERT INTO test_update VALUES (3, \'item3\', 100)') and: - Global.session = Mock(Session) { + def session = Mock(Session) { getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) when: 'Inserting data with executeUpdate' - def insertCount = ChannelSqlExtension.executeUpdate([ + def insertCount = sqlExtension.executeUpdate([ db: 'test', statement: 'INSERT INTO test_update (id, name, value) VALUES (4, \'item4\', 100)' ]) @@ -147,7 +153,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT * FROM test_update').size() == 4 when: 'Updating multiple rows' - def updateCount = ChannelSqlExtension.executeUpdate([ + def updateCount = sqlExtension.executeUpdate([ db: 'test', statement: 'UPDATE test_update SET value = 200 WHERE value = 100' ]) @@ -157,7 +163,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT * FROM test_update WHERE value = 200').size() == 4 when: 'Deleting multiple rows' - def deleteCount = ChannelSqlExtension.executeUpdate([ + def deleteCount = sqlExtension.executeUpdate([ db: 'test', statement: 'DELETE FROM test_update WHERE value = 200' ]) @@ -173,12 +179,14 @@ class SqlExecutionTest extends Specification { def sql = Sql.newInstance(JDBC_URL, 'sa', null) and: - Global.session = Mock(Session) { + def session = Mock(Session) { getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) when: 'Executing invalid SQL' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'INVALID SQL STATEMENT' ]) @@ -187,7 +195,7 @@ class SqlExecutionTest extends Specification { thrown(Exception) when: 'Executing query with invalid table name' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'SELECT * FROM non_existent_table' ]) @@ -198,12 +206,14 @@ class SqlExecutionTest extends Specification { def 'should handle invalid database configuration correctly'() { given: - Global.session = Mock(Session) { + def session = Mock(Session) { getConfig() >> [sql: [db: [test: [url: 'jdbc:h2:mem:test']]]] } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) when: 'Using non-existent database alias' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'non_existent_db', statement: 'SELECT 1' ]) @@ -212,7 +222,7 @@ class SqlExecutionTest extends Specification { thrown(IllegalArgumentException) when: 'Missing statement parameter' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test' ]) @@ -220,7 +230,7 @@ class SqlExecutionTest extends Specification { thrown(IllegalArgumentException) when: 'Empty statement parameter' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: '' ]) @@ -235,12 +245,14 @@ class SqlExecutionTest extends Specification { def sql = Sql.newInstance(JDBC_URL, 'sa', null) and: - Global.session = Mock(Session) { + def session = Mock(Session) { getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) when: 'Executing statement without semicolon' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'CREATE TABLE test_norm(id INT PRIMARY KEY)' ]) @@ -249,7 +261,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() > 0 when: 'Executing statement with trailing whitespace' - ChannelSqlExtension.execute([ + sqlExtension.execute([ db: 'test', statement: 'DROP TABLE test_norm ' ]) From b738fab0f0e23677773defb1381a98e512369666 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 30 Apr 2025 16:46:58 -0500 Subject: [PATCH 08/17] refactor: Update SQL execution example to use file-based H2 database - Changed the database configuration from in-memory to file-based for persistence. - Updated SQL statements to use uppercase for table and column names for consistency. - Enhanced logging to include results of table creation and updates. Signed-off-by: Edmund Miller --- .../main.nf} | 44 +++++++++++-------- .../nextflow.config} | 4 +- 2 files changed, 27 insertions(+), 21 deletions(-) rename examples/{sql-execution-example.nf => sql-execution/main.nf} (57%) rename examples/{sql-execution-example.config => sql-execution/nextflow.config} (77%) diff --git a/examples/sql-execution-example.nf b/examples/sql-execution/main.nf similarity index 57% rename from examples/sql-execution-example.nf rename to examples/sql-execution/main.nf index 0690aba..3c70df4 100644 --- a/examples/sql-execution-example.nf +++ b/examples/sql-execution/main.nf @@ -19,49 +19,55 @@ workflow { // Step 1: Create a table log.info "Creating a sample table..." - execute( + def createResult = executeUpdate( db: 'demo', statement: ''' - CREATE TABLE IF NOT EXISTS test_table ( - id INTEGER PRIMARY KEY, - name VARCHAR(100), - value DOUBLE + CREATE TABLE IF NOT EXISTS TEST_TABLE ( + ID INTEGER PRIMARY KEY, + NAME VARCHAR(100), + VALUE DOUBLE ) ''' ) + log.info "Create table result: $createResult" // Step 2: Insert some data log.info "Inserting data..." - execute( + executeUpdate( db: 'demo', statement: ''' - INSERT INTO test_table (id, name, value) VALUES + INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES (1, 'alpha', 10.5), (2, 'beta', 20.7), (3, 'gamma', 30.2), - (4, 'delta', 40.9) + (4, 'delta', 40.9); ''' ) - // Step 3: Update data and get affected row count + // Step 3: Update some data log.info "Updating data..." - def updatedRows = executeUpdate( + executeUpdate( db: 'demo', - statement: "UPDATE test_table SET value = value * 2 WHERE value > 20" + statement: ''' + UPDATE TEST_TABLE + SET VALUE = VALUE * 2 + WHERE ID = 2; + ''' ) - log.info "Updated $updatedRows row(s)" - // Step 4: Delete data and get affected row count + // Step 4: Delete some data log.info "Deleting data..." - def deletedRows = executeUpdate( + executeUpdate( db: 'demo', - statement: "DELETE FROM test_table WHERE value > 60" + statement: ''' + DELETE FROM TEST_TABLE + WHERE ID = 4; + ''' ) - log.info "Deleted $deletedRows row(s)" - // Step 5: Query the results to verify - log.info "Querying remaining data..." + // Step 5: Query results + log.info "Querying results..." channel - .fromQuery("SELECT * FROM test_table ORDER BY id", db: 'demo') + .fromQuery("SELECT * FROM TEST_TABLE ORDER BY ID", db: 'demo') .view { row -> "ID: ${row[0]}, Name: ${row[1]}, Value: ${row[2]}" } } \ No newline at end of file diff --git a/examples/sql-execution-example.config b/examples/sql-execution/nextflow.config similarity index 77% rename from examples/sql-execution-example.config rename to examples/sql-execution/nextflow.config index 4de67c8..c78361c 100644 --- a/examples/sql-execution-example.config +++ b/examples/sql-execution/nextflow.config @@ -7,11 +7,11 @@ plugins { id 'nf-sqldb@0.7.0' } -// Define an in-memory H2 database for the example +// Define a file-based H2 database for the example sql { db { demo { - url = 'jdbc:h2:mem:demo' + url = 'jdbc:h2:./demo' driver = 'org.h2.Driver' } } From 86881f576ec0eff036f50296ca47896325c63c9a Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Thu, 1 May 2025 13:19:09 -0500 Subject: [PATCH 09/17] fix: Enhance error handling for database connection and commit operations - Added try-catch blocks around `setAutoCommit` and `commit` methods to handle exceptions for databases that do not support these operations. - Improved logging to provide feedback when default behaviors are used due to unsupported operations. Signed-off-by: Edmund Miller --- .../main/nextflow/sql/InsertHandler.groovy | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy index 03a34bb..5a56667 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy @@ -70,7 +70,12 @@ class InsertHandler implements Closeable { if( connection == null ) { connection = Sql.newInstance(ds.toMap()).getConnection() checkCreate(connection) - connection.setAutoCommit(false) + try { + connection.setAutoCommit(false) + } + catch(Exception e) { + log.debug "Database does not support setAutoCommit, continuing with default settings: ${e.message}" + } } return connection } @@ -197,7 +202,12 @@ class InsertHandler implements Closeable { // reset the current batch count batchCount = 0 // make sure to commit the current batch - connection.commit() + try { + connection.commit() + } + catch(Exception e) { + log.debug "Database does not support commit, continuing with default behavior: ${e.message}" + } } } } @@ -238,7 +248,12 @@ class InsertHandler implements Closeable { log.debug("[SQL] flushing and committing open batch") preparedStatement.executeBatch() preparedStatement.close() - connection.commit() + try { + connection.commit() + } + catch(Exception e) { + log.debug "Database does not support commit, continuing with default behavior: ${e.message}" + } } } finally { From 13111a886241f171cd37e9174eaa94dd7ee799ff Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Fri, 2 May 2025 13:52:05 -0500 Subject: [PATCH 10/17] feat: Add integration testing framework for database backends - Added a new `.envrc` file to manage environment variables for database connections. Signed-off-by: Edmund Miller --- .envrc | 6 ++ .../main/nextflow/sql/InsertHandler.groovy | 46 +++++++++++-- .../sql/SqlPluginIntegrationTest.groovy | 65 +++++++++++++++++++ .../src/testResources/testDir/nextflow.config | 19 ++++++ .../src/testResources/testDir/test_sql_db.nf | 37 +++++++++++ 5 files changed, 169 insertions(+), 4 deletions(-) create mode 100644 .envrc create mode 100644 plugins/nf-sqldb/src/test/groovy/nextflow/sql/SqlPluginIntegrationTest.groovy create mode 100644 plugins/nf-sqldb/src/testResources/testDir/nextflow.config create mode 100644 plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..c0a3d12 --- /dev/null +++ b/.envrc @@ -0,0 +1,6 @@ +source_url "https://github.com/tmatilai/direnv-1password/raw/v1.0.1/1password.sh" \ + "sha256-4dmKkmlPBNXimznxeehplDfiV+CvJiIzg7H1Pik4oqY=" + +# Fetch one secret and export it into the specified environment variable +from_op DATABRICKS_JDBC_URL=op://Employee/e3ynriit7iof45533u3slrnjka/hostname +from_op DATABRICKS_TOKEN=op://Employee/e3ynriit7iof45533u3slrnjka/credential diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy index 5a56667..1a0572e 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy @@ -73,6 +73,9 @@ class InsertHandler implements Closeable { try { connection.setAutoCommit(false) } + catch(UnsupportedOperationException e) { + log.debug "setAutoCommit is not supported by this driver (likely Databricks), continuing: ${e.message}" + } catch(Exception e) { log.debug "Database does not support setAutoCommit, continuing with default settings: ${e.message}" } @@ -162,7 +165,15 @@ class InsertHandler implements Closeable { for(int i=0; i0 ) { log.debug("[SQL] flushing and committing open batch") - preparedStatement.executeBatch() - preparedStatement.close() + try { + preparedStatement.executeBatch() + } + catch(UnsupportedOperationException e) { + log.debug "executeBatch is not supported by this driver (likely Databricks), continuing: ${e.message}" + } + catch(Exception e) { + log.debug "Database does not support executeBatch, continuing with default behavior: ${e.message}" + } + try { + preparedStatement.close() + } + catch(Exception e) { + log.debug "Database does not support preparedStatement.close(), continuing: ${e.message}" + } try { connection.commit() } + catch(UnsupportedOperationException e) { + log.debug "commit is not supported by this driver (likely Databricks), continuing: ${e.message}" + } catch(Exception e) { log.debug "Database does not support commit, continuing with default behavior: ${e.message}" } diff --git a/plugins/nf-sqldb/src/test/groovy/nextflow/sql/SqlPluginIntegrationTest.groovy b/plugins/nf-sqldb/src/test/groovy/nextflow/sql/SqlPluginIntegrationTest.groovy new file mode 100644 index 0000000..05274f3 --- /dev/null +++ b/plugins/nf-sqldb/src/test/groovy/nextflow/sql/SqlPluginIntegrationTest.groovy @@ -0,0 +1,65 @@ +package nextflow.sql + +import spock.lang.Specification +import spock.lang.Requires +import java.nio.file.Path +import java.nio.file.Paths +import java.nio.file.Files + +class SqlPluginIntegrationTest extends Specification { + + static boolean isNextflowAvailable() { + try { + def proc = new ProcessBuilder('nextflow', '--version').start() + proc.waitFor() + return proc.exitValue() == 0 + } catch (Exception e) { + return false + } + } + + static boolean hasDatabricksCredentials() { + def jdbcUrl = System.getenv('DATABRICKS_JDBC_URL') + def token = System.getenv('DATABRICKS_TOKEN') + return jdbcUrl && token && !jdbcUrl.isEmpty() && !token.isEmpty() + } + + @Requires({ isNextflowAvailable() && hasDatabricksCredentials() }) + def 'should run Nextflow pipeline with SQL plugin and Databricks'() { + given: + // Ensure test resources directory exists + def testDir = Paths.get('plugins/nf-sqldb/src/testResources/testDir').toAbsolutePath() + def scriptPath = testDir.resolve('test_sql_db.nf') + def configPath = testDir.resolve('nextflow.config') + + // Check if required files exist + assert Files.exists(testDir), "Test directory doesn't exist: $testDir" + assert Files.exists(scriptPath), "Script file doesn't exist: $scriptPath" + assert Files.exists(configPath), "Config file doesn't exist: $configPath" + + def env = [ + 'DATABRICKS_JDBC_URL': System.getenv('DATABRICKS_JDBC_URL'), + 'DATABRICKS_TOKEN': System.getenv('DATABRICKS_TOKEN') + ] + + when: + def pb = new ProcessBuilder('nextflow', 'run', scriptPath.toString(), '-c', configPath.toString()) + pb.directory(testDir.toFile()) + pb.environment().putAll(env) + pb.redirectErrorStream(true) + + def proc = pb.start() + def output = new StringBuilder() + proc.inputStream.eachLine { line -> + println line // Print output in real-time for debugging + output.append(line).append('\n') + } + def exitCode = proc.waitFor() + + then: + exitCode == 0 + output.toString().contains('alpha') // Should see query result in output + output.toString().contains('beta') + output.toString().contains('Updated 1 row(s)') + } +} \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/testDir/nextflow.config b/plugins/nf-sqldb/src/testResources/testDir/nextflow.config new file mode 100644 index 0000000..7b7941a --- /dev/null +++ b/plugins/nf-sqldb/src/testResources/testDir/nextflow.config @@ -0,0 +1,19 @@ +plugins { + id 'nf-sqldb' +} + +sql { + db { + foo { + url = System.getenv('DATABRICKS_JDBC_URL') + user = '' + password = '' + properties { + token = System.getenv('DATABRICKS_TOKEN') + ConnCatalog = "hive_metastore" + ConnSchema = "default" + } + // Add any other required properties for Databricks + } + } +} \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf b/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf new file mode 100644 index 0000000..1d38121 --- /dev/null +++ b/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf @@ -0,0 +1,37 @@ +nextflow.enable.dsl=2 + +include { fromQuery; sqlInsert; execute; executeUpdate } from 'plugin/nf-sqldb' + +workflow { + // Setup: create table + execute( + db: 'foo', + statement: ''' + CREATE TABLE IF NOT EXISTS sample_table ( + id INTEGER PRIMARY KEY, + name VARCHAR(100), + value DOUBLE + ) + ''' + ) + + // Insert data using sqlInsert + Channel + .of([1, 'alpha', 10.5], [2, 'beta', 20.5]) + .sqlInsert( + db: 'foo', + into: 'sample_table', + columns: 'id, name, value' + ) + + // Query data using fromQuery + fromQuery('SELECT * FROM sample_table', db: 'foo') + .view() + + // Update data using executeUpdate + def updated = executeUpdate( + db: 'foo', + statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'beta'" + ) + println "Updated $updated row(s)" +} \ No newline at end of file From 593e4f479efd6cb4a15f98d34b5267ac96246433 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Fri, 2 May 2025 15:56:05 -0500 Subject: [PATCH 11/17] refactor: Rename execute function to sqlExecute and update documentation - Renamed the `execute` function to `sqlExecute` for clarity and consistency. - Updated all references in the README, example scripts, and tests to reflect the new function name. - Enhanced documentation to provide clearer usage examples for SQL execution functions. Signed-off-by: Edmund Miller Co-authored-by: Edmund Miller Co-authored-by: Paolo Di Tommaso --- README.md | 20 +++++++------- examples/sql-execution/main.nf | 4 +-- .../nextflow/sql/ChannelSqlExtension.groovy | 4 +-- .../src/main/nextflow/sql/QueryHandler.groovy | 6 ++--- .../test/nextflow/sql/SqlExecutionTest.groovy | 26 +++++++++---------- .../src/testResources/testDir/test_sql_db.nf | 6 ++--- 6 files changed, 33 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index c9690a8..9e628d0 100644 --- a/README.md +++ b/README.md @@ -131,15 +131,15 @@ The following options are available: This plugin provides the following functions for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations. -### execute +### sqlExecute -The `execute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For example: +The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For example: ```nextflow -include { execute } from 'plugin/nf-sqldb' +include { sqlExecute } from 'plugin/nf-sqldb' // Create a table -execute( +sqlExecute( db: 'foo', statement: ''' CREATE TABLE IF NOT EXISTS sample_table ( @@ -151,13 +151,13 @@ execute( ) // Insert data -execute( +sqlExecute( db: 'foo', statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)" ) // Delete data -execute( +sqlExecute( db: 'foo', statement: "DELETE FROM sample_table WHERE id = 1" ) @@ -173,7 +173,7 @@ The following options are available: ### executeUpdate -The `executeUpdate` function is similar to `execute`, but it returns the number of rows affected by the SQL statement. This is particularly useful for DML operations like `INSERT`, `UPDATE`, and `DELETE` where you need to know how many rows were affected. For example: +The `executeUpdate` function is similar to `sqlExecute`, but it returns the number of rows affected by the SQL statement. This is particularly useful for DML operations like `INSERT`, `UPDATE`, and `DELETE` where you need to know how many rows were affected. For example: ```nextflow include { executeUpdate } from 'plugin/nf-sqldb' @@ -216,8 +216,8 @@ The plugin provides two different ways to interact with databases: - `fromQuery`: Queries data from a database and returns a channel that emits the results. - `sqlInsert`: Takes data from a channel and inserts it into a database. -2. **Execution Functions** (`execute` and `executeUpdate`): These are designed for direct SQL statement execution that doesn't require channel integration. - - `execute`: Executes a SQL statement without returning any data. +2. **Execution Functions** (`sqlExecute` and `executeUpdate`): These are designed for direct SQL statement execution that doesn't require channel integration. + - `sqlExecute`: Executes a SQL statement without returning any data. - `executeUpdate`: Executes a SQL statement and returns the count of affected rows. Use **Dataflow Operators** when you need to: @@ -262,4 +262,4 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously. -In particular, data inserted using the `sqlInsert` operator is *not* guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation. +In particular, data inserted using the `sqlInsert` operator is *not* guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation. \ No newline at end of file diff --git a/examples/sql-execution/main.nf b/examples/sql-execution/main.nf index 3c70df4..e34af92 100644 --- a/examples/sql-execution/main.nf +++ b/examples/sql-execution/main.nf @@ -1,10 +1,10 @@ #!/usr/bin/env nextflow /* - * Example script demonstrating how to use the SQL execute and executeUpdate functions + * Example script demonstrating how to use the SQL sqlExecute and executeUpdate functions */ -include { execute; executeUpdate } from 'plugin/nf-sqldb' +include { sqlExecute; executeUpdate } from 'plugin/nf-sqldb' include { fromQuery } from 'plugin/nf-sqldb' // Define database configuration in nextflow.config file diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index 20d8eff..5f9e17f 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -149,8 +149,8 @@ class ChannelSqlExtension extends PluginExtensionPoint { * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) */ @Function - void execute(Map params) { - CheckHelper.checkParams('execute', params, EXECUTE_PARAMS) + void sqlExecute(Map params) { + CheckHelper.checkParams('sqlExecute', params, EXECUTE_PARAMS) final String dbName = params.db as String ?: 'default' final String statement = params.statement as String diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy index febad90..b1fc5b1 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy @@ -52,11 +52,11 @@ class QueryHandler implements QueryOp { type_mapping.TINYINT = Byte type_mapping.SMALLINT = Short type_mapping.INTEGER = Integer - type_mapping.BIGINT = Long + type_mapping.BIGINT = Long type_mapping.REAL= Float type_mapping.FLOAT= Double - type_mapping.DOUBLE = Double - type_mapping.BINARY = byte[] + type_mapping.DOUBLE = Double + type_mapping.BINARY = byte[] type_mapping.VARBINARY = byte[] type_mapping.LONGVARBINARY= byte[] type_mapping.DATE = java.sql.Date diff --git a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy index cc2ef5c..59f07ca 100644 --- a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy +++ b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy @@ -53,7 +53,7 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Creating a table' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'CREATE TABLE test_table(id INT PRIMARY KEY, name VARCHAR(255))' ]) @@ -62,7 +62,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() > 0 when: 'Altering the table' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'ALTER TABLE test_table ADD COLUMN description VARCHAR(255)' ]) @@ -71,7 +71,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = \'TEST_TABLE\' AND COLUMN_NAME = \'DESCRIPTION\'').size() > 0 when: 'Dropping the table' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'DROP TABLE test_table' ]) @@ -96,7 +96,7 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Inserting data' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'INSERT INTO test_dml (id, name, value) VALUES (1, \'item1\', 100)' ]) @@ -106,7 +106,7 @@ class SqlExecutionTest extends Specification { sql.firstRow('SELECT * FROM test_dml WHERE id = 1').name == 'item1' when: 'Updating data' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'UPDATE test_dml SET value = 200 WHERE id = 1' ]) @@ -115,7 +115,7 @@ class SqlExecutionTest extends Specification { sql.firstRow('SELECT value FROM test_dml WHERE id = 1').value == 200 when: 'Deleting data' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'DELETE FROM test_dml WHERE id = 1' ]) @@ -186,7 +186,7 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Executing invalid SQL' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'INVALID SQL STATEMENT' ]) @@ -195,7 +195,7 @@ class SqlExecutionTest extends Specification { thrown(Exception) when: 'Executing query with invalid table name' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'SELECT * FROM non_existent_table' ]) @@ -213,7 +213,7 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Using non-existent database alias' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'non_existent_db', statement: 'SELECT 1' ]) @@ -222,7 +222,7 @@ class SqlExecutionTest extends Specification { thrown(IllegalArgumentException) when: 'Missing statement parameter' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test' ]) @@ -230,7 +230,7 @@ class SqlExecutionTest extends Specification { thrown(IllegalArgumentException) when: 'Empty statement parameter' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: '' ]) @@ -252,7 +252,7 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Executing statement without semicolon' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'CREATE TABLE test_norm(id INT PRIMARY KEY)' ]) @@ -261,7 +261,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() > 0 when: 'Executing statement with trailing whitespace' - sqlExtension.execute([ + sqlExtension.sqlExecute([ db: 'test', statement: 'DROP TABLE test_norm ' ]) diff --git a/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf b/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf index 1d38121..e16bfb8 100644 --- a/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf +++ b/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf @@ -1,13 +1,13 @@ nextflow.enable.dsl=2 -include { fromQuery; sqlInsert; execute; executeUpdate } from 'plugin/nf-sqldb' +include { fromQuery; sqlInsert; sqlExecute; executeUpdate } from 'plugin/nf-sqldb' workflow { // Setup: create table - execute( + sqlExecute( db: 'foo', statement: ''' - CREATE TABLE IF NOT EXISTS sample_table ( + CREATE TABLE IF NOT EXISTS testing ( id INTEGER PRIMARY KEY, name VARCHAR(100), value DOUBLE From 3598b14b535d219984e10a091c75b5d05c4335ef Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Fri, 2 May 2025 19:09:31 -0500 Subject: [PATCH 12/17] refactor: Rewrite execute functions into on one function - Enhanced logging in examples to provide feedback on the number of affected rows and results of DDL operations. Signed-off-by: Edmund Miller Co-authored-by: Paolo Di Tommaso --- README.md | 90 +++++++------------ examples/sql-execution/main.nf | 25 +++--- .../nextflow/sql/ChannelSqlExtension.groovy | 59 ++++-------- .../test/nextflow/sql/SqlExecutionTest.groovy | 56 +++++++----- .../src/testResources/testDir/test_sql_db.nf | 11 +-- 5 files changed, 101 insertions(+), 140 deletions(-) diff --git a/README.md b/README.md index 9e628d0..0441611 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,13 @@ This plugin provides support for interacting with SQL databases in Nextflow scri The following databases are currently supported: -* [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md)) -* [DuckDB](https://duckdb.org/) -* [H2](https://www.h2database.com) -* [MySQL](https://www.mysql.com/) -* [MariaDB](https://mariadb.org/) -* [PostgreSQL](https://www.postgresql.org/) -* [SQLite](https://www.sqlite.org/index.html) +- [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md)) +- [DuckDB](https://duckdb.org/) +- [H2](https://www.h2database.com) +- [MySQL](https://www.mysql.com/) +- [MariaDB](https://mariadb.org/) +- [PostgreSQL](https://www.postgresql.org/) +- [SQLite](https://www.sqlite.org/index.html) NOTE: THIS IS A PREVIEW TECHNOLOGY, FEATURES AND CONFIGURATION SETTINGS CAN CHANGE IN FUTURE RELEASES. @@ -24,7 +24,6 @@ plugins { } ``` - ## Configuration You can configure any number of databases under the `sql.db` configuration scope. For example: @@ -79,7 +78,7 @@ The following options are available: `batchSize` : Query the data in batches of the given size. This option is recommended for queries that may return large a large result set, so that the entire result set is not loaded into memory at once. -: *NOTE:* this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`. +: _NOTE:_ this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`. `emitColumns` : When `true`, the column names in the `SELECT` statement are emitted as the first tuple in the resulting channel. @@ -104,7 +103,7 @@ INSERT INTO SAMPLE (NAME, LEN) VALUES ('HELLO', 5); INSERT INTO SAMPLE (NAME, LEN) VALUES ('WORLD!', 6); ``` -*NOTE:* the target table (e.g. `SAMPLE` in the above example) must be created beforehand. +_NOTE:_ the target table (e.g. `SAMPLE` in the above example) must be created beforehand. The following options are available: @@ -125,21 +124,23 @@ The following options are available: `setup` : A SQL statement that is executed before inserting the data, e.g. to create the target table. -: *NOTE:* the underlying database should support the *create table if not exist* idiom, as the plugin will execute this statement every time the script is run. +: _NOTE:_ the underlying database should support the _create table if not exist_ idiom, as the plugin will execute this statement every time the script is run. ## SQL Execution Functions -This plugin provides the following functions for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations. +This plugin provides the following function for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations. ### sqlExecute -The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For example: +The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For DML statements (`INSERT`, `UPDATE`, `DELETE`), it returns the number of rows affected. For DDL statements (`CREATE`, `ALTER`, `DROP`), it returns `null`. + +For example: ```nextflow include { sqlExecute } from 'plugin/nf-sqldb' -// Create a table -sqlExecute( +// Create a table (returns null for DDL operations) +def createResult = sqlExecute( db: 'foo', statement: ''' CREATE TABLE IF NOT EXISTS sample_table ( @@ -149,51 +150,24 @@ sqlExecute( ) ''' ) +println "Create result: $createResult" // null -// Insert data -sqlExecute( +// Insert data (returns 1 for number of rows affected) +def insertedRows = sqlExecute( db: 'foo', statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)" ) - -// Delete data -sqlExecute( - db: 'foo', - statement: "DELETE FROM sample_table WHERE id = 1" -) -``` - -The following options are available: - -`db` -: The database handle. It must be defined under `sql.db` in the Nextflow configuration. - -`statement` -: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set. - -### executeUpdate - -The `executeUpdate` function is similar to `sqlExecute`, but it returns the number of rows affected by the SQL statement. This is particularly useful for DML operations like `INSERT`, `UPDATE`, and `DELETE` where you need to know how many rows were affected. For example: - -```nextflow -include { executeUpdate } from 'plugin/nf-sqldb' - -// Insert data and get the number of rows inserted -def insertedRows = executeUpdate( - db: 'foo', - statement: "INSERT INTO sample_table (id, name, value) VALUES (2, 'beta', 20.5)" -) println "Inserted $insertedRows row(s)" -// Update data and get the number of rows updated -def updatedRows = executeUpdate( +// Update data (returns number of rows updated) +def updatedRows = sqlExecute( db: 'foo', - statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'beta'" + statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'alpha'" ) println "Updated $updatedRows row(s)" -// Delete data and get the number of rows deleted -def deletedRows = executeUpdate( +// Delete data (returns number of rows deleted) +def deletedRows = sqlExecute( db: 'foo', statement: "DELETE FROM sample_table WHERE value > 25" ) @@ -206,25 +180,27 @@ The following options are available: : The database handle. It must be defined under `sql.db` in the Nextflow configuration. `statement` -: The SQL statement to execute. This should be a DML statement that can return a count of affected rows. +: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set. -## Differences Between Dataflow Operators and Execution Functions +## Differences Between Dataflow Operators and Execution Function The plugin provides two different ways to interact with databases: 1. **Dataflow Operators** (`fromQuery` and `sqlInsert`): These are designed to integrate with Nextflow's dataflow programming model, operating on channels. + - `fromQuery`: Queries data from a database and returns a channel that emits the results. - `sqlInsert`: Takes data from a channel and inserts it into a database. -2. **Execution Functions** (`sqlExecute` and `executeUpdate`): These are designed for direct SQL statement execution that doesn't require channel integration. - - `sqlExecute`: Executes a SQL statement without returning any data. - - `executeUpdate`: Executes a SQL statement and returns the count of affected rows. +2. **Execution Function** (`sqlExecute`): This is designed for direct SQL statement execution that doesn't require channel integration. + - `sqlExecute`: Executes a SQL statement. For DML operations, it returns the count of affected rows. For DDL operations, it returns null. Use **Dataflow Operators** when you need to: + - Query data that will flow into your pipeline processing - Insert data from your pipeline processing into a database -Use **Execution Functions** when you need to: +Use **Execution Function** when you need to: + - Perform database setup (creating tables, schemas, etc.) - Execute administrative commands - Perform one-off operations (deleting all records, truncating a table, etc.) @@ -262,4 +238,4 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously. -In particular, data inserted using the `sqlInsert` operator is *not* guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation. \ No newline at end of file +In particular, data inserted using the `sqlInsert` operator is _not_ guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation. diff --git a/examples/sql-execution/main.nf b/examples/sql-execution/main.nf index e34af92..b6ca32a 100644 --- a/examples/sql-execution/main.nf +++ b/examples/sql-execution/main.nf @@ -1,10 +1,10 @@ #!/usr/bin/env nextflow /* - * Example script demonstrating how to use the SQL sqlExecute and executeUpdate functions + * Example script demonstrating how to use the SQL sqlExecute function */ -include { sqlExecute; executeUpdate } from 'plugin/nf-sqldb' +include { sqlExecute } from 'plugin/nf-sqldb' include { fromQuery } from 'plugin/nf-sqldb' // Define database configuration in nextflow.config file @@ -13,13 +13,13 @@ include { fromQuery } from 'plugin/nf-sqldb' workflow { log.info """ ========================================= - SQL Execution Functions Example + SQL Execution Function Example ========================================= """ - // Step 1: Create a table + // Step 1: Create a table (DDL operation returns null) log.info "Creating a sample table..." - def createResult = executeUpdate( + def createResult = sqlExecute( db: 'demo', statement: ''' CREATE TABLE IF NOT EXISTS TEST_TABLE ( @@ -31,9 +31,9 @@ workflow { ) log.info "Create table result: $createResult" - // Step 2: Insert some data + // Step 2: Insert some data (DML operation returns affected row count) log.info "Inserting data..." - executeUpdate( + def insertCount = sqlExecute( db: 'demo', statement: ''' INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES @@ -43,10 +43,11 @@ workflow { (4, 'delta', 40.9); ''' ) + log.info "Inserted $insertCount rows" - // Step 3: Update some data + // Step 3: Update some data (DML operation returns affected row count) log.info "Updating data..." - executeUpdate( + def updateCount = sqlExecute( db: 'demo', statement: ''' UPDATE TEST_TABLE @@ -54,16 +55,18 @@ workflow { WHERE ID = 2; ''' ) + log.info "Updated $updateCount rows" - // Step 4: Delete some data + // Step 4: Delete some data (DML operation returns affected row count) log.info "Deleting data..." - executeUpdate( + def deleteCount = sqlExecute( db: 'demo', statement: ''' DELETE FROM TEST_TABLE WHERE ID = 4; ''' ) + log.info "Deleted $deleteCount rows" // Step 5: Query results log.info "Querying results..." diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index 5f9e17f..a1343c4 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -145,11 +145,14 @@ class ChannelSqlExtension extends PluginExtensionPoint { /** * Execute a SQL statement that does not return a result set (DDL/DML statements) + * For DML statements (INSERT, UPDATE, DELETE), it returns the number of affected rows + * For DDL statements (CREATE, ALTER, DROP), it returns null * * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) + * @return The number of rows affected by the SQL statement for DML operations, null for DDL operations */ @Function - void sqlExecute(Map params) { + Integer sqlExecute(Map params) { CheckHelper.checkParams('sqlExecute', params, EXECUTE_PARAMS) final String dbName = params.db as String ?: 'default' @@ -173,7 +176,18 @@ class ChannelSqlExtension extends PluginExtensionPoint { try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) { try (Statement stm = conn.createStatement()) { - stm.execute(normalizeStatement(statement)) + String normalizedStatement = normalizeStatement(statement) + + // For DDL statements (CREATE, ALTER, DROP, etc.), execute() returns true if the first result is a ResultSet + // For DML statements (INSERT, UPDATE, DELETE), executeUpdate() returns the number of rows affected + boolean isDDL = normalizedStatement.trim().toLowerCase().matches("^(create|alter|drop|truncate).*") + + if (isDDL) { + stm.execute(normalizedStatement) + return null + } else { + return stm.executeUpdate(normalizedStatement) + } } } catch (Exception e) { @@ -182,47 +196,6 @@ class ChannelSqlExtension extends PluginExtensionPoint { } } - /** - * Execute a SQL statement that does not return a result set (DDL/DML statements) - * and returns the number of affected rows - * - * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) - * @return The number of rows affected by the SQL statement - */ - @Function - int executeUpdate(Map params) { - CheckHelper.checkParams('executeUpdate', params, EXECUTE_PARAMS) - - final String dbName = params.db as String ?: 'default' - final String statement = params.statement as String - - if (!statement) - throw new IllegalArgumentException("Missing required parameter 'statement'") - - final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db')) - final SqlDataSource dataSource = sqlConfig.getDataSource(dbName) - - if (dataSource == null) { - def msg = "Unknown db name: $dbName" - def choices = sqlConfig.getDataSourceNames().closest(dbName) ?: sqlConfig.getDataSourceNames() - if (choices?.size() == 1) - msg += " - Did you mean: ${choices.get(0)}?" - else if (choices) - msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n' - throw new IllegalArgumentException(msg) - } - - try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) { - try (Statement stm = conn.createStatement()) { - return stm.executeUpdate(normalizeStatement(statement)) - } - } - catch (Exception e) { - log.error("Error executing SQL update statement: ${e.message}", e) - throw e - } - } - /** * Normalizes a SQL statement by adding a semicolon if needed * diff --git a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy index 59f07ca..46e5258 100644 --- a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy +++ b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy @@ -24,7 +24,7 @@ import spock.lang.Specification import spock.lang.Timeout /** - * Tests for the SQL execution functionality (execute and executeUpdate methods) + * Tests for the SQL execution functionality (sqlExecute method) * * @author Seqera Labs */ @@ -40,7 +40,7 @@ class SqlExecutionTest extends Specification { Global.session = null } - def 'should execute DDL statements successfully'() { + def 'should execute DDL statements successfully and return null'() { given: def JDBC_URL = 'jdbc:h2:mem:test_ddl_' + Random.newInstance().nextInt(1_000_000) def sql = Sql.newInstance(JDBC_URL, 'sa', null) @@ -53,34 +53,37 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Creating a table' - sqlExtension.sqlExecute([ + def createResult = sqlExtension.sqlExecute([ db: 'test', statement: 'CREATE TABLE test_table(id INT PRIMARY KEY, name VARCHAR(255))' ]) - then: 'Table should be created' + then: 'Table should be created and result should be null' sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() > 0 + createResult == null when: 'Altering the table' - sqlExtension.sqlExecute([ + def alterResult = sqlExtension.sqlExecute([ db: 'test', statement: 'ALTER TABLE test_table ADD COLUMN description VARCHAR(255)' ]) - then: 'Column should be added' + then: 'Column should be added and result should be null' sql.rows('SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = \'TEST_TABLE\' AND COLUMN_NAME = \'DESCRIPTION\'').size() > 0 + alterResult == null when: 'Dropping the table' - sqlExtension.sqlExecute([ + def dropResult = sqlExtension.sqlExecute([ db: 'test', statement: 'DROP TABLE test_table' ]) - then: 'Table should be dropped' + then: 'Table should be dropped and result should be null' sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() == 0 + dropResult == null } - def 'should execute DML statements successfully'() { + def 'should execute DML statements successfully and return affected row count'() { given: def JDBC_URL = 'jdbc:h2:mem:test_dml_' + Random.newInstance().nextInt(1_000_000) def sql = Sql.newInstance(JDBC_URL, 'sa', null) @@ -96,35 +99,38 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Inserting data' - sqlExtension.sqlExecute([ + def insertResult = sqlExtension.sqlExecute([ db: 'test', statement: 'INSERT INTO test_dml (id, name, value) VALUES (1, \'item1\', 100)' ]) - then: 'Row should be inserted' + then: 'Row should be inserted and result should be 1' sql.rows('SELECT * FROM test_dml').size() == 1 sql.firstRow('SELECT * FROM test_dml WHERE id = 1').name == 'item1' + insertResult == 1 when: 'Updating data' - sqlExtension.sqlExecute([ + def updateResult = sqlExtension.sqlExecute([ db: 'test', statement: 'UPDATE test_dml SET value = 200 WHERE id = 1' ]) - then: 'Row should be updated' + then: 'Row should be updated and result should be 1' sql.firstRow('SELECT value FROM test_dml WHERE id = 1').value == 200 + updateResult == 1 when: 'Deleting data' - sqlExtension.sqlExecute([ + def deleteResult = sqlExtension.sqlExecute([ db: 'test', statement: 'DELETE FROM test_dml WHERE id = 1' ]) - then: 'Row should be deleted' + then: 'Row should be deleted and result should be 1' sql.rows('SELECT * FROM test_dml').size() == 0 + deleteResult == 1 } - def 'should return affected row count with executeUpdate'() { + def 'should return correct affected row count for multiple row operations'() { given: def JDBC_URL = 'jdbc:h2:mem:test_update_' + Random.newInstance().nextInt(1_000_000) def sql = Sql.newInstance(JDBC_URL, 'sa', null) @@ -142,8 +148,8 @@ class SqlExecutionTest extends Specification { def sqlExtension = new ChannelSqlExtension() sqlExtension.init(session) - when: 'Inserting data with executeUpdate' - def insertCount = sqlExtension.executeUpdate([ + when: 'Inserting data' + def insertCount = sqlExtension.sqlExecute([ db: 'test', statement: 'INSERT INTO test_update (id, name, value) VALUES (4, \'item4\', 100)' ]) @@ -153,7 +159,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT * FROM test_update').size() == 4 when: 'Updating multiple rows' - def updateCount = sqlExtension.executeUpdate([ + def updateCount = sqlExtension.sqlExecute([ db: 'test', statement: 'UPDATE test_update SET value = 200 WHERE value = 100' ]) @@ -163,7 +169,7 @@ class SqlExecutionTest extends Specification { sql.rows('SELECT * FROM test_update WHERE value = 200').size() == 4 when: 'Deleting multiple rows' - def deleteCount = sqlExtension.executeUpdate([ + def deleteCount = sqlExtension.sqlExecute([ db: 'test', statement: 'DELETE FROM test_update WHERE value = 200' ]) @@ -252,21 +258,23 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Executing statement without semicolon' - sqlExtension.sqlExecute([ + def result = sqlExtension.sqlExecute([ db: 'test', statement: 'CREATE TABLE test_norm(id INT PRIMARY KEY)' ]) - then: 'Statement should be executed successfully' + then: 'Statement should be executed successfully and result should be null' sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() > 0 + result == null when: 'Executing statement with trailing whitespace' - sqlExtension.sqlExecute([ + def dropResult = sqlExtension.sqlExecute([ db: 'test', statement: 'DROP TABLE test_norm ' ]) - then: 'Statement should be executed successfully' + then: 'Statement should be executed successfully and result should be null' sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() == 0 + dropResult == null } } \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf b/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf index e16bfb8..98f596f 100644 --- a/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf +++ b/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf @@ -1,10 +1,10 @@ nextflow.enable.dsl=2 -include { fromQuery; sqlInsert; sqlExecute; executeUpdate } from 'plugin/nf-sqldb' +include { fromQuery; sqlInsert; sqlExecute } from 'plugin/nf-sqldb' workflow { - // Setup: create table - sqlExecute( + // Setup: create table (DDL operation returns null) + def createResult = sqlExecute( db: 'foo', statement: ''' CREATE TABLE IF NOT EXISTS testing ( @@ -14,6 +14,7 @@ workflow { ) ''' ) + println "Create result: $createResult" // null // Insert data using sqlInsert Channel @@ -28,8 +29,8 @@ workflow { fromQuery('SELECT * FROM sample_table', db: 'foo') .view() - // Update data using executeUpdate - def updated = executeUpdate( + // Update data using sqlExecute (DML operation returns affected row count) + def updated = sqlExecute( db: 'foo', statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'beta'" ) From 343cbca322468db49b20098113c6720065b71654 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Fri, 2 May 2025 19:45:37 -0500 Subject: [PATCH 13/17] fix: Update sqlExecute method to return structured result maps - Modified the `sqlExecute` method to return a map containing success status, affected rows, and error messages for both DDL and DML operations. - Updated documentation to reflect the new return structure. - Adjusted unit tests to validate the new response format and ensure proper handling of success and error cases. Signed-off-by: Edmund Miller --- .../nextflow/sql/ChannelSqlExtension.groovy | 23 ++-- .../test/nextflow/sql/SqlExecutionTest.groovy | 104 ++++++++++-------- .../src/testResources/testDir/test_sql_db.nf | 21 +++- 3 files changed, 88 insertions(+), 60 deletions(-) diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index a1343c4..987602c 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -145,21 +145,21 @@ class ChannelSqlExtension extends PluginExtensionPoint { /** * Execute a SQL statement that does not return a result set (DDL/DML statements) - * For DML statements (INSERT, UPDATE, DELETE), it returns the number of affected rows - * For DDL statements (CREATE, ALTER, DROP), it returns null + * For DML statements (INSERT, UPDATE, DELETE), it returns a result map with success status and number of affected rows + * For DDL statements (CREATE, ALTER, DROP), it returns a result map with success status * * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) - * @return The number of rows affected by the SQL statement for DML operations, null for DDL operations + * @return A map containing 'success' (boolean), 'result' (rows affected or null) and optionally 'error' (message) */ @Function - Integer sqlExecute(Map params) { + Map sqlExecute(Map params) { CheckHelper.checkParams('sqlExecute', params, EXECUTE_PARAMS) final String dbName = params.db as String ?: 'default' final String statement = params.statement as String if (!statement) - throw new IllegalArgumentException("Missing required parameter 'statement'") + return [success: false, error: "Missing required parameter 'statement'"] final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db')) final SqlDataSource dataSource = sqlConfig.getDataSource(dbName) @@ -171,28 +171,27 @@ class ChannelSqlExtension extends PluginExtensionPoint { msg += " - Did you mean: ${choices.get(0)}?" else if (choices) msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n' - throw new IllegalArgumentException(msg) + return [success: false, error: msg] } try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) { try (Statement stm = conn.createStatement()) { String normalizedStatement = normalizeStatement(statement) - // For DDL statements (CREATE, ALTER, DROP, etc.), execute() returns true if the first result is a ResultSet - // For DML statements (INSERT, UPDATE, DELETE), executeUpdate() returns the number of rows affected boolean isDDL = normalizedStatement.trim().toLowerCase().matches("^(create|alter|drop|truncate).*") if (isDDL) { stm.execute(normalizedStatement) - return null + return [success: true, result: null] } else { - return stm.executeUpdate(normalizedStatement) + Integer rowsAffected = stm.executeUpdate(normalizedStatement) + return [success: true, result: rowsAffected] } } } catch (Exception e) { log.error("Error executing SQL statement: ${e.message}", e) - throw e + return [success: false, error: e.message] } } @@ -204,7 +203,7 @@ class ChannelSqlExtension extends PluginExtensionPoint { */ private static String normalizeStatement(String statement) { if (!statement) - throw new IllegalArgumentException("Missing SQL statement") + return null def result = statement.trim() if (!result.endsWith(';')) result += ';' diff --git a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy index 46e5258..ae384c7 100644 --- a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy +++ b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy @@ -40,7 +40,7 @@ class SqlExecutionTest extends Specification { Global.session = null } - def 'should execute DDL statements successfully and return null'() { + def 'should execute DDL statements successfully and return success map'() { given: def JDBC_URL = 'jdbc:h2:mem:test_ddl_' + Random.newInstance().nextInt(1_000_000) def sql = Sql.newInstance(JDBC_URL, 'sa', null) @@ -58,9 +58,10 @@ class SqlExecutionTest extends Specification { statement: 'CREATE TABLE test_table(id INT PRIMARY KEY, name VARCHAR(255))' ]) - then: 'Table should be created and result should be null' + then: 'Table should be created and result should indicate success' sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() > 0 - createResult == null + createResult.success == true + createResult.result == null when: 'Altering the table' def alterResult = sqlExtension.sqlExecute([ @@ -68,9 +69,10 @@ class SqlExecutionTest extends Specification { statement: 'ALTER TABLE test_table ADD COLUMN description VARCHAR(255)' ]) - then: 'Column should be added and result should be null' + then: 'Column should be added and result should indicate success' sql.rows('SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = \'TEST_TABLE\' AND COLUMN_NAME = \'DESCRIPTION\'').size() > 0 - alterResult == null + alterResult.success == true + alterResult.result == null when: 'Dropping the table' def dropResult = sqlExtension.sqlExecute([ @@ -78,9 +80,10 @@ class SqlExecutionTest extends Specification { statement: 'DROP TABLE test_table' ]) - then: 'Table should be dropped and result should be null' + then: 'Table should be dropped and result should indicate success' sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() == 0 - dropResult == null + dropResult.success == true + dropResult.result == null } def 'should execute DML statements successfully and return affected row count'() { @@ -104,10 +107,11 @@ class SqlExecutionTest extends Specification { statement: 'INSERT INTO test_dml (id, name, value) VALUES (1, \'item1\', 100)' ]) - then: 'Row should be inserted and result should be 1' + then: 'Row should be inserted and result should indicate success with 1 row affected' sql.rows('SELECT * FROM test_dml').size() == 1 sql.firstRow('SELECT * FROM test_dml WHERE id = 1').name == 'item1' - insertResult == 1 + insertResult.success == true + insertResult.result == 1 when: 'Updating data' def updateResult = sqlExtension.sqlExecute([ @@ -115,9 +119,10 @@ class SqlExecutionTest extends Specification { statement: 'UPDATE test_dml SET value = 200 WHERE id = 1' ]) - then: 'Row should be updated and result should be 1' + then: 'Row should be updated and result should indicate success with 1 row affected' sql.firstRow('SELECT value FROM test_dml WHERE id = 1').value == 200 - updateResult == 1 + updateResult.success == true + updateResult.result == 1 when: 'Deleting data' def deleteResult = sqlExtension.sqlExecute([ @@ -125,9 +130,10 @@ class SqlExecutionTest extends Specification { statement: 'DELETE FROM test_dml WHERE id = 1' ]) - then: 'Row should be deleted and result should be 1' + then: 'Row should be deleted and result should indicate success with 1 row affected' sql.rows('SELECT * FROM test_dml').size() == 0 - deleteResult == 1 + deleteResult.success == true + deleteResult.result == 1 } def 'should return correct affected row count for multiple row operations'() { @@ -149,40 +155,42 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Inserting data' - def insertCount = sqlExtension.sqlExecute([ + def insertResult = sqlExtension.sqlExecute([ db: 'test', statement: 'INSERT INTO test_update (id, name, value) VALUES (4, \'item4\', 100)' ]) - then: 'Should return 1 affected row' - insertCount == 1 + then: 'Should return success with 1 affected row' + insertResult.success == true + insertResult.result == 1 sql.rows('SELECT * FROM test_update').size() == 4 when: 'Updating multiple rows' - def updateCount = sqlExtension.sqlExecute([ + def updateResult = sqlExtension.sqlExecute([ db: 'test', statement: 'UPDATE test_update SET value = 200 WHERE value = 100' ]) - then: 'Should return 4 affected rows' - updateCount == 4 + then: 'Should return success with 4 affected rows' + updateResult.success == true + updateResult.result == 4 sql.rows('SELECT * FROM test_update WHERE value = 200').size() == 4 when: 'Deleting multiple rows' - def deleteCount = sqlExtension.sqlExecute([ + def deleteResult = sqlExtension.sqlExecute([ db: 'test', statement: 'DELETE FROM test_update WHERE value = 200' ]) - then: 'Should return 4 affected rows' - deleteCount == 4 + then: 'Should return success with 4 affected rows' + deleteResult.success == true + deleteResult.result == 4 sql.rows('SELECT * FROM test_update').size() == 0 } def 'should handle invalid SQL correctly'() { given: def JDBC_URL = 'jdbc:h2:mem:test_error_' + Random.newInstance().nextInt(1_000_000) - def sql = Sql.newInstance(JDBC_URL, 'sa', null) and: def session = Mock(Session) { @@ -192,22 +200,24 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Executing invalid SQL' - sqlExtension.sqlExecute([ + def invalidResult = sqlExtension.sqlExecute([ db: 'test', statement: 'INVALID SQL STATEMENT' ]) - then: 'Should throw an exception' - thrown(Exception) + then: 'Should return failure with error message' + invalidResult.success == false + invalidResult.error != null when: 'Executing query with invalid table name' - sqlExtension.sqlExecute([ + def noTableResult = sqlExtension.sqlExecute([ db: 'test', statement: 'SELECT * FROM non_existent_table' ]) - then: 'Should throw an exception' - thrown(Exception) + then: 'Should return failure with error message' + noTableResult.success == false + noTableResult.error != null } def 'should handle invalid database configuration correctly'() { @@ -219,30 +229,36 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Using non-existent database alias' - sqlExtension.sqlExecute([ + def nonExistentDbResult = sqlExtension.sqlExecute([ db: 'non_existent_db', statement: 'SELECT 1' ]) - then: 'Should throw an IllegalArgumentException' - thrown(IllegalArgumentException) + then: 'Should return failure with error message' + nonExistentDbResult.success == false + nonExistentDbResult.error != null + nonExistentDbResult.error.contains('Unknown db name') when: 'Missing statement parameter' - sqlExtension.sqlExecute([ + def missingStatementResult = sqlExtension.sqlExecute([ db: 'test' ]) - then: 'Should throw an IllegalArgumentException' - thrown(IllegalArgumentException) + then: 'Should return failure with error message' + missingStatementResult.success == false + missingStatementResult.error != null + missingStatementResult.error.contains('Missing required parameter') when: 'Empty statement parameter' - sqlExtension.sqlExecute([ + def emptyStatementResult = sqlExtension.sqlExecute([ db: 'test', statement: '' ]) - then: 'Should throw an IllegalArgumentException' - thrown(IllegalArgumentException) + then: 'Should return failure with error message' + emptyStatementResult.success == false + emptyStatementResult.error != null + emptyStatementResult.error.contains('Missing required parameter') } def 'should handle statement normalization correctly'() { @@ -258,14 +274,15 @@ class SqlExecutionTest extends Specification { sqlExtension.init(session) when: 'Executing statement without semicolon' - def result = sqlExtension.sqlExecute([ + def createResult = sqlExtension.sqlExecute([ db: 'test', statement: 'CREATE TABLE test_norm(id INT PRIMARY KEY)' ]) - then: 'Statement should be executed successfully and result should be null' + then: 'Statement should be executed successfully' sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() > 0 - result == null + createResult.success == true + createResult.result == null when: 'Executing statement with trailing whitespace' def dropResult = sqlExtension.sqlExecute([ @@ -273,8 +290,9 @@ class SqlExecutionTest extends Specification { statement: 'DROP TABLE test_norm ' ]) - then: 'Statement should be executed successfully and result should be null' + then: 'Statement should be executed successfully' sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() == 0 - dropResult == null + dropResult.success == true + dropResult.result == null } } \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf b/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf index 98f596f..1cb6f61 100644 --- a/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf +++ b/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf @@ -3,7 +3,7 @@ nextflow.enable.dsl=2 include { fromQuery; sqlInsert; sqlExecute } from 'plugin/nf-sqldb' workflow { - // Setup: create table (DDL operation returns null) + // Setup: create table (DDL operation) def createResult = sqlExecute( db: 'foo', statement: ''' @@ -14,7 +14,13 @@ workflow { ) ''' ) - println "Create result: $createResult" // null + println "Create table success: ${createResult.success}" // Should be true + + // Handle potential failure + if (!createResult.success) { + println "Failed to create table: ${createResult.error}" + return + } // Insert data using sqlInsert Channel @@ -29,10 +35,15 @@ workflow { fromQuery('SELECT * FROM sample_table', db: 'foo') .view() - // Update data using sqlExecute (DML operation returns affected row count) - def updated = sqlExecute( + // Update data using sqlExecute (DML operation returns affected row count in result field) + def updateResult = sqlExecute( db: 'foo', statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'beta'" ) - println "Updated $updated row(s)" + + if (updateResult.success) { + println "Updated ${updateResult.result} row(s)" + } else { + println "Update failed: ${updateResult.error}" + } } \ No newline at end of file From 8b3cb8f065ca7e7bdaa9967fc09f16072e779a5c Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Sat, 3 May 2025 07:58:24 -0500 Subject: [PATCH 14/17] chore: Restructure test directory names Signed-off-by: Edmund Miller --- .../nextflow.config | 0 .../test_sql_db.nf | 0 .../src/testResources/test-h2/nextflow.config | 22 ++++++ .../src/testResources/test-h2/test_sql_db.nf | 76 +++++++++++++++++++ 4 files changed, 98 insertions(+) rename plugins/nf-sqldb/src/testResources/{testDir => test-databricks}/nextflow.config (100%) rename plugins/nf-sqldb/src/testResources/{testDir => test-databricks}/test_sql_db.nf (100%) create mode 100644 plugins/nf-sqldb/src/testResources/test-h2/nextflow.config create mode 100644 plugins/nf-sqldb/src/testResources/test-h2/test_sql_db.nf diff --git a/plugins/nf-sqldb/src/testResources/testDir/nextflow.config b/plugins/nf-sqldb/src/testResources/test-databricks/nextflow.config similarity index 100% rename from plugins/nf-sqldb/src/testResources/testDir/nextflow.config rename to plugins/nf-sqldb/src/testResources/test-databricks/nextflow.config diff --git a/plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf b/plugins/nf-sqldb/src/testResources/test-databricks/test_sql_db.nf similarity index 100% rename from plugins/nf-sqldb/src/testResources/testDir/test_sql_db.nf rename to plugins/nf-sqldb/src/testResources/test-databricks/test_sql_db.nf diff --git a/plugins/nf-sqldb/src/testResources/test-h2/nextflow.config b/plugins/nf-sqldb/src/testResources/test-h2/nextflow.config new file mode 100644 index 0000000..c78361c --- /dev/null +++ b/plugins/nf-sqldb/src/testResources/test-h2/nextflow.config @@ -0,0 +1,22 @@ +/* + * Configuration file for the SQL execution example script + */ + +// Enable the SQL DB plugin +plugins { + id 'nf-sqldb@0.7.0' +} + +// Define a file-based H2 database for the example +sql { + db { + demo { + url = 'jdbc:h2:./demo' + driver = 'org.h2.Driver' + } + } +} + +// Silence unnecessary Nextflow logs +process.echo = false +dag.enabled = false \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/test-h2/test_sql_db.nf b/plugins/nf-sqldb/src/testResources/test-h2/test_sql_db.nf new file mode 100644 index 0000000..b6ca32a --- /dev/null +++ b/plugins/nf-sqldb/src/testResources/test-h2/test_sql_db.nf @@ -0,0 +1,76 @@ +#!/usr/bin/env nextflow + +/* + * Example script demonstrating how to use the SQL sqlExecute function + */ + +include { sqlExecute } from 'plugin/nf-sqldb' +include { fromQuery } from 'plugin/nf-sqldb' + +// Define database configuration in nextflow.config file +// sql.db.demo = [url: 'jdbc:h2:mem:demo', driver: 'org.h2.Driver'] + +workflow { + log.info """ + ========================================= + SQL Execution Function Example + ========================================= + """ + + // Step 1: Create a table (DDL operation returns null) + log.info "Creating a sample table..." + def createResult = sqlExecute( + db: 'demo', + statement: ''' + CREATE TABLE IF NOT EXISTS TEST_TABLE ( + ID INTEGER PRIMARY KEY, + NAME VARCHAR(100), + VALUE DOUBLE + ) + ''' + ) + log.info "Create table result: $createResult" + + // Step 2: Insert some data (DML operation returns affected row count) + log.info "Inserting data..." + def insertCount = sqlExecute( + db: 'demo', + statement: ''' + INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES + (1, 'alpha', 10.5), + (2, 'beta', 20.7), + (3, 'gamma', 30.2), + (4, 'delta', 40.9); + ''' + ) + log.info "Inserted $insertCount rows" + + // Step 3: Update some data (DML operation returns affected row count) + log.info "Updating data..." + def updateCount = sqlExecute( + db: 'demo', + statement: ''' + UPDATE TEST_TABLE + SET VALUE = VALUE * 2 + WHERE ID = 2; + ''' + ) + log.info "Updated $updateCount rows" + + // Step 4: Delete some data (DML operation returns affected row count) + log.info "Deleting data..." + def deleteCount = sqlExecute( + db: 'demo', + statement: ''' + DELETE FROM TEST_TABLE + WHERE ID = 4; + ''' + ) + log.info "Deleted $deleteCount rows" + + // Step 5: Query results + log.info "Querying results..." + channel + .fromQuery("SELECT * FROM TEST_TABLE ORDER BY ID", db: 'demo') + .view { row -> "ID: ${row[0]}, Name: ${row[1]}, Value: ${row[2]}" } +} \ No newline at end of file From a0df808c1eef49d10211c1494e298912466c3724 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 7 May 2025 09:08:56 -0500 Subject: [PATCH 15/17] docs: Update sqlExecute documentation on outputs Co-authored-by: Paolo Di Tommaso Signed-off-by: Edmund Miller --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 0441611..db14c40 100644 --- a/README.md +++ b/README.md @@ -132,14 +132,14 @@ This plugin provides the following function for executing SQL statements that do ### sqlExecute -The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For DML statements (`INSERT`, `UPDATE`, `DELETE`), it returns the number of rows affected. For DDL statements (`CREATE`, `ALTER`, `DROP`), it returns `null`. +The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For DML statements (`INSERT`, `UPDATE`, `DELETE`), it returns a Map with `success: true` and `result` set to the number of rows affected. For DDL statements (`CREATE`, `ALTER`, `DROP`), it returns a Map with `success: true` and `result: null`. For example: ```nextflow include { sqlExecute } from 'plugin/nf-sqldb' -// Create a table (returns null for DDL operations) +// Create a table (returns Map with result: null for DDL operations) def createResult = sqlExecute( db: 'foo', statement: ''' @@ -150,28 +150,28 @@ def createResult = sqlExecute( ) ''' ) -println "Create result: $createResult" // null +println "Create result: $createResult" // [success: true, result: null] -// Insert data (returns 1 for number of rows affected) +// Insert data (returns Map with result: 1 for number of rows affected) def insertedRows = sqlExecute( db: 'foo', statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)" ) -println "Inserted $insertedRows row(s)" +println "Inserted $insertedRows.row(s)" // [success: true, result: 1] -// Update data (returns number of rows updated) +// Update data (returns Map with result: number of rows updated) def updatedRows = sqlExecute( db: 'foo', statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'alpha'" ) -println "Updated $updatedRows row(s)" +println "Updated $updatedRows.row(s)" // [success: true, result: ] -// Delete data (returns number of rows deleted) +// Delete data (returns Map with result: number of rows deleted) def deletedRows = sqlExecute( db: 'foo', statement: "DELETE FROM sample_table WHERE value > 25" ) -println "Deleted $deletedRows row(s)" +println "Deleted $deletedRows.row(s)" // [success: true, result: ] ``` The following options are available: From c19c8e7f77bba1bdcd2581cf9b27742c2313b92b Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 7 May 2025 09:09:57 -0500 Subject: [PATCH 16/17] docs: Remove .envrc and make a note about ENV Signed-off-by: Edmund Miller --- .envrc | 6 ------ README.md | 25 +++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) delete mode 100644 .envrc diff --git a/.envrc b/.envrc deleted file mode 100644 index c0a3d12..0000000 --- a/.envrc +++ /dev/null @@ -1,6 +0,0 @@ -source_url "https://github.com/tmatilai/direnv-1password/raw/v1.0.1/1password.sh" \ - "sha256-4dmKkmlPBNXimznxeehplDfiV+CvJiIzg7H1Pik4oqY=" - -# Fetch one secret and export it into the specified environment variable -from_op DATABRICKS_JDBC_URL=op://Employee/e3ynriit7iof45533u3slrnjka/hostname -from_op DATABRICKS_TOKEN=op://Employee/e3ynriit7iof45533u3slrnjka/credential diff --git a/README.md b/README.md index db14c40..59c83ad 100644 --- a/README.md +++ b/README.md @@ -239,3 +239,28 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously. In particular, data inserted using the `sqlInsert` operator is _not_ guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation. + +## Running Integration Tests + +To run the integration tests, you'll need to set up the following environment variables: + +### Databricks Integration Tests + +For Databricks integration tests, you need to set: + +```bash +export DATABRICKS_JDBC_URL="jdbc:databricks://:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o//" +export DATABRICKS_TOKEN="" +``` + +You can get these values from your Databricks workspace: +1. The JDBC URL can be found in the Databricks SQL endpoint connection details +2. The token can be generated from your Databricks user settings + +After setting up the required environment variables, you can run the integration tests using: + +```bash +./gradlew test +``` + + \ No newline at end of file From c877be59a6c13b64d538fad86394f65f9ecc9c8c Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 7 May 2025 10:25:31 -0500 Subject: [PATCH 17/17] refactor: Clean up integration test structure Signed-off-by: Edmund Miller --- README.md | 25 ----------------- docs/databricks.md | 27 +++++++++++++++++++ .../sql/SqlPluginIntegrationTest.groovy | 2 +- .../{test_sql_db.nf => main.nf} | 0 .../test-h2/{test_sql_db.nf => main.nf} | 0 5 files changed, 28 insertions(+), 26 deletions(-) create mode 100644 docs/databricks.md rename plugins/nf-sqldb/src/test/{groovy => }/nextflow/sql/SqlPluginIntegrationTest.groovy (97%) rename plugins/nf-sqldb/src/testResources/test-databricks/{test_sql_db.nf => main.nf} (100%) rename plugins/nf-sqldb/src/testResources/test-h2/{test_sql_db.nf => main.nf} (100%) diff --git a/README.md b/README.md index 59c83ad..db14c40 100644 --- a/README.md +++ b/README.md @@ -239,28 +239,3 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously. In particular, data inserted using the `sqlInsert` operator is _not_ guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation. - -## Running Integration Tests - -To run the integration tests, you'll need to set up the following environment variables: - -### Databricks Integration Tests - -For Databricks integration tests, you need to set: - -```bash -export DATABRICKS_JDBC_URL="jdbc:databricks://:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o//" -export DATABRICKS_TOKEN="" -``` - -You can get these values from your Databricks workspace: -1. The JDBC URL can be found in the Databricks SQL endpoint connection details -2. The token can be generated from your Databricks user settings - -After setting up the required environment variables, you can run the integration tests using: - -```bash -./gradlew test -``` - - \ No newline at end of file diff --git a/docs/databricks.md b/docs/databricks.md new file mode 100644 index 0000000..b3c59ae --- /dev/null +++ b/docs/databricks.md @@ -0,0 +1,27 @@ +# Databricks integration + +## Running Integration Tests + +To run the integration tests, you'll need to set up the following environment variables: + +### Databricks Integration Tests + +For Databricks integration tests, you need to set: + +```bash +export DATABRICKS_JDBC_URL="jdbc:databricks://:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o//" +export DATABRICKS_TOKEN="" +``` + +You can get these values from your Databricks workspace: + +1. The JDBC URL can be found in the Databricks SQL endpoint connection details +2. The token can be generated from your Databricks user settings + +After setting up the required environment variables, you can run the integration tests using: + +```bash +./gradlew test +``` + + diff --git a/plugins/nf-sqldb/src/test/groovy/nextflow/sql/SqlPluginIntegrationTest.groovy b/plugins/nf-sqldb/src/test/nextflow/sql/SqlPluginIntegrationTest.groovy similarity index 97% rename from plugins/nf-sqldb/src/test/groovy/nextflow/sql/SqlPluginIntegrationTest.groovy rename to plugins/nf-sqldb/src/test/nextflow/sql/SqlPluginIntegrationTest.groovy index 05274f3..184408a 100644 --- a/plugins/nf-sqldb/src/test/groovy/nextflow/sql/SqlPluginIntegrationTest.groovy +++ b/plugins/nf-sqldb/src/test/nextflow/sql/SqlPluginIntegrationTest.groovy @@ -29,7 +29,7 @@ class SqlPluginIntegrationTest extends Specification { given: // Ensure test resources directory exists def testDir = Paths.get('plugins/nf-sqldb/src/testResources/testDir').toAbsolutePath() - def scriptPath = testDir.resolve('test_sql_db.nf') + def scriptPath = testDir.resolve('main.nf') def configPath = testDir.resolve('nextflow.config') // Check if required files exist diff --git a/plugins/nf-sqldb/src/testResources/test-databricks/test_sql_db.nf b/plugins/nf-sqldb/src/testResources/test-databricks/main.nf similarity index 100% rename from plugins/nf-sqldb/src/testResources/test-databricks/test_sql_db.nf rename to plugins/nf-sqldb/src/testResources/test-databricks/main.nf diff --git a/plugins/nf-sqldb/src/testResources/test-h2/test_sql_db.nf b/plugins/nf-sqldb/src/testResources/test-h2/main.nf similarity index 100% rename from plugins/nf-sqldb/src/testResources/test-h2/test_sql_db.nf rename to plugins/nf-sqldb/src/testResources/test-h2/main.nf