Skip to content

Data Lineage quick fixes #6045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.extension.FilesEx

import static nextflow.lineage.fs.LinPath.*
/**
* File to store a history of the workflow executions and their corresponding LIDs
*
Expand All @@ -39,9 +41,10 @@ class DefaultLinHistoryLog implements LinHistoryLog {
}

void write(String name, UUID key, String runLid, Date date = null) {
assert name
assert key
def timestamp = date ?: new Date()
final recordFile = path.resolve(key.toString())
final recordFile = path.resolve(runLid.substring(LID_PROT.size()))
try {
recordFile.text = new LinHistoryRecord(timestamp, name, key, runLid).toString()
log.trace("Record for $key written in lineage history log ${FilesEx.toUriString(this.path)}")
Expand All @@ -50,18 +53,6 @@ class DefaultLinHistoryLog implements LinHistoryLog {
}
}

void updateRunLid(UUID id, String runLid) {
assert id
final recordFile = path.resolve(id.toString())
try {
def current = LinHistoryRecord.parse(path.resolve(id.toString()).text)
recordFile.text = new LinHistoryRecord(current.timestamp, current.runName, id, runLid).toString()
}
catch (Throwable e) {
log.warn("Can't read session $id file: ${FilesEx.toUriString(recordFile)}", e.message)
}
}

List<LinHistoryRecord> getRecords(){
List<LinHistoryRecord> list = new LinkedList<LinHistoryRecord>()
try {
Expand All @@ -73,15 +64,4 @@ class DefaultLinHistoryLog implements LinHistoryLog {
return list.sort {it.timestamp }
}

LinHistoryRecord getRecord(UUID id) {
assert id
final recordFile = path.resolve(id.toString())
try {
return LinHistoryRecord.parse(recordFile.text)
} catch( Throwable e ) {
log.warn("Can't find session $id in file: ${FilesEx.toUriString(recordFile)}", e.message)
return null
}
}

}
15 changes: 0 additions & 15 deletions modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,11 @@ interface LinHistoryLog {
*/
void write(String name, UUID sessionId, String runLid)

/**
* Updates the run LID for a given session ID.
*
* @param sessionId Workflow session ID.
* @param runLid Workflow run Lineage ID.
*/
void updateRunLid(UUID sessionId, String runLid)

/**
* Get the store records in the Lineage History Log.
*
* @return List of stored lineage history records.
*/
List<LinHistoryRecord> getRecords()

/**
* Get the record for a given
* @param sessionId Workflow session ID.
* @return LinHistoryRecord for the given ID.
*/
LinHistoryRecord getRecord(UUID sessionId)

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ class LinObserver implements TraceObserverV2 {
this.store = store
}

@Override
void onFlowCreate(Session session) {
this.store.getHistoryLog().write(session.runName, session.uniqueId, '-')
}

@TestOnly
String getExecutionHash(){ executionHash }

Expand All @@ -120,7 +115,7 @@ class LinObserver implements TraceObserverV2 {
executionUri,
new LinkedList<Parameter>()
)
this.store.getHistoryLog().updateRunLid(session.uniqueId, executionUri)
this.store.getHistoryLog().write(session.runName, session.uniqueId, executionUri)
}

@Override
Expand Down Expand Up @@ -152,7 +147,7 @@ class LinObserver implements TraceObserverV2 {
final dataPath = new DataPath(normalizer.normalizePath(it.normalize()), Checksum.ofNextflow(it.text))
result.add(dataPath)
}
return result
return result.sort{it.path}
}

protected String storeWorkflowRun(PathNormalizer normalizer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,65 +59,6 @@ class DefaultLinHistoryLogTest extends Specification {
parsedRecord.runLid == runLid
}

def "should return correct record for existing session"() {
given:
UUID sessionId = UUID.randomUUID()
String runName = "Run1"
String runLid = "lid://123"

and:
linHistoryLog.write(runName, sessionId, runLid)

when:
def record = linHistoryLog.getRecord(sessionId)
then:
record.sessionId == sessionId
record.runName == runName
record.runLid == runLid
}

def "should return null and warn if session does not exist"() {
expect:
linHistoryLog.getRecord(UUID.randomUUID()) == null
}

def "update should modify existing Lid for given session"() {
given:
UUID sessionId = UUID.randomUUID()
String runName = "Run1"
String runLidUpdated = "run-lid-updated"

and:
linHistoryLog.write(runName, sessionId, 'run-lid-initial')

when:
linHistoryLog.updateRunLid(sessionId, runLidUpdated)

then:
def files = historyFile.listFiles()
files.size() == 1
def parsedRecord = LinHistoryRecord.parse(files[0].text)
parsedRecord.runLid == runLidUpdated
}

def "update should do nothing if session does not exist"() {
given:
UUID existingSessionId = UUID.randomUUID()
UUID nonExistingSessionId = UUID.randomUUID()
String runName = "Run1"
String runLid = "lid://123"
and:
linHistoryLog.write(runName, existingSessionId, runLid)

when:
linHistoryLog.updateRunLid(nonExistingSessionId, "new-lid")
then:
def files = historyFile.listFiles()
files.size() == 1
def parsedRecord = LinHistoryRecord.parse(files[0].text)
parsedRecord.runLid == runLid
}

def 'should get records' () {
given:
UUID sessionId = UUID.randomUUID()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,10 @@ class LinObserverTest extends Specification {
observer.onFlowCreate(session)
observer.onFlowBegin()
then: 'History file should contain execution hash'
def lid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size())
lid == observer.executionHash
def lid = LinHistoryRecord.parse(folder.resolve(".history/${observer.executionHash}").text)
lid.runLid == asUriString(observer.executionHash)
lid.sessionId == uniqueId
lid.runName == "test_run"

when: ' publish output with source file'
def outFile1 = outputDir.resolve('foo/file.bam')
Expand Down Expand Up @@ -554,9 +556,8 @@ class LinObserverTest extends Specification {

when: 'Workflow complete'
observer.onFlowComplete()
then: 'Check history file is updated and Workflow Result is written in the lid store'
def finalLid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size())
def resultsRetrieved = store.load("${finalLid}#output") as WorkflowOutput
then: 'Check WorkflowOutput is written in the lid store'
def resultsRetrieved = store.load("${observer.executionHash}#output") as WorkflowOutput
resultsRetrieved.output == [new Parameter(Path.simpleName, "a", "lid://${observer.executionHash}/foo/file.bam"), new Parameter(Path.simpleName, "b", "lid://${observer.executionHash}/foo/file2.bam")]

cleanup:
Expand Down Expand Up @@ -596,8 +597,7 @@ class LinObserverTest extends Specification {
observer.onFlowCreate(session)
observer.onFlowBegin()
observer.onFlowComplete()
def finalLid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size())
def resultFile = folder.resolve("${finalLid}#output")
def resultFile = folder.resolve("${observer.executionHash}#output")
then:
!resultFile.exists()
}
Expand Down