flowserv.model.run module

The run manager is used to create, delete, query, and update information about workflow runs in an underlying database.

class flowserv.model.run.RunManager(session: sqlalchemy.orm.session.Session, fs: flowserv.volume.base.StorageVolume)

Bases: object

The run manager maintains workflow runs. It provides methods the create, delete, and retrieve runs. the manager also provides the functionality to update the state of workflow runs.

create_run(workflow=None, group=None, arguments=None, runs=None)

Create a new entry for a run that is in pending state. Returns a handle for the created run.

A run is either created for a group (i.e., a grop submission run) or for a workflow (i.e., a post-processing run). Only one of the two parameters is expected to be None.

Parameters
  • workflow (flowserv.model.base.WorkflowObject, default=None) – Workflow handle if this is a post-processing run.

  • group (flowserv.model.base.GroupObject) – Group handle if this is a group sumbission run.

  • arguments (list) – List of argument values for parameters in the template.

  • runs (list(string), default=None) – List of run identifier that define the input for a post-processing run.

Return type

flowserv.model.base.RunObject

Raises
delete_obsolete_runs(date: str, state: Optional[str] = None) int

Delete all workflow runs that were created before the given date. The optional state parameter allows to further restrict the list of deleted runs to those that were created before the given date and that are in the give state.

Parameters
  • date (string) – Filter for run creation date.

  • state (string, default=None) – Filter for run state.

Return type

int

delete_run(run_id)

Delete the entry for the given run from the underlying database.

Parameters

run_id (string) – Unique run identifier

Raises

flowserv.error.UnknownRunError

get_run(run_id: str) flowserv.model.base.RunObject

Get handle for the given run from the underlying database. Raises an error if the run does not exist.

Parameters

run_id (string) – Unique run identifier

Return type

flowserv.model.base.RunObject

Raises

flowserv.error.UnknownRunError

get_runarchive(run_id: str) flowserv.model.files.FileHandle

Get tar archive containing all result files for a given workflow run. Raises UnknownRunError if the run is not in SUCCESS state.

Parameters

run_id (string) – Unique run identifier.

Return type

flowserv.model.files.FileHandle

Raises

flowserv.error.UnknownRunError

get_runfile(run_id: str, file_id: Optional[str] = None, key: Optional[str] = None) flowserv.model.files.FileHandle

Get handle and file object for a given run result file. The file is either identified by the unique file identifier or the file key. Raises an error if the specified file does not exist.

Parameters
  • run_id (string) – Unique run identifier.

  • file_id (string) – Unique file identifier.

Return type

flowserv.model.files.FileHandle

Raises
list_obsolete_runs(date: str, state: Optional[str] = None) List[flowserv.model.base.RunObject]

List all workflow runs that were created before the given date. The optional state parameter allows to further restrict the list of returned runs to those that were created before the given date and that are in the give state.

Parameters
  • date (string) – Filter for run creation date.

  • state (string, default=None) – Filter for run state.

Return type

list(flowserv.model.base.RunObject)

list_runs(group_id, state=None)

Get list of run handles for all runs that are associated with a given workflow group.

Parameters
  • group_id (string, optional) – Unique workflow group identifier

  • state (string or list(string), default=None) – Run state query. If given, only those runs that are in the given state(s) will be returned.

Return type

list(flowserv.model.base.RunObject)

update_run(run_id: str, state: flowserv.model.workflow.state.WorkflowState, runstore: Optional[flowserv.volume.base.StorageVolume] = None)

Update the state of the given run. This method does check if the state transition is valid. Transitions are valid for active workflows, if the transition is (a) from pending to running or (b) to an inactive state. Invalid state transitions will raise an error.

For successful runs a reference to the storage volume and the run directory containing the result files has to be given.

Parameters
  • run_id (string) – Unique identifier for the run

  • state (flowserv.model.workflow.state.WorkflowState) – New workflow state

  • runstore (flowserv.volume.base.StorageVolume, default=None) – Storage volume containing the run (result) files for a successful workflow run.

Return type

flowserv.model.base.RunObject

Raises
flowserv.model.run.read_run_results(run: flowserv.model.base.RunObject, schema: flowserv.model.template.schema.ResultSchema, runstore: flowserv.volume.base.StorageVolume)

Read the run results from the result file that is specified in the workflow result schema. If the file is not found we currently do not raise an error.

Parameters
  • run (flowserv.model.base.RunObject) – Handle for a workflow run.

  • schema (flowserv.model.template.schema.ResultSchema) – Workflow result schema specification that contains the reference to the result file key.

  • runstore (flowserv.volume.base.StorageVolume) – Storage volume containing the run (result) files for a successful workflow run.

flowserv.model.run.store_run_files(run: flowserv.model.base.RunObject, files: List[str], source: flowserv.volume.base.StorageVolume, target: flowserv.volume.base.StorageVolume) List[flowserv.model.base.RunFile]

Create list of output files for a successful run. The list of files depends on whether files are specified in the workflow specification or not. If files are specified only those files are included in the returned lists. Otherwise, all result files that are listed in the run state are returned.

Parameters
  • run (flowserv.model.base.RunObject) – Handle for a workflow run.

  • files (list of string) – List of result files for a successful workflow run.

  • source (flowserv.volume.base.StorageVolume) – Storage volume containing the run (result) files for a successful workflow run.

  • target (flowserv.volume.base.StorageVolume) – Storage volume for persiting run result files.

Return type

list of RunObject, list of string

flowserv.model.run.validate_state_transition(current_state: str, target_state: str, valid_states: List[str])

Validate that a transition from current state to target state is permitted. The list of valid state identifier determines the current states that are permitted to transition to the target state. If an invalid transition is detected an error is raised.

Parameters
  • current_state (str) – Identifier for the current run state.

  • target_state (str) – Identifier for the target workflow state.

  • valid_states (list of string) – List of valid source states for the anticipated target state.