Skip to content

Add version to the Data Lineage metadata JSON serialization #6037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@
* Shape shape = gson.fromJson(json, Shape.class);
* }</pre>
*/
public final class RuntimeTypeAdapterFactory<T> implements TypeAdapterFactory {
public class RuntimeTypeAdapterFactory<T> implements TypeAdapterFactory {
private final Class<?> baseType;
private final String typeFieldName;
private final Map<String, Class<?>> labelToSubtype = new LinkedHashMap<>();
private final Map<Class<?>, String> subtypeToLabel = new LinkedHashMap<>();
private final boolean maintainType;
private boolean recognizeSubtypes;

private RuntimeTypeAdapterFactory(Class<?> baseType, String typeFieldName, boolean maintainType) {
protected RuntimeTypeAdapterFactory(Class<?> baseType, String typeFieldName, boolean maintainType) {
if (typeFieldName == null || baseType == null) {
throw new NullPointerException();
}
Expand Down
5 changes: 1 addition & 4 deletions modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ package nextflow.lineage

import com.google.common.annotations.Beta
import groovy.transform.CompileStatic
import nextflow.lineage.serde.LinSerializable
import nextflow.lineage.config.LineageConfig

import java.nio.file.Path

import nextflow.lineage.serde.LinSerializable
/**
* Interface for the lineage store
*
Expand Down
3 changes: 2 additions & 1 deletion modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import nextflow.lineage.model.TaskRun
import nextflow.lineage.model.WorkflowRun
import nextflow.lineage.serde.LinEncoder
import nextflow.lineage.serde.LinSerializable
import nextflow.lineage.serde.LinTypeAdapterFactory
import nextflow.serde.gson.GsonEncoder
/**
* Utils class for Lineage IDs.
Expand Down Expand Up @@ -258,7 +259,7 @@ class LinUtils {
return new GsonEncoder<Object>() {}
.withPrettyPrint(prettyPrint)
.withSerializeNulls(true)
.withTypeAdapterFactory(LinEncoder.newTypeAdapterFactory())
.withTypeAdapterFactory(new LinTypeAdapterFactory())
.encode(output)
}
}
Expand Down
26 changes: 26 additions & 0 deletions modules/nf-lineage/src/main/nextflow/lineage/model/LinModel.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.lineage.model

/**
* Marker interface holding lineage model common definitions
*
* @author Paolo Di Tommaso <[email protected]>
*/
interface LinModel {
static final public String VERSION = 'lineage/v1beta1'
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@
package nextflow.lineage.serde

import groovy.transform.CompileStatic
import nextflow.lineage.model.FileOutput
import nextflow.lineage.model.TaskOutput
import nextflow.lineage.model.TaskRun
import nextflow.lineage.model.Workflow
import nextflow.lineage.model.WorkflowOutput
import nextflow.lineage.model.WorkflowRun
import nextflow.serde.gson.GsonEncoder
import nextflow.serde.gson.RuntimeTypeAdapterFactory

/**
* Implements a JSON encoder for lineage model objects
Expand All @@ -35,19 +28,9 @@ import nextflow.serde.gson.RuntimeTypeAdapterFactory
class LinEncoder extends GsonEncoder<LinSerializable> {

LinEncoder() {
withTypeAdapterFactory(newTypeAdapterFactory())
withTypeAdapterFactory(new LinTypeAdapterFactory())
// enable rendering of null values
withSerializeNulls(true)
}

static RuntimeTypeAdapterFactory newTypeAdapterFactory(){
RuntimeTypeAdapterFactory.of(LinSerializable.class, "type")
.registerSubtype(WorkflowRun, WorkflowRun.simpleName)
.registerSubtype(WorkflowOutput, WorkflowOutput.simpleName)
.registerSubtype(Workflow, Workflow.simpleName)
.registerSubtype(TaskRun, TaskRun.simpleName)
.registerSubtype(TaskOutput, TaskOutput.simpleName)
.registerSubtype(FileOutput, FileOutput.simpleName)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nextflow.lineage.serde

import com.google.gson.Gson
import com.google.gson.JsonElement
import com.google.gson.JsonObject
import com.google.gson.JsonParseException
import com.google.gson.JsonParser
import com.google.gson.TypeAdapter
import com.google.gson.reflect.TypeToken
import com.google.gson.stream.JsonReader
import com.google.gson.stream.JsonWriter

import groovy.transform.CompileStatic
import nextflow.lineage.model.FileOutput
import nextflow.lineage.model.LinModel
import nextflow.lineage.model.TaskOutput
import nextflow.lineage.model.TaskRun
import nextflow.lineage.model.Workflow
import nextflow.lineage.model.WorkflowOutput
import nextflow.lineage.model.WorkflowRun
import nextflow.serde.gson.RuntimeTypeAdapterFactory

/**
* Class to serialize LiSerializable objects including the Lineage model version.
*
* @author Jorge Ejarque <[email protected]>
*/
@CompileStatic
class LinTypeAdapterFactory<T> extends RuntimeTypeAdapterFactory<T> {
public static final String VERSION_FIELD = 'version'
public static final String CURRENT_VERSION = LinModel.VERSION

LinTypeAdapterFactory() {
super(LinSerializable.class, "type", false)
this.registerSubtype(WorkflowRun, WorkflowRun.simpleName)
.registerSubtype(WorkflowOutput, WorkflowOutput.simpleName)
.registerSubtype(Workflow, Workflow.simpleName)
.registerSubtype(TaskRun, TaskRun.simpleName)
.registerSubtype(TaskOutput, TaskOutput.simpleName)
.registerSubtype(FileOutput, FileOutput.simpleName)

}

@Override
<R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) {
if (!LinSerializable.class.isAssignableFrom(type.rawType)) {
return null
}

def delegate = super.create(gson, type as TypeToken<T>)
if (delegate == null) {
return null
}

return new TypeAdapter<R>() {
@Override
void write(JsonWriter out, R value) throws IOException {
def json = delegate.toJsonTree(value)
if (json instanceof JsonObject) {
json = addVersion(json)
}
gson.toJson(json, out)
}

@Override
R read(JsonReader reader) throws IOException {
def json = JsonParser.parseReader(reader)
if (json instanceof JsonObject) {
def obj = (JsonObject) json
def versionEl = obj.get(VERSION_FIELD)
if (versionEl == null || versionEl.asString != CURRENT_VERSION) {
throw new JsonParseException("Invalid or missing version")
}
obj.remove(VERSION_FIELD)
}
return delegate.fromJsonTree(json)
}
}
}

private static JsonObject addVersion(JsonObject json){
if( json.has(VERSION_FIELD) )
throw new JsonParseException("object already defines a field named ${VERSION_FIELD}")

JsonObject clone = new JsonObject();
clone.addProperty(VERSION_FIELD, CURRENT_VERSION)
for (Map.Entry<String, JsonElement> e : json.entrySet()) {
clone.add(e.getKey(), e.getValue());
}
return clone
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,9 @@ class LinCommandImplTest extends Specification{
def expectedOutput = '''diff --git 12345 67890
--- 12345
+++ 67890
@@ -1,15 +1,15 @@
@@ -1,16 +1,16 @@
{
"version": "lineage/v1beta1",
"type": "FileOutput",
- "path": "path/to/file",
+ "path": "path/to/file2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package nextflow.lineage.fs

import nextflow.lineage.DefaultLinStore
import spock.lang.Shared

import java.nio.ByteBuffer
import java.nio.channels.NonWritableChannelException
import java.nio.file.AccessDeniedException
Expand All @@ -32,8 +29,9 @@ import java.nio.file.attribute.BasicFileAttributes

import nextflow.Global
import nextflow.Session
import nextflow.lineage.DefaultLinStore
import spock.lang.Shared
import spock.lang.Specification

/**
* LID File system provider tests
* @author Jorge Ejarque <[email protected]>
Expand Down Expand Up @@ -125,7 +123,7 @@ class LinFileSystemProviderTest extends Specification {
def output = data.resolve("output.txt")
output.text = "Hello, World!"
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"type":"FileOutput","path":"'+output.toString()+'"}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","type":"FileOutput","path":"'+output.toString()+'"}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -181,7 +179,7 @@ class LinFileSystemProviderTest extends Specification {
def config = [lineage:[store:[location:wdir.toString()]]]
def outputMeta = wdir.resolve("12345")
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"type":"WorkflowRun","sessionId":"session","name":"run_name","params":[{"type":"String","name":"param1","value":"value1"}]}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","type":"WorkflowRun","sessionId":"session","name":"run_name","params":[{"type":"String","name":"param1","value":"value1"}]}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -240,7 +238,7 @@ class LinFileSystemProviderTest extends Specification {
def output = data.resolve("output.txt")
output.text = "Hello, World!"
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"type":"FileOutput","path":"'+output.toString()+'"}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","type":"FileOutput","path":"'+output.toString()+'"}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -280,8 +278,8 @@ class LinFileSystemProviderTest extends Specification {
output1.resolve('file3.txt').text = 'file3'
wdir.resolve('12345/output1').mkdirs()
wdir.resolve('12345/output2').mkdirs()
wdir.resolve('12345/.data.json').text = '{"type":"TaskRun"}'
wdir.resolve('12345/output1/.data.json').text = '{"type":"FileOutput", "path": "' + output1.toString() + '"}'
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","type":"TaskRun"}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","type":"FileOutput", "path": "' + output1.toString() + '"}'

and:
def config = [lineage:[store:[location:wdir.toString()]]]
Expand Down Expand Up @@ -405,7 +403,7 @@ class LinFileSystemProviderTest extends Specification {
output.resolve('abc').text = 'file1'
output.resolve('.foo').text = 'file2'
wdir.resolve('12345/output').mkdirs()
wdir.resolve('12345/output/.data.json').text = '{"type":"FileOutput", "path": "' + output.toString() + '"}'
wdir.resolve('12345/output/.data.json').text = '{"version":"lineage/v1beta1","type":"FileOutput", "path": "' + output.toString() + '"}'
and:
def provider = new LinFileSystemProvider()
def lid1 = provider.getPath(LinPath.asUri('lid://12345/output/abc'))
Expand All @@ -425,7 +423,8 @@ class LinFileSystemProviderTest extends Specification {
def file = data.resolve('abc')
file.text = 'Hello'
wdir.resolve('12345/abc').mkdirs()
wdir.resolve('12345/abc/.data.json').text = '{"type":"FileOutput", "path": "' + file.toString() + '"}'
wdir.resolve('12345/abc/.data.json').text = '{"version":"lineage/v1beta1","type":"FileOutput", "path":"' + file.toString() + '"}'
and:
Global.session = Mock(Session) { getConfig()>>config }
and:
def provider = new LinFileSystemProvider()
Expand Down
23 changes: 10 additions & 13 deletions modules/nf-lineage/src/test/nextflow/lineage/fs/LinPathTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,25 @@

package nextflow.lineage.fs

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.ProviderMismatchException
import java.time.OffsetDateTime

import nextflow.lineage.LinUtils
import nextflow.lineage.model.Checksum
import nextflow.lineage.model.FileOutput
import nextflow.lineage.model.Parameter
import nextflow.lineage.model.Workflow
import nextflow.lineage.model.WorkflowOutput
import nextflow.lineage.model.FileOutput
import nextflow.lineage.model.WorkflowRun
import nextflow.lineage.serde.LinEncoder
import nextflow.util.CacheHelper
import org.junit.Rule
import test.OutputCapture

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.ProviderMismatchException

import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll

import java.time.OffsetDateTime

import test.OutputCapture
/**
* LID Path Tests
* @author Jorge Ejarque <[email protected]>
Expand Down Expand Up @@ -164,9 +161,9 @@ class LinPathTest extends Specification {

wdir.resolve('12345/output1').mkdirs()
wdir.resolve('12345/path/to/file2.txt').mkdirs()
wdir.resolve('12345/.data.json').text = '{"type":"TaskRun"}'
wdir.resolve('12345/output1/.data.json').text = '{"type":"FileOutput", "path": "' + outputFolder.toString() + '"}'
wdir.resolve('12345/path/to/file2.txt/.data.json').text = '{"type":"FileOutput", "path": "' + outputFile.toString() + '"}'
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","type":"TaskRun"}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","type":"FileOutput", "path": "' + outputFolder.toString() + '"}'
wdir.resolve('12345/path/to/file2.txt/.data.json').text = '{"version":"lineage/v1beta1","type":"FileOutput", "path": "' + outputFile.toString() + '"}'
def time = OffsetDateTime.now()
def wfResultsMetadata = new LinEncoder().withPrettyPrint(true).encode(new WorkflowOutput(time, "lid://1234", [new Parameter( "Path", "a", "lid://1234/a.txt")]))
wdir.resolve('5678/').mkdirs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

package nextflow.lineage.serde

import java.time.OffsetDateTime

import nextflow.lineage.model.Checksum
import nextflow.lineage.model.DataPath
import nextflow.lineage.model.Parameter
import nextflow.lineage.model.FileOutput
import nextflow.lineage.model.Parameter
import nextflow.lineage.model.TaskOutput
import nextflow.lineage.model.TaskRun
import nextflow.lineage.model.Workflow
import nextflow.lineage.model.WorkflowOutput
import nextflow.lineage.model.WorkflowRun
import spock.lang.Specification

import java.time.OffsetDateTime

class LinEncoderTest extends Specification{

def 'should encode and decode Outputs'(){
Expand Down Expand Up @@ -163,7 +163,7 @@ class LinEncoderTest extends Specification{
def encoded = encoder.encode(wfResults)
def object = encoder.decode(encoded)
then:
encoded == '{"type":"WorkflowOutput","createdAt":null,"workflowRun":"lid://1234","output":null}'
encoded == '{"version":"lineage/v1beta1","type":"WorkflowOutput","createdAt":null,"workflowRun":"lid://1234","output":null}'
def result = object as WorkflowOutput
result.createdAt == null

Expand Down