diff --git a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy index f7a306f616..745dbab4b8 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy @@ -21,6 +21,8 @@ import java.nio.file.Path import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.extension.FilesEx + +import static nextflow.lineage.fs.LinPath.* /** * File to store a history of the workflow executions and their corresponding LIDs * @@ -39,9 +41,10 @@ class DefaultLinHistoryLog implements LinHistoryLog { } void write(String name, UUID key, String runLid, Date date = null) { + assert name assert key def timestamp = date ?: new Date() - final recordFile = path.resolve(key.toString()) + final recordFile = path.resolve(runLid.substring(LID_PROT.size())) try { recordFile.text = new LinHistoryRecord(timestamp, name, key, runLid).toString() log.trace("Record for $key written in lineage history log ${FilesEx.toUriString(this.path)}") @@ -50,18 +53,6 @@ class DefaultLinHistoryLog implements LinHistoryLog { } } - void updateRunLid(UUID id, String runLid) { - assert id - final recordFile = path.resolve(id.toString()) - try { - def current = LinHistoryRecord.parse(path.resolve(id.toString()).text) - recordFile.text = new LinHistoryRecord(current.timestamp, current.runName, id, runLid).toString() - } - catch (Throwable e) { - log.warn("Can't read session $id file: ${FilesEx.toUriString(recordFile)}", e.message) - } - } - List getRecords(){ List list = new LinkedList() try { @@ -73,15 +64,4 @@ class DefaultLinHistoryLog implements LinHistoryLog { return list.sort {it.timestamp } } - LinHistoryRecord getRecord(UUID id) { - assert id - final recordFile = path.resolve(id.toString()) - try { - return LinHistoryRecord.parse(recordFile.text) - } catch( Throwable e ) { - log.warn("Can't find session $id in file: ${FilesEx.toUriString(recordFile)}", e.message) - return null - } - } - } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy index d95a110c60..33c31da30f 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy @@ -30,14 +30,6 @@ interface LinHistoryLog { */ void write(String name, UUID sessionId, String runLid) - /** - * Updates the run LID for a given session ID. - * - * @param sessionId Workflow session ID. - * @param runLid Workflow run Lineage ID. - */ - void updateRunLid(UUID sessionId, String runLid) - /** * Get the store records in the Lineage History Log. * @@ -45,11 +37,4 @@ interface LinHistoryLog { */ List getRecords() - /** - * Get the record for a given - * @param sessionId Workflow session ID. - * @return LinHistoryRecord for the given ID. - */ - LinHistoryRecord getRecord(UUID sessionId) - } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy index 8491c044e3..ab46418c39 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy @@ -96,11 +96,6 @@ class LinObserver implements TraceObserverV2 { this.store = store } - @Override - void onFlowCreate(Session session) { - this.store.getHistoryLog().write(session.runName, session.uniqueId, '-') - } - @TestOnly String getExecutionHash(){ executionHash } @@ -120,7 +115,7 @@ class LinObserver implements TraceObserverV2 { executionUri, new LinkedList() ) - this.store.getHistoryLog().updateRunLid(session.uniqueId, executionUri) + this.store.getHistoryLog().write(session.runName, session.uniqueId, executionUri) } @Override @@ -152,7 +147,7 @@ class LinObserver implements TraceObserverV2 { final dataPath = new DataPath(normalizer.normalizePath(it.normalize()), Checksum.ofNextflow(it.text)) result.add(dataPath) } - return result + return result.sort{it.path} } protected String storeWorkflowRun(PathNormalizer normalizer) { diff --git a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy index bde00b0595..24741f96ad 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy @@ -59,65 +59,6 @@ class DefaultLinHistoryLogTest extends Specification { parsedRecord.runLid == runLid } - def "should return correct record for existing session"() { - given: - UUID sessionId = UUID.randomUUID() - String runName = "Run1" - String runLid = "lid://123" - - and: - linHistoryLog.write(runName, sessionId, runLid) - - when: - def record = linHistoryLog.getRecord(sessionId) - then: - record.sessionId == sessionId - record.runName == runName - record.runLid == runLid - } - - def "should return null and warn if session does not exist"() { - expect: - linHistoryLog.getRecord(UUID.randomUUID()) == null - } - - def "update should modify existing Lid for given session"() { - given: - UUID sessionId = UUID.randomUUID() - String runName = "Run1" - String runLidUpdated = "run-lid-updated" - - and: - linHistoryLog.write(runName, sessionId, 'run-lid-initial') - - when: - linHistoryLog.updateRunLid(sessionId, runLidUpdated) - - then: - def files = historyFile.listFiles() - files.size() == 1 - def parsedRecord = LinHistoryRecord.parse(files[0].text) - parsedRecord.runLid == runLidUpdated - } - - def "update should do nothing if session does not exist"() { - given: - UUID existingSessionId = UUID.randomUUID() - UUID nonExistingSessionId = UUID.randomUUID() - String runName = "Run1" - String runLid = "lid://123" - and: - linHistoryLog.write(runName, existingSessionId, runLid) - - when: - linHistoryLog.updateRunLid(nonExistingSessionId, "new-lid") - then: - def files = historyFile.listFiles() - files.size() == 1 - def parsedRecord = LinHistoryRecord.parse(files[0].text) - parsedRecord.runLid == runLid - } - def 'should get records' () { given: UUID sessionId = UUID.randomUUID() diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy index 4ecea8b6ae..e9e261250e 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy @@ -517,8 +517,10 @@ class LinObserverTest extends Specification { observer.onFlowCreate(session) observer.onFlowBegin() then: 'History file should contain execution hash' - def lid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size()) - lid == observer.executionHash + def lid = LinHistoryRecord.parse(folder.resolve(".history/${observer.executionHash}").text) + lid.runLid == asUriString(observer.executionHash) + lid.sessionId == uniqueId + lid.runName == "test_run" when: ' publish output with source file' def outFile1 = outputDir.resolve('foo/file.bam') @@ -554,9 +556,8 @@ class LinObserverTest extends Specification { when: 'Workflow complete' observer.onFlowComplete() - then: 'Check history file is updated and Workflow Result is written in the lid store' - def finalLid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size()) - def resultsRetrieved = store.load("${finalLid}#output") as WorkflowOutput + then: 'Check WorkflowOutput is written in the lid store' + def resultsRetrieved = store.load("${observer.executionHash}#output") as WorkflowOutput resultsRetrieved.output == [new Parameter(Path.simpleName, "a", "lid://${observer.executionHash}/foo/file.bam"), new Parameter(Path.simpleName, "b", "lid://${observer.executionHash}/foo/file2.bam")] cleanup: @@ -596,8 +597,7 @@ class LinObserverTest extends Specification { observer.onFlowCreate(session) observer.onFlowBegin() observer.onFlowComplete() - def finalLid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size()) - def resultFile = folder.resolve("${finalLid}#output") + def resultFile = folder.resolve("${observer.executionHash}#output") then: !resultFile.exists() }