Constructing workflows (skbio.workflow)

Construct arbitrarily complex workflows in which the specific methods run are determined at runtime. This module supports short circuiting a workflow if an item fails, supports ordering methods, callbacks for processed items, and deciding what methods are executed based on state or runtime options.


Workflow(state[, short_circuit, debug, options]) Arbitrary workflow support structure


requires([option, values, state]) Decorator that executes a function if requirements are met
method([priority]) Decorate a function to indicate it is a workflow method


>>> from skbio.workflow import Workflow

As an example of the Workflow object, let’s construct a sequence processor that will filter sequences that are < 10 nucleotides, reverse the sequence if the runtime options indicate to, and truncate if a specific nucleotide pattern is observed. The Workflow object will only short circuit, and evaluate requirements on methods decorated by method. Developers are free to define as many methods as they’d like within the object definition, and which can be called from workflow methods, but they will not be subjected directly to workflow checks.

>>> nuc_pattern = 'AATTG'
>>> has_nuc_pattern = lambda s: s[:len(nuc_pattern)] == nuc_pattern
>>> class SequenceProcessor(Workflow):
...    def initialize_state(self, item):
...        # Setup the state for a new item (e.g., a new sequence)
...        self.state = item
...    @method(priority=100)
...    def check_length(self):
...        # Always make sure the sequence is at least 10 nucleotides
...        if len(self.state) < 10:
...            self.failed = True
...    @method(priority=90)
...    @requires(state=has_nuc_pattern)
...    def truncate(self):
...        # Truncate if a specific starting nucleotide pattern is observed
...        self.state = self.state[len(nuc_pattern):]
...    @method(priority=80)
...    @requires(option='reverse', values=True)
...    def reverse(self):
...        # Reverse the sequence if indicatd at runtime
...        self.state = self.state[::-1]

An instance of a Workflow must be passed a state object and any runtime options. There are a few other useful parameters that can be specfied but are out of scope for the purposes of this example. We also do not need to provide a state object as our initialize_state method overrides self.state. Now, let’s create the instance.

>>> wf = SequenceProcessor(state=None, options={'reverse=': False})

To run items through the SequenceProcessor, we need to pass in an iterable. So, lets create a list of sequences.


Before we run these sequences through, we’re going to also define callbacks that are applied to the result of an single pass through the Workflow. Callbacks are optional – by default, a success will simply yield the state member variable while failures are ignored – but, depending on your workflow, it can be useful to handle failures or potentially do something fun and exciting on success.

>>> def success_f(obj):
...     return "SUCCESS: %s" % obj.state
>>> def fail_f(obj):
...     return "FAIL: %s" % obj.state

Now, lets process some data!

>>> for result in wf(seqs, success_callback=success_f, fail_callback=fail_f):
...     print result

A few things of note just happened. First off, none of the sequences were reversed as the SequenceProcessor did not have option “reverse” set to True. Second, you’ll notice that the 3rd sequence was truncated, which is expected as it matched our nucleotide pattern of interest. Finally, of the sequences we processed, only a single sequence failed.

To assist in constructing workflows, debug information is available but it must be turned on at instantiation. Let’s do that, and while we’re at it, let’s go ahead and enable the reversal method. This time through though, were going to walk through an item at a time so we can examine the debug information.

>>> wf = SequenceProcessor(state=None, options={'reverse':True}, debug=True)
>>> gen = wf(seqs, fail_callback=lambda x: x.state)
>>> print wf.failed
>>> print wf.debug_trace
set([('check_length', 0), ('reverse', 2)])

The debug_trace specifies the methods executed, and the order of their execution where closer to zero indicates earlier in the execution order. Gaps indicate there was a method evaluated but not executed. Each of the items in the debug_trace is a key into a few other dict of debug information which we’ll discuss in a moment. Did you see that the sequence was reversed this time through the workflow?

Now, let’s take a look at the next item, which on our prior run through the workflow was a failed item.

>>> print wf.failed
>>> print wf.debug_trace
set([('check_length', 0)])

What we can see is that the failed sequence only executed the check_length method. Since the sequence didn’t pass our length filter of 10 nucleotides, it was marked as failed within the check_length method. As a result, none of the other methods were evaluated (note: this short circuiting behavior can be disabled if desired).

This third item previously matched our nucleotide pattern of interest for truncation. Let’s see what that looks like in the debug output.

>>> #
>>> print wf.failed
>>> wf.debug_trace
set([('check_length', 0), ('truncate', 1), ('reverse', 2)])

In this last example, we can see that the truncate method was executed prior to the reverse method and following the check_length method. This is as anticipated given the priorities we specified for these methods. Since the truncate method is doing something interesting, let’s take a closer look at how the state is changing. First, we’re going to dump out the state of the workflow prior to the call to truncate and then we’re going to dump out the state following the call to truncate, which will allow us to rapidly what is going on.

>>> wf.debug_pre_state[('truncate', 1)]
>>> wf.debug_post_state[('truncate', 1)]

As we expect, we have our original sequence going into truncate, and following the application of truncate, our sequence is missing our nucleotide pattern of interest. Awesome, right?

There is one final piece of debug output, wf.debug_runtime, which can be useful when diagnosing the amount of time required for individual methods on a particular piece of state (as opposed to the aggregate as provided by cProfile).

Three final components of the workflow that are quite handy are objects that allow you to indicate anything as an option value, anything that is not_none, and a mechanism to define a range of valid values.

>>> from skbio.workflow import not_none, anything
>>> class Ex(Workflow):
...     @method()
...     @requires(option='foo', values=not_none)
...     def do_something(self):
...         pass
...     @method()
...     @requires(option='bar', values=anything)
...     def do_something_else(self):
...         pass
...     @method()
...     @requires(option='foobar', values=[1,2,3])
...     def do_something_awesome(self):
...         pass