1093 lines
45 KiB
Python
1093 lines
45 KiB
Python
from django.db import models
|
|
from django_better_admin_arrayfield.models.fields import ArrayField
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.utils.safestring import mark_safe
|
|
from django.template.defaultfilters import slugify
|
|
from django.urls import reverse
|
|
from colorfield.fields import ColorField
|
|
from ckeditor.fields import RichTextField
|
|
from urllib.parse import urlparse
|
|
import urllib.request
|
|
import os
|
|
from django.core.files import File
|
|
from django.utils import timezone
|
|
from django.contrib.postgres.search import TrigramSimilarity
|
|
from django.db.models import Q, Count
|
|
import recurrence.fields
|
|
import recurrence
|
|
import copy
|
|
import unicodedata
|
|
|
|
from django.template.defaultfilters import date as _date
|
|
from datetime import time, timedelta, date
|
|
from django.utils.timezone import datetime
|
|
from django.utils import timezone
|
|
|
|
from location_field.models.plain import PlainLocationField
|
|
|
|
from .calendar import CalendarList, CalendarDay
|
|
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def remove_accents(input_str):
|
|
nfkd_form = unicodedata.normalize('NFKD', input_str)
|
|
return u"".join([c for c in nfkd_form if not unicodedata.combining(c)])
|
|
|
|
class StaticContent(models.Model):
|
|
|
|
name = models.CharField(verbose_name=_('Name'), help_text=_('Category name'), max_length=512, unique=True)
|
|
text = RichTextField(verbose_name=_('Content'), help_text=_('Text as shown to the visitors'))
|
|
url_path = models.CharField(verbose_name=_('URL path'), help_text=_('URL path where the content is included.'))
|
|
|
|
class Meta:
|
|
verbose_name = _('Static content')
|
|
verbose_name_plural = _('Static contents')
|
|
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def get_absolute_url(self):
|
|
return self.url_path
|
|
|
|
class Category(models.Model):
|
|
|
|
default_name = "Sans catégorie"
|
|
default_alt_name = "Événements non catégorisés"
|
|
default_codename = "∅"
|
|
default_css_class = "cat-nocat"
|
|
default_color = "#aaaaaa"
|
|
|
|
COLOR_PALETTE = [
|
|
("#ea5545", "color 1"),
|
|
("#f46a9b", "color 2"),
|
|
("#ef9b20", "color 3"),
|
|
("#edbf33", "color 4"),
|
|
("#ede15b", "color 5"),
|
|
("#bdcf32", "color 6"),
|
|
("#87bc45", "color 7"),
|
|
("#27aeef", "color 8"),
|
|
("#b33dc6", "color 9")]
|
|
|
|
name = models.CharField(verbose_name=_('Name'), help_text=_('Category name'), max_length=512)
|
|
alt_name = models.CharField(verbose_name=_('Alternative Name'), help_text=_('Alternative name used with a time period'), max_length=512)
|
|
codename = models.CharField(verbose_name=_('Short name'), help_text=_('Short name of the category'), max_length=3)
|
|
color = ColorField(verbose_name=_('Color'), help_text=_('Color used as background for the category'), blank=True, null=True)
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.color is None:
|
|
existing_colors = [c.color for c in Category.objects.all()]
|
|
if len(existing_colors) > len(Category.COLOR_PALETTE):
|
|
self.color = "#CCCCCC"
|
|
else:
|
|
for c, n in Category.COLOR_PALETTE:
|
|
if c not in existing_colors:
|
|
self.color = c
|
|
break
|
|
if self.color is None:
|
|
self.color = "#CCCCCC"
|
|
|
|
super(Category, self).save(*args, **kwargs)
|
|
|
|
def get_default_category():
|
|
try:
|
|
default, created = Category.objects.get_or_create(name=Category.default_name,
|
|
alt_name=Category.default_alt_name,
|
|
codename=Category.default_codename,
|
|
color=Category.default_color)
|
|
|
|
return default
|
|
except:
|
|
return None
|
|
|
|
|
|
def get_default_category_id():
|
|
cat = Category.get_default_category()
|
|
if cat:
|
|
return cat.id
|
|
else:
|
|
return None
|
|
|
|
def css_class(self):
|
|
return "cat-" + str(self.id)
|
|
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class Meta:
|
|
verbose_name = _('Category')
|
|
verbose_name_plural = _('Categories')
|
|
|
|
|
|
class DuplicatedEvents(models.Model):
|
|
|
|
class Meta:
|
|
verbose_name = _('Duplicated events')
|
|
verbose_name_plural = _('Duplicated events')
|
|
|
|
|
|
def nb_duplicated(self):
|
|
return Event.objects.filter(possibly_duplicated=self).count()
|
|
|
|
def get_duplicated(self):
|
|
return Event.objects.filter(possibly_duplicated=self)
|
|
|
|
def merge_into(self, other):
|
|
# for all objects associated to this group
|
|
for e in Event.objects.filter(possibly_duplicated=self):
|
|
# change their group membership
|
|
e.possibly_duplicated = other
|
|
# save them
|
|
e.save()
|
|
# then delete the empty group
|
|
self.delete()
|
|
|
|
def merge_groups(groups):
|
|
if len(groups) == 0:
|
|
return None
|
|
elif len(groups) == 1:
|
|
return groups[0]
|
|
else:
|
|
result = groups[0]
|
|
for g in groups[1:]:
|
|
g.merge_into(result)
|
|
return result
|
|
|
|
def get_items_comparison(self):
|
|
return Event.get_comparison(self.get_duplicated())
|
|
|
|
def remove_singletons():
|
|
singletons = DuplicatedEvents.objects.annotate(nb_events=Count("event")).filter(nb_events__lte=1)
|
|
nb = len(singletons)
|
|
if nb > 0:
|
|
logger.warning("Removing: " + str(nb) + " empty or singleton duplicated")
|
|
singletons.delete()
|
|
return nb
|
|
|
|
def remove_similar_entries():
|
|
to_be_removed = []
|
|
duplicates = DuplicatedEvents.objects.all()
|
|
for d in duplicates:
|
|
comp = d.get_items_comparison()
|
|
similar = len([c for c in comp if not c["similar"]]) > 0
|
|
if similar:
|
|
to_be_removed.append(d)
|
|
|
|
nb = len(to_be_removed)
|
|
if nb > 0:
|
|
logger.warning("Removing: " + str(nb) + " similar duplicated")
|
|
for s in to_be_removed:
|
|
s.delete()
|
|
return nb
|
|
|
|
|
|
|
|
class Place(models.Model):
|
|
name = models.CharField(verbose_name=_('Name'), help_text=_('Name of the place'))
|
|
address = models.CharField(verbose_name=_('Address'), help_text=_('Address of this place (without city name)'), blank=True, null=True)
|
|
city = models.CharField(verbose_name=_('City'), help_text=_('City name'))
|
|
location = PlainLocationField(based_fields=['name', 'address', 'city'], zoom=12)
|
|
|
|
aliases = ArrayField(models.CharField(max_length=512), verbose_name=_('Alternative names'), help_text=_("Alternative names or addresses used to match a place with the free-form location of an event."), blank=True, null=True)
|
|
|
|
class Meta:
|
|
verbose_name = _('Place')
|
|
verbose_name_plural = _('Places')
|
|
|
|
def __str__(self):
|
|
if self.address:
|
|
return self.name + ", " + self.address + ", " + self.city
|
|
else:
|
|
return self.name + ", " + self.city
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("view_place", kwargs={"pk": self.pk})
|
|
|
|
def nb_events(self):
|
|
return Event.objects.filter(exact_location=self).count()
|
|
|
|
def match(self, event):
|
|
if self.aliases:
|
|
return event.location in self.aliases
|
|
else:
|
|
return False
|
|
|
|
def associate_matching_events(self):
|
|
u_events = Event.objects.filter(exact_location__isnull=True)
|
|
|
|
to_be_updated = []
|
|
# try to find matches
|
|
for ue in u_events:
|
|
if self.match(ue):
|
|
ue.exact_location = self
|
|
to_be_updated.append(ue)
|
|
continue
|
|
# update events with a location
|
|
Event.objects.bulk_update(to_be_updated, fields=["exact_location"])
|
|
return len(to_be_updated)
|
|
|
|
|
|
def get_all_cities():
|
|
try:
|
|
tags = list([p["city"] for p in Place.objects.values("city").distinct().order_by("city")])
|
|
except:
|
|
tags = []
|
|
return tags
|
|
|
|
|
|
class Event(models.Model):
|
|
|
|
class STATUS(models.TextChoices):
|
|
PUBLISHED = "published", _("Published")
|
|
DRAFT = "draft", _("Draft")
|
|
TRASH = "trash", _("Trash")
|
|
|
|
created_date = models.DateTimeField(editable=False)
|
|
imported_date = models.DateTimeField(blank=True, null=True)
|
|
modified_date = models.DateTimeField(blank=True, null=True)
|
|
moderated_date = models.DateTimeField(blank=True, null=True)
|
|
|
|
recurrence_dtstart = models.DateTimeField(editable=False, blank=True, null=True)
|
|
recurrence_dtend = models.DateTimeField(editable=False, blank=True, null=True)
|
|
|
|
title = models.CharField(verbose_name=_('Title'), help_text=_('Short title'), max_length=512)
|
|
|
|
status = models.CharField(_("Status"), max_length=20, choices=STATUS.choices, default=STATUS.DRAFT)
|
|
|
|
category = models.ForeignKey(Category, verbose_name=_('Category'), help_text=_('Category of the event'), null=True, default=Category.get_default_category_id(), on_delete=models.SET_DEFAULT)
|
|
|
|
start_day = models.DateField(verbose_name=_('Day of the event'), help_text=_('Day of the event'))
|
|
start_time = models.TimeField(verbose_name=_('Starting time'), help_text=_('Starting time'), blank=True, null=True)
|
|
|
|
end_day = models.DateField(verbose_name=_('End day of the event'), help_text=_('End day of the event, only required if different from the start day.'), blank=True, null=True)
|
|
end_time = models.TimeField(verbose_name=_('Final time'), help_text=_('Final time'), blank=True, null=True)
|
|
|
|
recurrences = recurrence.fields.RecurrenceField(verbose_name=_("Recurrence"), include_dtstart=False, blank=True, null=True)
|
|
|
|
exact_location = models.ForeignKey(Place, verbose_name=_('Location'), help_text=_('Address of the event'), null=True, on_delete=models.SET_NULL, blank=True)
|
|
location = models.CharField(verbose_name=_('Location (free form)'), help_text=_('Address of the event in case its not available in the already known places (free form)'), max_length=512, default="")
|
|
|
|
description = models.TextField(verbose_name=_('Description'), help_text=_('General description of the event'), blank=True, null=True)
|
|
|
|
local_image = models.ImageField(verbose_name=_('Illustration (local image)'), help_text=_("Illustration image stored in the agenda server"), max_length=1024, blank=True, null=True)
|
|
|
|
image = models.URLField(verbose_name=_('Illustration'), help_text=_("URL of the illustration image"), max_length=1024, blank=True, null=True)
|
|
image_alt = models.CharField(verbose_name=_('Illustration description'), help_text=_('Alternative text used by screen readers for the image'), blank=True, null=True, max_length=1024)
|
|
|
|
import_sources = ArrayField(models.CharField(max_length=512), verbose_name=_('Importation source'), help_text=_("Importation source used to detect removed entries."), blank=True, null=True)
|
|
uuids = ArrayField(models.CharField(max_length=512), verbose_name=_('UUIDs'), help_text=_("UUIDs from import to detect duplicated entries."), blank=True, null=True)
|
|
reference_urls = ArrayField(models.URLField(max_length=512), verbose_name=_('URLs'), help_text=_("List of all the urls where this event can be found."), blank=True, null=True)
|
|
|
|
tags = ArrayField(models.CharField(max_length=64), verbose_name=_('Tags'), help_text=_("A list of tags that describe the event."), blank=True, null=True)
|
|
|
|
possibly_duplicated = models.ForeignKey(DuplicatedEvents, verbose_name=_('Possibly duplicated'), on_delete=models.SET_NULL, null=True, blank=True)
|
|
|
|
def get_consolidated_end_day(self, intuitive=True):
|
|
if intuitive:
|
|
end_day = self.get_consolidated_end_day(False)
|
|
if end_day != self.start_day and self.end_time and self.end_time < time(8):
|
|
return end_day + timedelta(days=-1)
|
|
else:
|
|
return end_day
|
|
else:
|
|
return self.end_day if self.end_day else self.start_day
|
|
|
|
def get_dates(self):
|
|
first = self.start_day
|
|
last = self.get_consolidated_end_day()
|
|
return [first + timedelta(n) for n in range(int((last - first).days) + 1)]
|
|
|
|
|
|
def get_nb_events_same_dates(self):
|
|
first = self.start_day
|
|
last = self.get_consolidated_end_day()
|
|
calendar = CalendarList(first, last, exact=True)
|
|
return [(len(d.events), d.date) for dstr, d in calendar.calendar_days.items()]
|
|
|
|
|
|
def is_single_day(self, intuitive=True):
|
|
return self.start_day == self.get_consolidated_end_day(intuitive)
|
|
|
|
def contains_date(self, d, intuitive=True):
|
|
return d >= self.start_day and d <= self.get_consolidated_end_day(intuitive)
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("view_event", kwargs={"year": self.start_day.year,
|
|
"month": self.start_day.month,
|
|
"day": self.start_day.day,
|
|
"pk": self.pk, "extra": slugify(self.title)})
|
|
|
|
def __str__(self):
|
|
return _date(self.start_day) + ": " + self.title
|
|
|
|
class Meta:
|
|
verbose_name = _('Event')
|
|
verbose_name_plural = _('Events')
|
|
permissions = [("set_duplicated_event", "Can set an event as duplicated")]
|
|
|
|
def get_all_tags():
|
|
try:
|
|
tags = list(Event.objects.values_list('tags', flat = True))
|
|
except:
|
|
tags = []
|
|
uniq_tags = set()
|
|
for t in tags:
|
|
if t is not None:
|
|
uniq_tags = uniq_tags | set(t)
|
|
return list(uniq_tags)
|
|
|
|
def is_draft(self):
|
|
return self.status == Event.STATUS.DRAFT
|
|
|
|
def is_published(self):
|
|
return self.status == Event.STATUS.PUBLISHED
|
|
|
|
def is_trash(self):
|
|
return self.status == Event.STATUS.TRASH
|
|
|
|
def modified(self):
|
|
return self.modified_date is None or abs((self.modified_date - self.created_date).total_seconds()) > 1
|
|
|
|
def nb_draft_events():
|
|
return Event.objects.filter(status=Event.STATUS.DRAFT).count()
|
|
|
|
|
|
def download_image(self):
|
|
# first download file
|
|
|
|
a = urlparse(self.image)
|
|
basename = os.path.basename(a.path)
|
|
|
|
try:
|
|
tmpfile, _ = urllib.request.urlretrieve(self.image)
|
|
except:
|
|
return None
|
|
|
|
# if the download is ok, then create the corresponding file object
|
|
self.local_image = File(name=basename, file=open(tmpfile, "rb"))
|
|
|
|
def set_skip_duplicate_check(self):
|
|
self.skip_duplicate_check = True
|
|
|
|
def is_skip_duplicate_check(self):
|
|
return hasattr(self, "skip_duplicate_check")
|
|
|
|
def is_in_importation_process(self):
|
|
return hasattr(self, "in_importation_process")
|
|
|
|
def set_in_importation_process(self):
|
|
self.in_importation_process = True
|
|
|
|
def update_modification_dates(self):
|
|
now = timezone.now()
|
|
if not self.id:
|
|
self.created_date = now
|
|
if self.is_in_importation_process():
|
|
self.imported_date = now
|
|
if self.modified_date is None or not self.is_in_importation_process():
|
|
self.modified_date = now
|
|
|
|
def get_recurrence_at_date(self, year, month, day):
|
|
dtstart = timezone.make_aware(datetime(year, month, day, 0, 0), timezone.get_default_timezone())
|
|
recurrences = self.get_recurrences_between(dtstart, dtstart)
|
|
if len(recurrences) == 0:
|
|
return self
|
|
else:
|
|
return recurrences[0]
|
|
|
|
|
|
# return a copy of the current object for each recurrence between first an last date (included)
|
|
def get_recurrences_between(self, firstdate, lastdate):
|
|
if not self.has_recurrences():
|
|
return [self]
|
|
else:
|
|
result = []
|
|
dtstart = timezone.make_aware(datetime.combine(self.start_day, time()), timezone.get_default_timezone())
|
|
self.recurrences.dtstart = dtstart
|
|
for d in self.recurrences.between(firstdate, lastdate, inc=True, dtstart=dtstart):
|
|
|
|
c = copy.deepcopy(self)
|
|
c.start_day = d.date()
|
|
if c.end_day is not None:
|
|
shift = d.date() - self.start_day
|
|
c.end_day += shift
|
|
result.append(c)
|
|
|
|
return result
|
|
|
|
def has_recurrences(self):
|
|
# TODO: see https://forge.chapril.org/jmtrivial/agenda_culturel/issues/65
|
|
return self.recurrences is not None and len(self.recurrences.rrules) != 0
|
|
|
|
def update_recurrence_dtstartend(self):
|
|
sday = date.fromisoformat(self.start_day) if isinstance(self.start_day, str) else self.start_day
|
|
eday = date.fromisoformat(self.end_day) if isinstance(self.end_day, str) else self.end_day
|
|
stime = time.fromisoformat(self.start_time) if isinstance(self.start_time, str) else time() if self.start_time is None else self.start_time
|
|
etime = time.fromisoformat(self.end_time) if isinstance(self.end_time, str) else time() if self.end_time is None else self.end_time
|
|
|
|
self.recurrence_dtstart = datetime.combine(sday, stime)
|
|
if not self.has_recurrences():
|
|
if self.end_day is None:
|
|
self.dtend = None
|
|
else:
|
|
self.recurrence_dtend = datetime.combine(eday, etime)
|
|
else:
|
|
if self.recurrences.rrules[0].until is None and self.recurrences.rrules[0].count is None:
|
|
self.recurrence_dtend = None
|
|
else:
|
|
self.recurrences.dtstart = datetime.combine(sday, time())
|
|
occurrence = self.recurrences.occurrences()
|
|
try:
|
|
self.recurrence_dtend = occurrence[-1]
|
|
if self.recurrences.dtend is not None and self.recurrences.dtstart is not None:
|
|
self.recurrence_dtend += self.recurrences.dtend - self.recurrences.dtstart
|
|
except:
|
|
self.recurrence_dtend = self.recurrence_dtstart
|
|
|
|
|
|
def prepare_save(self):
|
|
self.update_modification_dates()
|
|
|
|
self.update_recurrence_dtstartend()
|
|
|
|
# if the image is defined but not locally downloaded
|
|
if self.image and not self.local_image:
|
|
self.download_image()
|
|
|
|
if self.is_in_importation_process():
|
|
# try to detect category
|
|
CategorisationRule.apply_rules(self)
|
|
# try to detect location
|
|
if not self.exact_location:
|
|
for p in Place.objects.all():
|
|
if p.match(self):
|
|
self.exact_location = p
|
|
break
|
|
|
|
|
|
def save(self, *args, **kwargs):
|
|
|
|
self.prepare_save()
|
|
|
|
# check for similar events if no duplicated is known only if the event is created
|
|
if self.pk is None and self.possibly_duplicated is None and not self.is_skip_duplicate_check():
|
|
# and if this is not an importation process
|
|
if not self.is_in_importation_process():
|
|
similar_events = self.find_similar_events()
|
|
|
|
# if it exists similar events, add this relation to the event
|
|
if len(similar_events) != 0:
|
|
self.set_possibly_duplicated(similar_events)
|
|
|
|
|
|
# delete duplicated group if it's only with one element
|
|
if self.possibly_duplicated is not None and self.possibly_duplicated.nb_duplicated() == 1:
|
|
self.possibly_duplicated.delete()
|
|
self.possibly_duplicated = None
|
|
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
|
|
def from_structure(event_structure, import_source = None):
|
|
if "category" in event_structure and event_structure["category"] is not None:
|
|
event_structure["category"] = Category.objects.get(name=event_structure["category"])
|
|
|
|
if "published" in event_structure and event_structure["published"] is not None:
|
|
if event_structure["published"]:
|
|
event_structure["status"] = Event.STATUS.PUBLISHED
|
|
else:
|
|
event_structure["status"] = Event.STATUS.DRAFT
|
|
del event_structure["published"]
|
|
else:
|
|
event_structure["status"] = Event.STATUS.DRAFT
|
|
|
|
if "url_human" in event_structure and event_structure["url_human"] is not None:
|
|
event_structure["reference_urls"] = [event_structure["url_human"]]
|
|
del event_structure["url_human"]
|
|
|
|
if "last_modified" in event_structure and event_structure["last_modified"] is not None:
|
|
d = datetime.fromisoformat(event_structure["last_modified"])
|
|
if d.year == 2024 and d.month > 2:
|
|
logger.warning("last modified {}".format(d))
|
|
if d.tzinfo is None or d.tzinfo.utcoffset(d) is None:
|
|
d = timezone.make_aware(d, timezone.get_default_timezone())
|
|
event_structure["modified_date"] = d
|
|
del event_structure["last_modified"]
|
|
else:
|
|
event_structure["modified_date"] = None
|
|
|
|
if "start_time" in event_structure:
|
|
event_structure["start_time"] = time.fromisoformat(event_structure["start_time"])
|
|
|
|
if "end_time" in event_structure:
|
|
event_structure["end_time"] = time.fromisoformat(event_structure["end_time"])
|
|
|
|
if "location" not in event_structure or event_structure["location"] is None:
|
|
event_structure["location"] = ""
|
|
|
|
if "description" in event_structure and event_structure["description"] is None:
|
|
event_structure["description"] = ""
|
|
|
|
if "recurrences" in event_structure and event_structure["recurrences"] is not None:
|
|
event_structure["recurrences"] = recurrence.deserialize(event_structure["recurrences"])
|
|
event_structure["recurrences"].exdates = [e.replace(hour=0, minute=0, second=0) for e in event_structure["recurrences"].exdates]
|
|
event_structure["recurrences"].rdates = [e.replace(hour=0, minute=0, second=0) for e in event_structure["recurrences"].rdates]
|
|
|
|
else:
|
|
event_structure["recurrences"] = None
|
|
|
|
if import_source is not None:
|
|
event_structure["import_sources"] = [import_source]
|
|
|
|
return Event(**event_structure)
|
|
|
|
|
|
def find_similar_events(self):
|
|
start_time_test = Q(start_time=self.start_time)
|
|
|
|
if self.start_time is not None:
|
|
# convert str start_time to time
|
|
if isinstance(self.start_time, str):
|
|
self.start_time = time.fromisoformat(self.start_time)
|
|
interval = (time(self.start_time.hour - 1, self.start_time.minute) if self.start_time.hour >= 1 else time(0, 0),
|
|
time(self.start_time.hour + 1, self.start_time.minute) if self.start_time.hour < 23 else time(23, 59))
|
|
start_time_test = start_time_test | Q(start_time__range=interval)
|
|
|
|
return Event.objects.annotate(similarity_title=TrigramSimilarity("title", self.title)). \
|
|
annotate(similarity_location=TrigramSimilarity("location", self.location)). \
|
|
filter(Q(start_day=self.start_day) & start_time_test & Q(similarity_title__gt=0.5) & Q(similarity_title__gt=0.3))
|
|
|
|
|
|
def find_same_events_by_uuid(self):
|
|
return None if self.uuids is None or len(self.uuids) == 0 else Event.objects.filter(uuids__contains=self.uuids)
|
|
|
|
|
|
def split_uuid(uuid):
|
|
els = uuid.split(':')
|
|
if len(els) == 1:
|
|
return ":".join(els[0:-1]), 0
|
|
else:
|
|
if els[-1].isdigit():
|
|
return ":".join(els[0:-1]), int(els[-1])
|
|
else:
|
|
return ":".join(els), 0
|
|
|
|
def is_ancestor_uuid(uuid1, uuid2):
|
|
root1, version1 = Event.split_uuid(uuid1)
|
|
root2, version2 = Event.split_uuid(uuid2)
|
|
return root1 == root2 and version1 < version2
|
|
|
|
def is_ancestor_by_uuid(self, event):
|
|
if self.uuids is None or event.uuids is None:
|
|
return False
|
|
|
|
for s_uuid in self.uuids:
|
|
for e_uuid in event.uuids:
|
|
if Event.is_ancestor_uuid(s_uuid, e_uuid):
|
|
return True
|
|
return False
|
|
|
|
def get_possibly_duplicated(self):
|
|
if self.possibly_duplicated is None:
|
|
return []
|
|
else:
|
|
return Event.objects.filter(possibly_duplicated=self.possibly_duplicated).exclude(pk=self.pk)
|
|
|
|
|
|
def get_comparison(events):
|
|
result = []
|
|
for attr in Event.data_fields(all=True, local_img=False):
|
|
values = [getattr(e, attr) for e in events]
|
|
values = ["" if v is None else v for v in values]
|
|
|
|
if len(set([str(v) for v in values])) == 1:
|
|
result.append({"similar": True, "key": attr, "values": values[0]})
|
|
else:
|
|
result.append({"similar": False, "key": attr, "values": values})
|
|
return result
|
|
|
|
def similar(self, event):
|
|
res = Event.get_comparison([self, event])
|
|
for r in res:
|
|
if not r["similar"]:
|
|
return False
|
|
return True
|
|
|
|
|
|
def set_possibly_duplicated(self, events):
|
|
|
|
# get existing groups
|
|
groups = list(set([e.possibly_duplicated for e in events] + [self.possibly_duplicated]))
|
|
groups = [g for g in groups if g is not None]
|
|
|
|
|
|
# do we have to create a new group?
|
|
if len(groups) == 0:
|
|
group = DuplicatedEvents.objects.create()
|
|
else:
|
|
# otherwise merge existing groups
|
|
group = DuplicatedEvents.merge_groups(groups)
|
|
group.save()
|
|
|
|
|
|
# set the possibly duplicated group for the current object
|
|
self.possibly_duplicated = group
|
|
|
|
# and for the other events
|
|
for e in events:
|
|
e.possibly_duplicated = group
|
|
|
|
# finally update all events (including current if already created)
|
|
elist = list(events) + ([self] if self.pk is not None else [])
|
|
Event.objects.bulk_update(elist, fields=["possibly_duplicated"])
|
|
|
|
|
|
def data_fields(all=False, local_img=True):
|
|
if all:
|
|
result = ["category"]
|
|
else:
|
|
result = []
|
|
|
|
result += ["title", "location", "exact_location", "start_day", "start_time", "end_day", "end_time", "description", "image"]
|
|
if all and local_img:
|
|
result += ["local_image"]
|
|
result += ["image_alt", "reference_urls", "recurrences"]
|
|
if all:
|
|
result += ["tags"]
|
|
return result
|
|
|
|
def same_event_by_data(self, other):
|
|
for attr in Event.data_fields():
|
|
if str(getattr(self, attr)) != str(getattr(other, attr)):
|
|
return False
|
|
return True
|
|
|
|
def find_same_event_by_data_in_list(self, events):
|
|
return [e for e in events if self.same_event_by_data(e)]
|
|
|
|
|
|
def find_last_imported(events):
|
|
events = [e for e in events if e.imported_date is not None]
|
|
if len(events) == 0:
|
|
return None
|
|
else:
|
|
events.sort(key=lambda e: e.imported_date, reverse=True)
|
|
return events[0]
|
|
|
|
|
|
def find_last_imported_not_modified(events):
|
|
events = [e for e in events if e.imported_date is not None and (e.modified_date is None or e.modified_date <= e.imported_date)]
|
|
if len(events) == 0:
|
|
return None
|
|
else:
|
|
events.sort(key=lambda e: e.imported_date, reverse=True)
|
|
return events[0]
|
|
|
|
|
|
def update(self, other):
|
|
# TODO: what about category, tags?
|
|
# set attributes
|
|
for attr in Event.data_fields():
|
|
setattr(self, attr, getattr(other, attr))
|
|
|
|
# adjust modified date if required
|
|
if other.modified_date and self.modified_date < other.modified_date:
|
|
self.modified_date = other.modified_date
|
|
|
|
# set status according to the input status
|
|
if other.status is not None:
|
|
self.status = other.status
|
|
|
|
# add a possible missing uuid
|
|
if self.uuids is None:
|
|
self.uuids = []
|
|
for uuid in other.uuids:
|
|
if not uuid in self.uuids:
|
|
self.uuids.append(uuid)
|
|
|
|
|
|
# Limitation: the given events should not be considered similar one to another...
|
|
def import_events(events, remove_missing_from_source=None):
|
|
to_import = []
|
|
to_update = []
|
|
|
|
min_date = timezone.now().date()
|
|
max_date = None
|
|
uuids = set()
|
|
|
|
# for each event, check if it's a new one, or a one to be updated
|
|
for event in events:
|
|
sdate = date.fromisoformat(event.start_day)
|
|
if event.end_day:
|
|
edate = date.fromisoformat(event.end_day)
|
|
else:
|
|
edate = sdate
|
|
if min_date is None or min_date > sdate:
|
|
min_date = sdate
|
|
if max_date is None or max_date < sdate:
|
|
max_date = sdate
|
|
if max_date is None or (event.end_day is not None and max_date < edate):
|
|
max_date = edate
|
|
if event.uuids and len(event.uuids) > 0:
|
|
uuids |= set(event.uuids)
|
|
|
|
# imported events should be updated
|
|
event.set_in_importation_process()
|
|
event.prepare_save()
|
|
|
|
# check if the event has already be imported (using uuid)
|
|
same_events = event.find_same_events_by_uuid()
|
|
|
|
if len(same_events) != 0:
|
|
# check if one event has been imported and not modified in this list
|
|
same_imported = Event.find_last_imported_not_modified(same_events)
|
|
same_events_not_similar = [e for e in same_events if not e.similar(event)]
|
|
if same_imported or len(same_events_not_similar) == 0:
|
|
if not same_imported:
|
|
same_imported = Event.find_last_imported(same_events)
|
|
|
|
# if this event exists, it will be updated with new data only if the data is fresher
|
|
if same_imported.modified_date < event.modified_date:
|
|
same_imported.update(event)
|
|
same_imported.set_in_importation_process()
|
|
same_imported.prepare_save()
|
|
to_update.append(same_imported)
|
|
else:
|
|
# otherwise, the new event possibly a duplication of the remaining others.
|
|
event.set_possibly_duplicated(same_events)
|
|
# it will be imported
|
|
to_import.append(event)
|
|
else:
|
|
# if uuid is unique (or not available), check for similar events
|
|
similar_events = event.find_similar_events()
|
|
|
|
# if it exists similar events, add this relation to the event
|
|
if len(similar_events) != 0:
|
|
|
|
# check if an event from the list is exactly the same as the new one (using data)
|
|
same_events = event.find_same_event_by_data_in_list(similar_events)
|
|
if same_events is not None and len(same_events) > 0:
|
|
# merge with the first one
|
|
same_events[0].update(event)
|
|
same_events[0].set_in_importation_process()
|
|
same_events[0].prepare_save()
|
|
to_update.append(same_events[0])
|
|
else:
|
|
# the event is possibly a duplication of the others
|
|
event.set_possibly_duplicated(similar_events)
|
|
to_import.append(event)
|
|
else:
|
|
# import this new event
|
|
to_import.append(event)
|
|
|
|
# then import all the new events
|
|
imported = Event.objects.bulk_create(to_import)
|
|
nb_updated = Event.objects.bulk_update(to_update, fields = Event.data_fields() + ["imported_date", "modified_date", "uuids", "status"])
|
|
|
|
nb_draft = 0
|
|
if remove_missing_from_source is not None and max_date is not None:
|
|
# events that are missing from the import but in database are turned into drafts
|
|
# only if they are in the future
|
|
|
|
in_interval = Event.objects.filter(((Q(end_day__isnull=True) & Q(start_day__gte=min_date) & Q(start_day__lte=max_date)) |
|
|
(Q(end_day__isnull=False) & ~(Q(start_day__gt=max_date) | Q(end_day__lt=min_date)))) & Q(import_sources__contains=[remove_missing_from_source]) & Q(status=Event.STATUS.PUBLISHED) & Q(uuids__len__gt=0))
|
|
|
|
to_draft = []
|
|
for e in in_interval:
|
|
if len(uuids.intersection(e.uuids)) == 0:
|
|
e.status = Event.STATUS.TRASH
|
|
e.prepare_save()
|
|
to_draft.append(e)
|
|
|
|
nb_draft = Event.objects.bulk_update(to_draft, fields = ["status"])
|
|
|
|
return imported, nb_updated, nb_draft
|
|
|
|
def set_current_date(self, date):
|
|
self.current_date = date
|
|
|
|
|
|
def get_start_end_datetimes(self, day):
|
|
if self.start_day == day:
|
|
if self.start_time is None:
|
|
dtstart = datetime.combine(self.start_day, time().min)
|
|
else:
|
|
dtstart = datetime.combine(self.start_day, self.start_time)
|
|
else:
|
|
dtstart = datetime.combine(day, time().min)
|
|
|
|
end_day = self.get_consolidated_end_day()
|
|
|
|
if end_day == day:
|
|
if self.end_time is None:
|
|
dtend = datetime.combine(end_day, time().max)
|
|
else:
|
|
dtend = datetime.combine(end_day, self.end_time)
|
|
else:
|
|
dtend = datetime.combine(day, time().max)
|
|
|
|
return dtstart, dtend
|
|
|
|
def get_concurrent_events(self):
|
|
day = self.current_date if hasattr(self, "current_date") else self.start_day
|
|
day_events = CalendarDay(self.start_day).get_events()
|
|
return [e for e in day_events if e != self and self.is_concurrent_event(e, day) and e.status == Event.STATUS.PUBLISHED]
|
|
|
|
def is_concurrent_event(self, e, day):
|
|
dtstart, dtend = self.get_start_end_datetimes(day)
|
|
e_dtstart, e_dtend = e.get_start_end_datetimes(day)
|
|
|
|
return (dtstart <= e_dtstart <= dtend) or (e_dtstart <= dtstart <= e_dtend)
|
|
|
|
|
|
|
|
|
|
class ContactMessage(models.Model):
|
|
|
|
class Meta:
|
|
verbose_name = _('Contact message')
|
|
verbose_name_plural = _('Contact messages')
|
|
|
|
subject = models.CharField(verbose_name=_('Subject'), help_text=_('The subject of your message'), max_length=512)
|
|
name = models.CharField(verbose_name=_('Name'), help_text=_('Your name'), max_length=512, blank=True, null=True)
|
|
email = models.EmailField(verbose_name=_('Email address'), help_text=_('Your email address'), max_length=254, blank=True, null=True)
|
|
message = RichTextField(verbose_name=_('Message'), help_text=_('Your message'))
|
|
|
|
date = models.DateTimeField(auto_now_add=True)
|
|
|
|
closed = models.BooleanField(verbose_name=_('Closed'), help_text=_('this message has been processed and no longer needs to be handled'), default=False)
|
|
comments = RichTextField(verbose_name=_('Comments'), help_text=_('Comments on the message from the moderation team'), default="", blank=True, null=True)
|
|
|
|
def nb_open_contactmessages():
|
|
return ContactMessage.objects.filter(closed=False).count()
|
|
|
|
|
|
class RecurrentImport(models.Model):
|
|
|
|
|
|
class Meta:
|
|
verbose_name = _('Recurrent import')
|
|
verbose_name_plural = _('Recurrent imports')
|
|
permissions = [("run_recurrentimport", "Can run a recurrent import")]
|
|
|
|
class PROCESSOR(models.TextChoices):
|
|
ICAL = "ical", _("ical")
|
|
ICALNOBUSY = "icalnobusy", _("ical no busy")
|
|
ICALNOVC = "icalnovc", _("ical no VC")
|
|
LACOOPE = "lacoope", _('lacoope.org')
|
|
LACOMEDIE = "lacomedie", _('la comédie')
|
|
LEFOTOMAT = "lefotomat", _('le fotomat')
|
|
LAPUCEALOREILLE = "lapucealoreille", _('la puce à l''oreille')
|
|
|
|
class DOWNLOADER(models.TextChoices):
|
|
SIMPLE = "simple", _("simple")
|
|
CHROMIUMHEADLESS = "chromium headless", _("Headless Chromium")
|
|
|
|
|
|
class RECURRENCE(models.TextChoices):
|
|
DAILY = "daily", _("daily"),
|
|
WEEKLY = "weekly", _("weekly")
|
|
|
|
name = models.CharField(verbose_name=_('Name'), help_text=_('Recurrent import name. Be careful to choose a name that is easy to understand, as it will be public and displayed on the site''s About page.'), max_length=512, default="")
|
|
processor = models.CharField(_("Processor"), max_length=20, choices=PROCESSOR.choices, default=PROCESSOR.ICAL)
|
|
downloader = models.CharField(_("Downloader"), max_length=20, choices=DOWNLOADER.choices, default=DOWNLOADER.SIMPLE)
|
|
|
|
recurrence = models.CharField(_("Import recurrence"), max_length=10, choices=RECURRENCE.choices, default=RECURRENCE.DAILY)
|
|
|
|
|
|
source = models.URLField(verbose_name=_('Source'), help_text=_("URL of the source document"), max_length=1024)
|
|
browsable_url = models.URLField(verbose_name=_('Browsable url'), help_text=_("URL of the corresponding document that will be shown to visitors."), max_length=1024, blank=True, null=True)
|
|
|
|
defaultPublished = models.BooleanField(verbose_name=_('Published'), help_text=_('Status of each imported event (published or draft)'), default=True)
|
|
defaultLocation = models.CharField(verbose_name=_('Location'), help_text=_('Address for each imported event'), max_length=512, null=True, blank=True)
|
|
defaultCategory = models.ForeignKey(Category, verbose_name=_('Category'), help_text=_('Category of each imported event'), default=Category.get_default_category_id(), on_delete=models.SET_DEFAULT)
|
|
defaultTags = ArrayField(models.CharField(max_length=64), verbose_name=_('Tags for each imported event'), help_text=_("A list of tags that describe each imported event."), blank=True, null=True)
|
|
|
|
def nb_imports(self):
|
|
return BatchImportation.objects.filter(recurrentImport=self).count()
|
|
|
|
def nb_events(self):
|
|
return Event.objects.filter(import_sources__contains=[self.source]).count()
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("view_rimport", kwargs={"pk": self.pk})
|
|
|
|
def last_import(self):
|
|
events = BatchImportation.objects.filter(recurrentImport=self).order_by("-created_date")
|
|
return events[0]
|
|
|
|
|
|
class BatchImportation(models.Model):
|
|
|
|
class STATUS(models.TextChoices):
|
|
RUNNING = "running", _("Running")
|
|
CANCELED = "canceled", _("Canceled")
|
|
SUCCESS = "success", _("Success")
|
|
FAILED = "failed", _("Failed")
|
|
|
|
class Meta:
|
|
verbose_name = _('Batch importation')
|
|
verbose_name_plural = _('Batch importations')
|
|
permissions = [("run_batchimportation", "Can run a batch importation")]
|
|
|
|
|
|
created_date = models.DateTimeField(auto_now_add=True)
|
|
|
|
recurrentImport = models.ForeignKey(RecurrentImport, verbose_name=_('Recurrent import'), help_text=_('Reference to the recurrent import processing'), blank=True, null=True, on_delete=models.SET_NULL, editable=False)
|
|
|
|
status = models.CharField(_("Status"), max_length=20, choices=STATUS.choices, default=STATUS.RUNNING)
|
|
|
|
error_message = models.CharField(verbose_name=_('Error message'), max_length=512, blank=True, null=True)
|
|
|
|
nb_initial = models.PositiveIntegerField(verbose_name=_('Number of collected events'), default=0)
|
|
nb_imported = models.PositiveIntegerField(verbose_name=_('Number of imported events'), default=0)
|
|
nb_updated = models.PositiveIntegerField(verbose_name=_('Number of updated events'), default=0)
|
|
nb_removed = models.PositiveIntegerField(verbose_name=_('Number of removed events'), default=0)
|
|
|
|
celery_id = models.CharField(max_length=128, default="")
|
|
|
|
|
|
class CategorisationRule(models.Model):
|
|
|
|
weight = models.IntegerField(verbose_name=_('Weight'), help_text=_("The lower is the weight, the earlier the filter is applied"), default=0)
|
|
|
|
category = models.ForeignKey(Category, verbose_name=_('Category'), help_text=_('Category applied to the event'), on_delete=models.CASCADE)
|
|
|
|
title_contains = models.CharField(verbose_name=_('Contained in the title'), help_text=_('Text contained in the event title'), max_length=512, blank=True, null=True)
|
|
title_exact = models.BooleanField(verbose_name=_('Exact title extract'), help_text=_("If checked, the extract will be searched for in the title using the exact form (capitals, accents)."), default=False)
|
|
|
|
description_contains = models.CharField(verbose_name=_('Contained in the description'), help_text=_('Text contained in the description'), max_length=512, blank=True, null=True)
|
|
desc_exact = models.BooleanField(verbose_name=_('Exact description extract'), help_text=_("If checked, the extract will be searched for in the description using the exact form (capitals, accents)."), default=False)
|
|
|
|
location_contains = models.CharField(verbose_name=_('Contained in the location'), help_text=_('Text contained in the event location'), max_length=512, blank=True, null=True)
|
|
loc_exact = models.BooleanField(verbose_name=_('Exact location extract'), help_text=_("If checked, the extract will be searched for in the location using the exact form (capitals, accents)."), default=False)
|
|
|
|
class Meta:
|
|
verbose_name = _('Categorisation rule')
|
|
verbose_name_plural = _('Categorisation rules')
|
|
permissions = [("apply_categorisationrules", "Apply a categorisation rule")]
|
|
|
|
|
|
# all rules are applied, starting from the first to the last
|
|
def apply_rules(event):
|
|
rules = CategorisationRule.objects.all().order_by("weight", "pk")
|
|
|
|
for rule in rules:
|
|
if rule.match(event):
|
|
event.category = rule.category
|
|
return 1
|
|
|
|
return 0
|
|
|
|
def match_rules(event):
|
|
rules = CategorisationRule.objects.all().order_by("weight", "pk")
|
|
|
|
for rule in rules:
|
|
if rule.match(event):
|
|
return rule.category
|
|
|
|
return None
|
|
|
|
|
|
def match(self, event):
|
|
|
|
if self.description_contains and self.description_contains != "":
|
|
if self.desc_exact:
|
|
result = self.description_contains in event.description
|
|
else:
|
|
result = remove_accents(self.description_contains).lower() in remove_accents(event.description).lower()
|
|
if not result:
|
|
return False
|
|
|
|
if self.title_contains and self.title_contains != "":
|
|
if self.title_exact:
|
|
result = self.title_contains in event.title
|
|
else:
|
|
result = remove_accents(self.title_contains).lower() in remove_accents(event.title).lower()
|
|
if not result:
|
|
return False
|
|
|
|
if self.location_contains and self.location_contains != "":
|
|
if self.loc_exact:
|
|
result = self.location_contains in event.location
|
|
else:
|
|
result = remove_accents(self.location_contains).lower() in remove_accents(event.location).lower()
|
|
if not result:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class ModerationQuestion(models.Model):
|
|
|
|
question = models.CharField(verbose_name=_('Question'), help_text=_('Text that will be shown to moderators'), max_length=512, unique=True)
|
|
|
|
class Meta:
|
|
verbose_name = _('Moderation question')
|
|
verbose_name_plural = _('Moderation questions')
|
|
permissions = [("use_moderation_question", "Can use a moderation question to tag an event")]
|
|
|
|
|
|
def __str__(self):
|
|
char_limit = 30
|
|
return (self.question[:char_limit] + "...") if char_limit < len(self.question) else self.question
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("view_mquestion", kwargs={"pk": self.pk})
|
|
|
|
def complete_id(self):
|
|
return "question_" + str(self.pk)
|
|
|
|
|
|
class ModerationAnswer(models.Model):
|
|
|
|
question = models.ForeignKey(ModerationQuestion, related_name="answers", verbose_name=_('Question'), help_text=_('Associated question from moderation'), on_delete=models.CASCADE)
|
|
|
|
answer = models.CharField(verbose_name=_('Answer'), help_text=_('Text that will be shown to moderators'), max_length=512)
|
|
|
|
adds_tags = ArrayField(models.CharField(max_length=64), verbose_name=_('Adds tags'), help_text=_("A list of tags that will be added if you choose this answer."), blank=True, null=True)
|
|
removes_tags = ArrayField(models.CharField(max_length=64), verbose_name=_('Removes tags'), help_text=_("A list of tags that will be removed if you choose this answer."), blank=True, null=True)
|
|
|
|
|
|
def complete_id(self):
|
|
return "answer_" + str(self.question.pk) + '_' + str(self.pk)
|
|
|
|
def html_description(self):
|
|
result = self.answer + '<br /><span class="helptext">'
|
|
if self.adds_tags:
|
|
result += ' '.join(['<span role="button" class="small-cat">' + a + '</span>' for a in self.adds_tags])
|
|
if self.removes_tags:
|
|
result += ' '.join(['<span role="button" class="small-cat strike">' + a + '</span>' for a in self.removes_tags])
|
|
result += "</span>"
|
|
return mark_safe(result)
|
|
|
|
def valid_event(self, event):
|
|
if event.tags:
|
|
if self.adds_tags:
|
|
for t in self.adds_tags:
|
|
if t not in event.tags:
|
|
return False
|
|
|
|
if self.removes_tags:
|
|
for t in self.removes_tags:
|
|
if t in event.tags:
|
|
return False
|
|
|
|
return True
|
|
else:
|
|
return not self.adds_tags or len(self.adds_tags) == 0
|
|
|
|
def apply_answer(self, event):
|
|
if not self.adds_tags:
|
|
self.adds_tags = []
|
|
if not self.removes_tags:
|
|
self.removes_tags = []
|
|
|
|
if event.tags:
|
|
event.tags = list((set(event.tags) | set(self.adds_tags)) - set(self.removes_tags))
|
|
else:
|
|
event.tags = self.adds_tags
|