Skip to content

Data lineage programmatic API #6003

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 24 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bbc21d8
add fromLineage channel factory
jorgee Apr 24, 2025
fbdbeef
include query factory
jorgee Apr 24, 2025
3aa4f0f
add published files in output, support queries in fromPath
jorgee Apr 25, 2025
cdd9e89
rename fromLinageQuery to queryLineage
jorgee Apr 25, 2025
62a2cfa
lineage API refactor and remove other implementations
jorgee Apr 29, 2025
3e37728
Correct lineage function comment
jorgee Apr 29, 2025
ba309cd
Merge branch 'master' into lineage-factory
jorgee Apr 29, 2025
6abae3c
Convert lineage from operator to function and add documentation
jorgee Apr 29, 2025
00f7f2e
Merge branch 'master' into lineage-factory
jorgee Apr 30, 2025
584c4ee
remove que query in view and fromPath
jorgee Apr 30, 2025
4974bee
improve error message [ci fast]
jorgee Apr 30, 2025
b5a2671
Merge branch 'master' into lineage-factory
bentsherman Apr 30, 2025
117ef30
Update docs
bentsherman Apr 30, 2025
321c02a
cleanup
bentsherman Apr 30, 2025
224d92d
change queryLineage to return file outputs
jorgee May 2, 2025
9136ccc
update docs
jorgee May 2, 2025
6bf2c75
Merge branch 'master' into lineage-factory
jorgee May 2, 2025
689cdcc
fixes from merge
jorgee May 2, 2025
9d6b77a
fix LinPath getFileName bug
jorgee May 2, 2025
e77a9c0
cleanup
bentsherman May 2, 2025
064349a
Update migration notes
bentsherman May 2, 2025
251c06e
Rename queryLineage -> fromLineage
bentsherman May 2, 2025
8b4ca1e
Rnemae "metadata object" -> "lineage record"
bentsherman May 2, 2025
880461c
Merge branch 'master' into lineage-factory
bentsherman May 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/migrations/25-04.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ The third preview of workflow outputs introduces the following breaking changes

See {ref}`workflow-output-def` to learn more about the workflow output definition.

<h3>Data lineage</h3>

This release introduces built-in provenance tracking, also known as *data lineage*. When `lineage.enabled` is set to `true` in your configuration, Nextflow will record every workflow run, task execution, output file, and the links between them.

You can explore this lineage from the command line using the {ref}`cli-lineage` command. Additionally, you can refer to files in the lineage store from a Nextflow script using the `lid://` path prefix as well as the {ref}`channel-from-lineage` channel factory.

See the {ref}`cli-lineage` command and {ref}`config-lineage` config scope for details.

## Enhancements

<h3>Improved <code>inspect</code> command</h3>
Expand Down
31 changes: 31 additions & 0 deletions docs/reference/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,37 @@ But when more than one argument is provided, they are always managed as *single*
channel.from( [1, 2], [5,6], [7,9] )
```

(channel-from-lineage)=

## fromLineage

:::{versionadded} 25.04.0
:::

:::{warning} *Experimental: may change in a future release.*
:::

The `channel.fromLineage` factory creates a channel that emits files from the {ref}`cli-lineage` store that match the given key-value params:

```nextflow
channel
.fromLineage(workflowRun: 'lid://0d1d1622ced3e4edc690bec768919b45', labels: ['alpha', 'beta'])
.view()
```

The above snippet emits files published by the given workflow run that are labeled as `alpha` and `beta`.

Available options:

`labels`
: List of labels associated with the desired files.

`taskRun`
: LID of the task run that produced the desired files.

`workflowRun`
: LID of the workflow run that produced the desired files.

(channel-fromlist)=

## fromList
Expand Down
20 changes: 12 additions & 8 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ $ nextflow lineage SUBCOMMAND [arg ..]

**Description**

The `lineage` command is used to inspect lineage metadata.
The `lineage` command is used to inspect lineage metadata. Data lineage can be enabled by setting `lineage.enabled` to `true` in your Nextflow configuration (see the {ref}`config-lineage` config scope for details).

**Options**

Expand All @@ -720,31 +720,35 @@ TIMESTAMP RUN NAME SESSION ID
2025-04-22 14:45:43 backstabbing_heyrovsky 21bc4fad-e8b8-447d-9410-388f926a711f lid://c914d714877cc5c882c55a5428b510b1
```

View a metadata description.
View a lineage record.

```console
$ nextflow lineage view <lid>
```

View a metadata description fragment. A fragment can be a property of a metadata description (e.g., `output` or `params`) or a set of nested properties separated by a `.` (e.g., `workflow.repository`).
The output of a workflow run can be shown by appending `#output` to the workflow run LID:

```console
$ nextflow lineage view <lid#fragment>
$ nextflow lineage view lid://c914d714877cc5c882c55a5428b510b1#output
```

Find a specific metadata description that matches a URL-like query string. The query string consists of `key=value` statements separated by `&`, where keys are defined similarly to the `fragments` used in the `view` command.
:::{tip}
You can use the [jq](https://jqlang.org/) command-line tool to apply further queries and transformations on the resulting lineage record.
:::

Find all lineage records that match a set of key-value pairs:

```console
$ nextflow lineage find "<query-string>"
$ nextflow lineage find <key-1>=<value-1> <key-2>=<value-2> ...
```

Display a git-style diff between two metadata descriptions.
Display a git-style diff between two lineage records.

```console
$ nextflow lineage diff <lid-1> <lid-2>
```

Render the lineage graph for a workflow or task output in an HTML file. (default file path: `./lineage.html`).
Render the lineage graph for a workflow or task output as an HTML file. (default file path: `./lineage.html`).

```console
$ nextflow lineage render <lid> [html-file-path]
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ See the {ref}`k8s-page` page for more details.

## `lineage`

The `lineage` scope controls the generation of lineage metadata.
The `lineage` scope controls the generation of {ref}`cli-lineage` metadata.

The following settings are available:

Expand Down
21 changes: 21 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ import nextflow.datasource.SraExplorer
import nextflow.exception.AbortOperationException
import nextflow.extension.CH
import nextflow.extension.GroupTupleOp
import nextflow.extension.LinExtension
import nextflow.extension.MapOp
import nextflow.file.DirListener
import nextflow.file.DirWatcher
import nextflow.file.DirWatcherV2
import nextflow.file.FileHelper
import nextflow.file.FilePatternSplitter
import nextflow.file.PathVisitor
import nextflow.plugin.Plugins
import nextflow.plugin.extension.PluginExtensionProvider
import nextflow.util.Duration
import nextflow.util.TestOnly
Expand Down Expand Up @@ -657,4 +659,23 @@ class Channel {
fromPath0Future = future.exceptionally(Channel.&handlerException)
}

static DataflowWriteChannel fromLineage(Map<String,?> params) {
checkParams('fromLineage', params, LinExtension.PARAMS)
final result = CH.create()
if( NF.isDsl2() ) {
session.addIgniter { fromLineage0(result, params) }
}
else {
fromLineage0(result, params )
}
return result
}

private static void fromLineage0(DataflowWriteChannel channel, Map<String,?> params) {
final linExt = Plugins.getExtension(LinExtension)
if( !linExt )
throw new IllegalStateException("Unable to load lineage extensions.")
final future = CompletableFuture.runAsync(() -> linExt.fromLineage(session, channel, params))
future.exceptionally(this.&handlerException)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Session

/**
* Interface for nf-lineage extensions.
*
* @author Jorge Ejarque <jorge.ejarque@seqera.io
*/
interface LinExtension {

static final Map PARAMS = [
labels: List,
taskRun: [String,GString],
workflowRun: [String,GString],
]

/**
* Query Lineage metadata to get files produced by tasks, workflows or annotations.
*
* @param session Nextflow Session
* @param channel Channel to publish the Lineage Ids matching the query params
* @param params Parameters for the lineage metadata query
*/
abstract void fromLineage(Session session, DataflowWriteChannel channel, Map<String,?> params)
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class CmdLineageTest extends Specification {

then:
stdout.size() == 1
stdout[0] == "Error loading lid://12345 - Lineage object 12345 not found"
stdout[0] == "Error loading lid://12345 - Lineage record 12345 not found"

cleanup:
folder?.deleteDir()
Expand Down Expand Up @@ -280,45 +280,10 @@ class CmdLineageTest extends Specification {
def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, null)
def jsonSer = encoder.encode(entry)
def expectedOutput = jsonSer
lidFile.text = jsonSer
when:
def lidCmd = new CmdLineage(launcher: launcher, args: ["view", "lid:///?type=FileOutput"])
lidCmd.run()
def stdout = capture
.toString()
.readLines()// remove the log part
.findResults { line -> !line.contains('DEBUG') ? line : null }
.findResults { line -> !line.contains('INFO') ? line : null }
.findResults { line -> !line.contains('plugin') ? line : null }

then:
stdout.size() == expectedOutput.readLines().size()
stdout.join('\n') == expectedOutput

cleanup:
folder?.deleteDir()
}

def 'should show query results'(){
given:
def folder = Files.createTempDirectory('test').toAbsolutePath()
def configFile = folder.resolve('nextflow.config')
configFile.text = "lineage.enabled = true\nlineage.store.location = '$folder'".toString()
def lidFile = folder.resolve("12345/.data.json")
Files.createDirectories(lidFile.parent)
def launcher = Mock(Launcher){
getOptions() >> new CliOptions(config: [configFile.toString()])
}
def encoder = new LinEncoder().withPrettyPrint(true)
def time = OffsetDateTime.now()
def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, null)
def jsonSer = encoder.encode(entry)
def expectedOutput = jsonSer
def expectedOutput = '[\n "lid://12345"\n]'
lidFile.text = jsonSer
when:
def lidCmd = new CmdLineage(launcher: launcher, args: ["view", "lid:///?type=FileOutput"])
def lidCmd = new CmdLineage(launcher: launcher, args: ["find", "type=FileOutput"])
lidCmd.run()
def stdout = capture
.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface ChannelFactory {

Channel fromFilePairs(Map<String,?> opts, String pattern, Closure grouping);

Channel<Path> fromLineage(Map<String,?> opts);

<E> Channel<E> fromList(Collection<E> values);

Channel<Path> fromPath(Map<String,?> opts, String pattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,7 @@ class DefaultLinStore implements LinStore {
void close() throws IOException { }

@Override
Map<String, LinSerializable> search(String queryString) {
def params = null
if (queryString) {
params = LinUtils.parseQuery(queryString)
}
return searchAllFiles(params)
}

private Map<String, LinSerializable> searchAllFiles(Map<String,List<String>> params) {
Map<String, LinSerializable> search(Map<String,List<String>> params) {
final results = new HashMap<String, LinSerializable>()

Files.walkFileTree(location, new FileVisitor<Path>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.lineage

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.Session
import nextflow.extension.LinExtension
import nextflow.lineage.fs.LinPathFactory
import nextflow.lineage.model.FileOutput
import nextflow.lineage.serde.LinSerializable

import static nextflow.lineage.fs.LinPath.*

/**
* Lineage channel extensions
*
* @author Jorge Ejarque <jorge.ejarque@seqera.io>
*/
@CompileStatic
@Slf4j
class LinExtensionImpl implements LinExtension {

@Override
void fromLineage(Session session, DataflowWriteChannel channel, Map<String,?> opts) {
final queryParams = buildQueryParams(opts)
log.trace("Querying lineage with params: $queryParams")
new LinPropertyValidator().validateQueryParams(queryParams.keySet())
final store = getStore(session)
emitSearchResults(channel, store.search(queryParams))
channel.bind(Channel.STOP)
}

private static Map<String, List<String>> buildQueryParams(Map<String,?> opts) {
final queryParams = [type: [FileOutput.class.simpleName] ]
if( opts.workflowRun )
queryParams['workflowRun'] = [opts.workflowRun as String]
if( opts.taskRun )
queryParams['taskRun'] = [opts.taskRun as String]
if( opts.labels )
queryParams['labels'] = opts.labels as List<String>
return queryParams
}

protected LinStore getStore(Session session) {
final store = LinStoreFactory.getOrCreate(session)
if( !store ) {
throw new Exception("Lineage store not found - Check Nextflow configuration")
}
return store
}

private void emitSearchResults(DataflowWriteChannel channel, Map<String, LinSerializable> results) {
if( !results ) {
return
}
results.keySet().forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) }
}
}
6 changes: 3 additions & 3 deletions modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ interface LinStore extends Closeable {

/**
* Search for lineage entries.
* @queryString Json-path like query string. (Only simple and nested field operators are supported(No array, wildcards,etc.)
* @return Key-lineage entry pairs fulfilling the queryString
* @param params Map of query params
* @return Key-lineage entry pairs fulfilling the query params
*/
Map<String,LinSerializable> search(String queryString)
Map<String,LinSerializable> search(Map<String, List<String>> params)

}
Loading