Ids for workflows and activities are created behind the scenes by inspecting inputs. get_id_from_input beneath assumes that the input has an “id” key and appends this value to the class name of the workflow or activity:
import flowser
@classmethod
def get_id_from_input(cls, input):
return ".".join([cls.name, input['id']])
This function sets version and task list properties on decorated classes. It also makes classes use our get_id_from_input function defined earlier:
def auto_configured(cls):
"""Class decorator for workflows and activities. """
cls.version = '1.0.0'
cls.task_list = '-'.join([cls.name, cls.version])
cls.get_id_from_input = get_id_from_input
return cls
Definition of a workflow that performs arithmetic:
@auto_configured
class ArithmeticWorkflow(flowser.types.Workflow):
"""Workflow for performing arithmetics. """
name = 'ArithmeticWorkflow'
execution_start_to_close_timeout = '600'
task_start_to_close_timeout = '120'
child_policy = 'TERMINATE'
Definition of two activities. These will be scheduled from arithmetic workflows:
@auto_configured
class MultiplyActivity(flowser.types.Activity):
name = 'MultiplyActivity'
heartbeat_timeout = '60'
schedule_to_close_timeout = '60'
schedule_to_start_timeout = '60'
start_to_close_timeout = '60'
@auto_configured
class SumActivity(flowser.types.Activity):
name = 'SumActivity'
heartbeat_timeout = '60'
schedule_to_close_timeout = '60'
schedule_to_start_timeout = '60'
start_to_close_timeout = '60'
Definition of a math domain:
class MathDomain(flowser.Domain):
name = 'math'
workflow_types = [ArithmeticWorkflow]
activity_types = [MultiplyActivity, SumActivity]
Register domain, workflows and activities:
import boto
domain = MathDomain(boto.connect_swf())
domain.register()
Implementations of worker threads for our workflow and activities:
import threading
class ArithmeticWorkflowDecider(threading.Thread):
def run(self):
op_to_activity = {
'multiply': MultiplyActivity,
'sum': SumActivity,
}
for task in domain.decisions(ArithmeticWorkflow):
is_new = len(task.filter('DecisionTaskScheduled')) == 1
if is_new:
# Schedule tasks from "operations" input.
for op_id, op, input in task.start_input['operations']:
activity = op_to_activity[op]
task.schedule(activity, {
'id': task.start_input['id'],
'operation': [op_id, input],
})
task.complete()
else:
op_ids = set([x[0] for x in task.start_input['operations']])
results = {}
for ev in task.filter('ActivityTaskCompleted'):
result = ev.attrs['result']
results[result[0]] = result[1]
result_ids = set(results.keys())
got_all = op_ids == result_ids
if got_all:
# Got results for all operations. Complete workflow.
task.workflow_execution.complete(results)
else:
task.complete()
class WorkerThread(threading.Thread):
def run(self):
for task in domain.activities(self.activity_class):
self.handle_task(task)
class MultiplyWorker(WorkerThread):
activity_class = MultiplyActivity
def handle_task(self, task):
op = task.input['operation']
result = reduce(lambda a, b: a * b, op[1])
task.complete([op[0], result])
class SumWorker(WorkerThread):
activity_class = SumActivity
def handle_task(self, task):
op = task.input['operation']
task.complete([op[0], sum(op[1])])
Start workers and an execution:
MultiplyWorker().start()
SumWorker().start()
ArithmeticWorkflowDecider().start()
import uuid
arithmetic_input = {
'id': str(uuid.uuid4()),
'operations': [
['mult_id', 'multiply', [1, 2, 3]],
['sum_id', 'sum', [1, 2, 3, 4]],
],
}
domain.start(ArithmeticWorkflow, arithmetic_input)