Arithmetic Example

Utilities

Ids for workflows and activities are created behind the scenes by inspecting inputs. get_id_from_input beneath assumes that the input has an “id” key and appends this value to the class name of the workflow or activity:

import flowser

@classmethod
def get_id_from_input(cls, input):
    return ".".join([cls.name, input['id']])

This function sets version and task list properties on decorated classes. It also makes classes use our get_id_from_input function defined earlier:

def auto_configured(cls):
    """Class decorator for workflows and activities. """
    cls.version = '1.0.0'
    cls.task_list = '-'.join([cls.name, cls.version])
    cls.get_id_from_input = get_id_from_input
    return cls

Domain, Workflow and Activities

Definition of a workflow that performs arithmetic:

@auto_configured
class ArithmeticWorkflow(flowser.types.Workflow):
    """Workflow for performing arithmetics. """
    name = 'ArithmeticWorkflow'
    execution_start_to_close_timeout = '600'
    task_start_to_close_timeout = '120'
    child_policy = 'TERMINATE'

Definition of two activities. These will be scheduled from arithmetic workflows:

@auto_configured
class MultiplyActivity(flowser.types.Activity):
    name = 'MultiplyActivity'
    heartbeat_timeout = '60'
    schedule_to_close_timeout = '60'
    schedule_to_start_timeout = '60'
    start_to_close_timeout = '60'

@auto_configured
class SumActivity(flowser.types.Activity):
    name = 'SumActivity'
    heartbeat_timeout = '60'
    schedule_to_close_timeout = '60'
    schedule_to_start_timeout = '60'
    start_to_close_timeout = '60'

Definition of a math domain:

class MathDomain(flowser.Domain):
    name = 'math'
    workflow_types = [ArithmeticWorkflow]
    activity_types = [MultiplyActivity, SumActivity]

Register domain, workflows and activities:

import boto
domain = MathDomain(boto.connect_swf())
domain.register()

Implementations of worker threads for our workflow and activities:

import threading

class ArithmeticWorkflowDecider(threading.Thread):

    def run(self):
        op_to_activity = {
                'multiply': MultiplyActivity,
                'sum': SumActivity,
                }
        for task in domain.decisions(ArithmeticWorkflow):
            is_new = len(task.filter('DecisionTaskScheduled')) == 1
            if is_new:
                # Schedule tasks from "operations" input.
                for op_id, op, input in task.start_input['operations']:
                    activity = op_to_activity[op]
                    task.schedule(activity, {
                        'id': task.start_input['id'],
                        'operation': [op_id, input],
                        })
                task.complete()
            else:
                op_ids = set([x[0] for x in task.start_input['operations']])
                results = {}
                for ev in task.filter('ActivityTaskCompleted'):
                    result = ev.attrs['result']
                    results[result[0]] = result[1]
                result_ids = set(results.keys())

                got_all = op_ids == result_ids
                if got_all:
                    # Got results for all operations. Complete workflow.
                    task.workflow_execution.complete(results)
                else:
                    task.complete()


class WorkerThread(threading.Thread):

    def run(self):
        for task in domain.activities(self.activity_class):
            self.handle_task(task)


class MultiplyWorker(WorkerThread):

    activity_class = MultiplyActivity

    def handle_task(self, task):
        op = task.input['operation']
        result = reduce(lambda a, b: a * b, op[1])
        task.complete([op[0], result])


class SumWorker(WorkerThread):

    activity_class = SumActivity

    def handle_task(self, task):
        op = task.input['operation']
        task.complete([op[0], sum(op[1])])

Starting a Workflow Execution

Start workers and an execution:

MultiplyWorker().start()
SumWorker().start()
ArithmeticWorkflowDecider().start()

import uuid
arithmetic_input = {
    'id': str(uuid.uuid4()),
    'operations': [
        ['mult_id', 'multiply', [1, 2, 3]],
        ['sum_id', 'sum', [1, 2, 3, 4]],
        ],
    }
domain.start(ArithmeticWorkflow, arithmetic_input)

Table Of Contents

Related Topics

This Page