Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery 4 support #67

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ before_install:
- export DJANGO_SETTINGS_MODULE=celery_haystack.test_settings
install:
- pip install -e .
- pip install -r requirements/v2.txt $DJANGO
- pip install -r requirements/v2.txt $DJANGO $CELERY
before_script:
- flake8 celery_haystack --ignore=E501,E731
script:
Expand All @@ -19,6 +19,8 @@ env:
- DJANGO="Django>=1.9,<1.10"
- DJANGO="Django>=1.10,<1.11"
- DJANGO="Django>=1.11,<1.12"
- CELERY="celery>=3.1,<4.0"
- CELERY="celery>=4.0,<5.0"

notifications:
irc: "irc.freenode.org#haystack"
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Requirements

* Django 1.8+
* Haystack_ `2.X`_
* Celery_ 3.X
* Celery_ 3.X / 4.X

You also need to install your choice of one of the supported search engines
for Haystack and one of the supported backends for Celery.
Expand Down
4 changes: 3 additions & 1 deletion celery_haystack/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ class CeleryHaystack(AppConf):
#: The number of retries that are done
MAX_RETRIES = 1
#: The default Celery task class
DEFAULT_TASK = 'celery_haystack.tasks.CeleryHaystackSignalHandler'
DEFAULT_TASK = 'celery_haystack.tasks.haystack_signal_handler'
#: The update handler class
HANDLER = 'celery_haystack.handler.CeleryHaystackSignalHandler'
#: The name of the celery queue to use, or None for default
QUEUE = None
#: Whether the task should be handled transaction safe
Expand Down
35 changes: 35 additions & 0 deletions celery_haystack/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import six


class CeleryHaystackException(Exception):
pass


@six.python_2_unicode_compatible
class IndexOperationException(CeleryHaystackException):
def __init__(self, index, reason):
self.index = index
self.reason = reason

def __str__(self):
return "Failed to update index %s. %s" % (self.index, self.reason)


@six.python_2_unicode_compatible
class InstanceNotFoundException(CeleryHaystackException):
def __init__(self, model_class, pk, reason):
self.model_class = model_class
self.pk = pk
self.reason = reason

def __str__(self):
return "Unable to load instance %s with pk=%s. %s" % (self.model_class, self.pk, self.reason)


@six.python_2_unicode_compatible
class UnrecognizedActionException(CeleryHaystackException):
def __init__(self, action):
self.action = action

def __str__(self):
return "Unrecognized action '%s'" % self.action
129 changes: 129 additions & 0 deletions celery_haystack/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging
from django.apps import apps
from django.core.exceptions import ImproperlyConfigured
from haystack import connection_router, connections
from haystack.exceptions import NotHandled as IndexNotFoundException

from celery_haystack import exceptions


logger = logging.getLogger(__name__)


class CeleryHaystackSignalHandler(object):
def __init__(self, identifier):
self.identifier = identifier

# First get the object path and pk (e.g. ('notes.note', 23))
self.object_path, self.pk = self.split_identifier(identifier)

@staticmethod
def split_identifier(identifier, **kwargs):
"""
Break down the identifier representing the instance.

Converts 'notes.note.23' into ('notes.note', 23).
"""
bits = identifier.split('.')

if len(bits) < 2:
raise ValueError("Unable to parse object identifer '%s'" % identifier)

pk = bits[-1]
# In case Django ever handles full paths...
object_path = '.'.join(bits[:-1])
return object_path, pk

def get_model_class(self):
"""
Fetch the model's class in a standarized way.
"""
bits = self.object_path.split('.')
app_name = '.'.join(bits[:-1])
classname = bits[-1]
model_class = apps.get_model(app_name, classname)

if model_class is None:
raise ImproperlyConfigured("Could not load model '%s'." %
self.object_path)
return model_class

@staticmethod
def get_instance(model_class, pk):
"""
Fetch the instance in a standarized way.
"""
try:
instance = model_class._default_manager.get(pk=pk)
except (model_class.DoesNotExist, model_class.MultipleObjectsReturned) as exc:
raise exceptions.InstanceNotFoundException(model_class, pk, reason=exc)

return instance

@staticmethod
def get_indexes(model_class):
"""
Fetch the model's registered ``SearchIndex`` in a standarized way.
"""
try:
using_backends = connection_router.for_write(**{'models': [model_class]})
for using in using_backends:
index_holder = connections[using].get_unified_index()
yield index_holder.get_index(model_class), using
except IndexNotFoundException:
raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." % model_class)

@staticmethod
def get_index_name(index):
"""
Get index name
"""
return ".".join([index.__class__.__module__,
index.__class__.__name__])

def handle_delete(self, current_index, using, model_class):
# If the object is gone, we'll use just the identifier
# against the index.
try:
current_index.remove_object(self.identifier, using=using)
except Exception as exc:
raise exceptions.IndexOperationException(index=current_index, reason=exc)
else:

msg = ("Deleted '%s' (with %s)" %
(self.identifier, self.get_index_name(current_index)))
logger.debug(msg)

def handle_update(self, current_index, using, model_class):
# and the instance of the model class with the pk

instance = self.get_instance(model_class, self.pk)

# Call the appropriate handler of the current index and
# handle exception if neccessary
try:
current_index.update_object(instance, using=using)
except Exception as exc:
raise exceptions.IndexOperationException(index=current_index, reason=exc)
else:
msg = ("Updated '%s' (with %s)" %
(self.identifier, self.get_index_name(current_index)))
logger.debug(msg)

def handle(self, action):
"""
Trigger the actual index handler depending on the
given action ('update' or 'delete').
"""

# Then get the model class for the object path
model_class = self.get_model_class()

for current_index, using in self.get_indexes(model_class):

if action == 'delete':
self.handle_delete(current_index, using, model_class)
elif action == 'update':
self.handle_update(current_index, using, model_class)
else:
raise exceptions.UnrecognizedActionException(action)
181 changes: 37 additions & 144 deletions celery_haystack/tasks.py
Original file line number Diff line number Diff line change
@@ -1,155 +1,48 @@
from django.core.exceptions import ImproperlyConfigured
from celery.app import shared_task
from celery.utils.log import get_task_logger
from django.core.management import call_command
from django.apps import apps

from .utils import get_handler
from . import exceptions
from .conf import settings

from haystack import connections, connection_router
from haystack.exceptions import NotHandled as IndexNotFoundException

from celery.task import Task # noqa
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


class CeleryHaystackSignalHandler(Task):
using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS
max_retries = settings.CELERY_HAYSTACK_MAX_RETRIES
default_retry_delay = settings.CELERY_HAYSTACK_RETRY_DELAY

def split_identifier(self, identifier, **kwargs):
"""
Break down the identifier representing the instance.

Converts 'notes.note.23' into ('notes.note', 23).
"""
bits = identifier.split('.')

if len(bits) < 2:
logger.error("Unable to parse object "
"identifer '%s'. Moving on..." % identifier)
return (None, None)

pk = bits[-1]
# In case Django ever handles full paths...
object_path = '.'.join(bits[:-1])
return (object_path, pk)

def get_model_class(self, object_path, **kwargs):
"""
Fetch the model's class in a standarized way.
"""
bits = object_path.split('.')
app_name = '.'.join(bits[:-1])
classname = bits[-1]
model_class = apps.get_model(app_name, classname)

if model_class is None:
raise ImproperlyConfigured("Could not load model '%s'." %
object_path)
return model_class

def get_instance(self, model_class, pk, **kwargs):
"""
Fetch the instance in a standarized way.
"""
instance = None
try:
instance = model_class._default_manager.get(pk=pk)
except model_class.DoesNotExist:
logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" %
(model_class._meta.app_label.lower(),
model_class._meta.object_name.lower(), pk))
except model_class.MultipleObjectsReturned:
logger.error("More than one object with pk %s. Oops?" % pk)
return instance

def get_indexes(self, model_class, **kwargs):
"""
Fetch the model's registered ``SearchIndex`` in a standarized way.
"""
try:
using_backends = connection_router.for_write(**{'models': [model_class]})
for using in using_backends:
index_holder = connections[using].get_unified_index()
yield index_holder.get_index(model_class), using
except IndexNotFoundException:
raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." %
model_class)

def run(self, action, identifier, **kwargs):
"""
Trigger the actual index handler depending on the
given action ('update' or 'delete').
"""
# First get the object path and pk (e.g. ('notes.note', 23))
object_path, pk = self.split_identifier(identifier, **kwargs)
if object_path is None or pk is None:
msg = "Couldn't handle object with identifier %s" % identifier
logger.error(msg)
raise ValueError(msg)

# Then get the model class for the object path
model_class = self.get_model_class(object_path, **kwargs)
for current_index, using in self.get_indexes(model_class, **kwargs):
current_index_name = ".".join([current_index.__class__.__module__,
current_index.__class__.__name__])

if action == 'delete':
# If the object is gone, we'll use just the identifier
# against the index.
try:
current_index.remove_object(identifier, using=using)
except Exception as exc:
logger.exception(exc)
self.retry(exc=exc)
else:
msg = ("Deleted '%s' (with %s)" %
(identifier, current_index_name))
logger.debug(msg)
elif action == 'update':
# and the instance of the model class with the pk
instance = self.get_instance(model_class, pk, **kwargs)
if instance is None:
logger.debug("Failed updating '%s' (with %s)" %
(identifier, current_index_name))
raise ValueError("Couldn't load object '%s'" % identifier)

# Call the appropriate handler of the current index and
# handle exception if neccessary
try:
current_index.update_object(instance, using=using)
except Exception as exc:
logger.exception(exc)
self.retry(exc=exc)
else:
msg = ("Updated '%s' (with %s)" %
(identifier, current_index_name))
logger.debug(msg)
else:
logger.error("Unrecognized action '%s'. Moving on..." % action)
raise ValueError("Unrecognized action %s" % action)


class CeleryHaystackUpdateIndex(Task):
@shared_task(bind=True,
using=settings.CELERY_HAYSTACK_DEFAULT_ALIAS,
max_retries=settings.CELERY_HAYSTACK_MAX_RETRIES,
default_retry_delay=settings.CELERY_HAYSTACK_RETRY_DELAY)
def haystack_signal_handler(self, action, identifier, **kwargs):
try:
get_handler()(identifier).handle(action)
except exceptions.IndexOperationException as exc:
logger.exception(exc)
self.retry(exc=exc)
except exceptions.InstanceNotFoundException as exc:
logger.error(exc)
except exceptions.UnrecognizedActionException as exc:
logger.error("%s. Moving on..." % action)


@shared_task()
def celery_haystack_update_index(apps=None, **kwargs):
"""
A celery task class to be used to call the update_index management
command from Celery.
"""
def run(self, apps=None, **kwargs):
defaults = {
'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE,
'age': settings.CELERY_HAYSTACK_COMMAND_AGE,
'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE,
'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS],
'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS,
'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY,
}
defaults.update(kwargs)
if apps is None:
apps = settings.CELERY_HAYSTACK_COMMAND_APPS
# Run the update_index management command
logger.info("Starting update index")
call_command('update_index', *apps, **defaults)
logger.info("Finishing update index")
defaults = {
'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE,
'age': settings.CELERY_HAYSTACK_COMMAND_AGE,
'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE,
'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS],
'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS,
'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY,
}
defaults.update(kwargs)
if apps is None:
apps = settings.CELERY_HAYSTACK_COMMAND_APPS
# Run the update_index management command
logger.info("Starting update index")
call_command('update_index', *apps, **defaults)
logger.info("Finishing update index")
Loading