2024-11-29 19:35:45 +01:00

381 lines
13 KiB
Python

import os
import json
from celery import Celery, Task, chain
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery.exceptions import MaxRetriesExceededError
import time as time_
from contextlib import contextmanager
from .import_tasks.downloader import *
from .import_tasks.extractor import *
from .import_tasks.importer import *
from .import_tasks.extractor_ical import *
from .import_tasks.custom_extractors import *
# Set the default Django settings module for the 'celery' program.
APP_ENV = os.getenv("APP_ENV", "dev")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"agenda_culturel.settings.{APP_ENV}")
app = Celery("agenda_culturel")
from django.core.cache import cache
logger = get_task_logger(__name__)
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
@contextmanager
def memcache_chromium_lock(oid):
lock_id = "chromium-lock"
timeout_at = time_.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time_.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
def close_import_task(taskid, success, error_message, importer):
from agenda_culturel.models import BatchImportation
task = BatchImportation.objects.get(celery_id=taskid)
task.status = (
BatchImportation.STATUS.SUCCESS if success else BatchImportation.STATUS.FAILED
)
task.nb_initial = importer.get_nb_events()
task.nb_imported = importer.get_nb_imported_events()
task.nb_updated = importer.get_nb_updated_events()
task.nb_removed = importer.get_nb_removed_events()
fields = ["status", "nb_initial", "nb_updated", "nb_imported", "nb_removed"]
if not success:
task.error_message = error_message
fields.append("error_message")
task.save(update_fields=fields)
@app.task(bind=True)
def import_events_from_json(self, json):
from agenda_culturel.models import BatchImportation
from .db_importer import DBImporterEvents
# create a batch importation
importation = BatchImportation(celery_id=self.request.id)
# save batch importation
importation.save()
logger.info("Import events from json: {}".format(self.request.id))
importer = DBImporterEvents(self.request.id)
# try:
success, error_message = importer.import_events(json)
# finally, close task
close_import_task(self.request.id, success, error_message, importer)
class ChromiumTask(Task):
_chm = None
@property
def chromiumDownloader(self):
if self._chm is None:
self._chm = ChromiumHeadlessDownloader()
return self._chm
def run_recurrent_import_internal(rimport, downloader, req_id):
from agenda_culturel.models import RecurrentImport, BatchImportation
from .db_importer import DBImporterEvents
logger.info("Run recurrent import: {}".format(req_id))
# create a batch importation
importation = BatchImportation(recurrentImport=rimport, celery_id=req_id)
# save batch importation
importation.save()
# create an importer
importer = DBImporterEvents(req_id)
if rimport.processor == RecurrentImport.PROCESSOR.ICAL:
extractor = ICALExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.ICALNOBUSY:
extractor = ICALNoBusyExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.ICALNOVC:
extractor = ICALNoVCExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.LACOOPE:
extractor = lacoope.CExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.LACOMEDIE:
extractor = lacomedie.CExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.LEFOTOMAT:
extractor = lefotomat.CExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.LAPUCEALOREILLE:
extractor = lapucealoreille.CExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.MECWORDPRESS:
extractor = wordpress_mec.CExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.FBEVENTS:
extractor = fbevents.CExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.C3C:
extractor = c3c.CExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.ARACHNEE:
extractor = arachnee.CExtractor()
elif rimport.processor == RecurrentImport.PROCESSOR.LERIO:
extractor = lerio.CExtractor()
else:
extractor = None
if extractor is None:
logger.error("Unknown extractor")
close_import_task(self.request.id, False, "Unknown extractor", importer)
# set parameters
u2e = URL2Events(downloader, extractor)
url = rimport.source
browsable_url = rimport.browsable_url
category = str(rimport.defaultCategory)
location = rimport.defaultLocation
tags = rimport.defaultTags
published = rimport.defaultPublished
organisers = [] if rimport.defaultOrganiser is None else [rimport.defaultOrganiser.pk]
try:
# get events from website
events = u2e.process(
url,
browsable_url,
default_values={"category": category, "location": location, "tags": tags, "organisers": organisers},
published=published,
)
# convert it to json
json_events = json.dumps(events, default=str)
# import events (from json)
success, error_message = importer.import_events(json_events)
# finally, close task
close_import_task(req_id, success, error_message, importer)
except Exception as e:
logger.error(e)
close_import_task(req_id, False, e, importer)
@app.task(base=ChromiumTask, bind=True)
def run_recurrent_import(self, pklist):
from agenda_culturel.models import RecurrentImport
if isinstance(pklist, list):
pk = pklist[0]
is_list = True
else:
is_list = False
pk = pklist
# get the recurrent import
rimport = RecurrentImport.objects.get(pk=pk)
# prepare downloading and extracting processes
if rimport.downloader == RecurrentImport.DOWNLOADER.SIMPLE:
downloader = SimpleDownloader()
elif rimport.downloader == RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS:
downloader = self.chromiumDownloader
downloader.pause = False
else:
downloader = self.chromiumDownloader
downloader.pause = True
# only one thread using Chromium can run at a time,
# to prevent from errors (including strange Facebook errors)
if rimport.downloader in [RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS, RecurrentImport.DOWNLOADER.CHROMIUMHEADLESSPAUSE]:
with memcache_chromium_lock(self.app.oid) as acquired:
if acquired:
run_recurrent_import_internal(rimport, downloader, self.request.id)
return pklist[1:] if is_list else True
else:
run_recurrent_import_internal(rimport, downloader, self.request.id)
return pklist[1:] if is_list else True
try:
# if chromium is locked, we wait before retrying
raise self.retry(countdown=120)
except MaxRetriesExceededError as e:
logger.error(e)
close_import_task(self.request.id, False, e, importer)
return pklist[1:] if is_list else False
def run_recurrent_imports_from_list(pklist):
tasks = chain(run_recurrent_import.s(pklist) if i == 0 else run_recurrent_import.s() for i in range(len(pklist)))
tasks.delay()
@app.task(bind=True)
def daily_imports(self):
from agenda_culturel.models import RecurrentImport
logger.info("Everyday imports")
imports = RecurrentImport.objects.filter(
recurrence=RecurrentImport.RECURRENCE.DAILY
).order_by("pk")
run_recurrent_imports_from_list([imp.pk for imp in imports])
@app.task(bind=True)
def run_all_recurrent_imports(self):
from agenda_culturel.models import RecurrentImport
logger.info("Run all imports")
imports = RecurrentImport.objects.all().order_by("pk")
run_recurrent_imports_from_list([imp.pk for imp in imports])
@app.task(bind=True)
def run_all_recurrent_imports_failed(self):
from agenda_culturel.models import RecurrentImport, BatchImportation
logger.info("Run only failed imports")
imports = RecurrentImport.objects.all().order_by("pk")
run_recurrent_imports_from_list([imp.pk for imp in imports if imp.last_import().status == BatchImportation.STATUS.FAILED])
@app.task(bind=True)
def run_all_recurrent_imports_canceled(self):
from agenda_culturel.models import RecurrentImport, BatchImportation
logger.info("Run only canceled imports")
imports = RecurrentImport.objects.all().order_by("pk")
run_recurrent_imports_from_list([imp.pk for imp in imports if imp.last_import().status == BatchImportation.STATUS.CANCELED])
@app.task(bind=True)
def weekly_imports(self):
from agenda_culturel.models import RecurrentImport
logger.info("Weekly imports")
imports = RecurrentImport.objects.filter(
recurrence=RecurrentImport.RECURRENCE.WEEKLY
).order_by("pk")
run_recurrent_imports_from_list([imp.pk for imp in imports])
@app.task(base=ChromiumTask, bind=True)
def import_events_from_url(self, url, cat, tags, force=False, user_id=None):
from .db_importer import DBImporterEvents
from agenda_culturel.models import RecurrentImport, BatchImportation
from agenda_culturel.models import Event, Category
with memcache_chromium_lock(self.app.oid) as acquired:
if acquired:
logger.info("URL import: {}".format(self.request.id))
# clean url
url = Extractor.clean_url(url)
# we check if the url is known
existing = None if force else Event.objects.filter(uuids__contains=[url])
# if it's unknown
if force or len(existing) == 0:
# create an importer
importer = DBImporterEvents(self.request.id)
# create a batch importation
importation = BatchImportation(url_source=url, celery_id=self.request.id)
# save batch importation
importation.save()
try:
## create loader
u2e = URL2Events(ChromiumHeadlessDownloader(), single_event=True)
# set default values
values = {}
if cat is not None:
values = {"category": cat, "tags": tags}
# get event
events = u2e.process(
url, published=False, default_values=values
)
if events:
# convert it to json
json_events = json.dumps(events, default=str)
# import events (from json)
success, error_message = importer.import_events(json_events, user_id)
# finally, close task
close_import_task(self.request.id, success, error_message, importer)
else:
close_import_task(self.request.id, False, "Cannot find any event", importer)
except Exception as e:
logger.error(e)
close_import_task(self.request.id, False, e, importer)
return
# if chromium is locked, we wait 30 seconds before retrying
raise self.retry(countdown=30)
@app.task(base=ChromiumTask, bind=True)
def import_events_from_urls(self, urls_cat_tags, user_id=None):
for ucat in urls_cat_tags:
if ucat is not None:
url = ucat[0]
cat = ucat[1]
tags = ucat[2]
import_events_from_url.delay(url, cat, tags, user_id=user_id)
app.conf.beat_schedule = {
"daily_imports": {
"task": "agenda_culturel.celery.daily_imports",
# Daily imports at 3:14 a.m.
"schedule": crontab(hour=3, minute=14),
},
"weekly_imports": {
"task": "agenda_culturel.celery.weekly_imports",
# Daily imports on Mondays at 2:22 a.m.
"schedule": crontab(hour=2, minute=22, day_of_week="mon"),
},
}
app.conf.timezone = "Europe/Paris"