Skip to content

Commit adb29cf

Browse files
authored
Jdbc workers (#878)
* JDBC workers * Documentation update
1 parent 1590e6f commit adb29cf

119 files changed

Lines changed: 3442 additions & 1164 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

ai/JDBC_CONFIGURATION.md

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
# JDBC Configuration
2+
3+
This document describes the configuration format for JDBC database connections in Conductor.
4+
5+
## Overview
6+
7+
Conductor supports configuring **multiple named JDBC instances** for use by the `JDBC` worker task. This allows you to:
8+
9+
- Connect to multiple databases (MySQL, PostgreSQL, Oracle, etc.)
10+
- Separate environments (prod, dev, staging)
11+
- Use different connection pool settings per use case (read-heavy vs write-heavy)
12+
13+
## Configuration Format
14+
15+
JDBC instances are configured using a list-based approach under `conductor.jdbc.instances`:
16+
17+
```yaml
18+
conductor:
19+
jdbc:
20+
instances:
21+
- name: "instance-name" # Unique identifier for this instance
22+
connection: # Connection configuration
23+
datasourceURL: "jdbc:..." # JDBC connection URL
24+
jdbcDriver: "..." # JDBC driver class (optional, auto-detected from URL)
25+
user: "..." # Database username
26+
password: "..." # Database password
27+
# ... pool settings
28+
```
29+
30+
## Configuration Examples
31+
32+
### Single MySQL Instance
33+
34+
```yaml
35+
conductor:
36+
jdbc:
37+
instances:
38+
- name: "mysql-prod"
39+
connection:
40+
datasourceURL: "jdbc:mysql://prod-db:3306/myapp"
41+
jdbcDriver: "com.mysql.cj.jdbc.Driver"
42+
user: "conductor"
43+
password: "secret"
44+
maximumPoolSize: 20
45+
minimumIdle: 5
46+
```
47+
48+
### Multiple Instances
49+
50+
```yaml
51+
conductor:
52+
jdbc:
53+
instances:
54+
- name: "mysql-prod"
55+
connection:
56+
datasourceURL: "jdbc:mysql://prod-db:3306/myapp"
57+
jdbcDriver: "com.mysql.cj.jdbc.Driver"
58+
user: "conductor"
59+
password: "prod-secret"
60+
maximumPoolSize: 20
61+
62+
- name: "postgres-analytics"
63+
connection:
64+
datasourceURL: "jdbc:postgresql://analytics-db:5432/warehouse"
65+
user: "analyst"
66+
password: "analytics-secret"
67+
maximumPoolSize: 10
68+
69+
- name: "mysql-staging"
70+
connection:
71+
datasourceURL: "jdbc:mysql://staging-db:3306/myapp"
72+
jdbcDriver: "com.mysql.cj.jdbc.Driver"
73+
user: "conductor"
74+
password: "staging-secret"
75+
maximumPoolSize: 5
76+
minimumIdle: 1
77+
```
78+
79+
## Usage in Workflows
80+
81+
When using the JDBC task in your workflows, reference the instance by its configured name using `connectionId`:
82+
83+
```json
84+
{
85+
"name": "query_users",
86+
"taskReferenceName": "query_users_ref",
87+
"type": "JDBC",
88+
"inputParameters": {
89+
"connectionId": "mysql-prod",
90+
"type": "SELECT",
91+
"statement": "SELECT id, name, email FROM users WHERE status = ?",
92+
"parameters": ["active"]
93+
}
94+
}
95+
```
96+
97+
### SELECT Example
98+
99+
```json
100+
{
101+
"name": "find_orders",
102+
"taskReferenceName": "find_orders_ref",
103+
"type": "JDBC",
104+
"inputParameters": {
105+
"connectionId": "postgres-analytics",
106+
"type": "SELECT",
107+
"statement": "SELECT order_id, total FROM orders WHERE customer_id = ?",
108+
"parameters": ["${workflow.input.customerId}"]
109+
}
110+
}
111+
```
112+
113+
Output:
114+
```json
115+
{
116+
"result": [
117+
{"order_id": 101, "total": 49.99},
118+
{"order_id": 205, "total": 129.50}
119+
]
120+
}
121+
```
122+
123+
### UPDATE Example
124+
125+
```json
126+
{
127+
"name": "update_status",
128+
"taskReferenceName": "update_status_ref",
129+
"type": "JDBC",
130+
"inputParameters": {
131+
"connectionId": "mysql-prod",
132+
"type": "UPDATE",
133+
"statement": "UPDATE orders SET status = ? WHERE order_id = ?",
134+
"parameters": ["shipped", "${workflow.input.orderId}"],
135+
"expectedUpdateCount": 1
136+
}
137+
}
138+
```
139+
140+
Output:
141+
```json
142+
{
143+
"update_count": 1
144+
}
145+
```
146+
147+
If the actual update count does not match `expectedUpdateCount`, the transaction is rolled back and the task fails.
148+
149+
## Connection Configuration Options
150+
151+
| Property | Type | Default | Description |
152+
|----------|------|---------|-------------|
153+
| `datasourceURL` | String | Required | JDBC connection URL |
154+
| `jdbcDriver` | String | Auto-detected | JDBC driver class name |
155+
| `user` | String | Optional | Database username |
156+
| `password` | String | Optional | Database password |
157+
| `maximumPoolSize` | Integer | 32 | Maximum connections in the pool |
158+
| `minimumIdle` | Integer | 2 | Minimum idle connections |
159+
| `idleTimeoutMs` | Long | 30000 | Idle connection timeout (ms) |
160+
| `connectionTimeout` | Long | 30000 | Connection acquisition timeout (ms) |
161+
| `leakDetectionThreshold` | Long | 60000 | Leak detection threshold (ms) |
162+
| `maxLifetime` | Long | 1800000 | Maximum connection lifetime (ms) |
163+
164+
## Migration from Old Configuration
165+
166+
### Old Format
167+
168+
```properties
169+
conductor.worker.jdbc.connectionIds=mysql,postgres
170+
conductor.worker.jdbc.mysql.connectionURL=jdbc:mysql://localhost:3306/db
171+
conductor.worker.jdbc.mysql.driverClassName=com.mysql.cj.jdbc.Driver
172+
conductor.worker.jdbc.mysql.username=root
173+
conductor.worker.jdbc.mysql.password=secret
174+
conductor.worker.jdbc.mysql.maximum-pool-size=10
175+
176+
conductor.worker.jdbc.postgres.connectionURL=jdbc:postgresql://localhost:5432/db
177+
conductor.worker.jdbc.postgres.driverClassName=org.postgresql.Driver
178+
conductor.worker.jdbc.postgres.username=pguser
179+
conductor.worker.jdbc.postgres.password=pgpass
180+
```
181+
182+
### New Format
183+
184+
```yaml
185+
conductor:
186+
jdbc:
187+
instances:
188+
- name: "mysql"
189+
connection:
190+
datasourceURL: "jdbc:mysql://localhost:3306/db"
191+
jdbcDriver: "com.mysql.cj.jdbc.Driver"
192+
user: "root"
193+
password: "secret"
194+
maximumPoolSize: 10
195+
196+
- name: "postgres"
197+
connection:
198+
datasourceURL: "jdbc:postgresql://localhost:5432/db"
199+
jdbcDriver: "org.postgresql.Driver"
200+
user: "pguser"
201+
password: "pgpass"
202+
```
203+
204+
**Note:** The old `conductor.worker.jdbc.*` format is still supported for backwards compatibility. If no `conductor.jdbc.instances` are configured, the system automatically falls back to reading the legacy format. The old and new formats are mutually exclusive -- if new-format instances are found, the legacy format is ignored.
205+
206+
### Property Name Mapping
207+
208+
| Old Property | New Property |
209+
|---|---|
210+
| `connectionURL` | `datasourceURL` |
211+
| `driverClassName` | `jdbcDriver` |
212+
| `username` | `user` |
213+
| `password` | `password` |
214+
| `maximum-pool-size` | `maximumPoolSize` |
215+
| `idle-timeout-ms` | `idleTimeoutMs` |
216+
| `minimum-idle` | `minimumIdle` |
217+
218+
## Best Practices
219+
220+
1. **Use descriptive names**: Choose instance names that clearly indicate their purpose (e.g., `mysql-prod`, `postgres-analytics`, `oracle-reporting`)
221+
222+
2. **Separate read/write pools**: For high-throughput systems, configure separate instances for read and write operations with appropriate pool sizes
223+
224+
3. **Right-size connection pools**: Set `maximumPoolSize` based on your database capacity and workload. A common formula is `connections = (core_count * 2) + effective_spindle_count`
225+
226+
4. **Enable leak detection**: The default `leakDetectionThreshold` of 60 seconds logs warnings for connections held longer than expected
227+
228+
5. **Use parameterized queries**: Always use `?` placeholders with the `parameters` list instead of string concatenation to prevent SQL injection
229+
230+
6. **Set expectedUpdateCount**: For critical UPDATE/INSERT/DELETE operations, set `expectedUpdateCount` to automatically rollback if the affected row count doesn't match
231+
232+
## Troubleshooting
233+
234+
### Instance Not Found
235+
236+
If you see "JDBC instance not found: xyz", check:
237+
238+
1. The `connectionId` in your workflow matches the configured `name` exactly
239+
2. The instance is properly configured in your application.yml/properties
240+
3. The application has been restarted after configuration changes
241+
242+
### Connection Timeout
243+
244+
If connections are timing out:
245+
246+
1. Verify network connectivity to the database
247+
2. Check `connectionTimeout` value (default 30 seconds)
248+
3. Ensure the connection pool is not exhausted (increase `maximumPoolSize` if needed)
249+
4. Check database max connections limit
250+
251+
### Connection Leaks
252+
253+
If you see leak detection warnings:
254+
255+
1. Ensure all connections are properly closed (the JDBC worker handles this automatically)
256+
2. If using custom integrations, wrap connection usage in try-with-resources
257+
3. Review `leakDetectionThreshold` setting

ai/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,10 @@ conductor.ai.payload-store-location=/tmp/conductor-ai
429429

430430
Vector databases support multiple named instances. For detailed configuration options and examples, see [Vector Database Configuration](VECTORDB_CONFIGURATION.md).
431431

432+
### JDBC Configuration
433+
434+
JDBC connections support multiple named instances for the `JDBC` worker task. For detailed configuration options, migration guide, and examples, see [JDBC Configuration](JDBC_CONFIGURATION.md).
435+
432436
### Provider-Specific Configuration (LLM)
433437

434438
#### OpenAI

ai/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,6 @@ dependencies {
5757
testImplementation "org.testcontainers:postgresql:${revTestContainer}"
5858
testImplementation "org.postgresql:postgresql:${revPostgres}"
5959
testImplementation "org.apache.commons:commons-compress:${revCommonsCompress}"
60+
testImplementation "com.h2database:h2:2.2.224"
6061

6162
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2026 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package org.conductoross.conductor.ai.sql;
14+
15+
import javax.sql.DataSource;
16+
17+
import com.zaxxer.hikari.HikariDataSource;
18+
import lombok.AllArgsConstructor;
19+
import lombok.Data;
20+
import lombok.NoArgsConstructor;
21+
22+
@Data
23+
@NoArgsConstructor
24+
@AllArgsConstructor
25+
public class JDBCConnectionConfig {
26+
27+
private String datasourceURL;
28+
29+
private String jdbcDriver;
30+
31+
private String user;
32+
33+
private String password;
34+
35+
// Hikari pool settings with defaults
36+
private Integer maximumPoolSize = 32;
37+
38+
private Long idleTimeoutMs = 30000L;
39+
40+
private Integer minimumIdle = 2;
41+
42+
private Long leakDetectionThreshold = 60000L;
43+
44+
private Long connectionTimeout = 30000L;
45+
46+
private Long maxLifetime = 1800000L;
47+
48+
/**
49+
* Creates a configured HikariCP DataSource from this configuration.
50+
*
51+
* @param name Pool name for identification and logging
52+
* @return A configured DataSource
53+
*/
54+
public DataSource createDataSource(String name) {
55+
HikariDataSource ds = new HikariDataSource();
56+
ds.setPoolName(name);
57+
ds.setJdbcUrl(datasourceURL);
58+
if (jdbcDriver != null && !jdbcDriver.isBlank()) {
59+
ds.setDriverClassName(jdbcDriver);
60+
}
61+
if (user != null) {
62+
ds.setUsername(user);
63+
}
64+
if (password != null) {
65+
ds.setPassword(password);
66+
}
67+
ds.setMaximumPoolSize(maximumPoolSize != null ? maximumPoolSize : 32);
68+
ds.setIdleTimeout(idleTimeoutMs != null ? idleTimeoutMs : 30000L);
69+
ds.setMinimumIdle(minimumIdle != null ? minimumIdle : 2);
70+
ds.setLeakDetectionThreshold(
71+
leakDetectionThreshold != null ? leakDetectionThreshold : 60000L);
72+
ds.setConnectionTimeout(connectionTimeout != null ? connectionTimeout : 30000L);
73+
ds.setMaxLifetime(maxLifetime != null ? maxLifetime : 1800000L);
74+
return ds;
75+
}
76+
}

0 commit comments

Comments
 (0)