Painting on a Distributed Canvas

A guide to Celery Workflows

by David Gouldin / @dgouldin

celery-workflows.herokuapp.com

But First

Some design principles

Tasks should be…

Atomic

“An operation ... is atomic ... if it appears to the rest of the system to occur instantaneously. Atomicity is a guarantee of isolation from concurrent processes. Additionally, atomic operations commonly have a succeed-or-fail definition — they either successfully change the state of the system, or have no apparent effect.”

Wikipedia (Atomic Operation)

Idempotent

“Idempotence is the property of certain operations in mathematics and computer science, that can be applied multiple times without changing the result beyond the initial application.”

Wikipedia (Idempotence)

Composable

“A highly composable system provides recombinant components that can be selected and assembled in various combinations to satisfy specific user requirements. ... The essential features that make a component composable are that it be ... self-contained (modular) [and] stateless.”

Wikipedia (Composability)

Result Backends

2 concerns

Tracking task state

PENDING · STARTED · SUCCESS · FAILURE · RETRY · REVOKED

>>> import tasks
>>> result = tasks.add.delay(1, 2)
>>> result.state
'PENDING'
>>> result.get()
3
>>> result.state
u'SUCCESS'
>>>

Storing results

>>> import tasks
>>> result = tasks.add.delay(1, 2)
>>> result.result
3

Which backend?

Not RabbitMQ!

“The RabbitMQ result backend (amqp) is special as it does not actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once; If you have two processes waiting for the same result, one of the processes will never receive the result!”

Celery Documentation

Database

  • Polling is expensive
  • Transaction isolation can cause problems

Memcache

  • Fast
  • Suports expiry
  • Atomic counters
  • No persistence

Redis

  • Fast
  • Suports expiry
  • Atomic counters
  • (Mostly) persistent

Canvas Building Blocks

Celery v3.2

Signature

Everything is a signature

Serializable

>>> import json
>>> from celery import signature
>>> s = signature('path.to.task', args=(1, 2, 3), kwargs={'foo': 'bar'})
>>> s
path.to.task(1, 2, 3, foo='bar')
>>> serialized = json.dumps(s)
>>> serialized
'{"task": "path.to.task", "subtask_type": null, "kwargs": {"foo": "bar"}, "args": [1, 2, 3], "options": {}, "immutable": false}'
>>> rehydrated = signature(json.loads(serialized))
>>> rehydrated
path.to.task(1, 2, 3, foo=u'bar')

Convenience Methods

>>> from celery import signature
>>> import tasks
>>> s = signature('tasks.add', args=(1, 2, 3))
>>> s() # same as tasks.add(1, 2, 3)
6
>>> s.delay() # same as tasks.add.delay(1, 2, 3)
<AsyncResult: ca5f7d1c-50f0-412d-8ba9-7bf16a959a0f>
>>> tasks.add.signature((1, 2, 3))
tasks.add(1, 2, 3)
>>> tasks.add.s(1, 2, 3) # shorthand
tasks.add(1, 2, 3)

Once we can serialize, we can begin to compose!

Callback

Run a task once another has finished

Parent result passed to child

>>> import tasks
>>> s = tasks.add.s(1, 2)
>>> s2 = tasks.add.s(3)
>>> s.link(s2)
tasks.add(3)
>>> dict(s)
{'task': 'tasks.add', 'subtask_type': None, 'kwargs': {}, 'args': (1, 2), 'options': {'link': [tasks.add(3)]}, 'immutable': False}
>>> result = s.apply_async()
>>> result.get()
3
>>> result.children
[<AsyncResult: fe715103-8e49-4512-b0c2-93a88ddd340c>]
>>> result.children[0].result
6

Chain

Multiple tasks run in series

Tasks chained as callbacks

>>> from celery import chain
>>> import tasks
>>> c = chain(tasks.add.s(1, 2), tasks.add.s(3))
>>> dict(c)
{'task': 'celery.chain', 'subtask_type': 'chain', 'kwargs': {'tasks': (tasks.add(1, 2), tasks.add(3))}, 'args': (), 'options': {}, 'immutable': False}
>>> result = c()
>>> result
<AsyncResult: cd36054d-755a-4745-bd32-0133ef026a04>
>>> result.task_name, result.result
('tasks.add', 6)
>>> result.parent.task_name, result.parent.result
('tasks.add', 3)

Under the hood

class chain(Signature):
    def run(self):
        tasks, results = self.prepare_steps(args, self.tasks)
        tasks[0].apply_async(**options)
    def prepare_steps(self, args, tasks):
        steps = deque(tasks)
        next_step = prev_task = prev_res = None
        tasks, results = [], []
        while steps:
            task = steps.popleft()
            if prev_task:
                prev_task.link(task)
                if not res.parent:
                    res.parent = prev_res
            prev_task, prev_res = task, res
        return tasks, results

Group

Multiple tasks run in parallel

Result per task

>>> from celery import group
>>> import tasks
>>> g = group(tasks.add.s(1, 2), tasks.add.s(3, 4), tasks.add.s(5, 6))
>>> dict(g)
{'task': 'celery.group', 'subtask_type': 'group', 'kwargs': {'tasks': (tasks.add(1, 2), tasks.add(3, 4), tasks.add(5, 6))}, 'args': (), 'options': {}, 'immutable': False}
>>> result = g()
>>> result
<GroupResult: 5272882a-3538-419a-915b-c28f7d119e8d [3f49e363-635e-46de-8034-8c07fe6bc302, 54d49c87-953a-4cec-b65e-c30883cc533f, d77ccfff-877b-4c3d-94b4-9a735ee989ba]>
>>> [r.result for r in result.children]
[3, 7, 11]

Under the hood

class group(Signature):
    def _apply_tasks(self, tasks):
        for sig, res in tasks:
            sig.apply_async()
            yield res

    def apply_async(self):
        result = self.app.GroupResult(group_id,
            list(self._apply_tasks(tasks))
        return result
            

Chord

A group with a callback

>>> from celery import chord
>>> import tasks
>>> c = chord([tasks.add.s(1, 2), tasks.add.s(3, 4)], tasks.tsum.s())
>>> dict(c)
{'task': 'celery.chord', 'subtask_type': 'chord', 'kwargs': {'body': tasks.tsum(), 'header': [tasks.add(1, 2), tasks.add(3, 4)]}, 'args': (), 'options': {}, 'immutable': False}
>>> result = c()
>>> result.task_name, result.result
('tasks.tsum', 10)
>>> result.parent
<GroupResult: 9309d954-669f-449c-8522-9247fb907faa [ff3d818a-bfd6-4139-a75d-95a2fcc8ea99, 5d748b16-8870-47eb-bd63-657f4f6a3350]>
>>> [r.result for r in result.parent.children]
[3, 7]
            

Under the hood

class chord(Signature):
    def run(self, header, body):
        if 'chord_size' not in body:
            body['chord_size'] = self.__length_hint__()
        parent = app.backend.apply_chord(header, partial_args,
                                         group_id, body)
        body.parent = parent
        return body
            

Redis/Memcache backend support

  • Launch the header group
  • Increment a counter upon each success
  • Launch the callback once counter > chord_size

Other backends

  • Launch the header group
  • Launch a polling task to periodically check group state
  • Launch the callback once the group is "ready"

Weave

A custom Signature subclass

Split, parallel-process, & join

>>> range.s(1, 7) | weave(multiply.s(2), 3)

    [1, 2, 3, 4, 5, 6]
            ↓
         weave(3)  →  [1, 2, 3]    [4, 5, 6]
                          ↓            ↓
                     multiply(2)  multiply(2)
                          ↓            ↓
                      [2, 4, 6]   [8, 10, 12] 

                          ⤷   join()  ⤶
                                ↓
                       [2, 4, 6, 8, 10, 12]

Under the hood

@Signature.register_type
class weave(Signature):
    def __init__(self, task, n, **options):
        Signature.__init__(self, 'weave', (),
                           {'task': task, 'n': n}, **options)
        self.subtask_type = 'weave'

    def apply_async(self, args=None, kwargs=None, **options):
        args, kwargs, _ = self._merge(args=args, kwargs=kwargs)
        it, = args
        task, n = self._unpack_args(kwargs)
        task = maybe_signature(task)
        body = join.s()
        body.freeze(self.freeze().task_id)

        tasks = [tasks.clone(args=[p]) for p in chunks(iter(it), n)]
        return chord(tasks, body).apply_async()

Putting it all together

With twitter of course

Twitter tasks

@app.task
def friend_ids(screen_name, cursor='-1', count=5000):
    return call('friends/ids', kwargs={
        'cursor': cursor,
        'count': count,
        'screen_name': screen_name,
    })['ids']

@app.task
def usernames(user_ids):
    response = call('users/lookup', kwargs={
        'user_id': ','.join([unicode(u) for u in user_ids]),
    })
    return [u['screen_name'] for u in response]

My friends' usernames

>>> from tasks import weave, friend_ids, usernames
>>> result = (friend_ids.s('dgouldin') |
              weave(usernames.s(), 100)).apply_async()
>>> result.result
[u'BiRiteCreamery', u'woodsbeer', u'AlJavieera', u'markasaurus', u'bayviewheight', u 'chrisdrackett', u'MikkellerBarSF', u'BeerHallSF', u'mdegerne', u'SimpsonsQOTD', u'PorcellinoSF', ...]
>>> len(result.parent.children[0].results)
5
              

When tasks fail

Parallel

>>> from celery import group
>>> import tasks
>>> result = group(tasks.fail.s(), tasks.succeed.s()).delay()
>>> result.ready()
True
>>> [r.state for r in result.results]
[u'FAILURE', u'SUCCESS']
>>> [(r.state, r.result) for r in result.results]
[(u'FAILURE', ZeroDivisionError(u'integer division or modulo by zero',)), (u'SUCCESS', u'success')]

Series

>>> import tasks
>>> result = (tasks.fail.s() | tasks.succeed.s()).delay()
>>> result.state
'PENDING'
>>> result.parent.state
'FAILURE'

Rules of thumb

Never block on another task

# BAD
@app.task
def get_follower_names(user_id):
    result = get_follower_ids.delay(user_id)
    follower_ids = result.get() # <-- do not do this
    return [u['name'] for u in twitter.users(follower_ids)]

# GOOD
@app.task
def get_follower_names(follower_ids):
    return [u['name'] for u in twitter.users(follower_ids)]

chain = get_follower_ids.s(user_id) | get_follower_names.s()
result = chain.delay()

Smallest (useful) unit of work

# BAD
@app.task
def get_follower_names(user_id):
    follower_ids = twitter.follower_ids(user_id)
    return [u['name'] for u in twitter.users(follower_ids)]

# GOOD
@app.task
def get_follower_names(follower_ids):
    return [u['name'] for u in twitter.users(follower_ids)]

chain = get_follower_ids.s(user_id) | get_follower_names.s()
result = chain.delay()

Set soft & hard time limits

app.conf.update(CELERYD_TASK_SOFT_TIME_LIMIT=5,
                CELERYD_TASK_TIME_LIMIT=8)

@app.task
def sleep(seconds):
    time.sleep(seconds)

@app.task
def block(seconds):
    thread = threading.Thread(target=time.sleep, args=(seconds,))
    thread.start()
    thread.join()

>>> import tasks
>>> tasks.sleep.delay(10) # hits soft timeout
>>> tasks.block.delay(10) # hits hard timeout

Use acks_late when possible

“When enabled messages for this task will be acknowledged after the task has been executed, and not just before which is the default behavior.

Please note that this means the task may be executed twice if the worker crashes mid execution (which may be acceptable for some applications).”

Celery Documentation

Questions

(we're hiring)
jobs.heroku.com