Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,10 @@ The following settings are available:
: The compute backend provider type (e.g. `'aws'`, `'local'`). When specified, used together with `region` to select the matching compute environment.

`seqera.executor.region`
: The AWS region for task execution (default: `'eu-central-1'`).
: The cloud region for task execution.

`seqera.executor.computeEnvId`
: The Seqera Platform compute environment ID. When specified, the scheduler resolves the compute environment directly by this ID instead of inferring a suitable compute environment. Used as a fallback when the workflow launch does not include a compute environment reference.

`seqera.executor.autoLabels`
: When `true`, automatically adds workflow metadata labels to the session with the `nextflow.io/` prefix (default: `false`). The following labels are added: `projectName`, `userName`, `runName`, `sessionId`, `resume`, `revision`, `commitId`, `repository`, `manifestName`, `runtimeVersion`. A `seqera.io/runId` label is also added, computed as a SipHash of the session ID and run name.
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/env-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ The following environment variables control the configuration of the Nextflow ru
`TOWER_REFRESH_TOKEN`
: Specifies the refresh token for maintaining authentication with Seqera Platform. Can also be configured using the `tower.refreshToken` config option.

`TOWER_COMPUTE_ENV_ID`
: Specifies the Seqera Platform compute environment ID. When specified, the scheduler resolves the compute environment directly by this ID instead of inferring a suitable compute environment. Can also be configured using the `tower.computeEnvId` config option.

`TOWER_WORKSPACE_ID`
: Specifies the Seqera Platform workspace ID. Can also be configured using the `tower.workspaceId` config option.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,32 @@ class PlatformHelper {
* @return
*/
static String getWorkspaceId(Map opts, Map<String,String> env) {
final workspaceId = env.get('TOWER_WORKFLOW_ID')
? env.get('TOWER_WORKSPACE_ID')
: opts.workspaceId as Long ?: env.get('TOWER_WORKSPACE_ID') as Long
return workspaceId
try {
final workspaceId = env.get('TOWER_WORKFLOW_ID')
? env.get('TOWER_WORKSPACE_ID')
: opts.workspaceId as Long ?: env.get('TOWER_WORKSPACE_ID') as Long
return workspaceId
}
catch (NumberFormatException e) {
final value = opts.workspaceId ?: env.get('TOWER_WORKSPACE_ID')
throw new IllegalArgumentException("Invalid Seqera Platform workspace ID: '${value}' — workspace ID must be a numeric value. Check the 'tower.workspaceId' setting in your Nextflow configuration or the TOWER_WORKSPACE_ID environment variable")
}
}

/**
* Return the configured Platform compute environment ID: if `TOWER_WORKFLOW_ID` is provided in the environment,
* it means we are running in a Platform-made run and we should ONLY retrieve the value from the environment.
* Otherwise, check the configuration or fallback to the environment. If not found, return null.
*
* @param opts the configuration options for Platform (e.g. `session.config.navigate('tower')`)
* @param env the applicable environment variables
* @return the Platform compute environment ID, or null
*/
static String getComputeEnvId(Map opts, Map<String,String> env) {
final computeEnvId = env.get('TOWER_WORKFLOW_ID')
? env.get('TOWER_COMPUTE_ENV_ID')
: opts.containsKey('computeEnvId') ? opts.computeEnvId as String : env.get('TOWER_COMPUTE_ENV_ID')
return computeEnvId
}

static Map<String,Object> config() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,32 @@ class PlatformHelperTest extends Specification {
cleanup:
SysEnv.pop()
}

def 'should get computeEnvId from config'() {
expect:
PlatformHelper.getComputeEnvId([computeEnvId: 'ce-123'], [:]) == 'ce-123'
}

def 'should get computeEnvId from env'() {
expect:
PlatformHelper.getComputeEnvId([:], [TOWER_COMPUTE_ENV_ID: 'ce-env']) == 'ce-env'
}

def 'should prefer config computeEnvId over env'() {
expect:
PlatformHelper.getComputeEnvId([computeEnvId: 'ce-config'], [TOWER_COMPUTE_ENV_ID: 'ce-env']) == 'ce-config'
}

def 'should use env computeEnvId when TOWER_WORKFLOW_ID is set'() {
expect:
PlatformHelper.getComputeEnvId(
[computeEnvId: 'ce-config'],
[TOWER_WORKFLOW_ID: 'wf-1', TOWER_COMPUTE_ENV_ID: 'ce-env']
) == 'ce-env'
}

def 'should return null computeEnvId when not configured'() {
expect:
PlatformHelper.getComputeEnvId([:], [:]) == null
}
}
16 changes: 15 additions & 1 deletion plugins/nf-seqera/src/main/io/seqera/config/ExecutorOpts.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ class ExecutorOpts implements ConfigScope {
""")
final Map<String, String> taskEnvironment

@ConfigOption
@Description("""
The Seqera Platform compute environment ID. When specified, the scheduler resolves
the compute environment directly by this ID instead of listing all workspace CEs.
Used as a fallback when the workflow launch does not include a CE reference.
""")
final String computeEnvId

/* required by config scope -- do not remove */

ExecutorOpts() {}
Expand All @@ -111,7 +119,7 @@ class ExecutorOpts implements ConfigScope {
throw new IllegalArgumentException("Missing Seqera endpoint - make sure to specify 'seqera.executor.endpoint' settings")

this.provider = opts.provider as String
this.region = opts.region as String ?: "eu-central-1"
this.region = opts.region as String
this.keyPairName = opts.keyPairName as String
this.batchFlushInterval = opts.batchFlushInterval
? Duration.of(opts.batchFlushInterval as String)
Expand All @@ -125,6 +133,8 @@ class ExecutorOpts implements ConfigScope {
this.predictionModel = parsePredictionModel(opts.predictionModel as String)
// custom task environment variables
this.taskEnvironment = opts.taskEnvironment as Map<String, String>
// compute environment ID
this.computeEnvId = opts.computeEnvId as String
}

private static final Set<String> VALID_PREDICTION_MODELS = Set.of('qr/v1')
Expand Down Expand Up @@ -180,4 +190,8 @@ class ExecutorOpts implements ConfigScope {
Map<String, String> getTaskEnvironment() {
return taskEnvironment
}

String getComputeEnvId() {
return computeEnvId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {
final towerConfig = session.config.tower as Map ?: Collections.emptyMap()
final workflowId = session.workflowMetadata?.platform?.workflowId
final workflowUrl = session.workflowMetadata?.platform?.workflowUrl
final workspaceId = PlatformHelper.getWorkspaceId(towerConfig, SysEnv.get()) as Long
final computeEnvId = PlatformHelper.getComputeEnvId(towerConfig, SysEnv.get()) ?: seqeraConfig.computeEnvId

final labels = new Labels()
if( seqeraConfig.autoLabels )
labels.withWorkflowMetadata(session.workflowMetadata)
Expand All @@ -116,9 +119,10 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {
.name(session.runName)
.machineRequirement(SchemaMapperUtil.toMachineRequirement(seqeraConfig.machineRequirement))
.labels(labels.entries)
.workspaceId(PlatformHelper.getWorkspaceId(towerConfig, SysEnv.get()) as Long)
.workspaceId(workspaceId)
.pipeline(pipeline)
.predictionModel(predictionModel)
.computeEnvId(computeEnvId)
log.debug "[SEQERA] Creating run: ${request}"
final response = client.createRun(request)
this.runId = response.getRunId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class ExecutorOptsTest extends Specification {

then:
config.endpoint == 'https://sched.example.com'
config.region == 'eu-central-1' // default
config.region == null
config.provider == null
config.keyPairName == null
config.batchFlushInterval == Duration.of('1 sec')
config.machineRequirement != null
Expand Down Expand Up @@ -227,6 +228,27 @@ class ExecutorOptsTest extends Specification {
config.taskEnvironment == [:]
}

def 'should create config with computeEnvId' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
computeEnvId: 'ce-12345'
])

then:
config.computeEnvId == 'ce-12345'
}

def 'should default computeEnvId to null' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com'
])

then:
config.computeEnvId == null
}

def 'should create config with provider' () {
when:
def config = new ExecutorOpts([
Expand Down
Loading