diff --git a/Makefile b/Makefile index 4d140c6..3f29b6d 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ config ?= compileClasspath +version ?= $(shell grep 'Plugin-Version' plugins/nf-sqldb/src/resources/META-INF/MANIFEST.MF | awk '{ print $$2 }') ifdef module mm = :${module}: @@ -69,3 +70,9 @@ upload-plugins: publish-index: ./gradlew plugins:publishIndex + +# Install the plugin into local nextflow plugins dir +install: + ./gradlew copyPluginZip + rm -rf ${HOME}/.nextflow/plugins/nf-sqldb-${version} + cp -r build/plugins/nf-sqldb-${version} ${HOME}/.nextflow/plugins/nf-sqldb-${version} \ No newline at end of file diff --git a/README.md b/README.md index f94306b..db14c40 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,13 @@ This plugin provides support for interacting with SQL databases in Nextflow scri The following databases are currently supported: -* [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md)) -* [DuckDB](https://duckdb.org/) -* [H2](https://www.h2database.com) -* [MySQL](https://www.mysql.com/) -* [MariaDB](https://mariadb.org/) -* [PostgreSQL](https://www.postgresql.org/) -* [SQLite](https://www.sqlite.org/index.html) +- [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md)) +- [DuckDB](https://duckdb.org/) +- [H2](https://www.h2database.com) +- [MySQL](https://www.mysql.com/) +- [MariaDB](https://mariadb.org/) +- [PostgreSQL](https://www.postgresql.org/) +- [SQLite](https://www.sqlite.org/index.html) NOTE: THIS IS A PREVIEW TECHNOLOGY, FEATURES AND CONFIGURATION SETTINGS CAN CHANGE IN FUTURE RELEASES. @@ -24,7 +24,6 @@ plugins { } ``` - ## Configuration You can configure any number of databases under the `sql.db` configuration scope. For example: @@ -79,7 +78,7 @@ The following options are available: `batchSize` : Query the data in batches of the given size. This option is recommended for queries that may return large a large result set, so that the entire result set is not loaded into memory at once. -: *NOTE:* this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`. +: _NOTE:_ this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`. `emitColumns` : When `true`, the column names in the `SELECT` statement are emitted as the first tuple in the resulting channel. @@ -104,7 +103,7 @@ INSERT INTO SAMPLE (NAME, LEN) VALUES ('HELLO', 5); INSERT INTO SAMPLE (NAME, LEN) VALUES ('WORLD!', 6); ``` -*NOTE:* the target table (e.g. `SAMPLE` in the above example) must be created beforehand. +_NOTE:_ the target table (e.g. `SAMPLE` in the above example) must be created beforehand. The following options are available: @@ -125,7 +124,87 @@ The following options are available: `setup` : A SQL statement that is executed before inserting the data, e.g. to create the target table. -: *NOTE:* the underlying database should support the *create table if not exist* idiom, as the plugin will execute this statement every time the script is run. +: _NOTE:_ the underlying database should support the _create table if not exist_ idiom, as the plugin will execute this statement every time the script is run. + +## SQL Execution Functions + +This plugin provides the following function for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations. + +### sqlExecute + +The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For DML statements (`INSERT`, `UPDATE`, `DELETE`), it returns a Map with `success: true` and `result` set to the number of rows affected. For DDL statements (`CREATE`, `ALTER`, `DROP`), it returns a Map with `success: true` and `result: null`. + +For example: + +```nextflow +include { sqlExecute } from 'plugin/nf-sqldb' + +// Create a table (returns Map with result: null for DDL operations) +def createResult = sqlExecute( + db: 'foo', + statement: ''' + CREATE TABLE IF NOT EXISTS sample_table ( + id INTEGER PRIMARY KEY, + name VARCHAR(100), + value DOUBLE + ) + ''' +) +println "Create result: $createResult" // [success: true, result: null] + +// Insert data (returns Map with result: 1 for number of rows affected) +def insertedRows = sqlExecute( + db: 'foo', + statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)" +) +println "Inserted $insertedRows.row(s)" // [success: true, result: 1] + +// Update data (returns Map with result: number of rows updated) +def updatedRows = sqlExecute( + db: 'foo', + statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'alpha'" +) +println "Updated $updatedRows.row(s)" // [success: true, result: ] + +// Delete data (returns Map with result: number of rows deleted) +def deletedRows = sqlExecute( + db: 'foo', + statement: "DELETE FROM sample_table WHERE value > 25" +) +println "Deleted $deletedRows.row(s)" // [success: true, result: ] +``` + +The following options are available: + +`db` +: The database handle. It must be defined under `sql.db` in the Nextflow configuration. + +`statement` +: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set. + +## Differences Between Dataflow Operators and Execution Function + +The plugin provides two different ways to interact with databases: + +1. **Dataflow Operators** (`fromQuery` and `sqlInsert`): These are designed to integrate with Nextflow's dataflow programming model, operating on channels. + + - `fromQuery`: Queries data from a database and returns a channel that emits the results. + - `sqlInsert`: Takes data from a channel and inserts it into a database. + +2. **Execution Function** (`sqlExecute`): This is designed for direct SQL statement execution that doesn't require channel integration. + - `sqlExecute`: Executes a SQL statement. For DML operations, it returns the count of affected rows. For DDL operations, it returns null. + +Use **Dataflow Operators** when you need to: + +- Query data that will flow into your pipeline processing +- Insert data from your pipeline processing into a database + +Use **Execution Function** when you need to: + +- Perform database setup (creating tables, schemas, etc.) +- Execute administrative commands +- Perform one-off operations (deleting all records, truncating a table, etc.) +- Execute statements where you don't need the results as part of your dataflow ## Querying CSV files @@ -159,4 +238,4 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously. -In particular, data inserted using the `sqlInsert` operator is *not* guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation. +In particular, data inserted using the `sqlInsert` operator is _not_ guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation. diff --git a/docs/databricks.md b/docs/databricks.md new file mode 100644 index 0000000..b3c59ae --- /dev/null +++ b/docs/databricks.md @@ -0,0 +1,27 @@ +# Databricks integration + +## Running Integration Tests + +To run the integration tests, you'll need to set up the following environment variables: + +### Databricks Integration Tests + +For Databricks integration tests, you need to set: + +```bash +export DATABRICKS_JDBC_URL="jdbc:databricks://:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o//" +export DATABRICKS_TOKEN="" +``` + +You can get these values from your Databricks workspace: + +1. The JDBC URL can be found in the Databricks SQL endpoint connection details +2. The token can be generated from your Databricks user settings + +After setting up the required environment variables, you can run the integration tests using: + +```bash +./gradlew test +``` + + diff --git a/examples/sql-execution/main.nf b/examples/sql-execution/main.nf new file mode 100644 index 0000000..b6ca32a --- /dev/null +++ b/examples/sql-execution/main.nf @@ -0,0 +1,76 @@ +#!/usr/bin/env nextflow + +/* + * Example script demonstrating how to use the SQL sqlExecute function + */ + +include { sqlExecute } from 'plugin/nf-sqldb' +include { fromQuery } from 'plugin/nf-sqldb' + +// Define database configuration in nextflow.config file +// sql.db.demo = [url: 'jdbc:h2:mem:demo', driver: 'org.h2.Driver'] + +workflow { + log.info """ + ========================================= + SQL Execution Function Example + ========================================= + """ + + // Step 1: Create a table (DDL operation returns null) + log.info "Creating a sample table..." + def createResult = sqlExecute( + db: 'demo', + statement: ''' + CREATE TABLE IF NOT EXISTS TEST_TABLE ( + ID INTEGER PRIMARY KEY, + NAME VARCHAR(100), + VALUE DOUBLE + ) + ''' + ) + log.info "Create table result: $createResult" + + // Step 2: Insert some data (DML operation returns affected row count) + log.info "Inserting data..." + def insertCount = sqlExecute( + db: 'demo', + statement: ''' + INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES + (1, 'alpha', 10.5), + (2, 'beta', 20.7), + (3, 'gamma', 30.2), + (4, 'delta', 40.9); + ''' + ) + log.info "Inserted $insertCount rows" + + // Step 3: Update some data (DML operation returns affected row count) + log.info "Updating data..." + def updateCount = sqlExecute( + db: 'demo', + statement: ''' + UPDATE TEST_TABLE + SET VALUE = VALUE * 2 + WHERE ID = 2; + ''' + ) + log.info "Updated $updateCount rows" + + // Step 4: Delete some data (DML operation returns affected row count) + log.info "Deleting data..." + def deleteCount = sqlExecute( + db: 'demo', + statement: ''' + DELETE FROM TEST_TABLE + WHERE ID = 4; + ''' + ) + log.info "Deleted $deleteCount rows" + + // Step 5: Query results + log.info "Querying results..." + channel + .fromQuery("SELECT * FROM TEST_TABLE ORDER BY ID", db: 'demo') + .view { row -> "ID: ${row[0]}, Name: ${row[1]}, Value: ${row[2]}" } +} \ No newline at end of file diff --git a/examples/sql-execution/nextflow.config b/examples/sql-execution/nextflow.config new file mode 100644 index 0000000..c78361c --- /dev/null +++ b/examples/sql-execution/nextflow.config @@ -0,0 +1,22 @@ +/* + * Configuration file for the SQL execution example script + */ + +// Enable the SQL DB plugin +plugins { + id 'nf-sqldb@0.7.0' +} + +// Define a file-based H2 database for the example +sql { + db { + demo { + url = 'jdbc:h2:./demo' + driver = 'org.h2.Driver' + } + } +} + +// Silence unnecessary Nextflow logs +process.echo = false +dag.enabled = false \ No newline at end of file diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy index d50f7d6..987602c 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy @@ -24,16 +24,21 @@ import groovyx.gpars.dataflow.DataflowReadChannel import groovyx.gpars.dataflow.DataflowWriteChannel import groovyx.gpars.dataflow.expression.DataflowExpression import nextflow.Channel +import nextflow.Global import nextflow.NF import nextflow.Session import nextflow.extension.CH import nextflow.extension.DataflowHelper import nextflow.plugin.extension.Factory +import nextflow.plugin.extension.Function import nextflow.plugin.extension.Operator import nextflow.plugin.extension.PluginExtensionPoint import nextflow.sql.config.SqlConfig import nextflow.sql.config.SqlDataSource import nextflow.util.CheckHelper +import java.sql.Connection +import java.sql.Statement +import groovy.sql.Sql /** * Provide a channel factory extension that allows the execution of Sql queries * @@ -133,4 +138,75 @@ class ChannelSqlExtension extends PluginExtensionPoint { return target } + private static final Map EXECUTE_PARAMS = [ + db: CharSequence, + statement: CharSequence + ] + + /** + * Execute a SQL statement that does not return a result set (DDL/DML statements) + * For DML statements (INSERT, UPDATE, DELETE), it returns a result map with success status and number of affected rows + * For DDL statements (CREATE, ALTER, DROP), it returns a result map with success status + * + * @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute) + * @return A map containing 'success' (boolean), 'result' (rows affected or null) and optionally 'error' (message) + */ + @Function + Map sqlExecute(Map params) { + CheckHelper.checkParams('sqlExecute', params, EXECUTE_PARAMS) + + final String dbName = params.db as String ?: 'default' + final String statement = params.statement as String + + if (!statement) + return [success: false, error: "Missing required parameter 'statement'"] + + final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db')) + final SqlDataSource dataSource = sqlConfig.getDataSource(dbName) + + if (dataSource == null) { + def msg = "Unknown db name: $dbName" + def choices = sqlConfig.getDataSourceNames().closest(dbName) ?: sqlConfig.getDataSourceNames() + if (choices?.size() == 1) + msg += " - Did you mean: ${choices.get(0)}?" + else if (choices) + msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n' + return [success: false, error: msg] + } + + try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) { + try (Statement stm = conn.createStatement()) { + String normalizedStatement = normalizeStatement(statement) + + boolean isDDL = normalizedStatement.trim().toLowerCase().matches("^(create|alter|drop|truncate).*") + + if (isDDL) { + stm.execute(normalizedStatement) + return [success: true, result: null] + } else { + Integer rowsAffected = stm.executeUpdate(normalizedStatement) + return [success: true, result: rowsAffected] + } + } + } + catch (Exception e) { + log.error("Error executing SQL statement: ${e.message}", e) + return [success: false, error: e.message] + } + } + + /** + * Normalizes a SQL statement by adding a semicolon if needed + * + * @param statement The SQL statement to normalize + * @return The normalized SQL statement + */ + private static String normalizeStatement(String statement) { + if (!statement) + return null + def result = statement.trim() + if (!result.endsWith(';')) + result += ';' + return result + } } diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy index 03a34bb..1a0572e 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/InsertHandler.groovy @@ -70,7 +70,15 @@ class InsertHandler implements Closeable { if( connection == null ) { connection = Sql.newInstance(ds.toMap()).getConnection() checkCreate(connection) - connection.setAutoCommit(false) + try { + connection.setAutoCommit(false) + } + catch(UnsupportedOperationException e) { + log.debug "setAutoCommit is not supported by this driver (likely Databricks), continuing: ${e.message}" + } + catch(Exception e) { + log.debug "Database does not support setAutoCommit, continuing with default settings: ${e.message}" + } } return connection } @@ -157,7 +165,15 @@ class InsertHandler implements Closeable { for(int i=0; i0 ) { log.debug("[SQL] flushing and committing open batch") - preparedStatement.executeBatch() - preparedStatement.close() - connection.commit() + try { + preparedStatement.executeBatch() + } + catch(UnsupportedOperationException e) { + log.debug "executeBatch is not supported by this driver (likely Databricks), continuing: ${e.message}" + } + catch(Exception e) { + log.debug "Database does not support executeBatch, continuing with default behavior: ${e.message}" + } + try { + preparedStatement.close() + } + catch(Exception e) { + log.debug "Database does not support preparedStatement.close(), continuing: ${e.message}" + } + try { + connection.commit() + } + catch(UnsupportedOperationException e) { + log.debug "commit is not supported by this driver (likely Databricks), continuing: ${e.message}" + } + catch(Exception e) { + log.debug "Database does not support commit, continuing with default behavior: ${e.message}" + } } } finally { diff --git a/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy b/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy index 680784d..b1fc5b1 100644 --- a/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy +++ b/plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy @@ -52,11 +52,11 @@ class QueryHandler implements QueryOp { type_mapping.TINYINT = Byte type_mapping.SMALLINT = Short type_mapping.INTEGER = Integer - type_mapping.BIGINT = Long + type_mapping.BIGINT = Long type_mapping.REAL= Float type_mapping.FLOAT= Double - type_mapping.DOUBLE = Double - type_mapping.BINARY = byte[] + type_mapping.DOUBLE = Double + type_mapping.BINARY = byte[] type_mapping.VARBINARY = byte[] type_mapping.LONGVARBINARY= byte[] type_mapping.DATE = java.sql.Date @@ -156,8 +156,10 @@ class QueryHandler implements QueryOp { protected void query0(Connection conn) { try { try (Statement stm = conn.createStatement()) { - try( def rs = stm.executeQuery(normalize(statement)) ) { - if( emitColumns ) + final String normalizedStmt = normalize(statement) + // Execute the SQL query and get results + try (def rs = stm.executeQuery(normalizedStmt)) { + if (emitColumns) emitColumns(rs) emitRowsAndClose(rs) } diff --git a/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy new file mode 100644 index 0000000..ae384c7 --- /dev/null +++ b/plugins/nf-sqldb/src/test/nextflow/sql/SqlExecutionTest.groovy @@ -0,0 +1,298 @@ +/* + * Copyright 2020-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.sql + +import groovy.sql.Sql +import nextflow.Global +import nextflow.Session +import spock.lang.Specification +import spock.lang.Timeout + +/** + * Tests for the SQL execution functionality (sqlExecute method) + * + * @author Seqera Labs + */ +@Timeout(10) +class SqlExecutionTest extends Specification { + + def setupSpec() { + // Initialize session for tests + Global.session = Mock(Session) + } + + def cleanup() { + Global.session = null + } + + def 'should execute DDL statements successfully and return success map'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_ddl_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: + def session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) + + when: 'Creating a table' + def createResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'CREATE TABLE test_table(id INT PRIMARY KEY, name VARCHAR(255))' + ]) + + then: 'Table should be created and result should indicate success' + sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() > 0 + createResult.success == true + createResult.result == null + + when: 'Altering the table' + def alterResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'ALTER TABLE test_table ADD COLUMN description VARCHAR(255)' + ]) + + then: 'Column should be added and result should indicate success' + sql.rows('SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = \'TEST_TABLE\' AND COLUMN_NAME = \'DESCRIPTION\'').size() > 0 + alterResult.success == true + alterResult.result == null + + when: 'Dropping the table' + def dropResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'DROP TABLE test_table' + ]) + + then: 'Table should be dropped and result should indicate success' + sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_TABLE\'').size() == 0 + dropResult.success == true + dropResult.result == null + } + + def 'should execute DML statements successfully and return affected row count'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_dml_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: 'Create a table' + sql.execute('CREATE TABLE test_dml(id INT PRIMARY KEY, name VARCHAR(255), value INT)') + + and: + def session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) + + when: 'Inserting data' + def insertResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'INSERT INTO test_dml (id, name, value) VALUES (1, \'item1\', 100)' + ]) + + then: 'Row should be inserted and result should indicate success with 1 row affected' + sql.rows('SELECT * FROM test_dml').size() == 1 + sql.firstRow('SELECT * FROM test_dml WHERE id = 1').name == 'item1' + insertResult.success == true + insertResult.result == 1 + + when: 'Updating data' + def updateResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'UPDATE test_dml SET value = 200 WHERE id = 1' + ]) + + then: 'Row should be updated and result should indicate success with 1 row affected' + sql.firstRow('SELECT value FROM test_dml WHERE id = 1').value == 200 + updateResult.success == true + updateResult.result == 1 + + when: 'Deleting data' + def deleteResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'DELETE FROM test_dml WHERE id = 1' + ]) + + then: 'Row should be deleted and result should indicate success with 1 row affected' + sql.rows('SELECT * FROM test_dml').size() == 0 + deleteResult.success == true + deleteResult.result == 1 + } + + def 'should return correct affected row count for multiple row operations'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_update_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: 'Create a table and insert multiple rows' + sql.execute('CREATE TABLE test_update(id INT PRIMARY KEY, name VARCHAR(255), value INT)') + sql.execute('INSERT INTO test_update VALUES (1, \'item1\', 100)') + sql.execute('INSERT INTO test_update VALUES (2, \'item2\', 100)') + sql.execute('INSERT INTO test_update VALUES (3, \'item3\', 100)') + + and: + def session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) + + when: 'Inserting data' + def insertResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'INSERT INTO test_update (id, name, value) VALUES (4, \'item4\', 100)' + ]) + + then: 'Should return success with 1 affected row' + insertResult.success == true + insertResult.result == 1 + sql.rows('SELECT * FROM test_update').size() == 4 + + when: 'Updating multiple rows' + def updateResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'UPDATE test_update SET value = 200 WHERE value = 100' + ]) + + then: 'Should return success with 4 affected rows' + updateResult.success == true + updateResult.result == 4 + sql.rows('SELECT * FROM test_update WHERE value = 200').size() == 4 + + when: 'Deleting multiple rows' + def deleteResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'DELETE FROM test_update WHERE value = 200' + ]) + + then: 'Should return success with 4 affected rows' + deleteResult.success == true + deleteResult.result == 4 + sql.rows('SELECT * FROM test_update').size() == 0 + } + + def 'should handle invalid SQL correctly'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_error_' + Random.newInstance().nextInt(1_000_000) + + and: + def session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) + + when: 'Executing invalid SQL' + def invalidResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'INVALID SQL STATEMENT' + ]) + + then: 'Should return failure with error message' + invalidResult.success == false + invalidResult.error != null + + when: 'Executing query with invalid table name' + def noTableResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'SELECT * FROM non_existent_table' + ]) + + then: 'Should return failure with error message' + noTableResult.success == false + noTableResult.error != null + } + + def 'should handle invalid database configuration correctly'() { + given: + def session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: 'jdbc:h2:mem:test']]]] + } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) + + when: 'Using non-existent database alias' + def nonExistentDbResult = sqlExtension.sqlExecute([ + db: 'non_existent_db', + statement: 'SELECT 1' + ]) + + then: 'Should return failure with error message' + nonExistentDbResult.success == false + nonExistentDbResult.error != null + nonExistentDbResult.error.contains('Unknown db name') + + when: 'Missing statement parameter' + def missingStatementResult = sqlExtension.sqlExecute([ + db: 'test' + ]) + + then: 'Should return failure with error message' + missingStatementResult.success == false + missingStatementResult.error != null + missingStatementResult.error.contains('Missing required parameter') + + when: 'Empty statement parameter' + def emptyStatementResult = sqlExtension.sqlExecute([ + db: 'test', + statement: '' + ]) + + then: 'Should return failure with error message' + emptyStatementResult.success == false + emptyStatementResult.error != null + emptyStatementResult.error.contains('Missing required parameter') + } + + def 'should handle statement normalization correctly'() { + given: + def JDBC_URL = 'jdbc:h2:mem:test_norm_' + Random.newInstance().nextInt(1_000_000) + def sql = Sql.newInstance(JDBC_URL, 'sa', null) + + and: + def session = Mock(Session) { + getConfig() >> [sql: [db: [test: [url: JDBC_URL]]]] + } + def sqlExtension = new ChannelSqlExtension() + sqlExtension.init(session) + + when: 'Executing statement without semicolon' + def createResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'CREATE TABLE test_norm(id INT PRIMARY KEY)' + ]) + + then: 'Statement should be executed successfully' + sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() > 0 + createResult.success == true + createResult.result == null + + when: 'Executing statement with trailing whitespace' + def dropResult = sqlExtension.sqlExecute([ + db: 'test', + statement: 'DROP TABLE test_norm ' + ]) + + then: 'Statement should be executed successfully' + sql.rows('SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \'TEST_NORM\'').size() == 0 + dropResult.success == true + dropResult.result == null + } +} \ No newline at end of file diff --git a/plugins/nf-sqldb/src/test/nextflow/sql/SqlPluginIntegrationTest.groovy b/plugins/nf-sqldb/src/test/nextflow/sql/SqlPluginIntegrationTest.groovy new file mode 100644 index 0000000..184408a --- /dev/null +++ b/plugins/nf-sqldb/src/test/nextflow/sql/SqlPluginIntegrationTest.groovy @@ -0,0 +1,65 @@ +package nextflow.sql + +import spock.lang.Specification +import spock.lang.Requires +import java.nio.file.Path +import java.nio.file.Paths +import java.nio.file.Files + +class SqlPluginIntegrationTest extends Specification { + + static boolean isNextflowAvailable() { + try { + def proc = new ProcessBuilder('nextflow', '--version').start() + proc.waitFor() + return proc.exitValue() == 0 + } catch (Exception e) { + return false + } + } + + static boolean hasDatabricksCredentials() { + def jdbcUrl = System.getenv('DATABRICKS_JDBC_URL') + def token = System.getenv('DATABRICKS_TOKEN') + return jdbcUrl && token && !jdbcUrl.isEmpty() && !token.isEmpty() + } + + @Requires({ isNextflowAvailable() && hasDatabricksCredentials() }) + def 'should run Nextflow pipeline with SQL plugin and Databricks'() { + given: + // Ensure test resources directory exists + def testDir = Paths.get('plugins/nf-sqldb/src/testResources/testDir').toAbsolutePath() + def scriptPath = testDir.resolve('main.nf') + def configPath = testDir.resolve('nextflow.config') + + // Check if required files exist + assert Files.exists(testDir), "Test directory doesn't exist: $testDir" + assert Files.exists(scriptPath), "Script file doesn't exist: $scriptPath" + assert Files.exists(configPath), "Config file doesn't exist: $configPath" + + def env = [ + 'DATABRICKS_JDBC_URL': System.getenv('DATABRICKS_JDBC_URL'), + 'DATABRICKS_TOKEN': System.getenv('DATABRICKS_TOKEN') + ] + + when: + def pb = new ProcessBuilder('nextflow', 'run', scriptPath.toString(), '-c', configPath.toString()) + pb.directory(testDir.toFile()) + pb.environment().putAll(env) + pb.redirectErrorStream(true) + + def proc = pb.start() + def output = new StringBuilder() + proc.inputStream.eachLine { line -> + println line // Print output in real-time for debugging + output.append(line).append('\n') + } + def exitCode = proc.waitFor() + + then: + exitCode == 0 + output.toString().contains('alpha') // Should see query result in output + output.toString().contains('beta') + output.toString().contains('Updated 1 row(s)') + } +} \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/test-databricks/main.nf b/plugins/nf-sqldb/src/testResources/test-databricks/main.nf new file mode 100644 index 0000000..1cb6f61 --- /dev/null +++ b/plugins/nf-sqldb/src/testResources/test-databricks/main.nf @@ -0,0 +1,49 @@ +nextflow.enable.dsl=2 + +include { fromQuery; sqlInsert; sqlExecute } from 'plugin/nf-sqldb' + +workflow { + // Setup: create table (DDL operation) + def createResult = sqlExecute( + db: 'foo', + statement: ''' + CREATE TABLE IF NOT EXISTS testing ( + id INTEGER PRIMARY KEY, + name VARCHAR(100), + value DOUBLE + ) + ''' + ) + println "Create table success: ${createResult.success}" // Should be true + + // Handle potential failure + if (!createResult.success) { + println "Failed to create table: ${createResult.error}" + return + } + + // Insert data using sqlInsert + Channel + .of([1, 'alpha', 10.5], [2, 'beta', 20.5]) + .sqlInsert( + db: 'foo', + into: 'sample_table', + columns: 'id, name, value' + ) + + // Query data using fromQuery + fromQuery('SELECT * FROM sample_table', db: 'foo') + .view() + + // Update data using sqlExecute (DML operation returns affected row count in result field) + def updateResult = sqlExecute( + db: 'foo', + statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'beta'" + ) + + if (updateResult.success) { + println "Updated ${updateResult.result} row(s)" + } else { + println "Update failed: ${updateResult.error}" + } +} \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/test-databricks/nextflow.config b/plugins/nf-sqldb/src/testResources/test-databricks/nextflow.config new file mode 100644 index 0000000..7b7941a --- /dev/null +++ b/plugins/nf-sqldb/src/testResources/test-databricks/nextflow.config @@ -0,0 +1,19 @@ +plugins { + id 'nf-sqldb' +} + +sql { + db { + foo { + url = System.getenv('DATABRICKS_JDBC_URL') + user = '' + password = '' + properties { + token = System.getenv('DATABRICKS_TOKEN') + ConnCatalog = "hive_metastore" + ConnSchema = "default" + } + // Add any other required properties for Databricks + } + } +} \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/test-h2/main.nf b/plugins/nf-sqldb/src/testResources/test-h2/main.nf new file mode 100644 index 0000000..b6ca32a --- /dev/null +++ b/plugins/nf-sqldb/src/testResources/test-h2/main.nf @@ -0,0 +1,76 @@ +#!/usr/bin/env nextflow + +/* + * Example script demonstrating how to use the SQL sqlExecute function + */ + +include { sqlExecute } from 'plugin/nf-sqldb' +include { fromQuery } from 'plugin/nf-sqldb' + +// Define database configuration in nextflow.config file +// sql.db.demo = [url: 'jdbc:h2:mem:demo', driver: 'org.h2.Driver'] + +workflow { + log.info """ + ========================================= + SQL Execution Function Example + ========================================= + """ + + // Step 1: Create a table (DDL operation returns null) + log.info "Creating a sample table..." + def createResult = sqlExecute( + db: 'demo', + statement: ''' + CREATE TABLE IF NOT EXISTS TEST_TABLE ( + ID INTEGER PRIMARY KEY, + NAME VARCHAR(100), + VALUE DOUBLE + ) + ''' + ) + log.info "Create table result: $createResult" + + // Step 2: Insert some data (DML operation returns affected row count) + log.info "Inserting data..." + def insertCount = sqlExecute( + db: 'demo', + statement: ''' + INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES + (1, 'alpha', 10.5), + (2, 'beta', 20.7), + (3, 'gamma', 30.2), + (4, 'delta', 40.9); + ''' + ) + log.info "Inserted $insertCount rows" + + // Step 3: Update some data (DML operation returns affected row count) + log.info "Updating data..." + def updateCount = sqlExecute( + db: 'demo', + statement: ''' + UPDATE TEST_TABLE + SET VALUE = VALUE * 2 + WHERE ID = 2; + ''' + ) + log.info "Updated $updateCount rows" + + // Step 4: Delete some data (DML operation returns affected row count) + log.info "Deleting data..." + def deleteCount = sqlExecute( + db: 'demo', + statement: ''' + DELETE FROM TEST_TABLE + WHERE ID = 4; + ''' + ) + log.info "Deleted $deleteCount rows" + + // Step 5: Query results + log.info "Querying results..." + channel + .fromQuery("SELECT * FROM TEST_TABLE ORDER BY ID", db: 'demo') + .view { row -> "ID: ${row[0]}, Name: ${row[1]}, Value: ${row[2]}" } +} \ No newline at end of file diff --git a/plugins/nf-sqldb/src/testResources/test-h2/nextflow.config b/plugins/nf-sqldb/src/testResources/test-h2/nextflow.config new file mode 100644 index 0000000..c78361c --- /dev/null +++ b/plugins/nf-sqldb/src/testResources/test-h2/nextflow.config @@ -0,0 +1,22 @@ +/* + * Configuration file for the SQL execution example script + */ + +// Enable the SQL DB plugin +plugins { + id 'nf-sqldb@0.7.0' +} + +// Define a file-based H2 database for the example +sql { + db { + demo { + url = 'jdbc:h2:./demo' + driver = 'org.h2.Driver' + } + } +} + +// Silence unnecessary Nextflow logs +process.echo = false +dag.enabled = false \ No newline at end of file