1910 lines
62 KiB
Python

from django.db import models
from django_better_admin_arrayfield.models.fields import ArrayField
from django.utils.translation import gettext_lazy as _
from django.utils.safestring import mark_safe
from django.template.defaultfilters import slugify
from django.urls import reverse
from colorfield.fields import ColorField
from django_ckeditor_5.fields import CKEditor5Field
from urllib.parse import urlparse
import urllib.request
import os
from django.core.files import File
from django.utils import timezone
from django.contrib.postgres.search import TrigramSimilarity
from django.db.models import Q, Count, F
import recurrence.fields
import recurrence
import copy
import unicodedata
from collections import defaultdict
from .import_tasks.extractor_facebook import FacebookEventExtractor
from django.template.defaultfilters import date as _date
from datetime import time, timedelta, date
from django.utils.timezone import datetime
from django.utils import timezone
from location_field.models.spatial import LocationField
from django.contrib.gis.geos import Point
from .calendar import CalendarList, CalendarDay
from icalendar import Calendar as icalCal
from icalendar import Event as icalEvent
import logging
logger = logging.getLogger(__name__)
def remove_accents(input_str):
if input_str is None:
return None
nfkd_form = unicodedata.normalize("NFKD", input_str)
return "".join([c for c in nfkd_form if not unicodedata.combining(c)])
class StaticContent(models.Model):
name = models.CharField(
verbose_name=_("Name"),
help_text=_("Category name"),
max_length=512,
unique=True,
)
text = CKEditor5Field(
verbose_name=_("Content"), help_text=_("Text as shown to the visitors"),
blank=True
)
url_path = models.CharField(
verbose_name=_("URL path"),
help_text=_("URL path where the content is included."),
)
class Meta:
verbose_name = _("Static content")
verbose_name_plural = _("Static contents")
def __str__(self):
return self.name
def get_absolute_url(self):
return self.url_path
class Category(models.Model):
default_name = "Sans catégorie"
default_alt_name = "Événements non catégorisés"
default_codename = ""
default_css_class = "cat-nocat"
default_color = "#aaaaaa"
COLOR_PALETTE = [
("#ea5545", "color 1"),
("#f46a9b", "color 2"),
("#ef9b20", "color 3"),
("#edbf33", "color 4"),
("#ede15b", "color 5"),
("#bdcf32", "color 6"),
("#87bc45", "color 7"),
("#27aeef", "color 8"),
("#b33dc6", "color 9"),
]
name = models.CharField(
verbose_name=_("Name"), help_text=_("Category name"), max_length=512
)
color = ColorField(
verbose_name=_("Color"),
help_text=_("Color used as background for the category"),
blank=True,
null=True,
)
pictogram = models.FileField(
verbose_name=_("Pictogram"),
help_text=_("Pictogram of the category (svg format)"),
max_length=1024,
blank=True,
null=True,
)
position = models.IntegerField(
verbose_name=_("Position for ordering categories"), default=0
)
def save(self, *args, **kwargs):
if self.color is None:
existing_colors = [c.color for c in Category.objects.all()]
if len(existing_colors) > len(Category.COLOR_PALETTE):
self.color = "#CCCCCC"
else:
for c, n in Category.COLOR_PALETTE:
if c not in existing_colors:
self.color = c
break
if self.color is None:
self.color = "#CCCCCC"
super(Category, self).save(*args, **kwargs)
def get_default_category():
try:
# try to get an existing category
default = Category.objects.get(name=Category.default_name)
return default
except:
# if it does not exist, return it
default, created = Category.objects.get_or_create(
name=Category.default_name,
alt_name=Category.default_alt_name,
codename=Category.default_codename,
color=Category.default_color,
)
return default
def get_default_category_id():
cat = Category.get_default_category()
if cat:
return cat.id
else:
return None
def css_class(self):
return "cat-" + str(self.id)
def get_absolute_url(self):
return reverse('home') + '?catgory=' + str(self.pk)
def __str__(self):
return self.name
class Meta:
verbose_name = _("Category")
verbose_name_plural = _("Categories")
class Tag(models.Model):
name = models.CharField(
verbose_name=_("Name"), help_text=_("Tag name"), max_length=512,
unique=True
)
description = CKEditor5Field(
verbose_name=_("Description"),
help_text=_("Description of the tag"),
blank=True,
null=True,
)
principal = models.BooleanField(
verbose_name=_("Principal"),
help_text=_("This tag is highlighted as a main tag for visitors, particularly in the filter."),
default=True,
)
category = models.ForeignKey(
Category,
verbose_name=_("Category"),
help_text=_("This tags corresponds to a sub-category of the given category"),
null=True,
blank=True,
default=None,
on_delete=models.SET_NULL,
)
def get_absolute_url(self):
return reverse("view_tag", kwargs={"t": self.name})
class DuplicatedEvents(models.Model):
representative = models.ForeignKey(
"Event",
verbose_name=_("Representative event"),
help_text=_("This event is the representative event of the duplicated events group"),
null=True,
default=None,
on_delete=models.SET_DEFAULT,
)
class Meta:
verbose_name = _("Duplicated events")
verbose_name_plural = _("Duplicated events")
def __init__(self, *args, **kwargs):
self.events = None
super().__init__(*args, **kwargs)
def nb_duplicated(self):
return self.event_set.count()
def get_duplicated(self):
if self.events is None:
self.events = self.event_set.order_by("created_date").all()
return self.events
def get_absolute_url(self):
return reverse("view_duplicate", kwargs={"pk": self.pk})
def get_one_event(self):
return self.representative
def fixed(self):
return not self.representative is None
def is_published(self):
return len([e for e in self.get_duplicated() if e.is_published()]) > 0
def has_modified(self):
return len([e for e in self.get_duplicated() if e.modified()]) > 0
def has_local_version(self):
return len([e for e in self.get_duplicated() if e.local_version()]) > 0
def get_local_version(self):
if self.representative and self.representative.local_version():
return self.representative
l = [e for e in self.get_duplicated() if e.local_version()]
if len(l) == 0:
return None
else:
l.sort(key=lambda x: x.modified_date, reverse=True)
return l[0]
def merge_into(self, other):
# for all objects associated to this group
for e in self.get_duplicated():
# change their group membership
e.other_versions = other
# save them without updating modified date
e.set_no_modification_date_changed()
e.save()
other.representative = None
other.save()
# then delete the empty group
self.delete()
# this method fixes the duplicated events by using the given event
# as the representative one.
# if no event is given, the last one (by creation date) is selected.
def fix(self, event=None):
events = self.get_duplicated()
if event is None:
events = events.order_by("-created_date")
for e in events:
if event is None:
event = e
if not event is None:
event.status = Event.STATUS.PUBLISHED
self.representative = event
Event.objects.bulk_update(events, fields=["status"])
self.save()
return len(events)
def merge_groups(groups):
if len(groups) == 0:
return None
elif len(groups) == 1:
return groups[0]
else:
result = groups[0]
for g in groups[1:]:
g.merge_into(result)
return result
def get_items_comparison(self):
return Event.get_comparison(self.get_duplicated())
def remove_singletons():
singletons = DuplicatedEvents.objects.annotate(nb_events=Count("event")).filter(
nb_events__lte=1
)
nb, d = singletons.delete()
return nb
def not_fixed_qs(qs=None, fixed=False):
if not qs:
qs = DuplicatedEvents.objects
qs = qs.annotate(nb_no_trash=Count("event", filter=~Q(event__status=Event.STATUS.TRASH)))
q = ~Q(representative__isnull=True)|Q(nb_no_trash__lte=1)
if fixed:
return qs.filter(q)
else:
return qs.exclude(q)
def save(self, *args, **kwargs):
if self.representative and not self.representative in self.event_set.all():
self.representative = None
super().save(*args, **kwargs)
class ReferenceLocation(models.Model):
name = models.CharField(verbose_name=_("Name"), help_text=_("Name of the location"), unique=True, null=False)
location = LocationField(based_fields=["name"], zoom=12, default=Point(3.08333, 45.783329), srid=4326)
main = models.BooleanField(
verbose_name=_("Main"),
help_text=_("This location is one of the main locations (shown first)."),
default=False,
)
class Meta:
verbose_name = _("Reference location")
verbose_name_plural = _("Reference locations")
def __str__(self):
return self.name
class Place(models.Model):
name = models.CharField(verbose_name=_("Name"), help_text=_("Name of the place"))
address = models.CharField(
verbose_name=_("Address"),
help_text=_("Address of this place (without city name)"),
blank=True,
null=True,
)
city = models.CharField(verbose_name=_("City"), help_text=_("City name"))
location = LocationField(based_fields=["name", "address", "city"], zoom=12, default=Point(3.08333, 45.783329))
aliases = ArrayField(
models.CharField(max_length=512),
verbose_name=_("Alternative names"),
help_text=_(
"Alternative names or addresses used to match a place with the free-form location of an event."
),
blank=True,
null=True,
)
class Meta:
verbose_name = _("Place")
verbose_name_plural = _("Places")
ordering = ["name"]
def __str__(self):
if self.address:
return self.name + ", " + self.address + ", " + self.city
else:
return self.name + ", " + self.city
def get_absolute_url(self):
return reverse("view_place_fullname", kwargs={"pk": self.pk, "extra": slugify(self.name)})
def nb_events(self):
return Event.objects.filter(exact_location=self).count()
def nb_events_future(self):
return Event.objects.filter(start_day__gte=datetime.now()).filter(exact_location=self).count()
def match(self, event):
if self.aliases:
return event.location.strip() in self.aliases
else:
return False
def associate_matching_events(self):
u_events = Event.objects.filter(exact_location__isnull=True)
to_be_updated = []
# try to find matches
for ue in u_events:
if self.match(ue):
ue.exact_location = self
to_be_updated.append(ue)
continue
# update events with a location
Event.objects.bulk_update(to_be_updated, fields=["exact_location"])
return len(to_be_updated)
def get_all_cities():
try:
tags = list(
[
p["city"]
for p in Place.objects.values("city").distinct().order_by("city")
]
)
except:
tags = []
return tags
class Event(models.Model):
class STATUS(models.TextChoices):
PUBLISHED = "published", _("Published")
DRAFT = "draft", _("Draft")
TRASH = "trash", _("Trash")
created_date = models.DateTimeField(editable=False)
imported_date = models.DateTimeField(blank=True, null=True)
modified_date = models.DateTimeField(blank=True, null=True)
moderated_date = models.DateTimeField(blank=True, null=True)
recurrence_dtstart = models.DateTimeField(editable=False, blank=True, null=True)
recurrence_dtend = models.DateTimeField(editable=False, blank=True, null=True)
title = models.CharField(
verbose_name=_("Title"), help_text=_("Short title"), max_length=512
)
status = models.CharField(
_("Status"), max_length=20, choices=STATUS.choices, default=STATUS.DRAFT
)
category = models.ForeignKey(
Category,
verbose_name=_("Category"),
help_text=_("Category of the event"),
null=True,
default=None,
on_delete=models.SET_DEFAULT,
)
start_day = models.DateField(
verbose_name=_("Day of the event"), help_text=_("Day of the event")
)
start_time = models.TimeField(
verbose_name=_("Starting time"),
help_text=_("Starting time"),
blank=True,
null=True,
)
end_day = models.DateField(
verbose_name=_("End day of the event"),
help_text=_(
"End day of the event, only required if different from the start day."
),
blank=True,
null=True,
)
end_time = models.TimeField(
verbose_name=_("Final time"), help_text=_("Final time"), blank=True, null=True
)
recurrences = recurrence.fields.RecurrenceField(
verbose_name=_("Recurrence"), include_dtstart=False, blank=True, null=True
)
exact_location = models.ForeignKey(
Place,
verbose_name=_("Location"),
help_text=_("Address of the event"),
null=True,
on_delete=models.SET_NULL,
blank=True,
)
location = models.CharField(
verbose_name=_("Location (free form)"),
help_text=_(
"Address of the event in case its not available in the already known places (free form)"
),
max_length=512,
default="",
null=True,
blank=True
)
description = models.TextField(
verbose_name=_("Description"),
help_text=_("General description of the event"),
blank=True,
null=True,
)
local_image = models.ImageField(
verbose_name=_("Illustration (local image)"),
help_text=_("Illustration image stored in the agenda server"),
max_length=1024,
blank=True,
null=True,
)
image = models.URLField(
verbose_name=_("Illustration"),
help_text=_("URL of the illustration image"),
max_length=1024,
blank=True,
null=True,
)
image_alt = models.CharField(
verbose_name=_("Illustration description"),
help_text=_("Alternative text used by screen readers for the image"),
blank=True,
null=True,
max_length=1024,
)
import_sources = ArrayField(
models.CharField(max_length=512),
verbose_name=_("Importation source"),
help_text=_("Importation source used to detect removed entries."),
blank=True,
null=True,
)
uuids = ArrayField(
models.CharField(max_length=512),
verbose_name=_("UUIDs"),
help_text=_("UUIDs from import to detect duplicated entries."),
blank=True,
null=True,
)
reference_urls = ArrayField(
models.URLField(max_length=512),
verbose_name=_("URLs"),
help_text=_("List of all the urls where this event can be found."),
blank=True,
null=True,
)
tags = ArrayField(
models.CharField(max_length=64),
verbose_name=_("Tags"),
help_text=_("A list of tags that describe the event."),
blank=True,
null=True,
)
other_versions = models.ForeignKey(
DuplicatedEvents,
verbose_name=_("Other versions"),
on_delete=models.SET_NULL,
null=True,
blank=True,
)
def get_consolidated_end_day(self, intuitive=True):
if intuitive:
end_day = self.get_consolidated_end_day(False)
if end_day != self.start_day and self.end_time and self.end_time < time(8):
return end_day + timedelta(days=-1)
else:
return end_day
else:
return self.end_day if self.end_day else self.start_day
def get_dates(self):
first = self.start_day
last = self.get_consolidated_end_day()
return [first + timedelta(n) for n in range(int((last - first).days) + 1)]
def get_nb_events_same_dates(self, remove_same_dup=True):
first = self.start_day
last = self.get_consolidated_end_day()
ignore_dup = None
if remove_same_dup:
ignore_dup = self.other_versions
calendar = CalendarList(first, last, exact=True, ignore_dup=ignore_dup)
return [(len(d.events), d.date) for dstr, d in calendar.get_calendar_days().items()]
def is_single_day(self, intuitive=True):
return self.start_day == self.get_consolidated_end_day(intuitive)
def contains_date(self, d, intuitive=True):
return d >= self.start_day and d <= self.get_consolidated_end_day(intuitive)
def get_absolute_url(self):
return reverse(
"view_event",
kwargs={
"year": self.start_day.year,
"month": self.start_day.month,
"day": self.start_day.day,
"pk": self.pk,
"extra": slugify(self.title),
},
)
def get_import_sources(self):
if self.import_sources:
result = []
for s in self.import_sources:
o_s = RecurrentImport.objects.get(source=s)
if o_s:
result.append((o_s.get_absolute_url(), o_s.name))
else:
result.append((o_s, o_s))
return result
else:
return []
def __str__(self):
return _date(self.start_day) + ": " + self.title
class Meta:
verbose_name = _("Event")
verbose_name_plural = _("Events")
permissions = [("set_duplicated_event", "Can set an event as duplicated")]
def get_all_tags():
try:
tags = list(Event.objects.values_list("tags", flat=True))
except:
tags = []
uniq_tags = set()
for t in tags:
if t is not None:
uniq_tags = uniq_tags | set(t)
return list(uniq_tags)
def is_draft(self):
return self.status == Event.STATUS.DRAFT
def is_published(self):
return self.status == Event.STATUS.PUBLISHED
def is_trash(self):
return self.status == Event.STATUS.TRASH
def modified(self):
return (
not self.pure_import()
and (self.modified_date - self.created_date).total_seconds() > 1
)
def pure_import(self):
if self.imported_date is None:
return False
return self.modified_date is None or (self.modified_date - self.imported_date).total_seconds() <= 0
def local_version(self):
return self.imported_date is None or self.modified()
def get_reference_urls(self):
res = [] if self.reference_urls is None else self.reference_urls
if self.other_versions:
for o in self.other_versions.get_duplicated():
if o.status == Event.STATUS.PUBLISHED and not o.reference_urls is None:
res += o.reference_urls
res = list(set(res))
res.sort()
return res
def get_local_version(self):
# a non-pure import is a local version
if not self.pure_import():
return self
# in case of non other version, return None
if self.other_versions is None:
return None
return self.other_versions.get_local_version()
def nb_draft_events():
return Event.objects.filter(status=Event.STATUS.DRAFT).count()
def get_qs_events_with_unkwnon_place():
return Event.objects.filter(exact_location__isnull=True). \
filter(~Q(status=Event.STATUS.TRASH)). \
filter(Q(other_versions=None)|Q(other_versions__representative=F('pk')))
def is_representative(self):
return self.other_versions is None or self.other_versions.representative == self
def download_image(self):
# first download file
a = urlparse(self.image)
basename = os.path.basename(a.path)
try:
tmpfile, _ = urllib.request.urlretrieve(self.image)
except:
return None
# if the download is ok, then create the corresponding file object
self.local_image = File(name=basename, file=open(tmpfile, "rb"))
def set_skip_duplicate_check(self):
self.skip_duplicate_check = True
def is_skip_duplicate_check(self):
return hasattr(self, "skip_duplicate_check")
def is_in_importation_process(self):
return hasattr(self, "in_importation_process")
def set_in_importation_process(self):
self.in_importation_process = True
def is_no_modification_date_changed(self):
return hasattr(self, "no_modification_date_changed")
def set_no_modification_date_changed(self):
self.no_modification_date_changed = True
def update_modification_dates(self):
now = timezone.now()
if not self.id:
self.created_date = now
if self.is_in_importation_process():
self.imported_date = now
if self.modified_date is None or not self.is_no_modification_date_changed():
self.modified_date = now
def get_recurrence_at_date(self, year, month, day):
dtstart = timezone.make_aware(
datetime(year, month, day, 0, 0), timezone.get_default_timezone()
)
recurrences = self.get_recurrences_between(dtstart, dtstart)
if len(recurrences) == 0:
return self
else:
return recurrences[0]
def get_image_url(self):
if self.local_image and hasattr(self.local_image, "url"):
try:
return self.local_image.url
except:
pass
if self.image:
return self.image
else:
return None
def has_image_url(self):
return self.get_image_url() is not None
# return a copy of the current object for each recurrence between first an last date (included)
def get_recurrences_between(self, firstdate, lastdate):
if not self.has_recurrences():
return [self]
else:
result = []
dtstart = timezone.make_aware(
datetime.combine(self.start_day, time()),
timezone.get_default_timezone(),
)
self.recurrences.dtstart = dtstart
for d in self.recurrences.between(
firstdate, lastdate, inc=True, dtstart=dtstart
):
c = copy.deepcopy(self)
c.start_day = d.date()
if c.end_day is not None:
shift = d.date() - self.start_day
c.end_day += shift
result.append(c)
return result
def has_recurrences(self):
# TODO: see https://forge.chapril.org/jmtrivial/agenda_culturel/issues/65
return self.recurrences is not None and len(self.recurrences.rrules) != 0
def update_recurrence_dtstartend(self):
sday = (
date.fromisoformat(self.start_day)
if isinstance(self.start_day, str)
else self.start_day
)
eday = (
date.fromisoformat(self.end_day)
if isinstance(self.end_day, str)
else self.end_day
)
stime = (
time.fromisoformat(self.start_time)
if isinstance(self.start_time, str)
else time()
if self.start_time is None
else self.start_time
)
etime = (
time.fromisoformat(self.end_time)
if isinstance(self.end_time, str)
else time()
if self.end_time is None
else self.end_time
)
self.recurrence_dtstart = datetime.combine(sday, stime)
if not self.has_recurrences():
if self.end_day is None:
self.dtend = None
else:
self.recurrence_dtend = datetime.combine(eday, etime)
else:
if (
self.recurrences.rrules[0].until is None
and self.recurrences.rrules[0].count is None
):
self.recurrence_dtend = None
else:
self.recurrences.dtstart = datetime.combine(sday, time())
occurrence = self.recurrences.occurrences()
try:
self.recurrence_dtend = occurrence[-1]
if (
self.recurrences.dtend is not None
and self.recurrences.dtstart is not None
):
self.recurrence_dtend += (
self.recurrences.dtend - self.recurrences.dtstart
)
except:
self.recurrence_dtend = self.recurrence_dtstart
def prepare_save(self):
self.update_modification_dates()
self.update_recurrence_dtstartend()
# if the image is defined but not locally downloaded
if self.image and not self.local_image:
self.download_image()
# remove "/" from tags
if self.tags:
self.tags = [t.replace('/', '-') for t in self.tags]
# in case of importation process
if self.is_in_importation_process():
# try to detect location
if not self.exact_location:
for p in Place.objects.all():
if p.match(self):
self.exact_location = p
break
# try to detect category
if not self.category or self.category.name == Category.default_name:
CategorisationRule.apply_rules(self)
def save(self, *args, **kwargs):
self.prepare_save()
# check for similar events if no duplicated is known and only if the event is created
if (
self.pk is None
and self.other_versions is None
and not self.is_skip_duplicate_check()
):
# and if this is not an importation process
if not self.is_in_importation_process():
similar_events = self.find_similar_events()
# if it exists similar events, add this relation to the event
if len(similar_events) != 0:
self.set_other_versions(similar_events)
# check if it's a clone (that will become representative)
clone = self.pk is None and not self.other_versions is None
# check if we need to clean the other_versions
if (
not clone and
self.pk and
self.other_versions is not None
and self.other_versions.nb_duplicated() == 1
):
self.other_versions.delete()
self.other_versions = None
# first save the current object
super().save(*args, **kwargs)
# then if its a clone, update the representative
if clone:
self.other_versions.representative = self
self.other_versions.save()
def from_structure(event_structure, import_source=None):
if "category" in event_structure and event_structure["category"] is not None:
try:
event_structure["category"] = Category.objects.get(
name__unaccent__icontains=remove_accents(event_structure["category"].lower())
)
except Category.DoesNotExist:
event_structure["category"] = Category.get_default_category()
else:
event_structure["category"] = Category.get_default_category()
if "published" in event_structure and event_structure["published"] is not None:
if event_structure["published"]:
event_structure["status"] = Event.STATUS.PUBLISHED
else:
event_structure["status"] = Event.STATUS.DRAFT
del event_structure["published"]
else:
event_structure["status"] = Event.STATUS.DRAFT
if "url_human" in event_structure and event_structure["url_human"] is not None:
event_structure["reference_urls"] = [event_structure["url_human"]]
del event_structure["url_human"]
if (
"last_modified" in event_structure
and event_structure["last_modified"] is not None
):
d = datetime.fromisoformat(event_structure["last_modified"])
if d.year == 2024 and d.month > 2:
logger.warning("last modified {}".format(d))
if d.tzinfo is None or d.tzinfo.utcoffset(d) is None:
d = timezone.make_aware(d, timezone.get_default_timezone())
event_structure["modified_date"] = d
del event_structure["last_modified"]
else:
event_structure["modified_date"] = None
if "start_time" in event_structure:
event_structure["start_time"] = time.fromisoformat(
event_structure["start_time"]
)
if "end_time" in event_structure:
event_structure["end_time"] = time.fromisoformat(
event_structure["end_time"]
)
if "location" not in event_structure or event_structure["location"] is None:
event_structure["location"] = ""
if "description" in event_structure and event_structure["description"] is None:
event_structure["description"] = ""
if (
"recurrences" in event_structure
and event_structure["recurrences"] is not None
):
event_structure["recurrences"] = recurrence.deserialize(
event_structure["recurrences"]
)
event_structure["recurrences"].exdates = [
e.replace(hour=0, minute=0, second=0)
for e in event_structure["recurrences"].exdates
]
event_structure["recurrences"].rdates = [
e.replace(hour=0, minute=0, second=0)
for e in event_structure["recurrences"].rdates
]
else:
event_structure["recurrences"] = None
if import_source is not None:
event_structure["import_sources"] = [import_source]
return Event(**event_structure)
def find_similar_events(self):
start_time_test = Q(start_time=self.start_time)
if self.start_time is not None:
# convert str start_time to time
if isinstance(self.start_time, str):
self.start_time = time.fromisoformat(self.start_time)
interval = (
time(self.start_time.hour - 1, self.start_time.minute)
if self.start_time.hour >= 1
else time(0, 0),
time(self.start_time.hour + 1, self.start_time.minute)
if self.start_time.hour < 23
else time(23, 59),
)
start_time_test = start_time_test | Q(start_time__range=interval)
return (
Event.objects.annotate(
similarity_title=TrigramSimilarity("title", self.title)
)
.annotate(similarity_location=TrigramSimilarity("location", self.location))
.filter(
Q(start_day=self.start_day)
& start_time_test
& Q(similarity_title__gt=0.5)
& Q(similarity_title__gt=0.3)
)
)
def find_same_events_by_uuid(self):
return (
None
if self.uuids is None or len(self.uuids) == 0
else Event.objects.filter(uuids__contains=self.uuids)
)
def get_updateable_uuid(self):
if self.uuids and len(self.uuids) > 0:
for s in self.uuids:
if FacebookEventExtractor.is_known_url(s):
return s
return None
def is_updateable(self):
return not self.get_updateable_uuid() is None
def split_uuid(uuid):
els = uuid.split(":")
if len(els) == 1:
return ":".join(els[0:-1]), 0
else:
if els[-1].isdigit():
return ":".join(els[0:-1]), int(els[-1])
else:
return ":".join(els), 0
def is_ancestor_uuid(uuid1, uuid2):
root1, version1 = Event.split_uuid(uuid1)
root2, version2 = Event.split_uuid(uuid2)
return root1 == root2 and version1 < version2
def is_ancestor_by_uuid(self, event):
if self.uuids is None or event.uuids is None:
return False
for s_uuid in self.uuids:
for e_uuid in event.uuids:
if Event.is_ancestor_uuid(s_uuid, e_uuid):
return True
return False
def same_uuid(self, event):
if self.uuids is None or event.uuids is None:
return False
for s_uuid in self.uuids:
for e_uuid in event.uuids:
if s_uuid == e_uuid:
return True
return False
def get_other_not_trash_versions(self):
if self.other_versions is None:
return []
else:
return [e for e in self.other_versions.get_duplicated() if e.pk != self.pk and e.status != Event.STATUS.TRASH]
def get_other_versions(self):
if self.other_versions is None:
return []
else:
return [e for e in self.other_versions.get_duplicated() if e.pk != self.pk]
def masked(self):
return self.other_versions and self.other_versions.representative != self
def get_comparison(events, all=True):
result = []
for attr in Event.data_fields(all=all, local_img=False, exact_location=False):
values = [getattr(e, attr) for e in events]
values = ["" if v is None else v for v in values]
values = [[] if attr == "tags" and v == "" else v for v in values]
# only consider fixed part of Facebook urls
if attr == "image":
values = [v.split("?")[0] if "fbcdn.net" in v else v for v in values]
if attr == "description":
values = [v.replace("\r\n", "\n") for v in values]
if len(set([str(v).strip() for v in values])) == 1:
result.append({"similar": True, "key": attr, "values": values[0]})
else:
result.append({"similar": False, "key": attr, "values": values})
return result
def similar(self, event, all=True):
res = Event.get_comparison([self, event], all)
for r in res:
if not r["similar"]:
return False
return True
def set_other_versions(self, events, force_non_fixed=False):
# get existing groups
groups = list(
set([e.other_versions for e in events] + [self.other_versions])
)
groups = [g for g in groups if g is not None]
# do we have to create a new group?
if len(groups) == 0:
group = DuplicatedEvents.objects.create()
else:
# otherwise merge existing groups
group = DuplicatedEvents.merge_groups(groups)
if force_non_fixed:
group.representative = None
group.save()
# set the possibly duplicated group for the current object
self.other_versions = group
# and for the other events
for e in events:
e.other_versions = group
# finally update all events (including current if already created)
elist = list(events) + ([self] if self.pk is not None else [])
Event.objects.bulk_update(elist, fields=["other_versions"])
def data_fields(local_img=True, exact_location=True, all=True):
result = []
if all:
result += [
"category",
"tags",
]
result += [
"title",
"location",
"start_day",
"start_time",
"end_day",
"end_time",
"description",
"image",
]
if all and local_img:
result += ["local_image"]
if all and exact_location:
result += ["exact_location"]
result += ["image_alt", "reference_urls", "recurrences"]
if all:
result += ["tags"]
return result
def find_last_imported(events):
events = [e for e in events if e.imported_date is not None]
if len(events) == 0:
return None
else:
events.sort(key=lambda e: e.imported_date, reverse=True)
return events[0]
def find_last_pure_import(events):
events = [e for e in events if e.pure_import()]
if len(events) == 0:
return None
else:
events.sort(key=lambda e: e.imported_date, reverse=True)
return events[0]
def update(self, other):
# set attributes
for attr in Event.data_fields():
setattr(self, attr, getattr(other, attr))
# adjust modified date if required
if other.modified_date and self.modified_date < other.modified_date:
self.modified_date = other.modified_date
# add a possible missing uuid
if self.uuids is None:
self.uuids = []
for uuid in other.uuids:
if uuid not in self.uuids:
self.uuids.append(uuid)
# Limitation: the given events should not be considered similar one to another...
def import_events(events, remove_missing_from_source=None):
to_import = []
to_update = []
min_date = timezone.now().date()
max_date = None
uuids = set()
# for each event, check if it's a new one, or a one to be updated
for event in events:
sdate = date.fromisoformat(event.start_day)
if event.end_day:
edate = date.fromisoformat(event.end_day)
else:
edate = sdate
if min_date is None or min_date > sdate:
min_date = sdate
if max_date is None or max_date < sdate:
max_date = sdate
if max_date is None or (event.end_day is not None and max_date < edate):
max_date = edate
if event.uuids and len(event.uuids) > 0:
uuids |= set(event.uuids)
# imported events should be updated
event.set_in_importation_process()
event.prepare_save()
# check if the event has already be imported (using uuid)
same_events = event.find_same_events_by_uuid()
if len(same_events) != 0:
# check if one event has been imported and not modified in this list
same_imported = Event.find_last_pure_import(same_events)
# if not, we check if it does not match exactly with another
if not same_imported:
for e in same_events:
if event.similar(e, False):
same_imported = e
break
if same_imported:
# reopen DuplicatedEvents if required
if not event.similar(same_imported, False) and same_imported.other_versions:
if same_imported.status != Event.STATUS.TRASH:
if same_imported.other_versions.is_published():
if same_imported.other_versions.representative != same_imported:
same_imported.other_versions.representative = None
same_imported.other_versions.save()
same_imported.update(event)
same_imported.set_in_importation_process()
same_imported.prepare_save()
to_update.append(same_imported)
else:
# otherwise, the new event possibly a duplication of the remaining others.
# check if it should be published
trash = len([e for e in same_events if e.status != Event.STATUS.TRASH]) == 0
if trash:
event.status = Event.STATUS.TRASH
event.set_other_versions(same_events, force_non_fixed=not trash)
# it will be imported
to_import.append(event)
else:
# if uuid is unique (or not available), check for similar events
similar_events = event.find_similar_events()
# if it exists similar events, add this relation to the event
if len(similar_events) != 0:
# the event is possibly a duplication of the others
event.set_other_versions(similar_events, force_non_fixed=True)
to_import.append(event)
else:
# import this new event
to_import.append(event)
# then import all the new events
imported = Event.objects.bulk_create(to_import)
nb_updated = Event.objects.bulk_update(
to_update,
fields=Event.data_fields()
+ ["imported_date", "modified_date", "uuids", "status"],
)
nb_draft = 0
if remove_missing_from_source is not None and max_date is not None:
# events that are missing from the import but in database are turned into drafts
# only if they are in the future
in_interval = Event.objects.filter(
(
(
Q(end_day__isnull=True)
& Q(start_day__gte=min_date)
& Q(start_day__lte=max_date)
)
| (
Q(end_day__isnull=False)
& ~(Q(start_day__gt=max_date) | Q(end_day__lt=min_date))
)
)
& Q(import_sources__contains=[remove_missing_from_source])
& Q(status=Event.STATUS.PUBLISHED)
& Q(uuids__len__gt=0)
)
to_draft = []
for e in in_interval:
if len(uuids.intersection(e.uuids)) == 0:
e.status = Event.STATUS.TRASH
# save them without updating modified date
e.set_no_modification_date_changed()
e.prepare_save()
to_draft.append(e)
nb_draft = Event.objects.bulk_update(to_draft, fields=["status"])
return imported, nb_updated, nb_draft
def set_current_date(self, date):
self.current_date = date
def get_start_end_datetimes(self, day):
if self.start_day == day:
if self.start_time is None:
dtstart = datetime.combine(self.start_day, time().min)
else:
dtstart = datetime.combine(self.start_day, self.start_time)
else:
dtstart = datetime.combine(day, time().min)
end_day = self.get_consolidated_end_day()
if end_day == day:
if self.end_time is None:
dtend = datetime.combine(end_day, time().max)
else:
dtend = datetime.combine(end_day, self.end_time)
else:
dtend = datetime.combine(day, time().max)
return dtstart, dtend
def get_concurrent_events(self, remove_same_dup=True):
day = self.current_date if hasattr(self, "current_date") else self.start_day
day_events = CalendarDay(self.start_day).get_events()
return [
e
for e in day_events
if e != self
and self.is_concurrent_event(e, day)
and e.status == Event.STATUS.PUBLISHED
and (e.other_versions is None or e.other_versions != self.other_versions)
]
def is_concurrent_event(self, e, day):
dtstart, dtend = self.get_start_end_datetimes(day)
e_dtstart, e_dtend = e.get_start_end_datetimes(day)
return (dtstart <= e_dtstart <= dtend) or (e_dtstart <= dtstart <= e_dtend)
def export_to_ics(events):
cal = icalCal()
# Some properties are required to be compliant
cal.add("prodid", "-//My calendar product//example.com//")
cal.add("version", "2.0")
for event in events:
ed = event.get_consolidated_end_day()
eventIcal = icalEvent()
# mapping
if event.start_time is None:
eventIcal.add(
"dtstart",
date(
event.start_day.year,
event.start_day.month,
event.start_day.day,
),
)
else:
eventIcal.add(
"dtstart",
datetime(
event.start_day.year,
event.start_day.month,
event.start_day.day,
event.start_time.hour,
event.start_time.minute,
),
)
if not event.end_day is None:
if event.end_time is None:
eventIcal.add(
"dtend",
date(
event.end_day.year,
event.end_day.month,
event.end_day.day,
),
)
else:
eventIcal.add(
"dtend",
datetime(
event.end_day.year,
event.end_day.month,
event.end_day.day,
event.end_time.hour,
event.end_time.minute,
),
)
eventIcal.add("summary", event.title)
eventIcal.add("name", event.title)
url = ("\n" + event.reference_urls[0]) if event.reference_urls and len(event.reference_urls) > 0 else ""
eventIcal.add(
"description", event.description + url
)
eventIcal.add("location", event.exact_location or event.location)
cal.add_component(eventIcal)
return cal
class ContactMessage(models.Model):
class Meta:
verbose_name = _("Contact message")
verbose_name_plural = _("Contact messages")
subject = models.CharField(
verbose_name=_("Subject"),
help_text=_("The subject of your message"),
max_length=512,
)
name = models.CharField(
verbose_name=_("Name"),
help_text=_("Your name"),
max_length=512,
blank=True,
null=True,
)
email = models.EmailField(
verbose_name=_("Email address"),
help_text=_("Your email address"),
max_length=254,
blank=True,
null=True,
)
message = CKEditor5Field(verbose_name=_("Message"), help_text=_("Your message"), blank=True)
date = models.DateTimeField(auto_now_add=True)
spam = models.BooleanField(
verbose_name=_("Spam"),
help_text=_("This message is a spam."),
default=False,
)
closed = models.BooleanField(
verbose_name=_("Closed"),
help_text=_(
"this message has been processed and no longer needs to be handled"
),
default=False,
)
comments = CKEditor5Field(
verbose_name=_("Comments"),
help_text=_("Comments on the message from the moderation team"),
default="",
blank=True,
null=True,
)
def nb_open_contactmessages():
return ContactMessage.objects.filter(closed=False).count()
class RecurrentImport(models.Model):
class Meta:
verbose_name = _("Recurrent import")
verbose_name_plural = _("Recurrent imports")
permissions = [("run_recurrentimport", "Can run a recurrent import")]
class PROCESSOR(models.TextChoices):
ICAL = "ical", _("ical")
ICALNOBUSY = "icalnobusy", _("ical no busy")
ICALNOVC = "icalnovc", _("ical no VC")
LACOOPE = "lacoope", _("lacoope.org")
LACOMEDIE = "lacomedie", _("la comédie")
LEFOTOMAT = "lefotomat", _("le fotomat")
LAPUCEALOREILLE = "lapucealoreille", _("la puce à l'oreille")
MECWORDPRESS = "Plugin wordpress MEC", _("Plugin wordpress MEC")
FBEVENTS = "Facebook events", _("Événements d'une page FB")
C3C = "cour3coquins", _("la cour des 3 coquins")
ARACHNEE = "arachnee", _("Arachnée concert")
class DOWNLOADER(models.TextChoices):
SIMPLE = "simple", _("simple")
CHROMIUMHEADLESS = "chromium headless", _("Headless Chromium")
CHROMIUMHEADLESSPAUSE = "chromium (pause)", _("Headless Chromium (pause)")
class RECURRENCE(models.TextChoices):
DAILY = (
"daily",
_("daily"),
)
WEEKLY = "weekly", _("weekly")
name = models.CharField(
verbose_name=_("Name"),
help_text=_(
"Recurrent import name. Be careful to choose a name that is easy to understand, as it will be public and displayed on the site"
"s About page."
),
max_length=512,
default="",
)
processor = models.CharField(
_("Processor"), max_length=20, choices=PROCESSOR.choices, default=PROCESSOR.ICAL
)
downloader = models.CharField(
_("Downloader"),
max_length=20,
choices=DOWNLOADER.choices,
default=DOWNLOADER.SIMPLE,
)
recurrence = models.CharField(
_("Import recurrence"),
max_length=10,
choices=RECURRENCE.choices,
default=RECURRENCE.DAILY,
)
source = models.URLField(
verbose_name=_("Source"),
help_text=_("URL of the source document"),
max_length=1024,
)
browsable_url = models.URLField(
verbose_name=_("Browsable url"),
help_text=_(
"URL of the corresponding document that will be shown to visitors."
),
max_length=1024,
blank=True,
null=True,
)
defaultPublished = models.BooleanField(
verbose_name=_("Published"),
help_text=_("Status of each imported event (published or draft)"),
default=True,
)
defaultLocation = models.CharField(
verbose_name=_("Location"),
help_text=_("Address for each imported event"),
max_length=512,
null=True,
blank=True,
)
defaultCategory = models.ForeignKey(
Category,
verbose_name=_("Category"),
help_text=_("Category of each imported event"),
default=None,
null=True,
blank=True,
on_delete=models.SET_DEFAULT,
)
defaultTags = ArrayField(
models.CharField(max_length=64),
verbose_name=_("Tags for each imported event"),
help_text=_("A list of tags that describe each imported event."),
blank=True,
null=True,
)
def __str__(self):
return self.name
def nb_imports(self):
return BatchImportation.objects.filter(recurrentImport=self).count()
def nb_events(self):
return Event.objects.filter(import_sources__contains=[self.source]).count()
def get_absolute_url(self):
return reverse("view_rimport", kwargs={"pk": self.pk})
def last_import(self):
events = BatchImportation.objects.filter(recurrentImport=self).order_by(
"-created_date"
)
if events and len(events) > 0:
return events[0]
else:
return None
class BatchImportation(models.Model):
class STATUS(models.TextChoices):
RUNNING = "running", _("Running")
CANCELED = "canceled", _("Canceled")
SUCCESS = "success", _("Success")
FAILED = "failed", _("Failed")
class Meta:
verbose_name = _("Batch importation")
verbose_name_plural = _("Batch importations")
permissions = [("run_batchimportation", "Can run a batch importation")]
created_date = models.DateTimeField(auto_now_add=True)
recurrentImport = models.ForeignKey(
RecurrentImport,
verbose_name=_("Recurrent import"),
help_text=_("Reference to the recurrent import processing"),
blank=True,
null=True,
on_delete=models.SET_NULL,
editable=False,
)
url_source = models.URLField(
verbose_name=_("URL (if not recurrent import)"),
help_text=_(
"Source URL if no RecurrentImport is associated."
),
max_length=1024,
blank=True,
null=True,
editable=False,
)
status = models.CharField(
_("Status"), max_length=20, choices=STATUS.choices, default=STATUS.RUNNING
)
error_message = models.CharField(
verbose_name=_("Error message"), max_length=512, blank=True, null=True
)
nb_initial = models.PositiveIntegerField(
verbose_name=_("Number of collected events"), default=0
)
nb_imported = models.PositiveIntegerField(
verbose_name=_("Number of imported events"), default=0
)
nb_updated = models.PositiveIntegerField(
verbose_name=_("Number of updated events"), default=0
)
nb_removed = models.PositiveIntegerField(
verbose_name=_("Number of removed events"), default=0
)
celery_id = models.CharField(max_length=128, default="")
class CategorisationRule(models.Model):
weight = models.IntegerField(
verbose_name=_("Weight"),
help_text=_("The lower is the weight, the earlier the filter is applied"),
default=1,
)
category = models.ForeignKey(
Category,
verbose_name=_("Category"),
help_text=_("Category applied to the event"),
on_delete=models.CASCADE,
)
title_contains = models.CharField(
verbose_name=_("Contained in the title"),
help_text=_("Text contained in the event title"),
max_length=512,
blank=True,
null=True,
)
title_exact = models.BooleanField(
verbose_name=_("Exact title extract"),
help_text=_(
"If checked, the extract will be searched for in the title using the exact form (capitals, accents)."
),
default=False,
)
description_contains = models.CharField(
verbose_name=_("Contained in the description"),
help_text=_("Text contained in the description"),
max_length=512,
blank=True,
null=True,
)
desc_exact = models.BooleanField(
verbose_name=_("Exact description extract"),
help_text=_(
"If checked, the extract will be searched for in the description using the exact form (capitals, accents)."
),
default=False,
)
location_contains = models.CharField(
verbose_name=_("Contained in the location"),
help_text=_("Text contained in the event location"),
max_length=512,
blank=True,
null=True,
)
loc_exact = models.BooleanField(
verbose_name=_("Exact location extract"),
help_text=_(
"If checked, the extract will be searched for in the location using the exact form (capitals, accents)."
),
default=False,
)
place = models.ForeignKey(
Place,
verbose_name=_("Place"),
help_text=_("Location from place"),
null=True,
on_delete=models.SET_NULL,
blank=True,
)
rules = None
class Meta:
verbose_name = _("Categorisation rule")
verbose_name_plural = _("Categorisation rules")
permissions = [("apply_categorisationrules", "Apply a categorisation rule")]
# all rules are applied, starting from the first to the last
def apply_rules(event):
c = CategorisationRule.get_category_from_rules(event)
if c is None:
return 0
else:
event.category = c
return 1
def get_category_from_rules(event):
cats = defaultdict(lambda: 0)
if CategorisationRule.rules is None:
CategorisationRule.rules = CategorisationRule.objects.all().prefetch_related("category").prefetch_related("place")
for rule in CategorisationRule.rules:
if rule.match(event):
cats[rule.category] += rule.weight
if len(cats) == 0:
return None
else:
return max(cats, key=cats.get)
def match(self, event):
if self.description_contains and self.description_contains != "":
if self.desc_exact:
result = self.description_contains in event.description
else:
result = (
remove_accents(self.description_contains).lower()
in remove_accents(event.description).lower()
)
if not result:
return False
if self.title_contains and self.title_contains != "":
if self.title_exact:
result = self.title_contains in event.title
else:
result = (
remove_accents(self.title_contains).lower()
in remove_accents(event.title).lower()
)
if not result:
return False
if self.location_contains and self.location_contains != "":
if self.loc_exact:
result = self.location_contains in event.location
else:
result = (
remove_accents(self.location_contains).lower()
in remove_accents(event.location).lower()
)
if not result:
return False
if self.place:
if not event.exact_location == self.place:
return False
return True
class ModerationQuestion(models.Model):
question = models.CharField(
verbose_name=_("Question"),
help_text=_("Text that will be shown to moderators"),
max_length=512,
unique=True,
)
class Meta:
verbose_name = _("Moderation question")
verbose_name_plural = _("Moderation questions")
permissions = [
("use_moderation_question", "Can use a moderation question to tag an event")
]
def __str__(self):
char_limit = 30
return (
(self.question[:char_limit] + "...")
if char_limit < len(self.question)
else self.question
)
def get_absolute_url(self):
return reverse("view_mquestion", kwargs={"pk": self.pk})
def complete_id(self):
return "question_" + str(self.pk)
class ModerationAnswer(models.Model):
question = models.ForeignKey(
ModerationQuestion,
related_name="answers",
verbose_name=_("Question"),
help_text=_("Associated question from moderation"),
on_delete=models.CASCADE,
)
answer = models.CharField(
verbose_name=_("Answer"),
help_text=_("Text that will be shown to moderators"),
max_length=512,
)
adds_tags = ArrayField(
models.CharField(max_length=64),
verbose_name=_("Adds tags"),
help_text=_("A list of tags that will be added if you choose this answer."),
blank=True,
null=True,
)
removes_tags = ArrayField(
models.CharField(max_length=64),
verbose_name=_("Removes tags"),
help_text=_("A list of tags that will be removed if you choose this answer."),
blank=True,
null=True,
)
def complete_id(self):
return "answer_" + str(self.question.pk) + "_" + str(self.pk)
def html_description(self):
result = self.answer + '<br /><span class="helptext">'
if self.adds_tags:
result += " ".join(
[
'<span role="button" class="small-cat">' + a + "</span>"
for a in self.adds_tags
]
)
if self.removes_tags:
result += " ".join(
[
'<span role="button" class="small-cat strike">' + a + "</span>"
for a in self.removes_tags
]
)
result += "</span>"
return mark_safe(result)
def valid_event(self, event):
if event.tags:
if self.adds_tags:
for t in self.adds_tags:
if t not in event.tags:
return False
if self.removes_tags:
for t in self.removes_tags:
if t in event.tags:
return False
return True
else:
return not self.adds_tags or len(self.adds_tags) == 0
def apply_answer(self, event):
if not self.adds_tags:
self.adds_tags = []
if not self.removes_tags:
self.removes_tags = []
if event.tags:
event.tags = list(
(set(event.tags) | set(self.adds_tags)) - set(self.removes_tags)
)
else:
event.tags = self.adds_tags