Running workflow

A workflow is a set of tasks whose relationship is described with a directed acyclic graph (DAG). In a DAG, a parent task can be connected to one or more child tasks with each edge directed from the parent task to a child task. A child task processes the output data of the parent task. It is possible to configure each child task to get started when the parent task produces the entire output data or partially produces the output data, depending on the use-case. If a child task is configured as the latter both parent and child tasks will run in parallel for a while, which will reduce the total execution time of the workflow. Currently tasks have to be PanDA tasks, but future versions will support more backend systems such as local batch systems, production system, kubernetes-based resources, and other workflow management systems, to run some tasks very quickly or outsource sub-workflows.

The user describes a workflow using a yaml-based language, Common Workflow Language (CWL), or a Python-based language, Snakemake, and submits it to PanDA using pchain. This page explains how to use pchain as well as how to describe workflows.

Remark: It could be a bit cumbersome and error-prone to edit CWL or snakemake files. We are developing a handy tool/interface that translates graphical diagrams to workflow descriptions written in python or yaml on behalf of users. However, you have to play with CWL or snakemake files for now.



Workflow examples with CWL

Simple task chain

The following cwl code shows a parent-child chain of two prun tasks.

../_images/pchain_dag_simple.png
simple_chain.cwl
cwlVersion: v1.0
class: Workflow

inputs: []

outputs:
  outDS:
    type: string
    outputSource: bottom/outDS


steps:
  top:
    run: prun
    in:
      opt_exec:
        default: "echo %RNDM:10 > seed.txt"
      opt_args:
        default: "--outputs seed.txt --nJobs 3 --avoidVP"
    out: [outDS]

  bottom:
    run: prun
    in:
      opt_inDS: top/outDS
      opt_exec:
        default: "echo %IN > results.root"
      opt_args:
        default: "--outputs results.root --forceStaged --avoidVP"
    out: [outDS]

The class field must be Workflow to indicate this code describes a workflow. There are two prun tasks in the workflow and defined as top and bottom steps in steps section. The inputs section is empty since the workflow doesn’t take any input data. The outputs section describes the output parameter of the workflow, and it takes only one string type parameter with an arbitrary name. The outputSource connects the output parameter of the bottom step to the workflow output parameter.

In the steps section, each step represents a task with an arbitrary task name, such as top and bottom. The run filed of a prun task is prun. The in section specifies a set of parameters correspond to command-line options of prun.

Here is a list of parameters in the in section to run a prun task.

Parameter

Corresponding prun option

opt_inDS

—inDS (string)

opt_inDsType

No correspondence. Type of inDS (string)

opt_secondaryDSs

—secondaryDSs (a list of strings)

opt_secondaryDsTypes

No correspondence. Types of secondaryDSs (a string array)

opt_exec

—exec (string)

opt_useAthenaPackages

—useAthenaPackages (bool)

opt_containerImage

—containerImage (string)

opt_args

all other prun options except for listed above (string)

All options opt_xyz except opt_args and opt_xyzDsTypes can be mapped to —xyz of prun. opt_args specifies all other prun options such as —outputs, —nFilesPerJob, and —nJobs. Essentially,

run: prun
in:
  opt_exec:
    default: "echo %RNDM:10 > seed.txt"
  opt_args:
    default: "--outputs seed.txt --nJobs 3"

corresponds to

prun --exec "echo %RNDM:10 > seed.txt" --outputs seed.txt --nJobs 3

%IN in opt_args is expanded to a list of filenames in the input dataset specified in opt_inDS. It is possible to use %{DSn} in opt_args as a placeholder for the input dataset name.

The out section specifies the task output with an arbitrary string surrendered by brackets. Note that it is always a single string even if the task produces multiple outputs. The output of the top task is passed to opt_inDS of the bottom task. The bottom task starts processing once the top task produces enough output data, waits if all data currently available has been processed but the top task is still running, and finishes once all data from the top task is processed.

The user can submit the workflow to PanDA using pchain that is included in panda-client. First, create a file called simple_chain.cwl containing the cwl code above. Next, you need to create an empty yaml file since cwl files work with yaml files that describe workflow inputs. This example doesn’t take an input, so the yaml file can be empty.

$ touch dummy.yaml
$ pchain --cwl simple_chain.cwl --yaml dummy.yaml --outDS user.<your_nickname>.blah

pchain automatically sends local *.cwl, *.yaml, and *.json files to PanDA together with the workflow. --outDS is the basename of the datasets for output and log files. Once the workflow is submitted, the cwl and yaml files are parsed on the server side to generate tasks with sequential numbers in the workflow. The system uses a combination of the sequential number and the task name, such as 000_top and 001_bottom, as a unique identifier for each task. The actual output dataset name is a combination of --outDS, the unique identifier, and —outputs in opt_args. For example, the output dataset name of the top task is user.<your_nickname>.blah_000_top_seed.txt and that of the bottom is user.<your_nickname>.blah_001_bottom_results.root. If —outputs is a comma-separate output list, one dataset is created for each output type.

To see all options of pchain

pchain --helpGroup ALL


More complicated chain

The following cwl example describes more complicated chain as shown in the picture below.

../_images/pchain_dag_combine.png
sig_bg_comb.cwl
cwlVersion: v1.0
class: Workflow

requirements:
  MultipleInputFeatureRequirement: {}

inputs:
  signal: string
  background: string

outputs:
  outDS:
    type: string
    outputSource: combine/outDS
  

steps:
  make_signal:
    run: prun
    in:
      opt_inDS: signal
      opt_containerImage:
        default: docker://busybox
      opt_exec:
        default: "echo %IN > abc.dat; echo 123 > def.zip"
      opt_args:
        default: "--outputs abc.dat,def.zip --nFilesPerJob 5"
    out: [outDS]

  make_background_1:
    run: prun
    in:
      opt_inDS: background
      opt_exec:
        default: "echo %IN > opq.root; echo %IN > xyz.pool"
      opt_args:
        default: "--outputs opq.root,xyz.pool --nGBPerJob 10"
    out: [outDS]

  premix:
    run: prun
    in:
      opt_inDS: make_signal/outDS
      opt_inDsType:
        default: def.zip
      opt_secondaryDSs: [make_background_1/outDS]
      opt_secondaryDsTypes:
        default: [xyz.pool]
      opt_exec:
        default: "echo %IN %IN2 > klm.root"
      opt_args:
        default: "--outputs klm.root --secondaryDSs IN2:2:%{SECDS1}"
    out: [outDS]

  generate_some:
    run: prun
    in:
      opt_exec:
        default: "echo %RNDM:10 > gen.root"
      opt_args:
        default: "--outputs gen.root --nJobs 10"
    out: [outDS]
    
  make_background_2:
    run: prun
    in:
      opt_inDS: background
      opt_containerImage:
        default: docker://alpine
      opt_secondaryDSs: [generate_some/outDS]
      opt_secondaryDsTypes:
        default: [gen.root]
      opt_exec:
        default: "echo %IN > ooo.root; echo %IN2 > jjj.txt"
      opt_args:
        default: "--outputs ooo.root,jjj.txt --secondaryDSs IN2:2:%{SECDS1}"
    out: [outDS]

  combine:
    run: prun
    in:
      opt_inDS: make_signal/outDS
      opt_inDsType:
        default: abc.dat
      opt_secondaryDSs: [premix/outDS, make_background_2/outDS]
      opt_secondaryDsTypes:
        default: [klm.root, ooo.root]
      opt_exec:
        default: "echo %IN %IN2 %IN3 > aaa.root"
      opt_args:
        default: "--outputs aaa.root --secondaryDSs IN2:2:%{SECDS1},IN3:5:%{SECDS2}"
    out: [outDS]

The workflow takes two inputs, signal and background. The signal is used as input for the make_signal task, while the background is used as input for the make_background_1 and make_background_2 tasks. The make_signal task runs in the busybox container as specified in opt_containerImage, to produce two types of output data, abc.dat and def.zip, as specified in opt_args. If the parent task produces multiple types of output data and the child task uses some of them, their types need to be specified in opt_inDsType. The premix task takes def.zip from the make_signal task and xyz.pool from the make_background_1 task.

Output data of parent tasks can be passed to a child task as secondary inputs. In this case, they are specified in opt_secondaryDSs and their types are specified in opt_secondaryDsTypes. Note that the stream name, the number of files per job, etc, for each secondary input are specified using —secondaryDSs in opt_args where %{SECDSn} can be used as a placeholder for the n-th secondary dataset name. MultipleInputFeatureRequirement is required if opt_secondaryDsTypes take multiple input data.

The workflow inputs are described in a yaml file. E.g.,

cat inputs.yaml

signal: mc16_valid:mc16_valid.900248.PG_singlepion_flatPt2to50.simul.HITS.e8312_s3238_tid26378578_00
background: mc16_5TeV.361238.Pythia8EvtGen_A3NNPDF23LO_minbias_inelastic_low.merge.HITS.e6446_s3238_s3250/

Then submit the workflow.

pchain --cwl sig_bg_comb.cwl --yaml inputs.yaml --outDS user.<your_nickname>.blah

If you need to run the workflow with different input data it enough to submit it with a different yaml file.


Sub-workflow and parallel execution with scatter

A workflow can be used as a step in another workflow. The following cwl example uses the above sig_bg_comb.cwl in the many_sig_bg_comb step.

../_images/pchain_dag_scatter.png
merge_many.cwl
cwlVersion: v1.0
class: Workflow

requirements:
  ScatterFeatureRequirement: {}
  SubworkflowFeatureRequirement: {}

inputs:
  signals: string[]
  backgrounds: string[]

outputs:
  outDS:
    type: string
    outputSource: merge/outDS


steps:
  many_sig_bg_comb:
    run: sig_bg_comb.cwl
    scatter: [signal, background]
    scatterMethod: dotproduct
    in:
      signal: signals
      background: backgrounds
    out: [outDS]

  merge:
    run: prun
    in:
      opt_inDS: many_sig_bg_comb/outDS
      opt_exec:
        default: "python merge.py --type aaa --level 3 %IN"
      opt_args:
        default: "--outputs merged.root"
    out: [outDS]

Note that sub-workflows require SubworkflowFeatureRequirement.

It is possible to run a task or sub-workflow multiple times over a list of inputs using ScatterFeatureRequirement. A popular use-case is to perform the same analysis step on different samples in a single workflow. The step takes the input(s) as an array and will run on each element of the array as if it were a single input. The many_sig_bg_comb step above takes two string arrays, signals and backgrounds, and specifies in the scatter field that it loops over those arrays. Output data from all many_sig_bg_comb tasks are fed into the merge task to produce the final output.

The workflow inputs are string arrays like

cat inputs2.yaml

signals:
  - mc16_valid:mc16_valid.900248.PG_singlepion_flatPt2to50.simul.HITS.e8312_s3238_tid26378578_00
  - valid1.427080.Pythia8EvtGen_A14NNPDF23LO_flatpT_Zprime.simul.HITS.e5362_s3718_tid26356243_00

background:
  - mc16_5TeV.361238.Pythia8EvtGen_A3NNPDF23LO_minbias_inelastic_low.merge.HITS.e6446_s3238_s3250/
  - mc16_5TeV:mc16_5TeV.361239.Pythia8EvtGen_A3NNPDF23LO_minbias_inelastic_high.merge.HITS.e6446_s3238_s3250/

Then submit the workflow.

pchain --cwl merge_many.cwl --yaml inputs2.yaml --outDS user.<your_nickname>.blah


Using Athena

One or more tasks in a single workflow can use Athena as shown in the example below.

athena.cwl
cwlVersion: v1.0
class: Workflow

inputs: []

outputs:
  outDS:
    type: string
    outputSource: third/outDS


steps:
  first:
    run: prun
    in:
      opt_exec:
        default: "Gen_tf.py --maxEvents=1000 --skipEvents=0 --ecmEnergy=5020 --firstEvent=1 --jobConfig=860059 --outputEVNTFile=evnt.pool.root --randomSeed=4 --runNumber=860059 --AMITag=e8201"
      opt_args:
        default: "--outputs evnt.pool.root --nJobs 3"
      opt_useAthenaPackages:
        default: true
    out: [outDS]

  second:
    run: prun
    in:
      opt_inDS: first/outDS
      opt_exec:
        default: "echo %IN > results.txt"
      opt_args:
        default: "--outputs results.txt"
    out: [outDS]

  third:
    run: prun
    in:
      opt_inDS: second/outDS
      opt_exec:
        default: "echo %IN > poststep.txt"
      opt_args:
        default: "--outputs poststep.txt --athenaTag AnalysisBase,21.2.167"
      opt_useAthenaPackages:
        default: true
    out: [outDS]

opt_useAthenaPackages corresponds to --useAthenaPackages of prun to remotely setup Athena with your locally-built packages. You can use a different Athena version by specifying —athenaTag in opt_args.

To submit the task, first you need to setup Athena on local computer, and execute pchain with --useAthenaPackages that automatically collect various Athena-related information from environment variables and uploads a sandbox file from your locally-built packages.

pchain --cwl athena.cwl --yaml inputs.yaml --outDS user.<your_nickname>.blah --useAthenaPackages


Using Conda

You can build a container image to setup a conda environment and use it in a workflow. The following Dockerfile installs Miniconda and creates an environment with PyROOT and PyYAML.

Dockerfile with conda
FROM docker.io/almalinux:9

RUN yum update -y
RUN yum install -y wget

RUN mkdir -p /opt/miniconda3 && \
    wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O /tmp/miniconda.sh && \
    bash /tmp/miniconda.sh -b -u -p /opt/miniconda3 && \
    rm -rf /tmp/miniconda.sh

RUN adduser auser
RUN groupadd zp
RUN usermod -a -G zp auser
USER auser
RUN /opt/miniconda3/bin/conda init bash && . ~/.bashrc && conda create -n testenv -y -c conda-forge root pyyaml

RUN yum clean all

CMD ["/bin/bash"]

You need to upload your container image to a container registry, such as GitLab container registry, and specify the registry path in opt_containerImage as shown in the example below.

conda.cwl
cwlVersion: v1.0
class: Workflow

inputs: []

outputs:
  outDS:
    type: string
    outputSource: bottom/outDS


steps:
  top:
    run: prun
    in:
      opt_exec:
        default: "echo %RNDM:10 > seed.txt"
      opt_args:
        default: "--outputs seed.txt --nJobs 3 --avoidVP"
    out: [outDS]

  bottom:
    run: prun
    in:
      opt_inDS: top/outDS
      opt_exec:
        default: "export HOME=/home/auser; . /home/auser/.bashrc; conda activate testenv; root --version; ls -la"
      opt_args:
        default: "--forceStaged --avoidVP"
      opt_containerImage:
        default: "docker://ghcr.io/someone/test-actions:main"
    out: [outDS]

As specified in opt_exec, the bottom task first initializes and activates the conda environment created in the build step of the container image, and then executes root and ls as a demonstration.


Conditional workflow

Workflows can contain conditional steps executed based on their input. This allows workflows to wait execution of subsequent tasks until previous tasks are done, and to skip subsequent tasks based on results of previous tasks. The following example contains conditional branching based on the result of the first step. Note that this workflows conditional branching require InlineJavascriptRequirement and CWL version 1.2 or higher.

../_images/pchain_dag_cond.png
cond.cwl
cwlVersion: v1.2
class: Workflow

requirements:
  InlineJavascriptRequirement: {}

inputs: []

outputs:
  outDS_OK:
    type: string
    outputSource: bottom_OK/outDS
  outDS_NG:
    type: string
    outputSource: bottom_NG/outDS


steps:
  top:
    run: prun
    in:
      opt_exec:
        default: "echo %RNDM:10 > seed.txt"
      opt_args:
        default: "--outputs seed.txt --nJobs 2"
    out: [outDS]

  bottom_OK:
    run: prun
    in:
      opt_inDS: top/outDS
      opt_exec:
        default: "echo %IN > results.txt"
      opt_args:
        default: "--outputs results.txt --forceStaged"
    out: [outDS]
    when: $(self.opt_inDS)

  bottom_NG:
    run: prun
    in:
      opt_inDS: top/outDS
      opt_exec:
        default: "echo hard luck > bad.txt"
      opt_args:
        default: "--outputs bad.txt"
    out: [outDS]
    when: $(!self.opt_inDS)
 

Both bottom_OK and bottom_NG steps take output data of the top step as input. The new property when specifies the condition validation expression that is interpreted by JavaScript. self.blah in the expression represents the input parameter blah of the step that is connected to output data of the parent step. If the parent step is successful self.blah gives True while !self.blah gives False. It is possible to create more complicated expressions using logical operators (&& for AND and || for OR) and parentheses. The step is executed when the whole expression gives True.

The bottom_NG step is executed when the top step fails and $(!self.opt_inDS) gives True. Note that in this case output data from the top step is empty and the prun task in the bottom_NG step is executed without --inDS.


Involving hyperparameter optimization

It is possible to run Hyperparameter Optimization (HPO) and chain it with other tasks in the workflow. The following example shows a chain of HPO and prun tasks.

hpo.cwl
cwlVersion: v1.0
class: Workflow

inputs: []

outputs:
  outDS:
    type: string
    outputSource: post_proc/outDS


steps:
  pre_proc:
    run: prun
    in:
      opt_exec:
        default: "echo %RNDM:10 > seed.txt"
      opt_args:
        default: "--outputs seed.txt --nJobs 2 --avoidVP"
    out: [outDS]

  main_hpo:
    run: phpo
    in:
      opt_trainingDS: pre_proc/outDS
      opt_args:
        default: "--loadJson conf.json"
    out: [outDS]
    when: $(self.opt_trainingDS)

  post_proc:
    run: prun
    in:
      opt_inDS: main_hpo/outDS
      opt_exec:
        default: "echo %IN > anal.txt"
      opt_args:
        default: "--outputs anal.txt --nFilesPerJob 100 --avoidVP"
    out: [outDS]
    when: $(self.opt_inDS)

where the output data of the pre_proc step is used as the training data for the main_hpo step, and the output data metrics.tgz of the main_hpo step is used as the input for the post_proc step. Both main_hpo and post_proc steps specify when since they waits until the upstream step is done.

The run filed of a phpo task is phpo. Here is a list of parameters in the in section to run a prun task.

Parameter

Corresponding phpo option

opt_trainingDS

—trainingDS (string)

opt_trainingDsType

No correspondence. Type of trainingDS (string)

opt_args

all other phpo options except for listed above (string)

opt_trainingDS can be omitted if the HPO task doesn’t take a training dataset. Note that you can put most phpo options in a json and specify the json filename in —loadJson in opt_args, rather than constructing a complicated string in opt_args.

cat config.json
{
  "evaluationContainer": "docker://gitlab-registry.cern.ch/zhangruihpc/evaluationcontainer:mlflow",
  "evaluationExec": "bash ./exec_in_container.sh",
  "evaluationMetrics": "metrics.tgz",
  "steeringExec": "run --rm -v \"$(pwd)\":/HPOiDDS gitlab-registry.cern.ch/zhangruihpc/steeringcontainer:0.0.1 /bin/bash -c \"hpogrid generate --n_point=%NUM_POINTS --max_point=%MAX_POINTS --infile=/HPOiDDS/%IN  --outfile=/HPOiDDS/%OUT -l nevergrad\""
}
pchain -cwl hpo.cwl --yaml dummy.yaml --outDS user.<your_nickname>.blah


Loops in workflows

Users can have loops in their workflows. Each loop is represented as a sub-workflow with a parameter dictionary. All tasks in the sub-workflow share the dictionary to generate actual steps. There is a special type of tasks, called junction, which read outputs from upstream tasks, and produce json files to update the parameter dictionary and/or make a decision to exit from the sub-workflow. The sub-workflow is iterated until one of junctions decides to exit. The new iteration is executed with the updated values in the parameter dictionary, so that each iteration can bring different results.

The run filed of a junction is junction. Essentially, a junction is a simplified prun task that processes all input files in a single job to produce a json file, so there are only a few parameters in the in section of a junction, as shown below.

Parameter

Corresponding prun option

opt_inDS

Input datasets (a list of strings)

opt_inDsType

Types of input datasets (a list of strings. optional)

opt_exec

The execution string

opt_containerImage

Container image name (string. optional)

opt_args

To define additional output files in –outputs

For example, the following pseudo-code snippet has a single loop

out1 = work_start()
xxx = 123
yyy = 0.456
while True:
    out2 = inner_work_top(out1, xxx)
    out3 = inner_work_bottom(out2, yyy)
    xxx = out2 + 1
    yyy *= out3
    if yyy > xxx:
        break
out4 = work_end(out3)

The code is described using CWL as follows.

../_images/pchain_dag_loop.png
loop.cwl
cwlVersion: v1.0
class: Workflow

requirements:
  SubworkflowFeatureRequirement: {}

inputs: []

outputs:
  outDS:
    type: string
    outputSource: work_end/outDS


steps:
  work_start:
    run: prun
    in:
      opt_exec:
        default: "echo %RNDM:10 > seed.txt"
      opt_args:
        default: "--outputs seed.txt --nJobs 2 --avoidVP"
    out: [outDS]

  work_loop:
    run: loop_body.cwl
    in:
      dataset: work_start/outDS
    out: [outDS]
    hints:
      - loop

  work_end:
    run: prun
    in:
      opt_inDS: work_loop/outDS
      opt_exec:
        default: "echo %IN > results.root"
      opt_args:
        default: "--outputs results.root --forceStaged"
    out: [outDS]

The work_loop step describes the looping stuff in the while block of the pseudo-code snippet. It runs a separate CWL file loop_body.cwl and has loop in the hints section to iterate.

loop_body.cwl
cwlVersion: v1.0
class: Workflow

inputs:
  dataset:
    type: string
  param_xxx:
    type: int
    default: 123
  param_yyy:
    type: float
    default: 0.456

outputs:
  outDS:
    type: string
    outputSource: inner_work_bottom/outDS


steps:
  inner_work_top:
    run: prun
    in:
      opt_inDS: dataset
      opt_exec:
        default: "echo %IN %{xxx} %{i} > seed.txt"
      opt_args:
        default: "--outputs seed.txt --avoidVP"
    out: [outDS]

  inner_work_bottom:
    run: prun
    in:
      opt_inDS: inner_work_top/outDS
      opt_exec:
        default: "echo %IN %{yyy} > results.root"
      opt_args:
        default: "--outputs results.root --forceStaged"
    out: [outDS]


  checkpoint:
    run: junction
    in:
      opt_inDS:
        - inner_work_top/outDS
        - inner_work_bottom/outDS
      opt_exec:
        default: "echo %{DS1} %{DS2} aaaa; echo '{\"x\": 456, \"to_continue\": false}' > results.json"
    out: []

The local variables in the loop like xxx and yyy are defined in the inputs section of loop_body.cwl with the param_ prefix and their initial values. They are internally translated to a parameter dictionary shared by all tasks in the sub-workflow. In the execution, %{blah} in opt_args is replaced with the actual value in the dictionary. A loop count is incremented for each iteration and is inserted to the output dataset names, like user.<your_nickname>.blah_<loop_count>_<output>, so tasks always produce unique output datasets. It is possible to specify the loop count explicitly in opt_exec using %{i}. In other words, param_i is reserved and thus cannot be used as user’s local variable.

In each iteration, the checkpoint step runs a junction to read outputs from inner_work_top and inner_work_bottom steps, and produce a json file to update values in the parameter dictionary and/or make a decision to exit from the loop. The actual dataset names are passed to the execution string through placeholders, %{DSn} in opt_exec, which represents the n-th dataset name. The json filename must be results.json. The file contains key-values to update the local variables in the parameter dictionary. It can also contain a special key-value, to_continue: True, to repeat the loop. If to_continue is False, the key-value is missing, or results.json is not produced, the workflow exits from the loop and proceeds to subsequent steps outside of the loop. It is possible to specify additional output files via —outputs in opt_args if the junction step produces other output files in addition to results.json. Subsequent steps can use those output files as input.


Loop + scatter

A loop is sequential iteration of a sub-workflow, while a scatter is a horizontal parallelization of independent sub-workflows. They can be combined to describe complex workflows.

Running multiple loops in parallel

The following example runs multiple loops in parallel.

../_images/pchain_dag_mloop.png
multi_loop.cwl
cwlVersion: v1.0
class: Workflow

requirements:
  ScatterFeatureRequirement: {}
  SubworkflowFeatureRequirement: {}

inputs:
  signals: string[]

outputs:
  outDS:
    type: string
    outputSource: work_end/outDS


steps:
  work_loop:
    run: loop_body.cwl
    scatter: [dataset]
    scatterMethod: dotproduct
    in:
      dataset: signals
    out: [outDS]
    hints:
      - loop

  merge:
    run: prun
    in:
      opt_inDS: work_loop/outDS
      opt_exec:
        default: "echo %IN > results.root"
      opt_args:
        default: "--outputs results.root --forceStaged"
    out: [outDS]

The work_loop step has the loop hint and is scattered over the list of inputs.

Parallel execution of multiple sub-workflows in a single loop

Here is another example of the loop+scatter combination that has a single loop where multiple sub-workflows are executed in parallel.

../_images/pchain_dag_sloop.png
sequential_loop.cwl
cwlVersion: v1.0
class: Workflow

requirements:
  ScatterFeatureRequirement: {}
  SubworkflowFeatureRequirement: {}

inputs:
  signal: string

outputs:
  outDS:
    type: string
    outputSource: seq_loop/outDS


steps:
  seq_loop:
    run: scatter_body.cwl
    in:
      dataset: signal
    out: [outDS]
    hints:
      - loop

The seq_loop step iterates scatter_body.cwl which defines arrays of parameters with the the param_ prefix and initial values; param_xxx and param_yyy. The parallel_work step is scattered over the arrays. The arrays are vertically sliced so that each execution of loop_main.cwl gets only one parameter set. The checkpoint step takes all outputs from the parallel_work step to update the arrays and make a decision to continue the loop.

scatter_body.cwl
cwlVersion: v1.0
class: Workflow

requirements:
  ScatterFeatureRequirement: {}
  SubworkflowFeatureRequirement: {}

inputs:
  dataset: string
  param_xxx:
    type: int[]
    default: [123, 456]
  param_yyy:
    type: float[]
    default: [0.456, 0.866]


outputs:
  outDS:
    type: string
    outputSource: /outDS


steps:
  parallel_work:
    run: loop_main.cwl
    scatter: [param_sliced_x, param_sliced_y]
    scatterMethod: dotproduct
    in:
      dataset: dataset
      param_sliced_x: param_xxx
      param_sliced_y: param_yyy
    out: [outDS]


  checkpoint:
    run: junction
    in:
      opt_inDS: parallel_work/outDS
      opt_exec:
        default: " echo '{\"xxx\": [345, 678], \"yyy\": [0.321, 0.567], \"to_continue\": false}' > results.json"
    out: []

The sliced values are passed to loop_main.cwl as local variables param_sliced_x and param_sliced_y in the sub-workflow. In each iteration the checkpoint step above updates the values in the arrays, so that %{blah} in opt_args is replaced with the updated value when the task is actually executed.

loop_main.cwl
cwlVersion: v1.0
class: Workflow

inputs:
  dataset:
    type: string
  param_sliced_x:
    type: int
  param_sliced_y:
    type: float

outputs:
  outDS:
    type: string
    outputSource: core/outDS


steps:
  core:
    run: prun
    in:
      opt_inDS: dataset
      opt_exec:
        default: "echo %IN %{sliced_x} %{sliced_y} > seed.txt"
      opt_args:
        default: "--outputs seed.txt --avoidVP"
    out: [outDS]


Using REANA

The following example offloads a part of workflow to REANA.

reana.cwl
cwlVersion: v1.0
class: Workflow

inputs: []

outputs:
  outDS:
    type: string
    outputSource: drai/outDS


steps:
  ain:
    run: prun
    in:
      opt_exec:
        default: "echo %RNDM:10 > seed_ain.txt"
      opt_args:
        default: "--outputs seed_ain.txt --avoidVP --nJobs 2"
    out: [outDS]

  twai:
    run: prun
    in:
      opt_exec:
        default: "echo %RNDM:10 > seed_twai.txt"
      opt_args:
        default: "--outputs seed_twai.txt --avoidVP --nJobs 3"
    out: [outDS]

  drai:
    run: reana
    in:
      opt_inDS:
        - ain/outDS
        - twai/outDS
      opt_exec:
        default: "echo %{DS1} %{DS2} > results.root"
      opt_args:
        default: "--outputs results.root"
      opt_containerImage:
        default: docker://busybox
    out: [outDS]

The ain and twai steps are executed on PanDA, while the drai step reads outputs from those steps and produce the final output on REANA. The run filed of a REANA step is reana. A REANA step is a simplified prun task composed of a single job that tells input dataset names to the payload through the execution string. The payload dynamically customizes the sub-workflow description to processes files in those datasets, submit it to REANA using secrets, and downloads the results from REANA.

Similarly to junctions, there are only a few parameters in the in section of a REANA step, as shown below.

Parameter

Corresponding prun option

opt_inDS

Input datasets (a list of strings)

opt_inDsType

Types of input datasets (a list of strings. optional)

opt_exec

The execution string

opt_containerImage

Container image name (string. optional)

opt_args

all other prun options except for listed above (string)

The actual dataset names are passed to the execution string through placeholders, %{DSn} in opt_exec, which represents the n-th dataset name. Note that the container image in opt_containerImage submits the sub-workflow description to REANA, so it is generally not the container image that processes input files. REANA steps are internally executed as prun tasks in PanDA, so that all prun options can be specified in opt_args.


Using Gitlab CI Pipelines

It is possible to integrate Gitlab CI pipelines in workflows. The following example executes the un step on PanDA, and then feeds the output to the deux step to trigger a Gitlab CI pipeline.

gitlab.cwl
cwlVersion: v1.0
class: Workflow

inputs: []

outputs:
  outDS:
    type: string
    outputSource: deux/outDS


steps:
  un:
    run: prun
    in:
      opt_exec:
        default: "echo 1 > out.txt"
      opt_args:
        default: "--outputs out.txt"
    out: [outDS]

  deux:
    run: gitlab
    in:
      opt_inDS:
        - un/outDS
      opt_site:
        default: "CERN"
      opt_ref:
        default: "master"
      opt_api:
        default: "https://gitlab.cern.ch/api/v4/projects"
      opt_projectID:
        default: 165337
      opt_triggerToken:
        default: "MY_T_TOKEN"
      opt_accessToken:
        default: "MY_A_TOKEN"
      opt_input_location:
        default: "/eos/atlas/atlascerngroupdisk/blah"
    out: [outDS]

A Gitlab CI step, defined with gitlab in the run filed, specifies the following parameters in the in section.

Parameter

Description

opt_inDS

Input datasets (a list of strings)

opt_input_location

The directory name where input files are downloaded (string. optional)

opt_site

The site name where the Gitlab CI is reachable (string)

opt_api

The API of the Gitlab projects (string, e.g. https://<hostname>/api/v4/projects)

opt_projectID

The project ID of the gitlab repository where pipelines run (int)

opt_ref

The branch name in the repository (string, e.g. master)

opt_triggerToken

The key of the trigger token uploaded to PanDA secrets (string)

opt_accessToken

The key of the access token uploaded to PanDA secrets (string)

To create trigger and access tokens, see Gitlab documentation pages; how to create a trigger token and how to create an access token. Access tokens require only the read_api scope with the Guest role. Note that trigger and access tokens need to be uploaded to PanDA secrets with keys specified in opt_triggerToken and ``opt_accessToken, before submitting workflows.



Workflow examples with Snakemake

Snakemake allows users to describe workflows with Python language syntax and JSON configuration files.


Common JSON configuration file for following examples

config.json
{
  "output": "outDS",
  "signal": "mc16_valid:mc16_valid.900248.PG_singlepion_flatPt2to50.simul.HITS.e8312_s3238_tid26378578_00",
  "background": "mc16_5TeV.361238.Pythia8EvtGen_A3NNPDF23LO_minbias_inelastic_low.merge.HITS.e6446_s3238_s3250/",
  "signals": [
    "mc16_valid:mc16_valid.900248.PG_singlepion_flatPt2to50.simul.HITS.e8312_s3238_tid26378578_00",
    "valid1.427080.Pythia8EvtGen_A14NNPDF23LO_flatpT_Zprime.simul.HITS.e5362_s3718_tid26356243_00"
  ],
  "backgrounds": [
    "mc16_5TeV.361238.Pythia8EvtGen_A3NNPDF23LO_minbias_inelastic_low.merge.HITS.e6446_s3238_s3250/",
    "mc16_5TeV:mc16_5TeV.361239.Pythia8EvtGen_A3NNPDF23LO_minbias_inelastic_high.merge.HITS.e6446_s3238_s3250/"
  ]
}


Simple task chain

simple_chain/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection SmkRuleRedeclaration
rule all:
    input: config["output"]

# noinspection SmkRuleRedeclaration
rule top:
    params:
        opt_exec="echo %RNDM:10 > seed.txt",
        opt_args="--outputs seed.txt --nJobs 3 --avoidVP"
    output: f"{config['output']}.top"
    shell: "prun"

# noinspection SmkRuleRedeclaration
rule bottom:
    params:
        opt_inDS=rules.top.output[0],
        opt_exec="echo %IN > results.root",
        opt_args="--outputs results.root --forceStaged --avoidVP"
    input: rules.top.output
    output: config["output"]
    shell: "prun"

# noinspection directives can be used if a workflow is described using PyCharm IDE with SnakeCharm plugin. In other case these directives can be omitted. To use parent output as input the user should specify rules.${parent_rule_name}.output[0] as value for the corresponding parameter (opt_inDS) and specify rules.${parent_rule_name}.output for input keyword.


More complicated chain

sig_bg_comb/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of, param_exp

# noinspection SmkRuleRedeclaration
rule all:
    params:
        signal=config["signal"],
        background=config["background"]
    input: config["output"]

rule make_signal:
    params:
        opt_inDS=param_of("signal",source=rules.all),
        opt_containerImage="docker://busybox",
        opt_exec="echo %IN > abc.dat; echo 123 > def.zip",
        opt_args="--outputs abc.dat,def.zip --nFilesPerJob 5"
    output: f"{config['output']}.signal"
    shell: "prun"

rule make_background_1:
    params:
        opt_inDS=param_of("background",source=rules.all),
        opt_exec="echo %IN > opq.root; echo %IN > xyz.pool",
        opt_args="--outputs opq.root,xyz.pool --nGBPerJob 10"
    output: f"{config['output']}.background"
    shell: "prun"

rule premix:
    params:
        opt_inDS=rules.make_signal.output[0],
        opt_inDsType="def.zip",
        opt_secondaryDSs=[rules.make_background_1.output[0]],
        opt_secondaryDsTypes=["xyz.pool"],
        opt_exec="echo %IN %IN2 > klm.root",
        opt_args=param_exp("--outputs klm.root --secondaryDSs IN2:2:%{SECDS1}")
    input: rules.make_signal.output,rules.make_background_1.output
    output: f"{config['output']}.premix"
    shell: "prun"

rule generate_some:
    params:
        opt_exec="echo %RNDM:10 > gen.root",
        opt_args="--outputs gen.root --nJobs 10"
    output: f"{config['output']}.generate_some"
    shell: "prun"

rule make_background_2:
    params:
        opt_inDS=param_of("background",source=rules.all),
        opt_containerImage="docker://alpine",
        opt_secondaryDSs=[rules.generate_some.output[0]],
        opt_secondaryDsTypes=["gen.root"],
        opt_exec="echo %IN > ooo.root; echo %IN2 > jjj.txt",
        opt_args=param_exp("--outputs ooo.root,jjj.txt --secondaryDSs IN2:2:%{SECDS1}")
    input: rules.generate_some.output
    output: f"{config['output']}.background_2"
    shell: "prun"

rule combine:
    params:
        opt_inDS=rules.make_signal.output[0],
        opt_inDsType="abc.dat",
        opt_secondaryDSs=[rules.premix.output[0], rules.make_background_2.output[0]],
        opt_secondaryDsTypes=["klm.root", "ooo.root"],
        opt_exec="echo %IN %IN2 %IN3 > aaa.root",
        opt_args=param_exp("--outputs aaa.root --secondaryDSs IN2:2:%{SECDS1},IN3:5:%{SECDS2}")
    input: rules.make_signal.output,rules.premix.output,rules.make_background_2.output
    output: config["output"]
    shell: "prun"

To use parameter from another step the user should use the helper function param_of from pandaserver.workflow.snakeparser.utils library. For example, param_of("${name_of_parameter}",source=rules.all), where ${name_of_parameter} should be replaced by actual parameter name and source refers to the parameter step. If step parameter value contains patterns like %{SECDS1} the helper function param_exp should be used. To use multiple inputs (multiple parents) input keyword should contain all references separated by commas.


Sub-workflow and parallel execution with scatter

merge_many/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of

# noinspection SmkRuleRedeclaration
rule all:
    params:
        signals=config["signals"],
        backgrounds=config["backgrounds"]
    input: config["output"]

# noinspection SmkUnrecognizedSection
rule many_sig_bg_comb:
    params:
        signal=param_of("signals",source=rules.all),
        background=param_of("backgrounds",source=rules.all)
    scatter: [param_of("signal"), param_of("background")]
    output: f"{config['output']}.many_sig_bg_comb"
    shell: "../sig_bg_comb/Snakefile"

# noinspection SmkRuleRedeclaration
rule merge:
    params:
        opt_inDS=rules.many_sig_bg_comb.output[0],
        opt_exec="python merge.py --type aaa --level 3 %IN",
        opt_args="--outputs merged.root"
    input: rules.many_sig_bg_comb.output
    output: config["output"]
    shell: "prun"


Using Athena

athena/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection SmkRuleRedeclaration
rule all:
    input: config["output"]

rule first:
    params:
        opt_exec="Gen_tf.py --maxEvents=1000 --skipEvents=0 --ecmEnergy=5020 --firstEvent=1 --jobConfig=860059 " +
                 "--outputEVNTFile=evnt.pool.root --randomSeed=4 --runNumber=860059 --AMITag=e8201",
        opt_args="--outputs evnt.pool.root --nJobs 3",
        opt_useAthenaPackages=True
    output: f"{config['output']}.first"
    shell: "prun"

rule second:
    params:
        opt_inDS=rules.first.output[0],
        opt_exec="echo %IN > results.txt",
        opt_args="--outputs results.txt"
    input: rules.first.output
    output: f"{config['output']}.second"
    shell: "prun"

rule third:
    params:
        opt_inDS=rules.second.output[0],
        opt_exec="echo %IN > poststep.txt",
        opt_args="--outputs poststep.txt --athenaTag AnalysisBase,21.2.167",
        opt_useAthenaPackages=True
    input: rules.second.output
    output: config["output"]
    shell: "prun"


Conditional workflow

cond/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of

# noinspection SmkRuleRedeclaration
rule all:
    input: f"{config['output']}.bottom_ok",f"{config['output']}.bottom_ng"

# noinspection SmkRuleRedeclaration
rule top:
    params:
        opt_exec="echo %RNDM:10 > seed.txt",
        opt_args="--outputs seed.txt --nJobs 2"
    output: f"{config['output']}.top"
    shell: "prun"

# noinspection SmkRuleRedeclaration, SmkUnrecognizedSection
rule bottom_OK:
    params:
        opt_inDS=rules.top.output[0],
        opt_exec="echo %IN > results.txt",
        opt_args="--outputs results.txt --forceStaged"
    input: rules.top.output
    output: f"{config['output']}.bottom_ok"
    when: str(param_of("opt_inDS"))
    shell: "prun"

# noinspection SmkRuleRedeclaration, SmkUnrecognizedSection
rule bottom_NG:
    params:
        opt_inDS=rules.top.output[0],
        opt_exec="echo hard luck > bad.txt",
        opt_args="--outputs bad.txt"
    input: rules.top.output
    output: f"{config['output']}.bottom_ng"
    when: f"!{param_of('opt_inDS')}"
    shell: "prun"

when keyword was added as Snakemake language extension to define step conditions.


Involving hyperparameter optimization

hpo/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of

# noinspection SmkRuleRedeclaration
rule all:
    input: config["output"]

rule pre_proc:
    params:
        opt_exec="echo %RNDM:10 > seed.txt",
        opt_args="--outputs seed.txt --nJobs 2 --avoidVP"
    output: f"{config['output']}.pre_proc"
    shell: "prun"

# noinspection SmkUnrecognizedSection
rule main_hpo:
    params:
        opt_trainingDS=rules.pre_proc.output[0],
        opt_args="--loadJson conf.json"
    input: rules.pre_proc.output
    output: f"{config['output']}.main_hpo"
    when: str(param_of("opt_trainingDS"))
    shell: "prun"

# noinspection SmkUnrecognizedSection
rule post_proc:
    params:
        opt_inDS=rules.main_hpo.output[0],
        opt_exec="echo %IN > anal.txt",
        opt_args="--outputs anal.txt --nFilesPerJob 100 --avoidVP"
    input: rules.main_hpo.output
    output: config["output"]
    when: str(param_of("opt_inDS"))
    shell: "prun"


Loops in workflows

loop/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection SmkRuleRedeclaration
rule all:
    input: config["output"]

rule work_start:
    params:
        opt_exec="echo %RNDM:10 > seed.txt",
        opt_args="--outputs seed.txt --nJobs 2 --avoidVP"
    output: f"{config['output']}.work_start"
    shell: "prun"

# noinspection SmkUnrecognizedSection, SmkRuleRedeclaration
rule work_loop:
    params:
        dataset=rules.work_start.output[0]
    input: rules.work_start.output
    loop: True
    output: f'{config["output"]}.work_loop'
    shell: "../loop_body/Snakefile"

rule work_end:
    params:
        opt_inDS=rules.work_loop.output[0],
        opt_exec="echo %IN > results.root",
        opt_args="--outputs results.root --forceStaged"
    input: rules.work_loop.output
    output: config["output"]
    shell: "prun"
loop_body/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of, param_exp

# noinspection SmkRuleRedeclaration
rule all:
    params:
        dataset=None,
        param_xxx=123,
        param_yyy=0.456
    input: config["output"]

rule inner_work_top:
    params:
        opt_inDS=param_of('dataset',source=rules.all),
        opt_exec=param_exp("echo %IN %{xxx} %{i} > seed.txt"),
        opt_args="--outputs seed.txt --avoidVP"
    output: f"{config['output']}.inner_work_top"
    shell: "prun"

rule inner_work_bottom:
    params:
        opt_inDS=rules.inner_work_top.output[0],
        opt_exec=param_exp("echo %IN %{yyy} > results.root"),
        opt_args="--outputs results.root --forceStaged"
    input: rules.inner_work_top.output
    output: f"{config['output']}.inner_work_bottom"
    shell: "prun"

# noinspection SmkRuleRedeclaration
rule checkpoint:
    params:
        opt_inDS=[rules.inner_work_top.output[0], rules.inner_work_bottom.output[0]],
        opt_exec=param_exp("echo %{DS1} %{DS2} aaaa; echo '{\"x\": 456, \"to_continue\": false}' > results.json")
    input: rules.inner_work_top.output,rules.inner_work_bottom.output
    output: config["output"]
    shell: "junction"

loop keyword was added as Snakemake language extension to define step as a loop.


Loop + scatter

Running multiple loops in parallel

multi_loop/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of

# noinspection SmkRuleRedeclaration
rule all:
    params:
        signals=[]
    input: config["output"]

# noinspection SmkUnrecognizedSection, SmkRuleRedeclaration
rule work_loop:
    params:
        dataset=param_of('signals',source=rules.all)
    scatter: [param_of('dataset')]
    loop: True
    output: f'{config["output"]}.work_loop'
    shell: "../loop_body/Snakefile"

# noinspection SmkRuleRedeclaration
rule merge:
    params:
        opt_inDS=rules.work_loop.output[0],
        opt_exec="echo %IN > results.root",
        opt_args="--outputs results.root --forceStaged"
    output: config["output"]
    shell: "prun"

A sub-workflow filename should be specified in shell keyword.


Parallel execution of multiple sub-workflows in a single loop

sequential_loop/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of

# noinspection SmkRuleRedeclaration
rule all:
    params:
        signal=None
    input: config["output"]

# noinspection SmkUnrecognizedSection
rule seq_loop:
    params:
        dataset=param_of('signal',source=rules.all)
    loop: True
    output: config["output"]
    shell: "../scatter_body/Snakefile"
scatter_body/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of, param_exp

# noinspection SmkRuleRedeclaration
rule all:
    params:
        dataset=None,
        param_xxx=[123, 456],
        param_yyy=[0.456, 0.866]
    input: config["output"]

# noinspection SmkRuleRedeclaration, SmkUnrecognizedSection
rule parallel_work:
    params:
        dataset=param_of('dataset',source=rules.all),
        param_sliced_x=param_of('param_xxx',source=rules.all),
        param_sliced_y=param_of('param_yyy',source=rules.all)
    scatter: [param_of('param_sliced_x'), param_of('param_sliced_y')]
    output: f'{config["output"]}.parallel_work'
    shell: "../loop_main/Snakefile"

# noinspection SmkRuleRedeclaration
rule checkpoint:
    params:
        opt_inDS=rules.parallel_work.output[0],
        opt_exec=param_exp(
            "echo '{\"xxx\": [345, 678], \"yyy\": [0.321, 0.567], \"to_continue\": false}' > results.json"
        )
    output: config["output"]
    input: rules.parallel_work.output
    shell: "junction"
loop_main/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_of

# noinspection SmkRuleRedeclaration
rule all:
    params:
        dataset=None,
        param_sliced_x=None,
        param_sliced_y=None
    input: config["output"]

rule core:
    params:
        opt_inDS=param_of("dataset",source=rules.all),
        opt_exec=lambda wildcards: "echo %IN %{sliced_x} %{sliced_y} > seed.txt",
        opt_args="--outputs seed.txt --avoidVP"
    output: config["output"]
    shell: "prun"

Using REANA

reana/Snakefile
# noinspection PyUnresolvedReferences
configfile: "../config.json"

# noinspection PyUnresolvedReferences
from pandaserver.workflow.snakeparser.utils import param_exp

# noinspection SmkRuleRedeclaration
rule all:
    input: config["output"]

rule ain:
    params:
        opt_exec="echo %RNDM:10 > seed_ain.txt",
        opt_args="--outputs seed_ain.txt --avoidVP --nJobs 2"
    output: f"{config['output']}.ain"
    shell: "prun"

rule twai:
    params:
        opt_exec="echo %RNDM:10 > seed_twai.txt",
        opt_args="--outputs seed_twai.txt --avoidVP --nJobs 3"
    output: f"{config['output']}.twai"
    shell: "prun"

rule drai:
    params:
        opt_inDS=[rules.ain.output[0], rules.twai.output[0]],
        opt_exec=param_exp("echo %{DS1} %{DS2} > results.root"),
        opt_args="--outputs results.root",
        opt_containerImage="docker://busybox"
    input: rules.ain.output,rules.twai.output
    output: config["output"]
    shell: "reana"


Debugging locally

Workflow descriptions can be error-prone. It is better to check workflow descriptions before submitting them. pchain has the --check option to verify the workflow description locally. You just need to add the --check option when running pchain. For example,

pchain --cwl test.cwl --yaml dummy.yaml --outDS user.<your_nickname>.blah --check

which should give a message like

INFO : uploading workflow sandbox
INFO : check workflow user.tmaeno.c63e2e54-df9e-402a-8d7b-293b587c4559
INFO : messages from the server

internally converted as follows

ID:0 Name:top Type:prun
  Parent:
  Input:
     opt_args: --outputs seed.txt --nJobs 2 --avoidVP
     opt_exec: echo %RNDM:10 > seed.txt
  Output:
...

INFO : Successfully verified workflow description

Your workflow description is sent to the server to check whether the options in opt_args are correct, dependencies among steps are valid, and input and output data are properly resolved.



Monitoring



Bookkeeping

pbook provides the following commands for workflow bookkeeping

show_workflow
kill_workflow
retry_workflow
finish_workflow
pause_workflow
resume_workflow

Please refer to the pbook documentation for their details.