6. Dynamic workflows
The workflows seen so far follow the two-phase execution model, in which the workflow is first constructed and then executed. Anduril also supports a more complex dynamic execution model, in which workflow construction and execution alternate, and there can be several of both phases. This improves the flexibility of workflows, but also increases complexity.
Let’s assume we have metadata about our data files in a CSV file called
metadata.csv
.
SampleID Filename
sample1 data1.csv
sample2 data2.csv
...
sample99 data99.csv
We want to analyze only certain data files, and we have an external Bash script that selects the correct lines in the metadata. For the purposes of this example, the Bash script is a “black box” that returns some subset of the rows given to it. We can construct a dynamic workflow that incorporates the Bash script:
#!/usr/bin/env anduril
import anduril.builtin._
import anduril.tools._
import org.anduril.runtime._
object Dynamic {
info("Beginning workflow construction")
val metadata = INPUT(path = "metadata.csv")
val selectScript = INPUT(path = "select-samples.sh")
val filteredMetadata = BashEvaluate(
command=selectScript,
var1=metadata
)
for (row <- iterCSV(filteredMetadata.stdOut)) {
withName(row("SampleID")) {
val input = INPUT(path = row("Filename"))
// Further processing omitted
}
}
info("Ending workflow construction")
}
Let’s assume that the Bash script returns only the line for sample1
. When
executed, the workflow prints:
[INFO <runtime>] Beginning workflow construction
[INFO filteredMetadata] Accessing dynamic contents of port 'stdOut' (CALLBACK) (SOURCE dynamic.scala:12)
[INFO <run-workflow>] Current ready queue: selectScript metadata (READY-QUEUE 2)
[INFO selectScript] Executing selectScript (anduril.builtin.INPUT) (SOURCE dynamic.scala:11) (COMPONENT-STARTED) (2016-04-29 14:10:45)
[INFO metadata] Executing metadata (anduril.builtin.INPUT) (SOURCE dynamic.scala:10) (COMPONENT-STARTED) (2016-04-29 14:10:45)
[INFO selectScript] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-29 14:10:45)
[INFO selectScript] Current ready queue: (empty) (READY-QUEUE 0)
[INFO metadata] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-29 14:10:45)
[INFO metadata] Current ready queue: filteredMetadata (READY-QUEUE 1)
[INFO filteredMetadata] Executing filteredMetadata (anduril.tools.BashEvaluate) (SOURCE dynamic.scala:12) (COMPONENT-STARTED) (2016-04-29 14:10:45)
[INFO filteredMetadata] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-29 14:10:45)
[INFO filteredMetadata] Current ready queue: (empty) (READY-QUEUE 0)
[INFO <runtime>] Ending workflow construction
[INFO <run-workflow>] Current ready queue: sample1-input (READY-QUEUE 1)
[INFO sample1-input] Executing sample1-input (anduril.builtin.INPUT) (SOURCE dynamic.scala:15) (COMPONENT-STARTED) (2016-04-29 14:10:45)
[INFO sample1-input] Component finished with success (COMPONENT-FINISHED-OK) (2016-04-29 14:10:45)
[INFO sample1-input] Current ready queue: (empty) (READY-QUEUE 0)
[INFO <run-workflow>] Done. No errors occurred.
Understanding the workflow
The Scala code looks almost identical to
previously seen code. There is
one difference that changes the execution model to dynamic:
iterCSV(filteredMetadata.stdOut)
. Instead of iterating over a “static” CSV
file (one that is located in the source folder), we iterate over the dynamic
product of filteredMetadata
. This triggers multi-phase execution, since we
must execute filteredMetadata
in order to iterate over the CSV file.
Execution flow is as follows:
- Workflow construction begins by executing Scala code. This prints
Beginning workflow construction
and insertsmetadata
andfilteredMetadata
to the workflow. iterCSV(filteredMetadata)
triggers the execution phase of the workflow constructed so far.metadata
andfilteredMetadata
are executed.- Control resumes in the Scala for-loop and the rest of the Scala file is evaluated.
sample1-input
is inserted to the workflow.Ending workflow construction
is printed.- A second execution phase starts to execute the components inserted in steps 4–5.
sample1-input
is executed.
Discussion
Dynamic execution is powerful, but should be limited to situations where it is needed. Dynamic workflows are harder to understand and debug, and error detection can be delayed. For example, if an error (such as division by zero) occurred in the Scala code after the for-loop, parts of the workflow would be executed before this error is detected.
The example contained one iterCSV
statement that triggered dynamic
execution, but in general there can be many such statements. Execution can
thus alternate between workflow construction and execution several times. For
example, there could be a second dynamic for-loop after the first.
Dynamic array access
Recall from Array ports that certain components and functions can produce file arrays that map key values to output files. Such arrays can also be accessed dynamically.
In the following example, we create an array containing two CSV files with the
keys sample1
and sample2
. The makeArray()
call creates the array; it could
also be created using a component. There are two ways to dynamically access
array contents. First, org.anduril.runtime.iterArray
takes an array port as
argument and iterates over key/file pairs. Second, the port object can be
indexed dynamically using the array key.
#!/usr/bin/env anduril
import anduril.builtin._
import anduril.tools._
import org.anduril.runtime._
object DynamicArrayPort {
val data1 = INPUT(path = "data1.csv")
val data2 = INPUT(path = "data2.csv")
val myData = Map(
"sample1" -> data1,
"sample2" -> data2
)
val array = makeArray(myData)
for ((key, file) <- iterArray(array)) {
info("%s = %s".format(key, file))
}
val filtered = CSVFilter(
array("sample1"),
regexp = "QualityOK=1"
)
}
In the for-loop, the file
values are
File objects that
provide access to file contents. When executed, the for-loop prints the
following:
sample1 = /home/user/data/data1.csv
sample2 = /home/user/data/data2.csv
The CSVFilter invocation accesses the array dynamically by indexing the array
with a key (sample1
). This produces a handle to the file (data1.csv
) that can be
passed to CSVFilter. This is the correct way of using array contents as inputs to
other components: taking the raw File object produced by iterArray
can break the
dependency chain. Inside the loop, you could maintain proper dependencies using
array(key)
.
Note: In some cases, you need two indexing operations to access a file in an
array. If the array is produced by a component (myComponent
), you need to
first access the output port of the component (myComponent.out
), and then
the file in the array (myComponent.out("element")
).