diff --git a/src/agenda_culturel/celery.py b/src/agenda_culturel/celery.py index ce23b7b..05b888a 100644 --- a/src/agenda_culturel/celery.py +++ b/src/agenda_culturel/celery.py @@ -4,6 +4,10 @@ import json from celery import Celery, Task from celery.schedules import crontab from celery.utils.log import get_task_logger +import time as time_ + + +from contextlib import contextmanager from .import_tasks.downloader import * from .import_tasks.extractor import * @@ -18,6 +22,8 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"agenda_culturel.settings.{APP_ app = Celery("agenda_culturel") +from django.core.cache import cache + logger = get_task_logger(__name__) @@ -30,6 +36,26 @@ app.config_from_object("django.conf:settings", namespace="CELERY") # Load task modules from all registered Django apps. app.autodiscover_tasks() +LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes + +@contextmanager +def memcache_chromium_lock(oid): + lock_id = "chromium-lock" + timeout_at = time_.monotonic() + LOCK_EXPIRE - 3 + # cache.add fails if the key already exists + status = cache.add(lock_id, oid, LOCK_EXPIRE) + try: + yield status + finally: + # memcache delete is very slow, but we have to use it to take + # advantage of using add() for atomic locking + if time_.monotonic() < timeout_at and status: + # don't release the lock if we exceeded the timeout + # to lessen the chance of releasing an expired lock + # owned by someone else + # also don't release the lock if we didn't acquire it + cache.delete(lock_id) + def close_import_task(taskid, success, error_message, importer): from agenda_culturel.models import BatchImportation @@ -84,33 +110,21 @@ class ChromiumTask(Task): return self._chm -@app.task(base=ChromiumTask, bind=True) -def run_recurrent_import(self, pk): +def run_recurrent_import_internal(rimport, downloader, req_id): from agenda_culturel.models import RecurrentImport, BatchImportation from .db_importer import DBImporterEvents - logger.info("Run recurrent import: {}".format(self.request.id)) + logger.info("Run recurrent import: {}".format(req_id)) - # get the recurrent import - rimport = RecurrentImport.objects.get(pk=pk) # create a batch importation - importation = BatchImportation(recurrentImport=rimport, celery_id=self.request.id) + importation = BatchImportation(recurrentImport=rimport, celery_id=req_id) # save batch importation importation.save() # create an importer - importer = DBImporterEvents(self.request.id) + importer = DBImporterEvents(req_id) - # prepare downloading and extracting processes - if rimport.downloader == RecurrentImport.DOWNLOADER.SIMPLE: - downloader = SimpleDownloader() - elif rimport.downloader == RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS: - downloader = self.chromiumDownloader - downloader.pause = False - else: - downloader = self.chromiumDownloader - downloader.pause = True if rimport.processor == RecurrentImport.PROCESSOR.ICAL: extractor = ICALExtractor() @@ -164,10 +178,44 @@ def run_recurrent_import(self, pk): success, error_message = importer.import_events(json_events) # finally, close task - close_import_task(self.request.id, success, error_message, importer) + close_import_task(req_id, success, error_message, importer) except Exception as e: logger.error(e) - close_import_task(self.request.id, False, e, importer) + close_import_task(req_id, False, e, importer) + + return + + +@app.task(base=ChromiumTask, bind=True) +def run_recurrent_import(self, pk): + from agenda_culturel.models import RecurrentImport + + # get the recurrent import + rimport = RecurrentImport.objects.get(pk=pk) + + # prepare downloading and extracting processes + if rimport.downloader == RecurrentImport.DOWNLOADER.SIMPLE: + downloader = SimpleDownloader() + elif rimport.downloader == RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS: + downloader = self.chromiumDownloader + downloader.pause = False + else: + downloader = self.chromiumDownloader + downloader.pause = True + + # only one thread using Chromium can run at a time, + # to prevent from errors (including strange Facebook errors) + if rimport.downloader in [RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS, RecurrentImport.DOWNLOADER.CHROMIUMHEADLESSPAUSE]: + with memcache_chromium_lock(self.app.oid) as acquired: + if acquired: + return run_recurrent_import_internal(rimport, downloader, self.request.id) + else: + return run_recurrent_import_internal(rimport, downloader, self.request.id) + + # if chromium is locked, we wait 30 seconds before retrying + raise self.retry(countdown=30) + + @app.task(bind=True) @@ -224,53 +272,61 @@ def import_events_from_url(self, url, cat): from agenda_culturel.models import RecurrentImport, BatchImportation from agenda_culturel.models import Event, Category - - logger.info("URL import: {}".format(self.request.id)) + with memcache_chromium_lock(self.app.oid) as acquired: + if acquired: - # clean url - url = Extractor.clean_url(url) + logger.info("URL import: {}".format(self.request.id)) - # we check if the url is known - existing = Event.objects.filter(uuids__contains=[url]) - # if it's unknown - if len(existing) == 0: - # create an importer - importer = DBImporterEvents(self.request.id) + # clean url + url = Extractor.clean_url(url) - # create a batch importation - importation = BatchImportation(url_source=url, celery_id=self.request.id) - # save batch importation - importation.save() + # we check if the url is known + existing = Event.objects.filter(uuids__contains=[url]) + # if it's unknown + if len(existing) == 0: - try: - ## create loader - u2e = URL2Events(ChromiumHeadlessDownloader(), single_event=True) - # set default values - values = {} - if cat is not None: - values = {"category": cat} + # create an importer + importer = DBImporterEvents(self.request.id) - # get event - events = u2e.process( - url, published=False, default_values=values - ) + # create a batch importation + importation = BatchImportation(url_source=url, celery_id=self.request.id) + # save batch importation + importation.save() - if events: - # convert it to json - json_events = json.dumps(events, default=str) + try: + ## create loader + u2e = URL2Events(ChromiumHeadlessDownloader(), single_event=True) + # set default values + values = {} + if cat is not None: + values = {"category": cat} - # import events (from json) - success, error_message = importer.import_events(json_events) + # get event + events = u2e.process( + url, published=False, default_values=values + ) - # finally, close task - close_import_task(self.request.id, success, error_message, importer) - else: - close_import_task(self.request.id, False, "Cannot find any event", importer) - except Exception as e: - logger.error(e) - close_import_task(self.request.id, False, e, importer) + if events: + # convert it to json + json_events = json.dumps(events, default=str) + + # import events (from json) + success, error_message = importer.import_events(json_events) + + # finally, close task + close_import_task(self.request.id, success, error_message, importer) + else: + close_import_task(self.request.id, False, "Cannot find any event", importer) + except Exception as e: + logger.error(e) + close_import_task(self.request.id, False, e, importer) + + return + + # if chromium is locked, we wait 30 seconds before retrying + raise self.retry(countdown=30) @app.task(base=ChromiumTask, bind=True) diff --git a/src/agenda_culturel/templates/agenda_culturel/rimports.html b/src/agenda_culturel/templates/agenda_culturel/rimports.html index 366f1b6..185399a 100644 --- a/src/agenda_culturel/templates/agenda_culturel/rimports.html +++ b/src/agenda_culturel/templates/agenda_culturel/rimports.html @@ -3,6 +3,7 @@ {% block title %}{% block og_title %}Importations récurrentes{% endblock %}{% endblock %} {% load utils_extra %} +{% load rimports_extra %} {% load cat_extra %} {% block entete_header %} {% css_categories %} @@ -13,8 +14,10 @@

Importations récurrentes

diff --git a/src/agenda_culturel/templatetags/rimports_extra.py b/src/agenda_culturel/templatetags/rimports_extra.py index fa8aa59..b64829e 100644 --- a/src/agenda_culturel/templatetags/rimports_extra.py +++ b/src/agenda_culturel/templatetags/rimports_extra.py @@ -12,6 +12,10 @@ from .utils_extra import picto_from_name register = template.Library() +@register.simple_tag +def has_failed_rimports(): + return BatchImportation.objects.filter(status=BatchImportation.STATUS.FAILED).count() != 0 + @register.simple_tag def show_badge_failed_rimports(placement="top"): newest = BatchImportation.objects.filter(recurrentImport=OuterRef("pk")).order_by(