Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions docs/module.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ baseDir

Modules can define binary scripts that are locally scoped to the processes defined by the tasks.

To enable this feature, set the following flag in your pipeline script or configuration file:
The binary scripts can be placed in any of the following directories within the module:

```nextflow
nextflow.enable.moduleBinaries = true
```
- `<module-dir>/resources/bin`
- `<module-dir>/resources/usr/bin`
- `<module-dir>/resources/usr/local/bin`

The binary scripts must be placed in the module directory named `<module-dir>/resources/usr/bin`:
For example:

```
<module-dir>
Expand All @@ -271,11 +271,7 @@ The binary scripts must be placed in the module directory named `<module-dir>/re
└── another-module-script2.py
```

Those scripts will be made accessible like any other command in the task environment, provided they have been granted the Linux execute permissions.

:::{note}
This feature requires the use of a local or shared file system for the pipeline work directory, or {ref}`wave-page` when using cloud-based executors.
:::
Those scripts will be made accessible like any other command in the task environment, provided they have been granted the Linux execute permissions. Module binaries work on all executors, including cloud-based executors.

## Sharing modules

Expand Down
4 changes: 3 additions & 1 deletion docs/reference/feature-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ Feature flags with the `nextflow.preview` prefix can cause pipelines run with ne
: Defines the DSL version to use (`1` or `2`).

`nextflow.enable.moduleBinaries`
: When `true`, enables the use of modules with binary scripts. See {ref}`module-binaries` for more information.
: :::{deprecated} 25.04.0
:::
: Module binaries are now enabled by default. This flag is no longer required. See {ref}`module-binaries` for more information.

`nextflow.enable.strict`
: :::{deprecated} 26.04.0
Expand Down
4 changes: 4 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NF.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class NF {
NextflowMeta.instance.isModuleBinariesEnabled()
}

static boolean isModuleBinariesDisabled() {
NextflowMeta.instance.isModuleBinariesDisabled()
}

static boolean isRecurseEnabled() {
NextflowMeta.instance.preview.recursion
}
Expand Down
4 changes: 4 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,12 @@ class NextflowMeta {
return enable.moduleBinaries
}

boolean moduleBinariesDisabled

void moduleBinaries(boolean mode) {
enable.moduleBinaries = mode
if( !mode )
moduleBinariesDisabled = true
}

}
10 changes: 8 additions & 2 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,17 @@ class CmdRun extends CmdBase implements HubOptions {
}

static void detectModuleBinaryFeature(ConfigMap config) {
final moduleBinaries = config.navigate('nextflow.enable.moduleBinaries', false)
final moduleBinaries = config.navigate('nextflow.enable.moduleBinaries')
if( moduleBinaries == null )
return
if( moduleBinaries ) {
log.debug "Enabling module binaries"
log.warn "Configuration `nextflow.enable.moduleBinaries` is no longer needed -- module binaries are now enabled by default"
NextflowMeta.instance.moduleBinaries(true)
}
else {
log.warn "Configuration `nextflow.enable.moduleBinaries = false` is deprecated and will be ignored in a future version -- module binaries are now always enabled"
NextflowMeta.instance.moduleBinaries(false)
}
}

static void detectStrictFeature(ConfigMap config, Map sysEnv) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ class BashWrapperBuilder {
binding.trace_cmd = getTraceCommand(interpreter)
binding.launch_cmd = getLaunchCommand(interpreter,env)
binding.stage_cmd = getStageCommand()
binding.module_bin_path = binFilesStaged ? getBinPathScript() : null
binding.unstage_cmd = getUnstageCommand()
binding.unstage_controls = changeDir || shouldUnstageControls() ? getUnstageControls() : null

Expand Down Expand Up @@ -700,9 +701,6 @@ class BashWrapperBuilder {
if( stageInMode != 'copy' && allowContainerMounts )
builder.addMountForInputs(inputFiles)

if( allowContainerMounts )
builder.addMounts(binDirs)

if(this.containerMount)
builder.addMount(containerMount)

Expand Down Expand Up @@ -775,6 +773,12 @@ class BashWrapperBuilder {
p != -1 ? "nxf_module_load ${name.substring(0,p)} ${name.substring(p+1)}" : "nxf_module_load ${name}"
}

protected String getBinPathScript() {
final binDir = TaskRun.BIN_DIR
"chmod +x ${binDir}/*\n" +
"export PATH=\"\$PWD/${binDir}:\$PATH\""
}

protected String getStageCommand() { 'nxf_stage' }

protected String getUnstageCommand() { 'nxf_unstage' }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.container.ContainerConfig
import nextflow.executor.BashWrapperBuilder
import nextflow.executor.TaskArrayExecutor
Expand All @@ -30,6 +31,7 @@ import nextflow.util.MemoryUnit
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@CompileStatic
class TaskBean implements Serializable, Cloneable {

Expand Down Expand Up @@ -117,6 +119,8 @@ class TaskBean implements Serializable, Cloneable {

Boolean stageFileEnabled

boolean binFilesStaged

@PackageScope
TaskBean() {
shell = BashWrapperBuilder.BASH
Expand Down Expand Up @@ -166,6 +170,17 @@ class TaskBean implements Serializable, Cloneable {
this.statsEnabled = task.getProcessor().getSession().statsEnabled

this.inputFiles = task.getInputFilesMap()
final moduleBinFiles = task.getProcessor().getModuleBinFiles() ?: Collections.<String,Path>emptyMap()
final projectBinFiles = task.getProcessor().getReferencedProjectBinFiles(task.source) ?: Collections.<String,Path>emptyMap()
this.binFilesStaged = !moduleBinFiles.isEmpty() || !projectBinFiles.isEmpty()
if( binFilesStaged ) {
this.inputFiles.putAll(moduleBinFiles)
for( Map.Entry<String,Path> e : projectBinFiles ) {
if( moduleBinFiles.containsKey(e.key) )
log.warn "Project bin script '${e.key.substring(TaskRun.BIN_DIR.length() + 1)}' overrides module bin script with the same name"
this.inputFiles.put(e.key, e.value)
}
}
this.outputFiles = task.getOutputFilesNames()
this.binDirs = task.getProcessor().getBinDirs()
this.stageInMode = task.config.getStageInMode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class TaskHasher {
keys.addAll(binEntries)
}

final moduleBinFiles = processor.getModuleBinFiles()
if( moduleBinFiles ) {
log.trace "Task: ${task.processor.name} > Adding module bin files: ${-> moduleBinFiles.values().join('; ')}"
keys.addAll(moduleBinFiles.values())
}

// add environment modules (`module` directive)
final modules = task.getConfig().getModule()
if( modules ) {
Expand Down Expand Up @@ -212,7 +218,7 @@ class TaskHasher {
@Memoized
List<Path> getTaskBinEntries(String script) {
List<Path> result = []
final tokenizer = new StringTokenizer(script, " \t\n\r\f()[]{};&|<>`")
final tokenizer = new StringTokenizer(script, TaskProcessor.SCRIPT_TOKEN_DELIMITERS)
while( tokenizer.hasMoreTokens() ) {
final token = tokenizer.nextToken()
final path = session.binEntries.get(token)
Expand Down
136 changes: 109 additions & 27 deletions modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import static nextflow.processor.ErrorStrategy.*

import java.nio.file.FileSystems
import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicIntegerArray
Expand Down Expand Up @@ -64,6 +65,7 @@ import nextflow.executor.Executor
import nextflow.executor.StoredTaskHandler
import nextflow.extension.CH
import nextflow.extension.DataflowHelper
import nextflow.extension.FilesEx
import nextflow.file.FileHelper
import nextflow.file.FileHolder
import nextflow.file.FilePorter
Expand Down Expand Up @@ -1568,16 +1570,114 @@ class TaskProcessor {
return meta?.isModule() ? meta.getModuleBundle() : null
}

@Memoized
/**
* @deprecated Bin dirs are no longer bind-mounted. Bin scripts are now staged
* as input files under {@link TaskRun#BIN_DIR}. Kept for backward compatibility
* with plugins (e.g. nf-k8s) that reference this method.
*/
@Deprecated
protected List<Path> getBinDirs() {
final result = new ArrayList(10)
// module bundle bin dir have priority, add before
final bundle = session.enableModuleBinaries() ? getModuleBundle() : null
if( bundle!=null )
result.addAll(bundle.getBinDirs())
// then add project bin dir
if( executor.binDir )
result.add(executor.binDir)
return Collections.<Path>emptyList()
}

/**
* Collect module bin files to be staged into the task work directory.
* These are scripts from the module's resources/bin/ directory that will
* be staged as input files under {@link TaskRun#BIN_DIR} and made available via PATH.
*
* @return A map of staging name to file path, e.g. {@code '.bin/myscript.sh' -> /path/to/file}
*/
@Memoized
Map<String,Path> getModuleBinFiles() {
if( NF.isModuleBinariesDisabled() )
return Collections.<String,Path>emptyMap()
final bundle = getModuleBundle()
if( bundle == null )
return Collections.<String,Path>emptyMap()
final rawFiles = bundle.getBinFiles()
if( rawFiles.isEmpty() )
return Collections.<String,Path>emptyMap()
final files = isLocalWorkDir() ? rawFiles : uploadBinFiles(rawFiles)
return prefixBinFiles(files)
}

/**
* Collect project-level bin files to be staged into the task work directory.
* These are scripts from the project's bin/ directory that will be staged
* as input files under {@link TaskRun#BIN_DIR} and made available via PATH.
*
* @return A map of staging name to file path, e.g. {@code '.bin/myscript.sh' -> /path/to/file}
*/
@Memoized
Map<String,Path> getProjectBinFiles() {
final entries = session.binEntries
if( !entries )
return Collections.<String,Path>emptyMap()
final files = isLocalWorkDir() ? entries : uploadBinFiles(entries)
return prefixBinFiles(files)
}

static final String SCRIPT_TOKEN_DELIMITERS = " \t\n\r\f()[]{};&|<>`"

/**
* Filter project bin files to only those referenced in the given script.
* Memoized so tokenization runs once per unique script source across all tasks in a process.
*/
@Memoized
Map<String,Path> getReferencedProjectBinFiles(String script) {
final allBinFiles = getProjectBinFiles()
if( !allBinFiles || !script )
return Collections.<String,Path>emptyMap()
final referenced = new LinkedHashMap<String,Path>(allBinFiles.size())
final tokenizer = new StringTokenizer(script, SCRIPT_TOKEN_DELIMITERS)
while( tokenizer.hasMoreTokens() ) {
final key = TaskRun.BIN_DIR + '/' + tokenizer.nextToken()
final path = allBinFiles.get(key)
if( path )
referenced.put(key, path)
}
return referenced
}

/**
* Prefix all keys in the given map with the bin staging directory name.
*/
private static Map<String,Path> prefixBinFiles(Map<String,Path> files) {
if( files.isEmpty() )
return Collections.<String,Path>emptyMap()
final prefix = TaskRun.BIN_DIR + '/'
final result = new LinkedHashMap<String,Path>(files.size())
for( Map.Entry<String,Path> e : files ) {
result.put(prefix + e.key, e.value)
}
return result
}

private static final ConcurrentHashMap<Path,Path> uploadedBinFiles = new ConcurrentHashMap<>()

@TestOnly
static void resetBinFileUploadCache() {
uploadedBinFiles.clear()
}

/**
* Upload bin files to cloud storage so they can be staged by cloud copy strategies.
* Files are uploaded to {@code {workDir}/.nextflow/bin/}. Uses a shared cache
* so that multiple processors sharing the same work directory upload each file only once.
*/
@PackageScope
Map<String,Path> uploadBinFiles(Map<String,Path> files) {
final stageDir = executor.workDir.resolve('.nextflow/bin')
FilesEx.mkdirs(stageDir)
final result = new LinkedHashMap<String,Path>(files.size())
for( Map.Entry<String,Path> e : files ) {
final target = stageDir.resolve(e.key)
final uploaded = uploadedBinFiles.putIfAbsent(e.value, target)
if( uploaded == null ) {
FileHelper.copyPath(e.value, target)
}
result.put(e.key, target)
}
return result
}

Expand All @@ -1604,24 +1704,6 @@ class TaskProcessor {
log.debug "Invalid 'session.config.env' object: ${session.config.env?.class?.name}"
}

// append the 'bin' folder to the task environment
List<Path> paths
if( isLocalWorkDir() && (paths=getBinDirs()) ) {
for( Path it : paths ) {
if( result.containsKey('PATH') ) {
// note: do not escape potential blanks in the bin path because the PATH
// variable is enclosed in `"` when in rendered in the launcher script -- see #630
result['PATH'] = "${result['PATH']}:${it}".toString()
}
else {
// note: append custom bin path *after* the system PATH
// to prevent unnecessary network round-trip for each command
// when the added path is a shared file system directory
result['PATH'] = "\$PATH:${it}".toString()
}
}
}

return Collections.unmodifiableMap(result)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@ class TaskRun implements Cloneable {
static final public String CMD_TRACE = '.command.trace'
static final public String CMD_ENV = '.command.env'

/**
* The directory name used for staging bin scripts in the task work directory
*/
static final public String BIN_DIR = '.bin'


String toString( ) {
"id: $id; name: $name; type: $type; exit: ${exitStatus==Integer.MAX_VALUE ? '-' : exitStatus}; error: $error; workDir: $workDir"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class ResourcesBundle {
}

final private static List<String> BIN_PATHS = ['bin','usr/bin','usr/local/bin']
final private static List<String> BIN_PREFIXES = BIN_PATHS.collect { it + '/' }

List<Path> getBinDirs() {
final result = new ArrayList<Path>(10)
Expand All @@ -213,4 +214,27 @@ class ResourcesBundle {
Collections.sort(result)
return result
}

/**
* Collect all executable files under bin directories in this bundle.
*
* @return A map of filename to file path, e.g. {@code 'myscript.sh' -> /path/to/resources/bin/myscript.sh}
*/
Map<String,Path> getBinFiles() {
final result = new LinkedHashMap<String,Path>(10)
for( Map.Entry<String,Path> it : content ) {
if( hasBinPrefix(it.key) && Files.isRegularFile(it.value) ) {
result.put(it.value.getFileName().toString(), it.value)
}
}
return result
}

private static boolean hasBinPrefix(String key) {
for( String pfx : BIN_PREFIXES ) {
if( key.startsWith(pfx) )
return true
}
return false
}
}
Loading
Loading