Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import nextflow.plugin.Plugins
import nextflow.processor.ErrorStrategy
import nextflow.processor.TaskFault
import nextflow.processor.TaskHandler
import nextflow.processor.hash.TaskHasherFactory
import nextflow.processor.TaskProcessor
import nextflow.script.BaseScript
import nextflow.script.ProcessFactory
Expand Down Expand Up @@ -179,6 +180,11 @@ class Session implements ISession {
*/
String runName

/**
* The task hash strategy version
*/
TaskHasherFactory.Version hashStrategy

/**
* Enable stub run mode
*/
Expand Down Expand Up @@ -385,6 +391,9 @@ class Session implements ISession {
this.runName = config.runName ?: NameGenerator.next()
log.debug "Run name: $runName"

// -- hash strategy
this.hashStrategy = TaskHasherFactory.Version.DEFAULT()

// -- dry run
this.stubRun = config.stubRun

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class TaskConfig extends LazyMap implements Cloneable {
}


protected TaskClosure getStubBlock() {
TaskClosure getStubBlock() {
final code = target.get(NextflowDSLImpl.PROCESS_STUB)
if( !code )
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ import nextflow.script.types.Record
import nextflow.script.types.Types
import nextflow.trace.TraceRecord
import nextflow.util.Escape
import nextflow.processor.hash.TaskHasherFactory
import nextflow.util.HashBuilder
import nextflow.util.LockManager
import nextflow.util.TestOnly
Expand Down Expand Up @@ -668,7 +669,7 @@ class TaskProcessor {
// -- download foreign files
session.filePorter.transfer(foreignFiles)

final hash = new TaskHasher(task).compute()
final hash = TaskHasherFactory.create(task).compute()
checkCachedOrLaunchTask(task, hash, resumable)
}

Expand Down
37 changes: 37 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,43 @@ class TaskRun implements Cloneable {
return result
}

/**
* Get the mapping of global variables referenced by the task script,
* including {@code task.ext.*} directive variables.
*/
Map<String,Object> getTaskGlobalVars() {
final result = getGlobalVars(processor.getOwnerScript().getBinding())
final variableNames = getVariableNames()
final taskConfig = config
for( final key : variableNames ) {
if( !key.startsWith('task.ext.') )
continue
final value = taskConfig.eval(key.substring(5))
result.put(key, value)
}
return result
}

/**
* Scan the task command string looking for invocations of scripts
* defined in the project bin folder.
*
* @param script The task command string
* @return The list of paths of scripts in the project bin folder referenced in the task command
*/
List<Path> getTaskBinEntries(String script) {
List<Path> result = []
final entries = processor.session.binEntries
final tokenizer = new StringTokenizer(script, " \t\n\r\f()[]{};&|<>`")
while( tokenizer.hasMoreTokens() ) {
final token = tokenizer.nextToken()
final path = entries.get(token)
if( path )
result.add(path)
}
return result
}

TaskBean toTaskBean() {
return new TaskBean(this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nextflow.processor
package nextflow.processor.hash

import java.nio.file.Path

Expand All @@ -24,29 +24,41 @@ import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.exception.UnexpectedException
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.util.CacheHelper
import nextflow.util.HashBuilder
/**
* Implement task hash computation
* Common logic for task hash computation strategies.
*
* Subclasses implement {@link #createHashBuilder()} to configure
* version-specific hashing behaviour.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@CompileStatic
class TaskHasher {
abstract class AbstractTaskHasher implements TaskHasher {

private TaskRun task
protected TaskRun task

private TaskProcessor processor
protected TaskProcessor processor

private Session session
protected Session session

public TaskHasher(TaskRun task) {
AbstractTaskHasher(TaskRun task) {
this.task = task
this.processor = task.processor
this.session = task.processor.session
}

public HashCode compute() {
/**
* Create a {@link HashBuilder} configured for this hashing strategy.
*/
abstract protected HashBuilder createHashBuilder()

@Override
HashCode compute() {

final keys = new ArrayList<Object>()

Expand Down Expand Up @@ -141,94 +153,39 @@ class TaskHasher {

/**
* Compute a deterministic string representation of eval output commands for cache hashing.
* This method creates a consistent hash key based on the semantic names and command values
* of eval outputs, ensuring cache invalidation when eval outputs change.
*
* @param outEvals Map of eval parameter names to their command strings
* @return A concatenated string of "name=command" pairs, sorted for deterministic hashing
*/
protected static String computeEvalOutputCommands(Map<String, String> outEvals) {
// Assert precondition that outEvals should not be null or empty when this method is called
assert outEvals != null && !outEvals.isEmpty(), "Eval outputs should not be null or empty"

final result = new StringBuilder()

// Sort entries by key for deterministic ordering. This ensures that the same set of
// eval outputs always produces the same hash regardless of map iteration order,
// which is critical for cache consistency across different JVM runs.
// Without sorting, HashMap iteration order can vary between executions, leading to
// different cache keys for identical eval output configurations and causing
// unnecessary cache misses and task re-execution
final sortedEntries = outEvals.entrySet().sort { a, b -> a.key.compareTo(b.key) }

// Build content using for loop to concatenate "name=command" pairs.
// This creates a symmetric pattern with input parameter hashing where both
// the parameter name and its value contribute to the cache key
for( final entry : sortedEntries ) {
// Add newline separator between entries for readability in debug scenarios
if( result.length() > 0 ) {
result.append('\n')
}
// Format: "semantic_name=bash_command" - both name and command value are
// included because changing either should invalidate the task cache
result.append(entry.key).append('=').append(entry.value)
}

return result.toString()
}

/**
* Get the mapping of global variables that were referenced by
* the task script, excluding references to `task.ext`.
*/
Map<String,Object> getTaskGlobalVars() {
final result = task.getGlobalVars(task.processor.getOwnerScript().getBinding())
final directives = getTaskExtensionDirectiveVars()
result.putAll(directives)
return result
}

protected Map<String,Object> getTaskExtensionDirectiveVars() {
final variableNames = task.getVariableNames()
final result = new HashMap(variableNames.size())
final taskConfig = task.config
for( final key : variableNames ) {
if( !key.startsWith('task.ext.') )
continue
final value = taskConfig.eval(key.substring(5))
result.put(key, value)
}

return result
protected Map<String,Object> getTaskGlobalVars() {
return task.getTaskGlobalVars()
}

/**
* This method scans the task command string looking for invocations of scripts
* defined in the project bin folder.
*
* @param script The task command string
* @return The list of paths of scripts in the project bin folder referenced in the task command
*/
@Memoized
List<Path> getTaskBinEntries(String script) {
List<Path> result = []
final tokenizer = new StringTokenizer(script, " \t\n\r\f()[]{};&|<>`")
while( tokenizer.hasMoreTokens() ) {
final token = tokenizer.nextToken()
final path = session.binEntries.get(token)
if( path )
result.add(path)
}
return result
protected List<Path> getTaskBinEntries(String script) {
return task.getTaskBinEntries(script)
}

private String safeTaskName(TaskRun task) {
return task != null ? task.lazyName() : task.processor.name
}

private HashCode computeHash(List keys, CacheHelper.HashMode mode) {
protected HashCode computeHash(List keys, CacheHelper.HashMode mode) {
try {
return CacheHelper.hasher(keys, mode).hash()
return createHashBuilder().withMode(mode).with(keys).build()
}
catch (Throwable e) {
final msg = "Something went wrong while creating task hash for process '${task.processor.name}' -- Offending keys: ${ keys.collect { k -> "\n - type=${k.getClass().getName()} value=$k" } }"
Expand All @@ -238,7 +195,7 @@ class TaskHasher {

private void dumpHashEntriesJson(TaskRun task, List entries, CacheHelper.HashMode mode, hash) {
final collector = (item) -> [
hash: CacheHelper.hasher(item, mode).hash().toString(),
hash: createHashBuilder().withMode(mode).with(item).build().toString(),
type: item?.getClass()?.getName(),
value: item?.toString()
]
Expand All @@ -250,7 +207,7 @@ class TaskHasher {
final buffer = new StringBuilder()
buffer.append("[${safeTaskName(task)}] cache hash: ${hash}; mode: $mode; entries: \n")
for( final entry : entries ) {
buffer.append( " ${CacheHelper.hasher(entry, mode).hash()} [${entry?.getClass()?.getName()}] $entry \n")
buffer.append( " ${createHashBuilder().withMode(mode).with(entry).build()} [${entry?.getClass()?.getName()}] $entry \n")
}
log.info(buffer.toString())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nextflow.processor.hash

import com.google.common.hash.HashCode
import groovy.transform.CompileStatic
/**
* Define the interface for task hash computation
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
interface TaskHasher {

HashCode compute()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nextflow.processor.hash

import groovy.transform.CompileStatic
import nextflow.SysEnv
import nextflow.processor.TaskRun
/**
* Factory for creating versioned {@link TaskHasher} instances.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class TaskHasherFactory {

enum Version {
STD_V1('std/v1'),
STD_V2('std/v2')

final String value

Version(String value) {
this.value = value
}

String toString() { value }

static Version of(String val) {
for( Version v : values() ) {
if( v.value == val )
return v
}
throw new IllegalArgumentException("Unknown task hasher version: ${val}")
}

static Version DEFAULT() {
final val = SysEnv.get('NXF_TASK_HASH_VER')
return val ? of(val) : STD_V2
}
}

static TaskHasher create(TaskRun task) {
final version = task.processor.session.hashStrategy
switch( version ) {
case Version.STD_V1:
return new TaskHasherV1(task)
case Version.STD_V2:
return new TaskHasherV2(task)
default:
throw new IllegalArgumentException("Unknown task hasher version: ${version}")
}
}
}
Loading
Loading