6. Dynamic workflows

The workflows seen so far follow the two-phase execution model, in which the workflow is first constructed and then executed. Anduril also supports a more complex dynamic execution model, in which workflow construction and execution alternate, and there can be several of both phases. This improves the flexibility of workflows, but also increases complexity.

Let’s assume we have metadata about our data files in a CSV file called metadata.csv.

SampleID    Filename
sample1     data1.csv
sample2     data2.csv
...
sample99    data99.csv

We want to analyze only certain data files, and we have an external Bash script that selects the correct lines in the metadata. For the purposes of this example, the Bash script is a “black box” that returns some subset of the rows given to it. We can construct a dynamic workflow that incorporates the Bash script:

#!/usr/bin/env anduril

import anduril.builtin._
import anduril.tools._
import org.anduril.runtime._

object Dynamic {
  info("Beginning workflow construction")

  val metadata = INPUT(path = "metadata.csv")
  val selectScript = INPUT(path = "select-samples.sh")
  val filteredMetadata = BashEvaluate(
    command=selectScript, 
    var1=metadata
  )

  for (row <- iterCSV(filteredMetadata.stdOut)) {
    withName(row("SampleID")) {
      val input = INPUT(path = row("Filename"))
        // Further processing omitted
    }
  }

  info("Ending workflow construction")
}

Let’s assume that the Bash script returns only the line for sample1. When executed, the workflow prints:

[INFO  <runtime>] Beginning workflow construction
[INFO filteredMetadata] Accessing dynamic contents of port 'stdOut' (CALLBACK) (SOURCE dynamic.scala:12)
[INFO <run-workflow>] Current ready queue: selectScript metadata (READY-QUEUE 2)
[INFO selectScript] Executing selectScript (anduril.builtin.INPUT) (SOURCE dynamic.scala:11) (COMPONENT-STARTED) (2016-04-29 14:10:45)
[INFO metadata] Executing metadata (anduril.builtin.INPUT) (SOURCE dynamic.scala:10) (COMPONENT-STARTED) (2016-04-29 14:10:45)
[INFO selectScript] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-29 14:10:45)
[INFO selectScript] Current ready queue: (empty) (READY-QUEUE 0)
[INFO metadata] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-29 14:10:45)
[INFO metadata] Current ready queue: filteredMetadata (READY-QUEUE 1)
[INFO filteredMetadata] Executing filteredMetadata (anduril.tools.BashEvaluate) (SOURCE dynamic.scala:12) (COMPONENT-STARTED) (2016-04-29 14:10:45)
[INFO filteredMetadata] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-29 14:10:45)
[INFO filteredMetadata] Current ready queue: (empty) (READY-QUEUE 0)
[INFO  <runtime>] Ending workflow construction
[INFO <run-workflow>] Current ready queue: sample1-input (READY-QUEUE 1)
[INFO sample1-input] Executing sample1-input (anduril.builtin.INPUT) (SOURCE dynamic.scala:15) (COMPONENT-STARTED) (2016-04-29 14:10:45)
[INFO sample1-input] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-29 14:10:45)
[INFO sample1-input] Current ready queue: (empty) (READY-QUEUE 0)
[INFO <run-workflow>] Done. No errors occurred.

Understanding the workflow

The Scala code looks almost identical to previously seen code. There is one difference that changes the execution model to dynamic: iterCSV(filteredMetadata.stdOut). Instead of iterating over a “static” CSV file (one that is located in the source folder), we iterate over the dynamic product of filteredMetadata. This triggers multi-phase execution, since we must execute filteredMetadata in order to iterate over the CSV file. Execution flow is as follows:

  1. Workflow construction begins by executing Scala code. This prints Beginning workflow construction and inserts metadata and filteredMetadata to the workflow.
  2. iterCSV(filteredMetadata) triggers the execution phase of the workflow constructed so far.
  3. metadata and filteredMetadata are executed.
  4. Control resumes in the Scala for-loop and the rest of the Scala file is evaluated.
  5. sample1-input is inserted to the workflow. Ending workflow construction is printed.
  6. A second execution phase starts to execute the components inserted in steps 4–5.
  7. sample1-input is executed.

Discussion

Dynamic execution is powerful, but should be limited to situations where it is needed. Dynamic workflows are harder to understand and debug, and error detection can be delayed. For example, if an error (such as division by zero) occurred in the Scala code after the for-loop, parts of the workflow would be executed before this error is detected.

The example contained one iterCSV statement that triggered dynamic execution, but in general there can be many such statements. Execution can thus alternate between workflow construction and execution several times. For example, there could be a second dynamic for-loop after the first.

Dynamic array access

Recall from Array ports that certain components and functions can produce file arrays that map key values to output files. Such arrays can also be accessed dynamically.

In the following example, we create an array containing two CSV files with the keys sample1 and sample2. The makeArray() call creates the array; it could also be created using a component. There are two ways to dynamically access array contents. First, org.anduril.runtime.iterArray takes an array port as argument and iterates over key/file pairs. Second, the port object can be indexed dynamically using the array key.

#!/usr/bin/env anduril

import anduril.builtin._
import anduril.tools._
import org.anduril.runtime._

object DynamicArrayPort {
  val data1 = INPUT(path = "data1.csv")
  val data2 = INPUT(path = "data2.csv")
  val myData = Map(
    "sample1" -> data1, 
    "sample2" -> data2
  )

  val array = makeArray(myData)

  for ((key, file) <- iterArray(array)) {
    info("%s = %s".format(key, file))
  }

  val filtered = CSVFilter(
    array("sample1"), 
    regexp = "QualityOK=1"
  )
}

In the for-loop, the file values are File objects that provide access to file contents. When executed, the for-loop prints the following:

sample1 = /home/user/data/data1.csv
sample2 = /home/user/data/data2.csv

The CSVFilter invocation accesses the array dynamically by indexing the array with a key (sample1). This produces a handle to the file (data1.csv) that can be passed to CSVFilter. This is the correct way of using array contents as inputs to other components: taking the raw File object produced by iterArray can break the dependency chain. Inside the loop, you could maintain proper dependencies using array(key).

Note: In some cases, you need two indexing operations to access a file in an array. If the array is produced by a component (myComponent), you need to first access the output port of the component (myComponent.out), and then the file in the array (myComponent.out("element")).