This document describes the current stable version of Celery (3.1). For development docs, go here.

Source code for celery.backends.database

# -*- coding: utf-8 -*-
"""
    celery.backends.database
    ~~~~~~~~~~~~~~~~~~~~~~~~

    SQLAlchemy result store backend.

"""
from __future__ import absolute_import

from functools import wraps

from celery import states
from celery.exceptions import ImproperlyConfigured
from celery.five import range
from celery.utils.timeutils import maybe_timedelta

from celery.backends.base import BaseBackend

from .models import Task, TaskSet
from .session import ResultSession

__all__ = ['DatabaseBackend']


def _sqlalchemy_installed():
    try:
        import sqlalchemy
    except ImportError:
        raise ImproperlyConfigured(
            'The database result backend requires SQLAlchemy to be installed.'
            'See http://pypi.python.org/pypi/SQLAlchemy')
    return sqlalchemy
_sqlalchemy_installed()

from sqlalchemy.exc import DatabaseError, OperationalError


def retry(fun):

    @wraps(fun)
    def _inner(*args, **kwargs):
        max_retries = kwargs.pop('max_retries', 3)

        for retries in range(max_retries):
            try:
                return fun(*args, **kwargs)
            except (DatabaseError, OperationalError):
                if retries + 1 >= max_retries:
                    raise

    return _inner


[docs]class DatabaseBackend(BaseBackend): """The database result backend.""" # ResultSet.iterate should sleep this much between each pool, # to not bombard the database with queries. subpolling_interval = 0.5 def __init__(self, dburi=None, expires=None, engine_options=None, url=None, **kwargs): # The `url` argument was added later and is used by # the app to set backend by url (celery.backends.get_backend_by_url) super(DatabaseBackend, self).__init__(**kwargs) conf = self.app.conf self.expires = maybe_timedelta(self.prepare_expires(expires)) self.dburi = url or dburi or conf.CELERY_RESULT_DBURI self.engine_options = dict( engine_options or {}, **conf.CELERY_RESULT_ENGINE_OPTIONS or {}) self.short_lived_sessions = kwargs.get( 'short_lived_sessions', conf.CELERY_RESULT_DB_SHORT_LIVED_SESSIONS, ) tablenames = conf.CELERY_RESULT_DB_TABLENAMES or {} Task.__table__.name = tablenames.get('task', 'celery_taskmeta') TaskSet.__table__.name = tablenames.get('group', 'celery_tasksetmeta') if not self.dburi: raise ImproperlyConfigured( 'Missing connection string! Do you have ' 'CELERY_RESULT_DBURI set to a real value?')
[docs] def ResultSession(self): return ResultSession( dburi=self.dburi, short_lived_sessions=self.short_lived_sessions, **self.engine_options )
@retry def _store_result(self, task_id, result, status, traceback=None, max_retries=3, **kwargs): """Store return value and status of an executed task.""" session = self.ResultSession() try: task = session.query(Task).filter(Task.task_id == task_id).first() if not task: task = Task(task_id) session.add(task) session.flush() task.result = result task.status = status task.traceback = traceback session.commit() return result finally: session.close() @retry def _get_task_meta_for(self, task_id): """Get task metadata for a task by id.""" session = self.ResultSession() try: task = session.query(Task).filter(Task.task_id == task_id).first() if task is None: task = Task(task_id) task.status = states.PENDING task.result = None return task.to_dict() finally: session.close() @retry def _save_group(self, group_id, result): """Store the result of an executed group.""" session = self.ResultSession() try: group = TaskSet(group_id, result) session.add(group) session.flush() session.commit() return result finally: session.close() @retry def _restore_group(self, group_id): """Get metadata for group by id.""" session = self.ResultSession() try: group = session.query(TaskSet).filter( TaskSet.taskset_id == group_id).first() if group: return group.to_dict() finally: session.close() @retry def _delete_group(self, group_id): """Delete metadata for group by id.""" session = self.ResultSession() try: session.query(TaskSet).filter( TaskSet.taskset_id == group_id).delete() session.flush() session.commit() finally: session.close() @retry def _forget(self, task_id): """Forget about result.""" session = self.ResultSession() try: session.query(Task).filter(Task.task_id == task_id).delete() session.commit() finally: session.close()
[docs] def cleanup(self): """Delete expired metadata.""" session = self.ResultSession() expires = self.expires now = self.app.now() try: session.query(Task).filter( Task.date_done < (now - expires)).delete() session.query(TaskSet).filter( TaskSet.date_done < (now - expires)).delete() session.commit() finally: session.close()
def __reduce__(self, args=(), kwargs={}): kwargs.update( dict(dburi=self.dburi, expires=self.expires, engine_options=self.engine_options)) return super(DatabaseBackend, self).__reduce__(args, kwargs)