3. Branching and merging

In this section, we introduce two related workflow patterns: branching and merging. Branching refers to using the output of one component in two or more components. Merging refers to reading the outputs of two or more components in one component.

Assume we have the following tab-delimited file (data.csv):

Gene    Value   QualityOK
gene01  1.5     1
gene02  2.7     0
gene03  5.8     0
gene99  3.2     1

Our goal is to remove rows that have QualityOK = 0 (i.e., bad quality measurements), but to keep the row for gene02 even if it has QualityOK = 0. This can be implemented by the following workflow:

#!/usr/bin/env anduril

import anduril.builtin._
import anduril.tools._
import org.anduril.runtime._

object BranchMerge {
  val input = INPUT(path = "data.csv")
  val qualityOK = CSVFilter(input, regexp = "QualityOK=1")
  val gene2 = CSVFilter(input, regexp = "Gene=gene02")
  val joined = CSVJoin(qualityOK, gene2, intersection = false)
}

When executed, the workflow prints the following:

$ ./branch-merge.scala
[INFO <run-workflow>] Current ready queue: input (READY-QUEUE 1)
[INFO input] Executing input (anduril.builtin.INPUT) (SOURCE branch-merge.scala:8) (COMPONENT-STARTED) (2016-04-28 14:10:28)
[INFO input] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-28 14:10:28)
[INFO input] Current ready queue: gene2 qualityOK (READY-QUEUE 2)
[INFO gene2] Executing gene2 (anduril.tools.CSVFilter) (SOURCE branch-merge.scala:10) (COMPONENT-STARTED) (2016-04-28 14:10:28)
[INFO qualityOK] Executing qualityOK (anduril.tools.CSVFilter) (SOURCE branch-merge.scala:9) (COMPONENT-STARTED) (2016-04-28 14:10:28)
[INFO gene2] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-28 14:10:28)
[INFO gene2] Current ready queue: (empty) (READY-QUEUE 0)
[INFO qualityOK] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-28 14:10:28)
[INFO qualityOK] Current ready queue: joined (READY-QUEUE 1)
[INFO joined] Executing joined (anduril.tools.CSVJoin) (SOURCE branch-merge.scala:11) (COMPONENT-STARTED) (2016-04-28 14:10:28)
[INFO joined] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-28 14:10:28)
[INFO joined] Current ready queue: (empty) (READY-QUEUE 0)
[INFO <run-workflow>] Done. No errors occurred.

After execution, the file result_branch-merge/joined/out.csv contains the result we were expecting:

Gene    Value   QualityOK
gene01  1.5     1
gene02  2.7     0
gene99  3.2     1

Understanding the workflow

Here we used two new components from the tools bundle, CSVFilter and CSVJoin. CSVFilter filters rows based on condition such as regular expressions (similar Unix grep), and CSVJoin combines several CSV files into one (similar to Unix cat); intersection = false means that rows are combined as a union, rather than intersection.

By inspecting the Scala code, we see that input is a dependency to both qualityOK and gene2, and joined requires the outputs of both qualityOK and gene2. This corresponds to the following workflow structure:

Workflow execution proceeds as follows:

  1. In the beginning, only input is ready to execute.
  2. input is executed.
  3. qualityOK and gene2 become available for execution.
  4. qualityOK and gene2 are launched in parallel.
  5. gene2 finishes first (for this run). The engine continues to wait.
  6. qualityOK also finishes. Now joined is available for execution.
  7. joined is executed.

Anduril automatically executed this workflow in parallel. Parallelization increases efficiency and is a key technique to processing large data sets. The Scala code for the workflow does not contain any explicit instructions for parallelization. Rather, the workflow engine infers from the dependency structure which components can be executed in parallel.

Parallelization makes certain aspects of workflow execution non-deterministic. In our case, the order in which qualityOK and gene2 finish execution is determined by the relative speeds of these components. However, Anduril ensures that both have finished before launching joined.