This guide shows how to use Lakehouse Tapworks through both command line (CLI) and notebook/programmatic interfaces.
Install the package first, then use the tapworks command:
# Install (from repo root)
pip install -e .
# List available connectors
tapworks --list
# Show connector info (required columns, defaults)
tapworks salesforce --info
# Generate pipelines using settings file
tapworks salesforce --input-config tables.csv --output-dir output --settings settings.json
# Generate pipelines using inline JSON
tapworks sql_server --input-config tables.csv --output-dir output \
--targets '{"dev": {"workspace_host": "https://..."}}' \
--default-values '{"project_name": "my_project"}'Use notebook_runner.py in Databricks for a single notebook entry point:
# Configuration - edit these values
connector_name = "salesforce"
input_source = "main.config.pipeline_tables" # Delta table or CSV path
output_dir = "/Workspace/Users/you@company.com/dab_output"
targets = {
"dev": {"workspace_host": "https://dev.cloud.databricks.com"},
"prod": {"workspace_host": "https://prod.cloud.databricks.com"},
}
default_values = {"project_name": "my_project", "schedule": "0 */6 * * *"}
# Run pipeline generation
from core.runner import run_pipeline_generation
result_df = run_pipeline_generation(
connector_name=connector_name,
input_source=input_source,
output_dir=output_dir,
targets=targets,
default_values=default_values,
spark_session=spark,
)
display(result_df)| Parameter | Description |
|---|---|
targets |
Target environments (dev, prod) with workspace settings |
default_values |
Default values for optional columns - fills missing/empty values (supports group-based) |
override_input_config |
Force override values for ALL rows (supports group-based) |
max_tables_per_pipeline |
Maximum tables per pipeline (default: 250) |
max_tables_per_gateway |
Maximum tables per gateway - database connectors only (default: 250) |
Both default_values and override_input_config support two formats:
default_values = {
'schedule': '0 */6 * * *',
'pause_status': 'UNPAUSED',
}default_values = {
'*': {'schedule': '0 */6 * * *'}, # Global fallback
'sales': {'schedule': '*/15 * * * *'}, # All sales pipelines
'sales_2': {'schedule': '*/30 * * * *'}, # Only sales_2 subgroup
'hr': {'schedule': '0 0 * * *'}, # HR pipelines
}
override_config = {
'*': {'pause_status': 'UNPAUSED'},
'finance': {'pause_status': 'PAUSED'}, # Pause finance for audit
}Config keys are matched in this order (most specific wins):
pipeline_group(prefix_subgroup) - e.g.,'sales_2'prefix- e.g.,'sales'project_name- e.g.,'my_project''*'(global fallback)
| Parameter | Behavior |
|---|---|
default_values |
Fill missing/empty values only |
override_config |
Overwrite all values (ignores CSV) |
See examples/features/group_based_config (Databricks) for detailed examples.
{
"targets": {
"dev": {
"workspace_host": "https://dev.cloud.databricks.com",
"root_path": "/Shared/pipelines/dev"
},
"prod": {
"workspace_host": "https://prod.cloud.databricks.com",
"root_path": "/Shared/pipelines/prod"
}
},
"default_values": {
"project_name": "my_project",
"schedule": "0 */6 * * *"
},
"override_input_config": {
"pause_status": "PAUSED"
},
"max_tables_per_pipeline": 250
}Use tapworks <connector> --info to see required columns and defaults for any connector.
Salesforce:
tapworks salesforce --input-config tables.csv --output-dir output --settings settings.jsonRequired columns: source_database, source_schema, source_table_name, target_catalog, target_schema, target_table_name, connection_name
Optional: include_columns, exclude_columns, primary_keys (comma-separated; supports composite keys)
Google Analytics 4:
tapworks google_analytics --input-config tables.csv --output-dir output --settings settings.jsonRequired columns: source_database, source_schema, source_table_name, target_catalog, target_schema, target_table_name, connection_name
ServiceNow:
tapworks servicenow --input-config tables.csv --output-dir output --settings settings.jsonRequired columns: source_database, source_schema, source_table_name, target_catalog, target_schema, target_table_name, connection_name
Workday Reports:
tapworks workday_reports --input-config tables.csv --output-dir output --settings settings.jsonRequired columns: source_url, target_catalog, target_schema, target_table_name, connection_name, primary_keys
Database connectors support two-level load balancing with gateways.
SQL Server:
tapworks sql_server --input-config tables.csv --output-dir output --settings settings.jsonRequired columns: source_database, source_schema, source_table_name, target_catalog, target_schema, target_table_name, connection_name
Optional: gateway_catalog, gateway_schema, gateway_worker_type, gateway_driver_type
PostgreSQL:
tapworks postgresql --input-config tables.csv --output-dir output --settings settings.jsonRequired columns: source_database, source_schema, source_table_name, target_catalog, target_schema, target_table_name, connection_name
Optional: gateway_catalog, gateway_schema, gateway_worker_type, gateway_driver_type
You can also use connectors directly in Python:
from tapworks.core import get_connector, run_pipeline_generation
# Option 1: Use the unified runner
result = run_pipeline_generation(
connector_name='salesforce',
input_source='tables.csv',
output_dir='output',
targets={'dev': {'workspace_host': 'https://...'}},
)
# Option 2: Use connector directly
connector = get_connector('salesforce')
result = connector.run_complete_pipeline_generation(
df=df,
output_dir='output',
targets={'dev': {'workspace_host': 'https://...'}},
)Each connector folder contains an example_notebook.ipynb:
salesforce/example_notebook.ipynbsql_server/example_notebook.ipynbpostgresql/example_notebook.ipynbgoogle_analytics/example_notebook.ipynbservicenow/example_notebook.ipynbworkday_reports/example_notebook.ipynb