flowserv.controller.serial.engine.base module
Implementation for a workflow controller backend that is capable of running serial workflow specifications.
This controller allows execution in workflow steps within separate sub-processes.
All workflow run files will be maintained in a (temporary) directory on the storage volume that is associated with the workflow engine. The base folder for these run files can be configured by setting the environment variable FLOWSERV_SERIAL_RUNSDIR.
- class flowserv.controller.serial.engine.base.SerialWorkflowEngine(service: flowserv.service.api.APIFactory, fs: Optional[flowserv.volume.base.StorageVolume] = None, config: Optional[Dict] = None)
Bases:
flowserv.controller.base.WorkflowControllerThe workflow engine is used to execute workflow templates for a given set of arguments. Each workflow is executed as a serial workflow. The individual workflow steps can be executed in aVolume(env separate process on request.
- cancel_run(run_id: str)
Request to cancel execution of the given run. This method is usually called by the workflow engine that uses this controller for workflow execution. It is therefore assumed that the state of the workflow run is updated accordingly by the caller.
- Parameters
run_id (string) – Unique run identifier
- exec_workflow(run: flowserv.model.base.RunObject, template: flowserv.model.template.base.WorkflowTemplate, arguments: Dict, staticfs: flowserv.volume.base.StorageVolume, config: Optional[Dict] = None) Tuple[flowserv.model.workflow.state.WorkflowState, flowserv.volume.base.StorageVolume]
Initiate the execution of a given workflow template for a set of argument values. This will start a new process that executes a serial workflow asynchronously.
The serial workflow engine executes workflows on the local machine and therefore uses the file system to store temporary run files. The path to the run folder is returned as the second value in the result tuple. The first value in the result tuple is the state of the workflow after the process is stated. If the workflow is executed asynchronously the state will be RUNNING. Otherwise, the run state should be an inactive state.
The set of arguments is not further validated. It is assumed that the validation has been performed by the calling code (e.g., the run service manager).
The optional configuration object can be used to override the worker configuration that was provided at object instantiation. Expects a dictionary with an element workers that contains a mapping of container identifier to a container worker configuration object.
If the state of the run handle is not pending, an error is raised.
- Parameters
run (flowserv.model.base.RunObject) – Handle for the run that is being executed.
template (flowserv.model.template.base.WorkflowTemplate) – Workflow template containing the parameterized specification and the parameter declarations.
arguments (dict) – Dictionary of argument values for parameters in the template.
staticfs (flowserv.volume.base.StorageVolume) – Storage volume that contains the static files from the workflow template.
config (dict, default=None) – Optional object to overwrite the worker configuration settings.
- Return type
flowserv.model.workflow.state.WorkflowState, flowserv.volume.base.StorageVolume
- flowserv.controller.serial.engine.base.callback_function(result, lock, tasks, service)
Callback function for executed tasks.Removes the task from the task index and updates the run state in the underlying database.
- Parameters
result ((string, dict)) – Tuple of task identifier and serialized state of the workflow run
lock (multiprocessing.Lock) – Lock for concurrency control
tasks (dict) – Task index of the backend
service (contextlib.contextmanager) – Context manager to create an instance of the service API.
- flowserv.controller.serial.engine.base.run_workflow(run_id: str, state: flowserv.model.workflow.state.WorkflowState, output_files: List[str], steps: List[flowserv.model.workflow.step.ContainerStep], arguments: Dict, volumes: flowserv.volume.manager.VolumeManager, workers: flowserv.controller.worker.manager.WorkerPool) Tuple[str, str, Dict]
Execute a list of workflow steps synchronously.
This is the worker function for asynchronous workflow executions. Returns a tuple containing the run identifier, the folder with the run files, and a serialization of the workflow state.
- Parameters
run_id (string) – Unique run identifier
state (flowserv.model.workflow.state.WorkflowState) – Current workflow state (to access the timestamps)
output_files (list(string)) – Relative path of output files that are generated by the workflow run
steps (list of flowserv.model.workflow.step.WorkflowStep) – Steps in the serial workflow that are executed in the given context.
arguments (dict) – Dictionary of argument values for parameters in the template.
volumes (flowserv.volume.manager.VolumeManager) – Factory for storage volumes.
workers (flowserv.controller.worker.manager.WorkerPool) – Factory for
flowserv.model.workflow.step.ContainerStepsteps.
- Return type
(string, string, dict)
- flowserv.controller.serial.engine.base.volume_manager(specs: List[Dict], runstore: flowserv.volume.base.StorageVolume, runfiles: List[str]) flowserv.volume.manager.VolumeManager
Create an instance of the storage volume manager for a workflow run.
Combines the volume store specifications in the workflow run confguration with the storage volume for the workflow run files.
- Parameters
specs (list of dict) – List of specifications (dictionary serializations) for storage volumes.
runstore (flowserv.volume.base.StorageVolume) – Storage volume for run files.
runfiles (list of string) – List of files that have been copied to the run store.
- Return type