flowserv.controller.worker.manager module

Factory for workers that implement the flowserv.controller.worker.base.Worker class. Workers are used to initiate and control the excution of workflow steps using different execution backends and implementations.

Instances of worker classes are created from a configuration specifications that follow the following schema:

definitions:
  keyValuePair:
    description: Key-value pair object.
    properties:
      key:
        description: Value key.
        type: string
      value:
        anyOf:
        - type: integer
        - type: string
        description: Scalar value associated with the key.
    required:
    - key
    - value
    type: object
  workerSpec:
    description: Specification for a worker engine instance.
    properties:
      env:
        description: Key-value pairs for environment variables.
        items:
          $ref: '#/definitions/keyValuePair'
        type: array
      name:
        description: Unique worker identifier.
        type: string
      type:
        description: Worker type identifier
        enum:
        - code
        - docker
        - subprocess
        type: string
      vars:
        description: Key-value pairs for template string variables.
        items:
          $ref: '#/definitions/keyValuePair'
        type: array
      volume:
        description: Storage volume the worker has access to.
        type: string
    required:
    - name
    - type
    type: object
flowserv.controller.worker.manager.Code(identifier: Optional[str] = None, volume: Optional[str] = None) Dict

Get base configuration serialization for a code worker.

Parameters
  • identifier (string, default=None) – Unique worker identifier. If no identifier is given, a new unique identifier will be generated.

  • volume (string, default=None) – Identifier for the storage volume that the worker has access to.

Return type

dict

flowserv.controller.worker.manager.Docker(identifier: Optional[str] = None, variables: Optional[Dict] = None, env: Optional[Dict] = None, volume: Optional[str] = None) Dict

Get base configuration for a subprocess worker with the given optional arguments.

Parameters
  • identifier (string, default=None) – Unique worker identifier. If no identifier is given, a new unique identifier will be generated.

  • variables (dict, default=None) – Mapping with default values for placeholders in command template strings.

  • env (dict, default=None) – Default settings for environment variables when executing workflow steps. These settings can get overridden by step-specific settings.

  • volume (string, default=None) – Identifier for the storage volume that the worker has access to.

Return type

dict

flowserv.controller.worker.manager.Notebook(identifier: Optional[str] = None, volume: Optional[str] = None) Dict

Get base configuration serialization for a notebook worker.

Parameters
  • identifier (string, default=None) – Unique worker identifier. If no identifier is given, a new unique identifier will be generated.

  • volume (string, default=None) – Identifier for the storage volume that the worker has access to.

Return type

dict

flowserv.controller.worker.manager.Subprocess(identifier: Optional[str] = None, variables: Optional[Dict] = None, env: Optional[Dict] = None, volume: Optional[str] = None) Dict

Get base configuration for a subprocess worker with the given optional arguments.

Parameters
  • identifier (string, default=None) – Unique worker identifier. If no identifier is given, a new unique identifier will be generated.

  • variables (dict, default=None) – Mapping with default values for placeholders in command template strings.

  • env (dict, default=None) – Default settings for environment variables when executing workflow steps. These settings can get overridden by step-specific settings.

  • volume (string, default=None) – Identifier for the storage volume that the worker has access to.

Return type

dict

class flowserv.controller.worker.manager.WorkerPool(workers: Optional[List[Dict]] = [], managers: Optional[Dict] = None)

Bases: object

Manager for a pool of worker instances. Workers are responsible for the initiation and control of the execution of steps in a serial workflow.

Workers are instantiated from a dictionary serializations that follows the workerSpec schema defined in the schema.json file.

get(step: flowserv.model.workflow.step.WorkflowStep) flowserv.controller.worker.base.Worker

Get the instance of the worker that is associated with the given workflow step.

If no worker specification exists for the given step a default worker is returned. The type of the default worker depends on the type of the workflow step. For code steps, currently only one type of worker exists. For container steps, a sub-process worker is used as the default worker.

Parameters

step (flowserv.model.workflow.step.WorkflowStep) – Step in a serial workflow.

Return type

flowserv.controller.worker.base.Worker

get_default_worker(step: flowserv.model.workflow.step.WorkflowStep) flowserv.controller.worker.base.Worker

Return the default worker depending on the type of the given workflow step.

Parameters

step (flowserv.model.workflow.step.WorkflowStep) – Step in a serial workflow.

Return type

flowserv.controller.worker.base.Worker

flowserv.controller.worker.manager.WorkerSpec(worker_type: str, identifier: Optional[str] = None, variables: Optional[Dict] = None, env: Optional[Dict] = None, volume: Optional[str] = None) Dict

Get a serialization for a worker specification.

Parameters
  • worker_type (string) – Unique worker type identifier.

  • identifier (string, default=None) – Unique worker identifier. If no identifier is given, a new unique identifier will be generated.

  • variables (dict, default=None) – Mapping with default values for placeholders in command template strings.

  • env (dict, default=None) – Default settings for environment variables when executing workflow steps. These settings can get overridden by step-specific settings.

  • volume (string, default=None) – Identifier for the storage volume that the worker has access to.

Return type

dict

flowserv.controller.worker.manager.create_worker(doc: Dict) flowserv.controller.worker.base.Worker

Factory pattern for workers.

Create an instance of a worker implementation from a given worker serialization.

Parameters

doc (dict) – Dictionary serialization for a worker.

Return type

flowserv.controller.worker.base.Worker

flowserv.controller.worker.manager.default_container_worker = <flowserv.controller.worker.subprocess.SubprocessWorker object>

Serialization label for worker identifier.