This document describes Celery 3.1. For development docs, go here.
# -*- coding: utf-8 -*-
"""
celery.backends.redis
~~~~~~~~~~~~~~~~~~~~~
Redis result store backend.
"""
from __future__ import absolute_import
from kombu.utils import cached_property
from kombu.utils.url import _parse_url
from celery.exceptions import ImproperlyConfigured
from .base import KeyValueStoreBackend
try:
import redis
from redis.exceptions import ConnectionError
except ImportError: # pragma: no cover
redis = None # noqa
ConnectionError = None # noqa
__all__ = ['RedisBackend']
REDIS_MISSING = """\
You need to install the redis library in order to use \
the Redis result store backend."""
[docs]class RedisBackend(KeyValueStoreBackend):
"""Redis task result store."""
#: redis-py client module.
redis = redis
#: default Redis server hostname (`localhost`).
host = 'localhost'
#: default Redis server port (6379)
port = 6379
#: default Redis db number (0)
db = 0
#: default Redis password (:const:`None`)
password = None
#: Maximium number of connections in the pool.
max_connections = None
supports_autoexpire = True
supports_native_join = True
implements_incr = True
def __init__(self, host=None, port=None, db=None, password=None,
expires=None, max_connections=None, url=None, **kwargs):
super(RedisBackend, self).__init__(**kwargs)
conf = self.app.conf
if self.redis is None:
raise ImproperlyConfigured(REDIS_MISSING)
# For compatibility with the old REDIS_* configuration keys.
def _get(key):
for prefix in 'CELERY_REDIS_{0}', 'REDIS_{0}':
try:
return conf[prefix.format(key)]
except KeyError:
pass
if host and '://' in host:
url, host = host, None
self.url = url
uhost = uport = upass = udb = None
if url:
_, uhost, uport, _, upass, udb, _ = _parse_url(url)
udb = udb.strip('/') if udb else 0
self.host = uhost or host or _get('HOST') or self.host
self.port = int(uport or port or _get('PORT') or self.port)
self.db = udb or db or _get('DB') or self.db
self.password = upass or password or _get('PASSWORD') or self.password
self.expires = self.prepare_expires(expires, type=int)
self.max_connections = (max_connections
or _get('MAX_CONNECTIONS')
or self.max_connections)
[docs] def set(self, key, value):
client = self.client
if self.expires:
client.setex(key, value, self.expires)
else:
client.set(key, value)
client.publish(key, value)
@cached_property
[docs] def client(self):
pool = self.redis.ConnectionPool(host=self.host, port=self.port,
db=self.db, password=self.password,
max_connections=self.max_connections)
return self.redis.Redis(connection_pool=pool)
def __reduce__(self, args=(), kwargs={}):
kwargs.update(
dict(host=self.host,
port=self.port,
db=self.db,
password=self.password,
expires=self.expires,
max_connections=self.max_connections))
return super(RedisBackend, self).__reduce__(args, kwargs)