Source code for flowser.tasks

# Copyright (c) 2012 Memoto AB
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from flowser import serializing
from flowser import decisions
from flowser.events import Event
from flowser.exceptions import LastPage


[docs]class WorkflowExecution(object): """Wrapper for the API data type. See http://docs.amazonwebservices.com/amazonswf/latest/apireference/API_WorkflowExecution.html. """ def __init__(self, result, caller): self.run_id = result['runId'] self.workflow_id = result['workflowId'] self._caller = caller self._domain = caller._domain def __str__(self): return "run_id(%s) workflow_id(%s)" % (self.run_id, self.workflow_id) def __repr__(self): return "<WorkflowExecution %s>" % self
[docs] def complete(self, result, context=None): """Complete workflow execution. This can only be called from a decision task. """ dec, attrs = decisions.skeleton("CompleteWorkflowExecution") attrs['result'] = serializing.dumps(result) self._caller._decisions.append(dec) self._caller.complete(context=context)
[docs] def request_cancel(self): self._domain.conn.request_cancel(self._domain.name, self.workflow_id, run_id=self.run_id)
[docs] def signal(self, name, input=None): serialized_input = None if input is not None: serialized_input = serializing.dumps(input) self._domain.conn.signal_workflow_execution( self._domain.name, name, self.workflow_id, input=serialized_input, run_id=self.run_id)
[docs] def terminate(self, details=None, reason=None): self._terminate('TERMINATE', details, reason)
[docs] def abandon(self, details=None, reason=None): self._terminate('ABANDON', details, reason)
[docs] def terminate_request_cancel(self, details=None, reason=None): # XXX sp: How is this different from RequestCancelWorkflowExecution? # See: # # * http://docs.amazonwebservices.com/amazonswf/latest/apireference/API_RequestCancelWorkflowExecution.html # * http://docs.amazonwebservices.com/amazonswf/latest/apireference/API_TerminateWorkflowExecution.html # # It seems that the RequestCancelelWorkflowExecution API call is # preferable because it allows the workflow to gracefully close whereas # the terminate call does not. self._terminate('REQUEST_CANCEL', details, reason)
def _terminate(self, child_policy, details, reason): self._domain.conn.terminate_workflow_execution( self._domain.name, self.workflow_id, child_policy=child_policy, details=details, reason=reason, run_id=self.run_id)
[docs]class WorkflowType(object): """Wrapper for the API data type. See http://docs.amazonwebservices.com/amazonswf/latest/apireference/API_WorkflowType.html. """ def __init__(self, result): self.name = result['name'] self.version = result['version'] def __str__(self): return "%s (%s)" % (self.name, self.version) def __repr__(self): return "<WorkflowType %s>" % self
[docs]class ActivityType(object): """Wrapper for the API data type. See http://docs.amazonwebservices.com/amazonswf/latest/apireference/API_ActivityType.html. """ def __init__(self, result): self.name = result['name'] self.version = result['version'] def __str__(self): return "%s (%s)" % (self.name, self.version) def __repr__(self): return "<ActivityType %s>" % self
[docs]class Decision(object): """Wrapper for "PollForDecisionTask" results. See http://docs.amazonwebservices.com/amazonswf/latest/apireference/API_PollForDecisionTask.html. This class assumes that history events are in reverse order (most recent first). """ def __init__(self, result, caller): """ :param result: Result structure from the API. :param caller: Caller object (subclass of ``types.Type``). """ self._decisions = [] self._caller = caller self._domain = caller._domain self._events = result['events'] self.next_page_token = self._get_next_page_token(result) self.previous_started_event_id = result['previousStartedEventId'] self.started_event_id = result['startedEventId'] self.task_token = result['taskToken'] self.workflow_execution = WorkflowExecution( result['workflowExecution'], self) self.workflow_type = WorkflowType(result['workflowType']) def __repr__(self): return "<Decision workflow_type(%s) %s>" % ( self.workflow_type, self.workflow_execution) def _get_next_page_token(self, result): return result.get('nextPageToken', None) @property
[docs] def events(self): # First go through what we got. This list may have been extended # from previous calls. After that, fetch new pages until no more are # available. for r in self._events: yield Event(r) try: while True: for r in self._next_page(): yield Event(r) except LastPage: raise StopIteration
def _next_page(self): """Get next page of history events. This method updates ``self.next_page_token`` and extends ``self._events`` behind the curtains. :raises: LastPage """ if self.next_page_token is None: raise LastPage next_result = self._caller._poll_for_decision_task( next_page_token=self.next_page_token, reverse_order=True) self.next_page_token = self._get_next_page_token(next_result) self._events.extend(next_result['events']) return next_result['events']
[docs] def most_recent(self, event_type): for event in self.events: if event.type == event_type: return event return None
[docs] def filter(self, event_type): return filter(lambda ev: ev.type == event_type, self.events)
@property
[docs] def start_input(self): """Get start input as a python object. This method iterates over the event history to find the WorkflowExecutionStarted event and unserializes its input attribute. The result is cached. """ if not hasattr(self, '_start_input'): started_event = self.most_recent('WorkflowExecutionStarted') input_attr = started_event.attrs['input'] self._start_input = serializing.loads(input_attr) return self._start_input
[docs] def mark(self, name, details=None): """Adds a RecordMarker decision. """ dec, attrs = decisions.skeleton("RecordMarker") attrs['markerName'] = name if details: attrs['details'] = details self._decisions.append(dec) return self
[docs] def schedule(self, activity_type, *args, **kwargs): """Schedule activity. Internally, this method calls the schedule classmethod on the activity type with the given args and kwargs. :param activity_type: Subclass of ``types.Activity``. """ dec = activity_type.schedule(*args, **kwargs) self._decisions.append(dec) return self
[docs] def start_child(self, workflow_type, *args, **kwargs): """Start child workflow. Internally, this method calls the start_child classmethod on the workflow type with the given args and kwargs. :param workflow_type: Subclass of ``types.Workflow``. """ dec = workflow_type.start_child(*args, **kwargs) self._decisions.append(dec) return self
[docs] def complete(self, context=None): execution_context = None if context is not None: execution_context = serializing.dumps(context) self._domain.conn.respond_decision_task_completed( self.task_token, decisions=self._decisions, execution_context=execution_context)
[docs] def fail(self, details=None, reason=None): self._domain.conn.respond_decision_task_failed( self.task_token, details=details, reason=reason)
[docs]class Activity(object): """Wrapper for "PollForActivityTask" results. See http://docs.amazonwebservices.com/amazonswf/latest/apireference/API_PollForActivityTask.html. """ def __init__(self, result, caller): """ :param result: Result structure from the API. :param domain: A domain instance (optional). Needed for responses. """ self._caller = caller self._domain = caller._domain self.activity_id = result['activityId'] self.activity_type = ActivityType(result['activityType']) self.input = serializing.loads(result['input']) self.started_event_id = result['startedEventId'] self.task_token = result['taskToken'] self.workflow_execution = WorkflowExecution( result['workflowExecution'], self) def __repr__(self): return "<Activity activity_type(%s) %s>" % ( self.activity_type, self.workflow_execution)
[docs] def complete(self, result=None): serialized_result = None if result is not None: serialized_result = serializing.dumps(result) return self._domain.conn.respond_activity_task_completed( self.task_token, result=serialized_result)
[docs] def fail(self, details=None, reason=None): self._domain.conn.respond_activity_task_failed( self.task_token, details=details, reason=reason)
[docs] def cancel(self, details=None): self._domain.conn.respond_activity_task_canceled( self.task_token, details=details)

Related Topics