Skip to content

Commit f3cee76

Browse files
jorgeebentsherman
andauthored
Data Lineage quick fixes (#6045) [ci fast]
Signed-off-by: jorgee <[email protected]> Co-authored-by: Ben Sherman <[email protected]>
1 parent 7e931e8 commit f3cee76

File tree

5 files changed

+13
-112
lines changed

5 files changed

+13
-112
lines changed

modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy

+4-24
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.nio.file.Path
2121
import groovy.transform.CompileStatic
2222
import groovy.util.logging.Slf4j
2323
import nextflow.extension.FilesEx
24+
25+
import static nextflow.lineage.fs.LinPath.*
2426
/**
2527
* File to store a history of the workflow executions and their corresponding LIDs
2628
*
@@ -39,9 +41,10 @@ class DefaultLinHistoryLog implements LinHistoryLog {
3941
}
4042

4143
void write(String name, UUID key, String runLid, Date date = null) {
44+
assert name
4245
assert key
4346
def timestamp = date ?: new Date()
44-
final recordFile = path.resolve(key.toString())
47+
final recordFile = path.resolve(runLid.substring(LID_PROT.size()))
4548
try {
4649
recordFile.text = new LinHistoryRecord(timestamp, name, key, runLid).toString()
4750
log.trace("Record for $key written in lineage history log ${FilesEx.toUriString(this.path)}")
@@ -50,18 +53,6 @@ class DefaultLinHistoryLog implements LinHistoryLog {
5053
}
5154
}
5255

53-
void updateRunLid(UUID id, String runLid) {
54-
assert id
55-
final recordFile = path.resolve(id.toString())
56-
try {
57-
def current = LinHistoryRecord.parse(path.resolve(id.toString()).text)
58-
recordFile.text = new LinHistoryRecord(current.timestamp, current.runName, id, runLid).toString()
59-
}
60-
catch (Throwable e) {
61-
log.warn("Can't read session $id file: ${FilesEx.toUriString(recordFile)}", e.message)
62-
}
63-
}
64-
6556
List<LinHistoryRecord> getRecords(){
6657
List<LinHistoryRecord> list = new LinkedList<LinHistoryRecord>()
6758
try {
@@ -73,15 +64,4 @@ class DefaultLinHistoryLog implements LinHistoryLog {
7364
return list.sort {it.timestamp }
7465
}
7566

76-
LinHistoryRecord getRecord(UUID id) {
77-
assert id
78-
final recordFile = path.resolve(id.toString())
79-
try {
80-
return LinHistoryRecord.parse(recordFile.text)
81-
} catch( Throwable e ) {
82-
log.warn("Can't find session $id in file: ${FilesEx.toUriString(recordFile)}", e.message)
83-
return null
84-
}
85-
}
86-
8767
}

modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy

-15
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,11 @@ interface LinHistoryLog {
3030
*/
3131
void write(String name, UUID sessionId, String runLid)
3232

33-
/**
34-
* Updates the run LID for a given session ID.
35-
*
36-
* @param sessionId Workflow session ID.
37-
* @param runLid Workflow run Lineage ID.
38-
*/
39-
void updateRunLid(UUID sessionId, String runLid)
40-
4133
/**
4234
* Get the store records in the Lineage History Log.
4335
*
4436
* @return List of stored lineage history records.
4537
*/
4638
List<LinHistoryRecord> getRecords()
4739

48-
/**
49-
* Get the record for a given
50-
* @param sessionId Workflow session ID.
51-
* @return LinHistoryRecord for the given ID.
52-
*/
53-
LinHistoryRecord getRecord(UUID sessionId)
54-
5540
}

modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy

+2-7
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,6 @@ class LinObserver implements TraceObserverV2 {
9696
this.store = store
9797
}
9898

99-
@Override
100-
void onFlowCreate(Session session) {
101-
this.store.getHistoryLog().write(session.runName, session.uniqueId, '-')
102-
}
103-
10499
@TestOnly
105100
String getExecutionHash(){ executionHash }
106101

@@ -120,7 +115,7 @@ class LinObserver implements TraceObserverV2 {
120115
executionUri,
121116
new LinkedList<Parameter>()
122117
)
123-
this.store.getHistoryLog().updateRunLid(session.uniqueId, executionUri)
118+
this.store.getHistoryLog().write(session.runName, session.uniqueId, executionUri)
124119
}
125120

126121
@Override
@@ -152,7 +147,7 @@ class LinObserver implements TraceObserverV2 {
152147
final dataPath = new DataPath(normalizer.normalizePath(it.normalize()), Checksum.ofNextflow(it.text))
153148
result.add(dataPath)
154149
}
155-
return result
150+
return result.sort{it.path}
156151
}
157152

158153
protected String storeWorkflowRun(PathNormalizer normalizer) {

modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy

-59
Original file line numberDiff line numberDiff line change
@@ -59,65 +59,6 @@ class DefaultLinHistoryLogTest extends Specification {
5959
parsedRecord.runLid == runLid
6060
}
6161

62-
def "should return correct record for existing session"() {
63-
given:
64-
UUID sessionId = UUID.randomUUID()
65-
String runName = "Run1"
66-
String runLid = "lid://123"
67-
68-
and:
69-
linHistoryLog.write(runName, sessionId, runLid)
70-
71-
when:
72-
def record = linHistoryLog.getRecord(sessionId)
73-
then:
74-
record.sessionId == sessionId
75-
record.runName == runName
76-
record.runLid == runLid
77-
}
78-
79-
def "should return null and warn if session does not exist"() {
80-
expect:
81-
linHistoryLog.getRecord(UUID.randomUUID()) == null
82-
}
83-
84-
def "update should modify existing Lid for given session"() {
85-
given:
86-
UUID sessionId = UUID.randomUUID()
87-
String runName = "Run1"
88-
String runLidUpdated = "run-lid-updated"
89-
90-
and:
91-
linHistoryLog.write(runName, sessionId, 'run-lid-initial')
92-
93-
when:
94-
linHistoryLog.updateRunLid(sessionId, runLidUpdated)
95-
96-
then:
97-
def files = historyFile.listFiles()
98-
files.size() == 1
99-
def parsedRecord = LinHistoryRecord.parse(files[0].text)
100-
parsedRecord.runLid == runLidUpdated
101-
}
102-
103-
def "update should do nothing if session does not exist"() {
104-
given:
105-
UUID existingSessionId = UUID.randomUUID()
106-
UUID nonExistingSessionId = UUID.randomUUID()
107-
String runName = "Run1"
108-
String runLid = "lid://123"
109-
and:
110-
linHistoryLog.write(runName, existingSessionId, runLid)
111-
112-
when:
113-
linHistoryLog.updateRunLid(nonExistingSessionId, "new-lid")
114-
then:
115-
def files = historyFile.listFiles()
116-
files.size() == 1
117-
def parsedRecord = LinHistoryRecord.parse(files[0].text)
118-
parsedRecord.runLid == runLid
119-
}
120-
12162
def 'should get records' () {
12263
given:
12364
UUID sessionId = UUID.randomUUID()

modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy

+7-7
Original file line numberDiff line numberDiff line change
@@ -517,8 +517,10 @@ class LinObserverTest extends Specification {
517517
observer.onFlowCreate(session)
518518
observer.onFlowBegin()
519519
then: 'History file should contain execution hash'
520-
def lid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size())
521-
lid == observer.executionHash
520+
def lid = LinHistoryRecord.parse(folder.resolve(".history/${observer.executionHash}").text)
521+
lid.runLid == asUriString(observer.executionHash)
522+
lid.sessionId == uniqueId
523+
lid.runName == "test_run"
522524

523525
when: ' publish output with source file'
524526
def outFile1 = outputDir.resolve('foo/file.bam')
@@ -554,9 +556,8 @@ class LinObserverTest extends Specification {
554556

555557
when: 'Workflow complete'
556558
observer.onFlowComplete()
557-
then: 'Check history file is updated and Workflow Result is written in the lid store'
558-
def finalLid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size())
559-
def resultsRetrieved = store.load("${finalLid}#output") as WorkflowOutput
559+
then: 'Check WorkflowOutput is written in the lid store'
560+
def resultsRetrieved = store.load("${observer.executionHash}#output") as WorkflowOutput
560561
resultsRetrieved.output == [new Parameter(Path.simpleName, "a", "lid://${observer.executionHash}/foo/file.bam"), new Parameter(Path.simpleName, "b", "lid://${observer.executionHash}/foo/file2.bam")]
561562

562563
cleanup:
@@ -596,8 +597,7 @@ class LinObserverTest extends Specification {
596597
observer.onFlowCreate(session)
597598
observer.onFlowBegin()
598599
observer.onFlowComplete()
599-
def finalLid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size())
600-
def resultFile = folder.resolve("${finalLid}#output")
600+
def resultFile = folder.resolve("${observer.executionHash}#output")
601601
then:
602602
!resultFile.exists()
603603
}

0 commit comments

Comments
 (0)