3. Branching and merging
In this section, we introduce two related workflow patterns: branching and merging. Branching refers to using the output of one component in two or more components. Merging refers to reading the outputs of two or more components in one component.
Assume we have the following tab-delimited file (data.csv
):
Gene Value QualityOK
gene01 1.5 1
gene02 2.7 0
gene03 5.8 0
gene99 3.2 1
Our goal is to remove rows that have QualityOK = 0 (i.e., bad quality measurements), but to keep the row for gene02 even if it has QualityOK = 0. This can be implemented by the following workflow:
#!/usr/bin/env anduril
import anduril.builtin._
import anduril.tools._
import org.anduril.runtime._
object BranchMerge {
val input = INPUT(path = "data.csv")
val qualityOK = CSVFilter(input, regexp = "QualityOK=1")
val gene2 = CSVFilter(input, regexp = "Gene=gene02")
val joined = CSVJoin(qualityOK, gene2, intersection = false)
}
When executed, the workflow prints the following:
$ ./branch-merge.scala
[INFO <run-workflow>] Current ready queue: input (READY-QUEUE 1)
[INFO input] Executing input (anduril.builtin.INPUT) (SOURCE branch-merge.scala:8) (COMPONENT-STARTED) (2016-04-28 14:10:28)
[INFO input] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-28 14:10:28)
[INFO input] Current ready queue: gene2 qualityOK (READY-QUEUE 2)
[INFO gene2] Executing gene2 (anduril.tools.CSVFilter) (SOURCE branch-merge.scala:10) (COMPONENT-STARTED) (2016-04-28 14:10:28)
[INFO qualityOK] Executing qualityOK (anduril.tools.CSVFilter) (SOURCE branch-merge.scala:9) (COMPONENT-STARTED) (2016-04-28 14:10:28)
[INFO gene2] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-28 14:10:28)
[INFO gene2] Current ready queue: (empty) (READY-QUEUE 0)
[INFO qualityOK] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-28 14:10:28)
[INFO qualityOK] Current ready queue: joined (READY-QUEUE 1)
[INFO joined] Executing joined (anduril.tools.CSVJoin) (SOURCE branch-merge.scala:11) (COMPONENT-STARTED) (2016-04-28 14:10:28)
[INFO joined] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-28 14:10:28)
[INFO joined] Current ready queue: (empty) (READY-QUEUE 0)
[INFO <run-workflow>] Done. No errors occurred.
After execution, the file result_branch-merge/joined/out.csv
contains the
result we were expecting:
Gene Value QualityOK
gene01 1.5 1
gene02 2.7 0
gene99 3.2 1
Understanding the workflow
Here we used two new components from the tools bundle, CSVFilter and CSVJoin.
CSVFilter filters rows based on condition such as regular expressions (similar
Unix grep
), and CSVJoin combines several CSV files into one (similar to Unix
cat
); intersection = false
means that rows are combined as a union, rather
than intersection.
By inspecting the Scala code, we see that input
is a dependency to both
qualityOK
and gene2
, and joined
requires the outputs of both qualityOK
and gene2
. This corresponds to the following workflow structure:
Workflow execution proceeds as follows:
- In the beginning, only
input
is ready to execute. input
is executed.qualityOK
andgene2
become available for execution.qualityOK
andgene2
are launched in parallel.gene2
finishes first (for this run). The engine continues to wait.qualityOK
also finishes. Nowjoined
is available for execution.joined
is executed.
Anduril automatically executed this workflow in parallel. Parallelization increases efficiency and is a key technique to processing large data sets. The Scala code for the workflow does not contain any explicit instructions for parallelization. Rather, the workflow engine infers from the dependency structure which components can be executed in parallel.
Parallelization makes certain aspects of workflow execution non-deterministic.
In our case, the order in which qualityOK
and gene2
finish execution is
determined by the relative speeds of these components. However, Anduril
ensures that both have finished before launching joined
.