flowserv.controller.serial.workflow.base module

Serial workflow that executes a sequence of workflow steps.

Serial Workflows are either created from workflow templates that follow the syntax of the REANA serial workflow specifications or explicitly within Python scripts.

class flowserv.controller.serial.workflow.base.SerialWorkflow(steps: Optional[List[flowserv.model.workflow.step.WorkflowStep]] = None, parameters: Optional[List[flowserv.model.parameter.base.Parameter]] = None, workers: Optional[flowserv.controller.worker.manager.WorkerPool] = None)

Bases: object

A serial workflow represents a sequence of flowserv.model.workflow.step.WorkflowStep) steps that are executed in order for a given set of input parameters.

At this point we distinguish two types of workflow steps: flowserv.model.workflow.step.CodeStep and flowserv.model.workflow.step.ContainerStep.

A flowserv.model.workflow.step.CodeStep is executed within the same thread and environment as the flowserv engine. A flowserv.model.workflow.step.ContainerStep is executed in a separate container-like environment. The execution environment is represented by a flowserv.controller.worker.base.ContainerWorker that is associated in the flowserv.controller.worker.manager.WorkerPool with the environment identifier of the container step.

add_code_step(identifier: str, func: Callable, arg: Optional[str] = None, varnames: Optional[Dict] = None, inputs: Optional[List[str]] = None, outputs: Optional[List[str]] = None) flowserv.controller.serial.workflow.base.SerialWorkflow

Append a code step to the serial workflow.

Parameters
  • identifier (str) – Unique workflow step identifier.

  • func (callable) – Python function that is executed by the workflow step.

  • arg (string, default=None) – Name of the variable under which the function result is stored in the workflow arguments. If None, the function result is discarded.

  • varnames (dict, default=None) – Mapping of function argument names to names of workflow arguments. This mapping is used when generating the arguments for the executed function. By default it is assumed that the names of arguments for the given function correspond to the names in the argument dictionary for the workflow. This mapping provides the option to map names in the function signature that do not occur in the arguments dictionary to argument names that are in the dictionary.

  • inputs (list of string, default=None) – List of files that are required by the workflow step as inputs.

  • outputs (list of string, default=None) – List of files that are generated by the workflow step as outputs.

Return type

flowserv.controller.serial.workflow.base.SerialWorkflow

add_container_step(identifier: str, image: str, commands: Optional[List[str]] = None, env: Optional[Dict] = None, inputs: Optional[List[str]] = None, outputs: Optional[List[str]] = None) flowserv.controller.serial.workflow.base.SerialWorkflow

Append a container step to the serial workflow.

Parameters
  • identifier (str) – Unique workflow step identifier.

  • image (string, default=None) – Execution environment identifier.

  • commands (list(string), default=None) – List of command line statements.

  • env (dict, default=None) – Environment variables for workflow step execution.

  • inputs (list of string, default=None) – List of files that are required by the workflow step as inputs.

  • outputs (list of string, default=None) – List of files that are generated by the workflow step as outputs.

Return type

flowserv.controller.serial.workflow.base.SerialWorkflow

add_parameter(parameter: flowserv.model.parameter.base.Parameter) flowserv.controller.serial.workflow.base.SerialWorkflow

Add a parameter to the internal index of workflow template parameters.

Parameter identifier are expected to be unique. If a parameter with the same identifier as the given parameter already exists in the internal parameter index it will be replaced with the given parameter.

Returns a reference to the object itself.

Parameters

parameter (flowserv.model.parameter.base.Parameter) – Workflow termplate parameter that is added to the internal parameter index.

Return type

flowserv.controller.serial.workflow.base.SerialWorkflow

run(arguments: Dict, workers: Optional[flowserv.controller.worker.manager.WorkerPool] = None, volumes: Optional[flowserv.volume.manager.VolumeManager] = None) flowserv.controller.serial.workflow.result.RunResult

Execute workflow for the given set of input arguments.

Executes workflow steps in sequence. Terminates early if the execution of a workflow step returns a non-zero value. Uses the given worker factory to create workers for steps that are of class flowserv.model.workflow.step.ContainerStep.

Collects results for all executed steps and returns them in the flowserv.controller.serial.workflow.result.RunResult.

Parameters
  • arguments (dict) – User-provided arguments for the workflow run.

  • workers (flowserv.controller.worker.manager.WorkerPool, default=None) – Factory for flowserv.model.workflow.step.ContainerStep steps. Uses the default worker for all container steps if None.

  • volumes (flowserv.volume.manager.VolumeManager) – Manager for storage volumes that are used by the different workers.

Return type

flowserv.controller.worker.result.RunResult