diff --git a/src/agenda_culturel/views/__init__.py b/src/agenda_culturel/views/__init__.py
new file mode 100644
index 0000000..00e8edd
--- /dev/null
+++ b/src/agenda_culturel/views/__init__.py
@@ -0,0 +1 @@
+from .oldviews import *
diff --git a/src/agenda_culturel/views/oldviews.py b/src/agenda_culturel/views/oldviews.py
new file mode 100644
index 0000000..72d5f15
--- /dev/null
+++ b/src/agenda_culturel/views/oldviews.py
@@ -0,0 +1,3076 @@
+import calendar as _calendar
+import hashlib
+import logging
+from datetime import date, timedelta
+
+import emoji
+from django.contrib import messages
+from django.contrib.auth.decorators import login_required, permission_required
+from django.contrib.auth.mixins import (
+ LoginRequiredMixin,
+ PermissionRequiredMixin,
+ UserPassesTestMixin,
+)
+from django.contrib.messages.views import SuccessMessageMixin
+from django.core.cache import cache
+from django.core.mail import mail_admins
+from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
+from django.db.models import Count, F, Func, OuterRef, Q, Subquery
+from django.db.models.functions import TruncMonth
+from django.http import (
+ Http404,
+ HttpResponse,
+ HttpResponseForbidden,
+ HttpResponseRedirect,
+)
+from django.shortcuts import get_object_or_404, render
+from django.urls import reverse, reverse_lazy
+from django.utils.decorators import method_decorator
+from django.utils.html import escape
+from django.utils.safestring import mark_safe
+from django.utils.timezone import datetime
+from django.utils.translation import gettext_lazy as _
+from django.views.generic import DetailView, ListView
+from django.views.generic.edit import (
+ CreateView,
+ DeleteView,
+ ModelFormMixin,
+ UpdateView,
+)
+from honeypot.decorators import check_honeypot
+from django.db.models.aggregates import StdDev
+from django.db.models import Avg, Max, Min
+from django.db.models import Aggregate, FloatField
+from django.db.models.functions import ExtractDay
+
+from ..calendar import CalendarDay, CalendarList, CalendarMonth, CalendarWeek
+from ..celery import app as celery_app
+from ..celery import (
+ import_events_from_json,
+ import_events_from_url,
+ import_events_from_urls,
+ run_all_recurrent_imports,
+ run_all_recurrent_imports_canceled,
+ run_all_recurrent_imports_failed,
+ run_recurrent_import,
+ update_orphan_pure_import_events,
+)
+from ..filters import (
+ DuplicatedEventsFilter,
+ EventFilter,
+ EventFilterAdmin,
+ MessagesFilterAdmin,
+ RecurrentImportFilter,
+ SearchEventFilter,
+ SimpleSearchEventFilter,
+)
+from ..forms import (
+ BatchImportationForm,
+ CategorisationForm,
+ CategorisationRuleImportForm,
+ EventAddPlaceForm,
+ EventForm,
+ EventFormWithContact,
+ EventModerateForm,
+ FixDuplicates,
+ MergeDuplicates,
+ MessageEventForm,
+ MessageForm,
+ PlaceForm,
+ RecurrentImportForm,
+ SelectEventInList,
+ SimpleContactForm,
+ TagForm,
+ TagRenameForm,
+ URLSubmissionFormSet,
+ URLSubmissionFormWithContact,
+ UserProfileForm,
+)
+from ..import_tasks.extractor import Extractor
+from ..models import (
+ BatchImportation,
+ CategorisationRule,
+ Category,
+ DuplicatedEvents,
+ Event,
+ Message,
+ Organisation,
+ Place,
+ RecurrentImport,
+ StaticContent,
+ Tag,
+ remove_accents,
+ UserProfile,
+)
+from ..utils import PlaceGuesser
+
+logger = logging.getLogger(__name__)
+
+
+class Median(Aggregate):
+ function = "PERCENTILE_CONT"
+ name = "median"
+ output_field = FloatField()
+ template = "%(function)s(0.5) WITHIN GROUP (ORDER BY %(expressions)s)"
+
+
+class PaginatorFilter(Paginator):
+ def __init__(self, filter, nb, request):
+ self.request = request
+ self.filter = filter
+
+ super().__init__(filter.qs, nb)
+
+ self.url_first_page = PaginatorFilter.update_param(
+ self.request.get_full_path(), "page", 1
+ )
+ self.url_last_page = PaginatorFilter.update_param(
+ self.request.get_full_path(), "page", self.num_pages
+ )
+
+ def update_param(params, key, value):
+ p = params.split("?")
+ root = p[0]
+ if len(p) > 1:
+ other = p[1]
+ others = other.split("&")
+ others = [o for o in others if not o.startswith(key)]
+ others += [key + "=" + str(value)]
+ return root + "?" + "&".join(others)
+ else:
+ return root + "?" + key + "=" + str(value)
+
+ def page(self, *args, **kwargs):
+ page = super().page(*args, **kwargs)
+
+ try:
+ page.url_previous_page = PaginatorFilter.update_param(
+ self.request.get_full_path(),
+ "page",
+ page.previous_page_number(),
+ )
+ except EmptyPage:
+ page.url_previous_page = self.request.get_full_path()
+
+ try:
+ page.url_next_page = PaginatorFilter.update_param(
+ self.request.get_full_path(), "page", page.next_page_number()
+ )
+ except EmptyPage:
+ page.url_next_page = self.request.get_full_path()
+
+ return page
+
+
+#
+#
+# Useful for translation
+to_be_translated = [
+ _("Recurrent import name"),
+ _("Add another"),
+ _("Browse..."),
+ _("No file selected."),
+]
+
+
+def get_event_qs(request):
+ if request.user.is_authenticated:
+ return Event.objects.filter()
+ else:
+ return Event.objects.filter(status=Event.STATUS.PUBLISHED)
+
+
+def page_not_found(request, exception=None):
+ return render(request, "page-erreur.html", status=404, context={"error": 404})
+
+
+def internal_server_error(request):
+ try:
+ mail_admins(
+ request.site.name + _(": error 500"),
+ _("An internal error has occurred on site {} at address {}.").format(
+ request.site.name, request.build_absolute_uri()
+ ),
+ )
+ except Exception:
+ pass
+ return render(request, "page-erreur.html", status=500, context={"error": 500})
+
+
+def thank_you(request):
+ return render(request, "agenda_culturel/thank_you.html")
+
+
+def mentions_legales(request):
+ context = {
+ "title": "Mentions légales",
+ "static_content": "mentions_legales",
+ "url_path": reverse_lazy("mentions_legales"),
+ }
+ return render(request, "agenda_culturel/page-single.html", context)
+
+
+def about(request):
+ rimports = (
+ RecurrentImport.objects.filter(~Q(recurrence=RecurrentImport.RECURRENCE.NEVER))
+ .order_by("name__unaccent")
+ .all()
+ )
+ context = {
+ "title": "À propos",
+ "static_content": "about",
+ "url_path": reverse_lazy("about"),
+ "rimports": rimports,
+ }
+ return render(request, "agenda_culturel/page-rimports-list.html", context)
+
+
+def moderation_rules(request):
+ context = {
+ "title": _("Moderation rules"),
+ "static_content": "moderation_rules",
+ "url_path": reverse_lazy("moderation_rules"),
+ }
+ return render(request, "agenda_culturel/page-single.html", context)
+
+
+def import_requirements(request):
+ context = {
+ "title": _("Import requirements"),
+ "static_content": "import_requirements",
+ "url_path": reverse_lazy("import_requirements"),
+ }
+ return render(request, "agenda_culturel/page-single.html", context)
+
+
+def home(request, cat=None):
+ return week_view(request, home=True, cat=cat)
+
+
+def month_view(request, year=None, month=None, cat=None):
+ now = date.today()
+ if year is None and month is None:
+ day = now.day
+ else:
+ day = None
+ if year is None:
+ year = now.year
+ if month is None:
+ month = now.month
+
+ request = EventFilter.set_default_values(request)
+ qs = get_event_qs(request).only(
+ "title",
+ "start_day",
+ "start_time",
+ "category",
+ "other_versions",
+ "recurrences",
+ "end_day",
+ "end_time",
+ "uuids",
+ "status",
+ "tags",
+ )
+ if cat is not None:
+ category = Category.objects.filter(slug=cat).first()
+ qs = qs.filter(category=category)
+ else:
+ category = None
+
+ filter = EventFilter(request.GET, qs, request=request)
+
+ if filter.has_category_parameters():
+ return HttpResponseRedirect(filter.get_new_url())
+
+ cmonth = CalendarMonth(year, month, filter, day=day)
+
+ context = {
+ "calendar": cmonth,
+ "this_month": day is not None,
+ "filter": filter,
+ "category": category,
+ "init_date": now if cmonth.today_in_calendar() else cmonth.firstdate,
+ }
+ return render(request, "agenda_culturel/page-month.html", context)
+
+
+def week_view(request, year=None, week=None, home=False, cat=None):
+ now = date.today()
+ if year is None:
+ year = now.isocalendar()[0]
+ if week is None:
+ week = now.isocalendar()[1]
+
+ request = EventFilter.set_default_values(request)
+ qs = (
+ get_event_qs(request)
+ .select_related("exact_location")
+ .only(
+ "title",
+ "start_day",
+ "start_time",
+ "category",
+ "other_versions",
+ "recurrences",
+ "end_day",
+ "end_time",
+ "uuids",
+ "status",
+ "tags",
+ "local_image",
+ "image",
+ "image_alt",
+ "exact_location",
+ "description",
+ )
+ )
+ if cat is not None:
+ category = Category.objects.filter(slug=cat).first()
+ qs = qs.filter(category=category)
+ else:
+ category = None
+ filter = EventFilter(request.GET, qs, request=request)
+
+ if filter.has_category_parameters():
+ return HttpResponseRedirect(filter.get_new_url())
+
+ cweek = CalendarWeek(year, week, filter)
+
+ context = {
+ "year": year,
+ "week": week,
+ "calendar": cweek,
+ "filter": filter,
+ "category": category,
+ "init_date": now if cweek.today_in_calendar() else cweek.firstdate,
+ }
+ if home:
+ context["home"] = 1
+ return render(request, "agenda_culturel/page-week.html", context)
+
+
+def day_view(request, year=None, month=None, day=None, cat=None):
+ if year is None or month is None or day is None:
+ if "when" in request.POST:
+ when = datetime.strptime(request.POST["when"], "%Y-%m-%d")
+ year = when.year
+ month = when.month
+ day = when.day
+
+ request = EventFilter.set_default_values(request)
+ qs = get_event_qs(request).select_related("exact_location")
+ if cat is not None:
+ category = Category.objects.filter(slug=cat).first()
+ qs = qs.filter(category=category)
+ else:
+ category = None
+ filter = EventFilter(request.GET, qs, request=request)
+
+ return HttpResponseRedirect(
+ reverse_lazy("day_view", args=[year, month, day])
+ + "?"
+ + filter.get_url()
+ )
+
+ return upcoming_events(request, year, month, day, 0, cat)
+
+
+def upcoming_events(request, year=None, month=None, day=None, neighsize=1, cat=None):
+ now = date.today()
+ if year is None:
+ year = now.year
+ if month is None:
+ month = now.month
+ if day is None:
+ day = now.day
+
+ day = date(year, month, day)
+ day = day + timedelta(days=neighsize)
+
+ request = EventFilter.set_default_values(request)
+ qs = get_event_qs(request).select_related("exact_location")
+ if cat is not None:
+ category = Category.objects.filter(slug=cat).first()
+ qs = qs.filter(category=category)
+ else:
+ category = None
+
+ filter = EventFilter(request.GET, qs, request=request)
+
+ if filter.has_category_parameters():
+ return HttpResponseRedirect(filter.get_new_url())
+
+ cal = CalendarList(
+ day + timedelta(days=-neighsize),
+ day + timedelta(days=neighsize),
+ filter,
+ True,
+ )
+
+ context = {
+ "calendar": cal,
+ "now": now,
+ "day": day,
+ "init_date": now if cal.today_in_calendar() else day,
+ "filter": filter,
+ "date_pred": day + timedelta(days=-neighsize - 1),
+ "date_next": day + timedelta(days=neighsize + 1),
+ "category": category,
+ }
+
+ return render(request, "agenda_culturel/page-upcoming.html", context)
+
+
+class StaticContentCreateView(LoginRequiredMixin, CreateView):
+ model = StaticContent
+ fields = ["text"]
+ permission_required = "agenda_culturel.add_staticcontent"
+
+ def form_valid(self, form):
+ form.instance.name = self.request.GET["name"]
+ form.instance.url_path = self.request.GET["url_path"]
+ return super().form_valid(form)
+
+
+class StaticContentUpdateView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ UpdateView,
+):
+ model = StaticContent
+ permission_required = "agenda_culturel.change_staticcontent"
+ fields = ["text"]
+ success_message = _("The static content has been successfully updated.")
+
+
+def update_from_source(request, pk):
+ event = get_object_or_404(Event, pk=pk)
+
+ url = event.get_updateable_uuid()
+ if url is None:
+ messages.warning(
+ request,
+ _(
+ "The event cannot be updated because the import process is not available for the referenced sources."
+ ),
+ )
+ else:
+ import_events_from_url.delay(
+ url,
+ None,
+ None,
+ True,
+ user_id=request.user.pk if request.user else None,
+ )
+ messages.success(
+ request,
+ _("The event update has been queued and will be completed shortly."),
+ )
+
+ return HttpResponseRedirect(event.get_absolute_url())
+
+
+class EventUpdateView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ UpdateView,
+):
+ model = Event
+ permission_required = "agenda_culturel.change_event"
+ form_class = EventForm
+
+ def get_form_kwargs(self):
+ kwargs = super().get_form_kwargs()
+ kwargs["is_authenticated"] = self.request.user.is_authenticated
+ kwargs["is_moderation_expert"] = (
+ self.request.user.userprofile.is_moderation_expert
+ )
+ kwargs["is_cloning"] = self.is_cloning
+ kwargs["is_edit_from_moderation"] = self.is_edit_force()
+ kwargs["is_simple_cloning"] = self.is_simple_cloning
+ return kwargs
+
+ def get_success_message(self, cleaned_data):
+ txt = (
+ _(" A message has been sent to the person who proposed the event.")
+ if hasattr(self, "with_msg") and self.with_msg
+ else ""
+ )
+ return mark_safe(_("The event has been successfully modified.") + txt)
+
+ def is_edit_force(self):
+ return "edit-force" in self.request.path.split("/")
+
+ def get_object(self, queryset=None):
+ event = super().get_object(queryset)
+ if event.status == Event.STATUS.DRAFT:
+ event.status = Event.STATUS.PUBLISHED
+ if self.is_edit_force():
+ event.free_modification_lock(self.request.user)
+ return event
+
+ def form_valid(self, form):
+ form.instance.set_processing_user(self.request.user)
+ self.with_message = form.instance.notify_if_required(self.request)
+ return super().form_valid(form)
+
+ def get_initial(self):
+ self.is_cloning = "clone" in self.request.path.split("/")
+ if self.is_cloning:
+ messages.info(
+ self.request,
+ _(
+ "Changes will be visible on a local copy of the event. The version identical to the imported source will be hidden."
+ ),
+ )
+ self.is_simple_cloning = "simple-clone" in self.request.path.split("/")
+ result = super().get_initial()
+
+ if self.is_cloning and "other_versions" not in result:
+ obj = self.get_object()
+ # if no DuplicatedEvents is associated, create one
+ obj.other_versions = DuplicatedEvents.objects.create()
+ obj.other_versions.save()
+ # save them without updating modified date
+ obj.set_no_modification_date_changed()
+ obj.save()
+ result["other_versions"] = obj.other_versions
+ result["status"] = Event.STATUS.PUBLISHED
+ result["cloning"] = True
+
+ if self.is_simple_cloning:
+ result["other_versions"] = None
+ result["simple_cloning"] = True
+
+ if self.is_cloning or self.is_simple_cloning:
+ obj = self.get_object()
+ if obj.local_image:
+ result["old_local_image"] = obj.local_image.name
+
+ return result
+
+ def get_success_url(self):
+ if "save_and_next" in self.request.POST:
+ return reverse_lazy("moderate_event_next", args=[self.object.pk])
+ else:
+ return self.object.get_absolute_url()
+
+
+class EventModerateView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ UpdateView,
+):
+ model = Event
+ permission_required = "agenda_culturel.change_event"
+ template_name = "agenda_culturel/event_form_moderate.html"
+ form_class = EventModerateForm
+
+ def get_form_kwargs(self):
+ kwargs = super().get_form_kwargs()
+ kwargs["is_moderation_expert"] = (
+ self.request.user.userprofile.is_moderation_expert
+ )
+ return kwargs
+
+ def get_success_message(self, cleaned_data):
+ txt = (
+ _(" A message has been sent to the person who proposed the event.")
+ if hasattr(self, "with_msg") and self.with_msg
+ else ""
+ )
+ return mark_safe(
+ _('The event {} has been moderated with success.').format(
+ self.object.get_absolute_url(), self.object.title
+ )
+ + txt
+ )
+
+ def is_moderate_next(self):
+ return "after" in self.request.path.split("/")
+
+ def is_moderate_back(self):
+ return "back" in self.request.path.split("/")
+
+ def is_moderate_force(self):
+ return "moderate-force" in self.request.path.split("/")
+
+ def is_starting_moderation(self):
+ return "pk" not in self.kwargs
+
+ def is_moderation_from_date(self):
+ return "m" in self.kwargs and "y" in self.kwargs and "d" in self.kwargs
+
+ def get_next_event(start_day, start_time, opk):
+ # select non moderated events
+ qs = Event.objects.filter(moderated_date__isnull=True)
+
+ # select events after the current one
+ if start_time:
+ qs = qs.filter(
+ Q(start_day__gt=start_day)
+ | (
+ Q(start_day=start_day)
+ & (Q(start_time__isnull=True) | Q(start_time__gt=start_time))
+ )
+ )
+ else:
+ qs = qs.filter(Q(start_day__gte=start_day) & ~Q(pk=opk))
+
+ # get only possibly representative events
+ qs = qs.filter(
+ Q(other_versions__isnull=True)
+ | Q(other_versions__representative=F("pk"))
+ | Q(other_versions__representative__isnull=True)
+ )
+
+ # remove trash events
+ qs = qs.filter(~Q(status=Event.STATUS.TRASH))
+
+ # sort by datetime
+ qs = qs.order_by("start_day", "start_time")
+
+ return qs.first()
+
+ def get_context_data(self, **kwargs):
+ context = super().get_context_data(**kwargs)
+ if self.is_moderate_next():
+ context["pred"] = self.kwargs["pred"]
+ return context
+
+ def get_object(self, queryset=None):
+ if self.is_starting_moderation():
+ now = datetime.now()
+ event = EventModerateView.get_next_event(now.date(), now.time(), None)
+ else:
+ event = super().get_object(queryset)
+ if event.status == Event.STATUS.DRAFT:
+ event.status = Event.STATUS.PUBLISHED
+ if self.is_moderate_back():
+ pred = Event.objects.filter(pk=self.kwargs["pred"]).first()
+ pred.free_modification_lock(self.request.user)
+ if self.is_moderate_force():
+ event.free_modification_lock(self.request.user, False)
+ return event
+
+ def post(self, request, *args, **kwargs):
+ try:
+ return super().post(request, args, kwargs)
+ except Http404:
+ return HttpResponseRedirect(
+ reverse_lazy("error_next_event", args=[self.object.pk])
+ )
+
+ def form_valid(self, form):
+ form.instance.set_no_modification_date_changed()
+ form.instance.set_in_moderation_process()
+ form.instance.set_processing_user(self.request.user)
+ self.with_msg = form.instance.notify_if_required(self.request)
+ return super().form_valid(form)
+
+ def get_success_url(self):
+ if "save_and_next" in self.request.POST:
+ return reverse_lazy("moderate_event_next", args=[self.object.pk])
+ elif "save_and_edit_local" in self.request.POST:
+ return reverse_lazy("edit_event", args=[self.object.get_local_version().pk])
+ else:
+ return self.object.get_absolute_url()
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.change_event")
+def error_next_event(request, pk):
+ obj = Event.objects.filter(pk=pk).first()
+
+ return render(
+ request,
+ "agenda_culturel/event_next_error_message.html",
+ {"pk": pk, "object": obj},
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.change_event")
+def moderate_event_next(request, pk):
+ # current event
+ obj = Event.objects.filter(pk=pk).first()
+ # free lock
+ obj.free_modification_lock(request.user)
+ start_day = obj.start_day
+ start_time = obj.start_time
+
+ next_obj = EventModerateView.get_next_event(start_day, start_time, pk)
+ if next_obj is None:
+ return render(
+ request,
+ "agenda_culturel/event_next_error_message.html",
+ {"pk": pk, "object": obj},
+ )
+ else:
+ return HttpResponseRedirect(
+ reverse_lazy("moderate_event_step", args=[next_obj.pk, obj.pk])
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.change_event")
+def moderate_from_date(request, y, m, d):
+ d = date(y, m, d)
+ obj = EventModerateView.get_next_event(d, None, None)
+ return HttpResponseRedirect(reverse_lazy("moderate_event", args=[obj.pk]))
+
+
+class EventDeleteView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ DeleteView,
+):
+ model = Event
+ permission_required = "agenda_culturel.delete_event"
+ success_url = reverse_lazy("recent")
+ success_message = _("The event has been successfully deleted.")
+
+
+class EventDetailView(UserPassesTestMixin, DetailView, ModelFormMixin):
+ model = Event
+ form_class = MessageEventForm
+ template_name = "agenda_culturel/page-event.html"
+ queryset = (
+ Event.objects.select_related("exact_location")
+ .select_related("category")
+ .select_related("other_versions")
+ .select_related("other_versions__representative")
+ .prefetch_related("message_set")
+ )
+
+ def test_func(self):
+ return (
+ self.request.user.is_authenticated
+ or self.get_object().status == Event.STATUS.PUBLISHED
+ )
+
+ def get_object(self):
+ o = super().get_object()
+ o.download_missing_image()
+ if "year" in self.kwargs:
+ y = self.kwargs["year"]
+ m = self.kwargs["month"]
+ d = self.kwargs["day"]
+ obj = o.get_recurrence_at_date(y, m, d)
+ obj.set_current_date(date(y, m, d))
+ return obj
+ else:
+ return o
+
+ def get_success_url(self):
+ return self.get_object().get_absolute_url() + "#chronology"
+
+ def post(self, request, *args, **kwargs):
+ if not request.user.is_authenticated:
+ return HttpResponseForbidden()
+ form = self.get_form()
+ if form.is_valid():
+ return self.form_valid(form)
+ else:
+ return self.form_invalid(form)
+
+ def form_valid(self, form):
+ message = form.save(commit=False)
+ message.user = self.request.user
+ message.related_event = self.get_object()
+ message.subject = _("Comment")
+ message.spam = False
+ message.closed = True
+ message.save()
+
+ return super().form_valid(form)
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.change_event")
+def change_status_event(request, pk, status):
+ event = get_object_or_404(Event, pk=pk)
+
+ if request.method == "POST":
+ event.status = Event.STATUS(status)
+ fields = ["status", "moderated_date", "moderated_by_user"]
+ event.set_in_moderation_process()
+ event.update_modification_dates()
+ event.save(update_fields=fields)
+ with_msg = event.notify_if_required(request)
+ if with_msg:
+ messages.success(
+ request,
+ _(
+ "The status has been successfully modified and a message has been sent to the person who proposed the event."
+ ),
+ )
+ else:
+ messages.success(request, _("The status has been successfully modified."))
+
+ return HttpResponseRedirect(event.get_absolute_url())
+
+ else:
+ cancel_url = request.META.get("HTTP_REFERER", "")
+ if cancel_url == "":
+ cancel_url = reverse_lazy("home")
+ return render(
+ request,
+ "agenda_culturel/event_confirm_change_status.html",
+ {"status": status, "event": event, "cancel_url": cancel_url},
+ )
+
+
+def import_event_proxy(request):
+ return render(request, "agenda_culturel/event_import.html")
+
+
+class EventCreateView(SuccessMessageMixin, CreateView):
+ model = Event
+ form_class = EventFormWithContact
+
+ def get_form_kwargs(self):
+ kwargs = super().get_form_kwargs()
+ kwargs["is_authenticated"] = self.request.user.is_authenticated
+ return kwargs
+
+ def get_success_url(self):
+ if self.request.user.is_authenticated:
+ if "save_and_next" in self.request.POST:
+ return reverse_lazy("moderate_event_next", args=[self.object.pk])
+ else:
+ return self.object.get_absolute_url()
+ else:
+ return reverse_lazy("home")
+
+ def get_success_message(self, cleaned_data):
+ if self.request.user.is_authenticated:
+ return mark_safe(
+ _('The event was created: {}.').format(
+ self.object.get_absolute_url(), self.object.title
+ )
+ )
+ else:
+ return _(
+ "The event has been submitted and will be published as soon as it has been validated by the moderation team."
+ )
+
+ def form_valid(self, form):
+ if form.cleaned_data["simple_cloning"]:
+ form.instance.set_skip_duplicate_check()
+
+ if form.cleaned_data["cloning"]:
+ form.instance.set_in_moderation_process()
+
+ if form.cleaned_data.get("email") or form.cleaned_data.get("comments"):
+ has_comments = form.cleaned_data.get("comments") not in ["", None]
+ form.instance.add_message(
+ Message(
+ subject=_("during the creation process"),
+ message=form.cleaned_data.get("comments"),
+ email=form.cleaned_data.get("email"),
+ closed=False,
+ message_type=(
+ Message.TYPE.FROM_CONTRIBUTOR
+ if has_comments
+ else Message.TYPE.FROM_CONTRIBUTOR_NO_MSG
+ ),
+ )
+ )
+
+ form.instance.import_sources = None
+ form.instance.set_processing_user(self.request.user)
+
+ result = super().form_valid(form)
+
+ if form.cleaned_data["cloning"]:
+ with_msg = form.instance.notify_if_required(self.request)
+ if with_msg:
+ messages.success(
+ self.request,
+ _(
+ "A message has been sent to the person who proposed the initial event."
+ ),
+ )
+
+ return result
+
+
+# A class to evaluate the URL according to the existing events and the authentification
+# level of the user
+class URLEventEvaluation:
+ def __init__(self, form, is_authenticated):
+ self.form = form
+ self.is_authenticated = is_authenticated
+
+ self.cat = None
+ self.tags = []
+ self.existing = None
+ self.url = form.cleaned_data.get("url")
+ self.event = None
+ if self.url is not None:
+ self.url = Extractor.clean_url(self.url)
+ # we check if the url is known
+ self.existing = Event.objects.filter(uuids__contains=[self.url])
+ # if it's unknown
+ if len(self.existing) == 0:
+ self.existing = None
+ self.cat = form.cleaned_data.get("category")
+ if self.cat is not None:
+ self.cat = self.cat.name
+ self.tags = form.cleaned_data.get("tags")
+
+ else:
+ published = [
+ e for e in self.existing if e.status == Event.STATUS.PUBLISHED
+ ]
+
+ if self.is_authenticated or len(published) > 1:
+ self.event = (
+ published[0] if len(published) > 1 else self.existing[0]
+ )
+ else:
+ self.event = None
+
+ def exists(self):
+ return self.url is not None
+
+ def is_new(self):
+ return self.exists() and self.existing is None
+
+ def is_event_visible(self):
+ return self.event is not None
+
+ def get_event(self):
+ if self.event is None:
+ return None
+ else:
+ return self.event
+
+ def get_link(self):
+ e = self.get_event()
+ if e is None:
+ return ""
+ else:
+ return '' + escape(e.title) + ""
+
+ def to_list(self):
+ if self.is_new():
+ return (self.url, self.cat, self.tags)
+
+
+def import_from_urls(request):
+ if request.method == "POST":
+ formset = URLSubmissionFormSet(request.POST, request.FILES)
+
+ if not request.user.is_authenticated:
+ contactform = SimpleContactForm(request.POST)
+ if formset.is_valid() and (
+ request.user.is_authenticated or contactform.is_valid()
+ ):
+ # evaluate all the forms
+ ucat = [
+ URLEventEvaluation(form, request.user.is_authenticated)
+ for form in formset.forms
+ ]
+
+ # for each not new, add a message
+ for uc in ucat:
+ if uc.exists() and not uc.is_new():
+ if uc.is_event_visible():
+ messages.info(
+ request,
+ mark_safe(
+ _(
+ "{} has not been submitted since it"
+ "s already known: {}."
+ ).format(uc.url, uc.get_link())
+ ),
+ )
+ else:
+ messages.info(
+ request,
+ _(
+ "{} has not been submitted since it"
+ "s already known and currently into moderation process."
+ ).format(uc.url),
+ )
+
+ # keep only new ones
+ ucat = [uc.to_list() for uc in ucat if uc.is_new()]
+
+ # finally process them or go back to home page
+ if len(ucat) > 0:
+ messages.info(
+ request,
+ _("Integrating {} url(s) into our import process.").format(
+ len(ucat)
+ ),
+ )
+ email = None
+ comments = None
+ if not request.user.is_authenticated:
+ email = contactform.cleaned_data["email"]
+ comments = contactform.cleaned_data["comments"]
+ import_events_from_urls.delay(
+ ucat,
+ user_id=request.user.pk if request.user else None,
+ email=email,
+ comments=comments,
+ )
+ return HttpResponseRedirect(reverse("thank_you"))
+ else:
+ return HttpResponseRedirect(reverse("home"))
+
+ else:
+ formset = URLSubmissionFormSet()
+ if not request.user.is_authenticated:
+ contactform = SimpleContactForm()
+
+ context = {"formset": formset}
+ if not request.user.is_authenticated:
+ context["contactform"] = contactform
+ return render(request, "agenda_culturel/import_set.html", context=context)
+
+
+def import_from_url(request):
+ form = URLSubmissionFormWithContact(is_authenticated=request.user.is_authenticated)
+
+ initial = {
+ "start_day": date.today() + timedelta(days=1),
+ "start_time": "20:00",
+ "end_time": "22:00",
+ }
+
+ form_event = EventForm(initial=initial)
+
+ # if the form has been sent
+ if request.method == "POST":
+ form = URLSubmissionFormWithContact(
+ request.POST, is_authenticated=request.user.is_authenticated
+ )
+
+ # if the form is valid
+ if form.is_valid():
+ uc = URLEventEvaluation(form, request.user.is_authenticated)
+
+ if uc.exists() and not uc.is_new():
+ if uc.is_event_visible():
+ messages.info(
+ request,
+ mark_safe(
+ _(
+ "{} has not been submitted since its already known: {}."
+ ).format(uc.url, uc.get_link())
+ ),
+ )
+ return HttpResponseRedirect(uc.get_event().get_absolute_url())
+ else:
+ messages.info(
+ request,
+ _(
+ "{} has not been submitted since it"
+ "s already known and currently into moderation process."
+ ).format(uc.url),
+ )
+ return HttpResponseRedirect(reverse("home"))
+
+ else:
+ messages.info(
+ request,
+ _("Integrating {} into our import process.").format(uc.url),
+ )
+ import_events_from_url.delay(
+ uc.url,
+ uc.cat,
+ uc.tags,
+ user_id=request.user.pk if request.user else None,
+ email=form.cleaned_data.get("email"),
+ comments=form.cleaned_data.get("comments"),
+ )
+ return HttpResponseRedirect(reverse("thank_you"))
+
+ return render(
+ request,
+ "agenda_culturel/import.html",
+ context={"form": form, "form_event": form_event},
+ )
+
+
+def export_event_ical(request, year, month, day, pk):
+ event = get_object_or_404(Event, pk=pk)
+ event = event.get_recurrence_at_date(year, month, day)
+
+ events = list()
+ events.append(event)
+
+ cal = Event.export_to_ics(events, request)
+
+ response = HttpResponse(content_type="text/calendar")
+ response.content = cal.to_ical().decode("utf-8").replace("\r\n", "\n")
+ response["Content-Disposition"] = "attachment; filename={0}{1}".format(
+ event.title.replace("\n", " ").replace("\r", "")[0:32], ".ics"
+ )
+
+ return response
+
+
+def export_ical(request, cat=None, tag=None, organisation_pk=None, place_pk=None):
+ now = date.today()
+
+ qs = get_event_qs(request)
+ if cat is not None:
+ category = Category.objects.filter(slug=cat).first()
+ qs = qs.filter(category=category)
+ else:
+ category = None
+
+ if place_pk is not None:
+ qs = qs.filter(exact_location=place_pk)
+ if organisation_pk is not None:
+ qs = qs.filter(organisers__in=[organisation_pk])
+ if tag is not None:
+ qs = qs.filter(tags__in=[tag])
+
+ request = EventFilter.set_default_values(request)
+ filter = EventFilter(request.GET, queryset=qs, request=request)
+
+ if filter.has_category_parameters():
+ return HttpResponseRedirect(filter.get_new_url())
+
+ id_cache = hashlib.md5(
+ (
+ filter.get_url()
+ + "-"
+ + str(tag)
+ + "-"
+ + str(cat)
+ + "-"
+ + str(organisation_pk)
+ + "-"
+ + str(place_pk)
+ ).encode("utf8")
+ ).hexdigest()
+ ical = cache.get(id_cache)
+ if not ical:
+ calendar = CalendarList(
+ now + timedelta(days=-7), now + timedelta(days=+60), filter
+ )
+ ical = calendar.export_to_ics(request)
+ cache.set(id_cache, ical, 3600) # 1 heure
+
+ response = HttpResponse(content_type="text/calendar")
+ response.content = ical.to_ical().decode("utf-8").replace("\r\n", "\n")
+ extra = filter.to_str(" ")
+ if extra is None:
+ extra = ""
+ if category is not None:
+ extra += " " + str(category)
+ if place_pk is not None:
+ extra += (
+ " @ " + Place.objects.filter(pk=place_pk).values("name").first()["name"]
+ )
+ if organisation_pk is not None:
+ extra += (
+ " - "
+ + Organisation.objects.filter(pk=organisation_pk)
+ .values("name")
+ .first()["name"]
+ )
+ if tag is not None:
+ extra += " - " + emoji.replace_emoji(tag, replace="")
+
+ response["Content-Disposition"] = "attachment; filename={0}{1}{2}".format(
+ "Pommes de lune", extra, ".ics"
+ )
+
+ return response
+
+
+@method_decorator(check_honeypot, name="post")
+class MessageCreateView(SuccessMessageMixin, CreateView):
+ model = Message
+ template_name = "agenda_culturel/message_create_form.html"
+ form_class = MessageForm
+
+ success_url = reverse_lazy("home")
+ success_message = _("Your message has been sent successfully.")
+
+ def __init__(self, *args, **kwargs):
+ self.event = None
+ super().__init__(*args, **kwargs)
+
+ def get_form(self, form_class=None):
+ if form_class is None:
+ form_class = self.get_form_class()
+ return form_class(**self.get_form_kwargs())
+
+ def get_form_kwargs(self):
+ kwargs = super().get_form_kwargs()
+ kwargs["event"] = self.event
+ if self.request.user.is_authenticated:
+ kwargs["internal"] = True
+ return kwargs
+
+ def form_valid(self, form):
+ if self.request.user.is_authenticated:
+ form.instance.user = self.request.user
+ form.instance.message_type = (
+ Message.TYPE.EVENT_REPORT
+ if "pk" in self.kwargs
+ else Message.TYPE.CONTACT_FORM
+ )
+ return super().form_valid(form)
+
+ def get_initial(self):
+ result = super().get_initial()
+ if "pk" in self.kwargs:
+ self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
+ result["related_event"] = self.event
+ result["subject"] = _("Reporting the event {} on {}").format(
+ self.event.title, self.event.start_day
+ )
+ else:
+ result["related_event"] = None
+ return result
+
+
+class MessageDeleteView(SuccessMessageMixin, DeleteView):
+ model = Message
+ success_message = _("The contact message has been successfully deleted.")
+ success_url = reverse_lazy("messages")
+
+
+class MessageUpdateView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ UpdateView,
+):
+ model = Message
+ permission_required = "agenda_culturel.change_message"
+ template_name = "agenda_culturel/message_moderation_form.html"
+ fields = ("spam", "closed", "comments")
+
+ success_message = _(
+ "The contact message properties has been successfully modified."
+ )
+
+ success_url = reverse_lazy("messages")
+
+ def get_form_kwargs(self):
+ """Return the keyword arguments for instantiating the form."""
+ kwargs = super().get_form_kwargs()
+ if hasattr(self, "object"):
+ kwargs.update({"instance": self.object})
+ return kwargs
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_event")
+def activite(request):
+ now = date.today()
+
+ days = [now]
+ while len(days) < 7 or days[-1].weekday() != 0:
+ days.append(days[-1] + timedelta(days=-1))
+
+ weeks = [days[-1]]
+ for w in range(0, 8):
+ weeks.append(weeks[-1] + timedelta(days=-7))
+
+ daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
+ weekly_modifications = Event.get_count_modifications([(w, 7) for w in weeks])
+
+ return render(
+ request,
+ "agenda_culturel/page-activity.html",
+ {
+ "daily_modifications": daily_modifications,
+ "weekly_modifications": weekly_modifications,
+ },
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_event")
+def administration(request):
+ nb_mod_days = 21
+ nb_classes = 4
+ today = date.today()
+
+ # get information about recent modifications
+ days = [today]
+ for i in range(0, 2):
+ days.append(days[-1] + timedelta(days=-1))
+ daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
+
+ # get last created events
+ events = (
+ Event.objects.all()
+ .order_by("-created_date")
+ .select_related("exact_location", "category")[:5]
+ )
+
+ # get last batch imports
+ rel_event = Event.objects.filter(
+ import_sources__contains=[OuterRef("url_source")]
+ ).values("pk")[:1]
+ batch_imports = (
+ BatchImportation.objects.all()
+ .select_related("recurrentImport")
+ .annotate(event_id=Subquery(rel_event))
+ .order_by("-created_date")[:5]
+ )
+
+ # get info about batch information
+ newest = (
+ BatchImportation.objects.filter(recurrentImport=OuterRef("pk"))
+ .order_by("-created_date")
+ .select_related("recurrentImport")
+ )
+ imported_events = RecurrentImport.objects.annotate(
+ last_run_status=Subquery(newest.values("status")[:1])
+ )
+
+ nb_failed = imported_events.filter(
+ last_run_status=BatchImportation.STATUS.FAILED
+ ).count()
+ nb_canceled = imported_events.filter(
+ last_run_status=BatchImportation.STATUS.CANCELED
+ ).count()
+ nb_running = imported_events.filter(
+ last_run_status=BatchImportation.STATUS.RUNNING
+ ).count()
+ nb_all = imported_events.count()
+
+ # get some info about imported (or not) events
+ srcs = RecurrentImport.objects.all().values_list("source")
+ in_future = Event.objects.filter(Q(start_day__gte=today))
+ nb_in_rimport = in_future.filter(Q(import_sources__overlap=srcs)).count()
+ nb_in_orphan_import = in_future.filter(
+ (
+ Q(import_sources__isnull=False)
+ & (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
+ )
+ & ~Q(import_sources__overlap=srcs)
+ ).count()
+
+ # get all non moderated events
+ nb_not_moderated = Event.get_nb_not_moderated(today, nb_mod_days, nb_classes)
+
+ return render(
+ request,
+ "agenda_culturel/administration.html",
+ {
+ "daily_modifications": daily_modifications,
+ "events": events,
+ "batch_imports": batch_imports,
+ "nb_failed": nb_failed,
+ "nb_canceled": nb_canceled,
+ "nb_running": nb_running,
+ "nb_all": nb_all,
+ "nb_not_moderated": nb_not_moderated,
+ "nb_in_rimport": nb_in_rimport,
+ "nb_in_orphan_import": nb_in_orphan_import,
+ },
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_event")
+def recent(request):
+ filter = EventFilterAdmin(
+ request.GET, queryset=Event.objects.all().order_by("-created_date")
+ )
+ paginator = PaginatorFilter(filter, 10, request)
+ page = request.GET.get("page")
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ return render(
+ request,
+ "agenda_culturel/recent.html",
+ {"filter": filter, "paginator_filter": response},
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_message")
+def view_messages(request):
+ filter = MessagesFilterAdmin(
+ request.GET, queryset=Message.objects.all().order_by("-date")
+ )
+ paginator = PaginatorFilter(filter, 10, request)
+ page = request.GET.get("page")
+
+ nb_spams = Message.objects.filter(spam=True).count()
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ return render(
+ request,
+ "agenda_culturel/messages.html",
+ {"filter": filter, "nb_spams": nb_spams, "paginator_filter": response},
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_message")
+def delete_cm_spam(request):
+ if request.method == "POST":
+ Message.objects.filter(spam=True).delete()
+
+ messages.success(request, _("Spam has been successfully deleted."))
+ return HttpResponseRedirect(reverse_lazy("messages"))
+ else:
+ nb_msgs = Message.objects.values("spam").annotate(total=Count("spam"))
+ nb_total = sum([nb["total"] for nb in nb_msgs])
+ nb_spams = sum([nb["total"] for nb in nb_msgs if nb["spam"]])
+ cancel_url = reverse_lazy("messages")
+ return render(
+ request,
+ "agenda_culturel/delete_spams_confirm.html",
+ {
+ "nb_total": nb_total,
+ "nb_spams": nb_spams,
+ "cancel_url": cancel_url,
+ },
+ )
+
+
+def event_search(request, full=False):
+ categories = None
+ tags = None
+ places = None
+ organisations = None
+ rimports = None
+
+ qs = get_event_qs(request).order_by("-start_day")
+ if not request.user.is_authenticated:
+ qs = qs.filter(
+ (
+ Q(other_versions__isnull=True)
+ | Q(other_versions__representative=F("pk"))
+ | Q(other_versions__representative__isnull=True)
+ )
+ )
+ if full:
+ filter = SearchEventFilter(
+ request.GET,
+ queryset=qs,
+ request=request,
+ )
+ else:
+ filter = SimpleSearchEventFilter(
+ request.GET,
+ queryset=qs,
+ request=request,
+ )
+ if "q" in request.GET:
+ categories = Category.objects.filter(name__icontains=request.GET["q"])
+ s_q = remove_accents(request.GET["q"].lower())
+ tags = (
+ Event.objects.extra(
+ where=["%s ILIKE ANY (tags)"], params=[request.GET["q"]]
+ )
+ .annotate(arr_tags=Func(F("tags"), function="unnest"))
+ .values_list("arr_tags", flat=True)
+ .distinct()
+ )
+ tags = [
+ (
+ t,
+ emoji.demojize(remove_accents(t).lower(), delimiters=("000", "")),
+ )
+ for t in tags
+ ]
+ tags = [t for t in tags if s_q == t[1]]
+ tags.sort(key=lambda x: x[1])
+ tags = [t[0] for t in tags]
+ places = Place.objects.filter(
+ Q(name__icontains=request.GET["q"])
+ | Q(description__icontains=request.GET["q"])
+ | Q(city__icontains=request.GET["q"])
+ )
+ organisations = Organisation.objects.filter(
+ Q(name__icontains=request.GET["q"])
+ | Q(description__icontains=request.GET["q"])
+ )
+ if request.user.is_authenticated:
+ rimports = RecurrentImport.objects.filter(
+ name__icontains=request.GET["q"]
+ )
+
+ paginator = PaginatorFilter(filter, 10, request)
+ page = request.GET.get("page")
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ return render(
+ request,
+ "agenda_culturel/search.html",
+ {
+ "filter": filter,
+ "categories": categories,
+ "tags": tags,
+ "places": places,
+ "organisations": organisations,
+ "rimports": rimports,
+ "has_results": len(request.GET) != 0
+ or (len(request.GET) > 1 and "page" in request.GET),
+ "paginator_filter": response,
+ "full": full,
+ },
+ )
+
+
+def event_search_full(request):
+ return event_search(request, True)
+
+
+#########################
+## batch importations
+#########################
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_batchimportation")
+def imports(request):
+ rel_event = Event.objects.filter(
+ import_sources__contains=[OuterRef("url_source")]
+ ).values("pk")[:1]
+ paginator = Paginator(
+ BatchImportation.objects.all()
+ .order_by("-created_date")
+ .annotate(event_id=Subquery(rel_event)),
+ 30,
+ )
+ page = request.GET.get("page")
+
+ today = date.today()
+
+ srcs = RecurrentImport.objects.all().values_list("source")
+ in_future = Event.objects.filter(Q(start_day__gte=today))
+ nb_in_orphan_import = in_future.filter(
+ (
+ Q(import_sources__isnull=False)
+ & (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
+ )
+ & ~Q(import_sources__overlap=srcs)
+ ).count()
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ return render(
+ request,
+ "agenda_culturel/imports.html",
+ {
+ "paginator_filter": response,
+ "nb_in_orphan_import": nb_in_orphan_import,
+ },
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ [
+ "agenda_culturel.add_batchimportation",
+ "agenda_culturel.run_batchimportation",
+ ]
+)
+def add_import(request):
+ form = BatchImportationForm()
+
+ if request.method == "POST":
+ form = BatchImportationForm(request.POST)
+
+ if form.is_valid():
+ import_events_from_json.delay(form.data["json"])
+
+ messages.success(request, _("The import has been run successfully."))
+ return HttpResponseRedirect(reverse_lazy("imports"))
+
+ return render(request, "agenda_culturel/batchimportation_form.html", {"form": form})
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ [
+ "agenda_culturel.view_batchimportation",
+ "agenda_culturel.run_batchimportation",
+ ]
+)
+def cancel_import(request, pk):
+ import_process = get_object_or_404(BatchImportation, pk=pk)
+
+ if request.method == "POST":
+ celery_app.control.revoke(import_process.celery_id)
+
+ import_process.status = BatchImportation.STATUS.CANCELED
+ import_process.save(update_fields=["status"])
+
+ messages.success(request, _("The import has been canceled."))
+ return HttpResponseRedirect(reverse_lazy("imports"))
+ else:
+ cancel_url = reverse_lazy("imports")
+ return render(
+ request,
+ "agenda_culturel/cancel_import_confirm.html",
+ {"object": import_process, "cancel_url": cancel_url},
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ [
+ "agenda_culturel.view_batchimportation",
+ "agenda_culturel.run_batchimportation",
+ ]
+)
+def update_orphan_events(request):
+ if request.method == "POST":
+ # run recurrent import
+ update_orphan_pure_import_events.delay()
+
+ messages.success(request, _("The orphan event update has been launched."))
+ return HttpResponseRedirect(reverse_lazy("imports"))
+ else:
+ today = date.today()
+
+ srcs = RecurrentImport.objects.all().values_list("source")
+ in_future = Event.objects.filter(Q(start_day__gte=today))
+ nb_in_orphan_import = in_future.filter(
+ (
+ Q(import_sources__isnull=False)
+ & (
+ Q(modified_date__isnull=True)
+ | Q(modified_date__lte=F("imported_date"))
+ )
+ )
+ & ~Q(import_sources__overlap=srcs)
+ ).count()
+ return render(
+ request,
+ "agenda_culturel/run_orphan_imports_confirm.html",
+ {"nb_in_orphan_import": nb_in_orphan_import},
+ )
+
+
+#########################
+## recurrent importations
+#########################
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_recurrentimport")
+def recurrent_imports(request, status=None):
+ newest = BatchImportation.objects.filter(recurrentImport=OuterRef("pk")).order_by(
+ "-created_date"
+ )
+
+ qs = (
+ RecurrentImport.objects.all()
+ .annotate(last_run_status=Subquery(newest.values("status")[:1]))
+ .order_by("-pk")
+ )
+
+ nb_failed = qs.filter(last_run_status=BatchImportation.STATUS.FAILED).count()
+ nb_canceled = qs.filter(last_run_status=BatchImportation.STATUS.CANCELED).count()
+ nb_running = qs.filter(last_run_status=BatchImportation.STATUS.RUNNING).count()
+ nb_all = qs.count()
+
+ if status is not None:
+ qs = qs.filter(last_run_status=status)
+
+ filter = RecurrentImportFilter(request.GET, queryset=qs)
+
+ paginator = PaginatorFilter(filter, 20, request)
+
+ page = request.GET.get("page")
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ return render(
+ request,
+ "agenda_culturel/rimports.html",
+ {
+ "paginator_filter": response,
+ "filter": filter,
+ "nb_all": nb_all,
+ "nb_failed": nb_failed,
+ "nb_canceled": nb_canceled,
+ "nb_running": nb_running,
+ "status": status,
+ },
+ )
+
+
+class RecurrentImportCreateView(
+ LoginRequiredMixin, PermissionRequiredMixin, CreateView
+):
+ model = RecurrentImport
+ permission_required = "agenda_culturel.add_recurrentimport"
+ success_url = reverse_lazy("recurrent_imports")
+ form_class = RecurrentImportForm
+
+
+class RecurrentImportUpdateView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ UpdateView,
+):
+ model = RecurrentImport
+ permission_required = "agenda_culturel.change_recurrentimport"
+ form_class = RecurrentImportForm
+ success_message = _("The recurrent import has been successfully modified.")
+
+
+class RecurrentImportDeleteView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ DeleteView,
+):
+ model = RecurrentImport
+ permission_required = "agenda_culturel.delete_recurrentimport"
+ success_url = reverse_lazy("recurrent_imports")
+ success_message = _("The recurrent import has been successfully deleted.")
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ [
+ "agenda_culturel.view_recurrentimport",
+ "agenda_culturel.view_batchimportation",
+ ]
+)
+def view_rimport(request, pk):
+ obj = get_object_or_404(RecurrentImport, pk=pk)
+ paginator = Paginator(
+ BatchImportation.objects.filter(recurrentImport=pk).order_by("-created_date"),
+ 10,
+ )
+ page = request.GET.get("page")
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ return render(
+ request,
+ "agenda_culturel/page-rimport.html",
+ {"paginator_filter": response, "object": obj},
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ [
+ "agenda_culturel.view_recurrentimport",
+ "agenda_culturel.run_recurrentimport",
+ ]
+)
+def run_rimport(request, pk):
+ rimport = get_object_or_404(RecurrentImport, pk=pk)
+
+ if request.method == "POST":
+ # run recurrent import
+ run_recurrent_import.delay(pk)
+
+ messages.success(request, _("The import has been launched."))
+ return HttpResponseRedirect(reverse_lazy("view_rimport", args=[pk]))
+ else:
+ return render(
+ request,
+ "agenda_culturel/run_rimport_confirm.html",
+ {"object": rimport},
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ [
+ "agenda_culturel.view_recurrentimport",
+ "agenda_culturel.run_recurrentimport",
+ ]
+)
+def run_all_rimports(request, status=None):
+ if request.method == "POST":
+ # run recurrent import
+ if status == BatchImportation.STATUS.FAILED:
+ run_all_recurrent_imports_failed.delay()
+ elif status == BatchImportation.STATUS.CANCELED:
+ run_all_recurrent_imports_canceled.delay()
+ else:
+ run_all_recurrent_imports.delay()
+
+ messages.success(request, _("Imports has been launched."))
+ return HttpResponseRedirect(reverse_lazy("recurrent_imports"))
+ else:
+ if status == BatchImportation.STATUS.FAILED:
+ return render(request, "agenda_culturel/run_failed_rimports_confirm.html")
+ elif status == BatchImportation.STATUS.CANCELED:
+ return render(request, "agenda_culturel/run_canceled_rimports_confirm.html")
+ else:
+ return render(request, "agenda_culturel/run_all_rimports_confirm.html")
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ [
+ "agenda_culturel.view_recurrentimport",
+ "agenda_culturel.run_recurrentimport",
+ ]
+)
+def run_all_fb_rimports(request, status=None):
+ if request.method == "POST":
+ run_all_recurrent_imports.delay(True)
+
+ messages.success(request, _("Facebook imports has been launched."))
+ return HttpResponseRedirect(reverse_lazy("recurrent_imports"))
+ else:
+ return render(request, "agenda_culturel/run_all_fb_rimports_confirm.html")
+
+
+#########################
+## duplicated events
+#########################
+
+
+class DuplicatedEventsDetailView(LoginRequiredMixin, DetailView):
+ model = DuplicatedEvents
+ template_name = "agenda_culturel/duplicate.html"
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ ["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
+)
+def update_duplicate_event(request, pk, epk):
+ edup = get_object_or_404(DuplicatedEvents, pk=pk)
+ event = get_object_or_404(Event, pk=epk)
+
+ form = MergeDuplicates(duplicates=edup, event=event)
+
+ if request.method == "POST":
+ form = MergeDuplicates(request.POST, duplicates=edup)
+ if form.is_valid():
+ for f in edup.get_items_comparison():
+ if not f["similar"]:
+ selected = form.get_selected_events(f["key"])
+ if selected is not None:
+ if isinstance(selected, list):
+ values = [
+ x
+ for x in [getattr(s, f["key"]) for s in selected]
+ if x is not None
+ ]
+ if len(values) != 0:
+ if isinstance(values[0], str):
+ setattr(event, f["key"], "\n".join(values))
+ else:
+ setattr(event, f["key"], sum(values, []))
+ else:
+ if f["key"] == "organisers":
+ event.organisers.set(selected.organisers.all())
+ else:
+ setattr(
+ event,
+ f["key"],
+ getattr(selected, f["key"]),
+ )
+ if f["key"] == "image":
+ setattr(
+ event,
+ "local_image",
+ getattr(selected, "local_image"),
+ )
+
+ event.other_versions.fix(event)
+ event.save()
+
+ messages.info(request, _("Update successfully completed."))
+ return HttpResponseRedirect(event.get_absolute_url())
+
+ return render(
+ request,
+ "agenda_culturel/update_duplicate.html",
+ context={
+ "form": form,
+ "object": edup,
+ "event_id": edup.get_event_index(event),
+ },
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ ["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
+)
+def merge_duplicate(request, pk):
+ edup = get_object_or_404(DuplicatedEvents, pk=pk)
+ form = MergeDuplicates(duplicates=edup)
+
+ if request.method == "POST":
+ form = MergeDuplicates(request.POST, duplicates=edup)
+ if form.is_valid():
+ events = edup.get_duplicated()
+
+ # build fields for the new event
+ new_event_data = {}
+ for f in edup.get_items_comparison():
+ if f["similar"]:
+ new_event_data[f["key"]] = getattr(events[0], f["key"])
+ else:
+ selected = form.get_selected_events(f["key"])
+ if selected is None:
+ new_event_data[f["key"]] = None
+ elif isinstance(selected, list):
+ values = [
+ x
+ for x in [getattr(s, f["key"]) for s in selected]
+ if x is not None
+ ]
+ if len(values) == 0:
+ new_event_data[f["key"]] = None
+ else:
+ if isinstance(values[0], str):
+ new_event_data[f["key"]] = "\n".join(values)
+ else:
+ new_event_data[f["key"]] = sum(values, [])
+ else:
+ new_event_data[f["key"]] = getattr(selected, f["key"])
+ if f["key"] == "image" and "local_image" not in new_event_data:
+ new_event_data["local_image"] = getattr(
+ selected, "local_image"
+ )
+
+ organisers = new_event_data.pop("organisers", None)
+ # create a new event that merge the selected events
+ new_event = Event(**new_event_data)
+ new_event.status = Event.STATUS.PUBLISHED
+ new_event.other_versions = edup
+ new_event.save()
+ if organisers is not None:
+ new_event.organisers.set(organisers.all())
+ edup.fix(new_event)
+
+ messages.info(
+ request,
+ _("Creation of a merged event has been successfully completed."),
+ )
+ return HttpResponseRedirect(new_event.get_absolute_url())
+
+ return render(
+ request,
+ "agenda_culturel/merge_duplicate.html",
+ context={"form": form, "object": edup},
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required(
+ ["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
+)
+def fix_duplicate(request, pk):
+ edup = get_object_or_404(DuplicatedEvents.objects.select_related(), pk=pk)
+
+ if request.method == "POST":
+ form = FixDuplicates(request.POST, edup=edup)
+
+ if form.is_valid():
+ if form.is_action_no_duplicates():
+ # all events are different
+ events = edup.get_duplicated()
+
+ # get redirection date
+ if len(events) == 0:
+ date = None
+ else:
+ s_events = [e for e in events if not e.has_recurrences()]
+ if len(s_events) != 0:
+ s_event = s_events[0]
+ else:
+ s_event = events[0]
+ date = s_event.start_day
+
+ messages.success(request, _("Events have been marked as unduplicated."))
+ # delete the duplicated event (other_versions will be set to None on all events)
+ edup.delete()
+ if date is None:
+ return HttpResponseRedirect(reverse_lazy("home"))
+ else:
+ return HttpResponseRedirect(
+ reverse_lazy("day_view", args=[date.year, date.month, date.day])
+ )
+
+ elif form.is_action_select():
+ # one element has been selected to be the representative
+ selected = form.get_selected_event(edup)
+ if selected is None:
+ messages.error(
+ request,
+ _(
+ "The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
+ ),
+ )
+ else:
+ edup.fix(selected)
+ messages.success(
+ request,
+ _("The selected event has been set as representative"),
+ )
+ return HttpResponseRedirect(edup.get_absolute_url())
+ elif form.is_action_remove():
+ # one element is removed from the set
+ event = form.get_selected_event(edup)
+ if event is None:
+ messages.error(
+ request,
+ _(
+ "The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
+ ),
+ )
+ return HttpResponseRedirect(edup.get_absolute_url())
+ else:
+ event.other_versions = None
+ if edup.representative == event:
+ edup.representative = None
+ event.set_no_modification_date_changed()
+ event.save()
+ edup.save()
+ edup.events = [e for e in edup.events if e.pk != event.pk]
+ messages.success(
+ request,
+ _(
+ "The event has been withdrawn from the group and made independent."
+ ),
+ )
+ if edup.nb_duplicated() == 1:
+ return HttpResponseRedirect(edup.get_absolute_url())
+ else:
+ form = FixDuplicates(edup=edup)
+ elif form.is_action_update():
+ # otherwise, an event will be updated using other elements
+ event = form.get_selected_event(edup)
+ if event is None:
+ messages.error(
+ request,
+ _(
+ "The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
+ ),
+ )
+ return HttpResponseRedirect(edup.get_absolute_url())
+ else:
+ return HttpResponseRedirect(
+ reverse_lazy("update_event", args=[edup.pk, event.pk])
+ )
+ else:
+ # otherwise, a new event will be created using a merging process
+ return HttpResponseRedirect(
+ reverse_lazy("merge_duplicate", args=[edup.pk])
+ )
+ else:
+ form = FixDuplicates(edup=edup)
+
+ return render(
+ request,
+ "agenda_culturel/fix_duplicate.html",
+ context={"form": form, "object": edup},
+ )
+
+
+class DuplicatedEventsUpdateView(LoginRequiredMixin, UpdateView):
+ model = DuplicatedEvents
+ fields = ()
+ template_name = "agenda_culturel/fix_duplicate.html"
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_duplicatedevents")
+def duplicates(request):
+ nb_removed = DuplicatedEvents.remove_singletons()
+ if nb_removed > 0:
+ messages.success(
+ request,
+ _("Cleaning up duplicates: {} item(s) fixed.").format(nb_removed),
+ )
+
+ filter = DuplicatedEventsFilter(
+ request.GET, queryset=DuplicatedEvents.objects.all().order_by("-pk")
+ )
+ paginator = PaginatorFilter(filter, 10, request)
+ page = request.GET.get("page")
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ return render(
+ request,
+ "agenda_culturel/duplicates.html",
+ {
+ "filter": filter,
+ "paginator_filter": response,
+ "paginator": paginator,
+ },
+ )
+
+
+def set_duplicate(request, year, month, day, pk):
+ event = get_object_or_404(Event, pk=pk)
+ cday = CalendarDay(date(year, month, day))
+ others = [
+ e
+ for e in cday.get_events()
+ if e != event
+ and (event.other_versions is None or event.other_versions != e.other_versions)
+ and e.status != Event.STATUS.TRASH
+ ]
+
+ form = SelectEventInList(events=others)
+
+ if request.method == "POST":
+ form = SelectEventInList(request.POST, events=others)
+ if form.is_valid():
+ selected = [o for o in others if o.pk == int(form.cleaned_data["event"])]
+ event.set_other_versions(selected)
+ # save them without updating modified date
+ event.set_no_modification_date_changed()
+ event.save()
+ if request.user.is_authenticated:
+ messages.success(request, _("The event was successfully duplicated."))
+ return HttpResponseRedirect(
+ reverse_lazy("view_duplicate", args=[event.other_versions.pk])
+ )
+ else:
+ messages.info(
+ request,
+ _(
+ "The event has been successfully flagged as a duplicate. The moderation team will deal with your suggestion shortly."
+ ),
+ )
+ return HttpResponseRedirect(event.get_absolute_url())
+
+ return render(
+ request,
+ "agenda_culturel/set_duplicate.html",
+ context={"form": form, "event": event},
+ )
+
+
+#########################
+## categorisation rules
+#########################
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.view_categorisationrule")
+def categorisation_rules(request):
+ paginator = Paginator(
+ CategorisationRule.objects.all()
+ .order_by("pk")
+ .select_related("category")
+ .select_related("place"),
+ 100,
+ )
+ page = request.GET.get("page")
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ return render(
+ request,
+ "agenda_culturel/categorisation_rules.html",
+ {"paginator_filter": response},
+ )
+
+
+class CategorisationRuleCreateView(
+ LoginRequiredMixin, PermissionRequiredMixin, CreateView
+):
+ model = CategorisationRule
+ permission_required = "agenda_culturel.add_categorisationrule"
+ success_url = reverse_lazy("categorisation_rules")
+ form_class = CategorisationRuleImportForm
+
+
+class CategorisationRuleUpdateView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ UpdateView,
+):
+ model = CategorisationRule
+ permission_required = "agenda_culturel.change_categorisationrule"
+ form_class = CategorisationRuleImportForm
+ success_url = reverse_lazy("categorisation_rules")
+ success_message = _("The categorisation rule has been successfully modified.")
+
+
+class CategorisationRuleDeleteView(
+ SuccessMessageMixin,
+ PermissionRequiredMixin,
+ LoginRequiredMixin,
+ DeleteView,
+):
+ model = CategorisationRule
+ permission_required = "agenda_culturel.delete_categorisationrule"
+ success_url = reverse_lazy("categorisation_rules")
+ success_message = _("The categorisation rule has been successfully deleted.")
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.apply_categorisationrules")
+def apply_categorisation_rules(request):
+ if request.method == "POST":
+ form = CategorisationForm(request.POST)
+ if form.is_valid():
+ nb = 0
+ for epk, c in form.get_validated():
+ e = Event.objects.get(pk=epk)
+ cat = Category.objects.filter(name=c).first()
+ e.category = cat
+ e.save()
+ nb += 1
+
+ if nb != 0:
+ if nb == 1:
+ messages.success(
+ request,
+ _(
+ "The rules were successfully applied and 1 event was categorised."
+ ),
+ )
+ else:
+ messages.success(
+ request,
+ _(
+ "The rules were successfully applied and {} events were categorised."
+ ).format(nb),
+ )
+ else:
+ messages.info(
+ request,
+ _(
+ "The rules were successfully applied and no events were categorised."
+ ),
+ )
+
+ return HttpResponseRedirect(reverse_lazy("categorisation_rules"))
+ else:
+ return render(
+ request,
+ "agenda_culturel/categorise_events_form.html",
+ context={"form": form},
+ )
+ else:
+ # first we check if events are not correctly categorised
+ to_categorise = []
+ events = (
+ Event.objects.filter(start_day__gte=datetime.now())
+ .exclude(category=Category.get_default_category_id())
+ .exclude(category=None)
+ .select_related("exact_location")
+ .select_related("category")
+ )
+ for e in events:
+ c = CategorisationRule.get_category_from_rules(e)
+ if c and c != e.category:
+ to_categorise.append((e, c))
+
+ # then we apply rules on events without category
+ nb = 0
+ to_save = []
+ events = (
+ Event.objects.filter(start_day__gte=datetime.now())
+ .filter(Q(category=Category.get_default_category_id()) | Q(category=None))
+ .select_related("exact_location")
+ .select_related("category")
+ )
+ for e in events:
+ success = CategorisationRule.apply_rules(e)
+ if success:
+ nb += 1
+ to_save.append(e)
+
+ if nb != 0:
+ Event.objects.bulk_update(to_save, fields=["category"])
+
+ # set messages
+ if nb != 0:
+ if nb == 1:
+ messages.success(
+ request,
+ _(
+ "The rules were successfully applied and 1 event with default category was categorised."
+ ),
+ )
+ else:
+ messages.success(
+ request,
+ _(
+ "The rules were successfully applied and {} events with default category were categorised."
+ ).format(nb),
+ )
+ else:
+ messages.info(
+ request,
+ _(
+ "The rules were successfully applied and no events were categorised."
+ ),
+ )
+
+ if len(to_categorise) != 0:
+ form = CategorisationForm(events=to_categorise)
+ return render(
+ request,
+ "agenda_culturel/categorise_events_form.html",
+ context={
+ "form": form,
+ "events": dict((e.pk, e) for e, c in to_categorise),
+ "categories": dict((e.pk, c) for e, c in to_categorise),
+ },
+ )
+ else:
+ return HttpResponseRedirect(reverse_lazy("categorisation_rules"))
+
+
+#########################
+## Places
+#########################
+
+
+class PlaceListView(ListView):
+ model = Place
+ ordering = ["name__unaccent"]
+
+
+class PlaceListAdminView(PermissionRequiredMixin, ListView):
+ model = Place
+ paginate_by = 10
+ permission_required = "agenda_culturel.add_place"
+ ordering = ["name__unaccent"]
+ template_name = "agenda_culturel/place_list_admin.html"
+
+
+class PlaceDetailView(ListView):
+ model = Place
+ template_name = "agenda_culturel/place_detail.html"
+ paginate_by = 10
+
+ def get_queryset(self):
+ self.place = get_object_or_404(Place, pk=self.kwargs["pk"])
+ return (
+ get_event_qs(self.request)
+ .filter(exact_location=self.place)
+ .filter(
+ Q(other_versions__isnull=True)
+ | Q(other_versions__representative=F("pk"))
+ | Q(other_versions__representative__isnull=True)
+ )
+ .filter(start_day__gte=datetime.now())
+ .order_by("start_day")
+ )
+
+ def get_context_data(self, **kwargs):
+ context = super().get_context_data(**kwargs)
+ context["object"] = self.place
+ return context
+
+
+class PlaceDetailViewPast(PlaceDetailView):
+ def get_queryset(self):
+ self.place = get_object_or_404(Place, pk=self.kwargs["pk"])
+ self.past = True
+ return (
+ get_event_qs(self.request)
+ .filter(exact_location=self.place)
+ .filter(
+ Q(other_versions__isnull=True)
+ | Q(other_versions__representative=F("pk"))
+ | Q(other_versions__representative__isnull=True)
+ )
+ .filter(start_day__lte=datetime.now())
+ .order_by("-start_day")
+ )
+
+ def get_context_data(self, **kwargs):
+ context = super().get_context_data(**kwargs)
+ context["past"] = self.past
+ return context
+
+
+class UpdatePlaces:
+ def form_valid(self, form):
+ result = super().form_valid(form)
+ p = form.instance
+
+ if not hasattr(self, "nb_applied"):
+ self.nb_applied = 0
+
+ # if required, find all matching events
+ if form.apply():
+ self.nb_applied += p.associate_matching_events()
+
+ if self.nb_applied > 1:
+ messages.success(
+ self.request,
+ _("{} events have been updated.").format(self.nb_applied),
+ )
+ elif self.nb_applied == 1:
+ messages.success(self.request, _("1 event has been updated."))
+ else:
+ messages.info(self.request, _("No events have been modified."))
+ return result
+
+
+class PlaceUpdateView(
+ UpdatePlaces, PermissionRequiredMixin, SuccessMessageMixin, UpdateView
+):
+ model = Place
+ permission_required = "agenda_culturel.change_place"
+ success_message = _("The place has been successfully updated.")
+ form_class = PlaceForm
+
+
+class PlaceCreateView(
+ UpdatePlaces, PermissionRequiredMixin, SuccessMessageMixin, CreateView
+):
+ model = Place
+ permission_required = "agenda_culturel.add_place"
+ success_message = _("The place has been successfully created.")
+ form_class = PlaceForm
+
+
+class PlaceDeleteView(PermissionRequiredMixin, DeleteView):
+ model = Place
+ permission_required = "agenda_culturel.delete_place"
+ success_url = reverse_lazy("view_places_admin")
+
+
+class UnknownPlacesListView(PermissionRequiredMixin, ListView):
+ model = Event
+ permission_required = "agenda_culturel.add_place"
+ paginate_by = 10
+ ordering = ["-pk"]
+ template_name = "agenda_culturel/place_unknown_list.html"
+ queryset = Event.get_qs_events_with_unkwnon_place()
+
+
+def fix_unknown_places(request):
+ # get all places
+ places = Place.objects.all()
+ # get all events without exact location
+ u_events = Event.get_qs_events_with_unkwnon_place()
+
+ to_be_updated = []
+ # try to find matches
+ for ue in u_events:
+ for p in places:
+ if p.match(ue):
+ ue.exact_location = p
+ to_be_updated.append(ue)
+ continue
+ # update events with a location
+ Event.objects.bulk_update(to_be_updated, fields=["exact_location"])
+
+ # create a success message
+ nb = len(to_be_updated)
+ if nb > 1:
+ messages.success(request, _("{} events have been updated.").format(nb))
+ elif nb == 1:
+ messages.success(request, _("1 event has been updated."))
+ else:
+ messages.info(request, _("No events have been modified."))
+
+ # come back to the list of places
+ return HttpResponseRedirect(reverse_lazy("view_unknown_places"))
+
+
+class UnknownPlaceAddView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
+ model = Event
+ permission_required = (
+ "agenda_culturel.change_place",
+ "agenda_culturel.change_event",
+ )
+ form_class = EventAddPlaceForm
+ template_name = "agenda_culturel/place_unknown_form.html"
+
+ def form_valid(self, form):
+ self.modified_event = form.cleaned_data.get("place")
+ self.add_alias = form.cleaned_data.get("add_alias")
+ result = super().form_valid(form)
+
+ if form.cleaned_data.get("place"):
+ messages.success(
+ self.request,
+ _("The selected place has been assigned to the event."),
+ )
+ if form.cleaned_data.get("add_alias"):
+ messages.success(
+ self.request,
+ _("A new alias has been added to the selected place."),
+ )
+
+ nb_applied = form.cleaned_data.get("place").associate_matching_events()
+
+ if nb_applied > 1:
+ messages.success(
+ self.request,
+ _("{} events have been updated.").format(nb_applied),
+ )
+ elif nb_applied == 1:
+ messages.success(self.request, _("1 event has been updated."))
+ else:
+ messages.info(self.request, _("No events have been modified."))
+
+ return result
+
+ def get_success_url(self):
+ if self.modified_event:
+ return reverse_lazy("view_unknown_places")
+ else:
+ param = "?add=1" if self.add_alias else ""
+ return reverse_lazy("add_place_from_event", args=[self.object.pk]) + param
+
+
+class PlaceFromEventCreateView(PlaceCreateView):
+ def get_context_data(self, **kwargs):
+ context = super().get_context_data(**kwargs)
+ context["event"] = self.event
+ return context
+
+ def get_initial(self, *args, **kwargs):
+ initial = super().get_initial(**kwargs)
+ self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
+ if self.event.location and "add" in self.request.GET:
+ initial["aliases"] = [self.event.location]
+ guesser = PlaceGuesser()
+ name, address, postcode, city = guesser.guess_address_elements(
+ self.event.location
+ )
+ initial["name"] = name
+ initial["address"] = address
+ initial["postcode"] = postcode
+ initial["city"] = city
+ initial["location"] = ""
+
+ return initial
+
+ def form_valid(self, form):
+ result = super().form_valid(form)
+ self.event.exact_location = form.instance
+ self.event.save(update_fields=["exact_location"])
+ return result
+
+ def get_success_url(self):
+ return self.event.get_absolute_url()
+
+
+#########################
+## Organisations
+#########################
+
+
+class OrganisationListView(ListView):
+ model = Organisation
+ paginate_by = 10
+ ordering = ["name__unaccent"]
+
+
+class OrganisationDetailView(ListView):
+ model = Organisation
+ template_name = "agenda_culturel/organisation_detail.html"
+ paginate_by = 10
+
+ def get_queryset(self):
+ self.organisation = (
+ Organisation.objects.filter(pk=self.kwargs["pk"])
+ .prefetch_related("organised_events")
+ .first()
+ )
+ return (
+ get_event_qs(self.request)
+ .filter(organisers__in=[self.kwargs["pk"]])
+ .filter(
+ Q(other_versions__isnull=True)
+ | Q(other_versions__representative=F("pk"))
+ | Q(other_versions__representative__isnull=True)
+ )
+ .filter(start_day__gte=datetime.now())
+ .order_by("start_day")
+ )
+
+ def get_context_data(self, **kwargs):
+ context = super().get_context_data(**kwargs)
+ context["object"] = self.organisation
+ return context
+
+
+class OrganisationDetailViewPast(OrganisationDetailView):
+ def get_queryset(self):
+ self.organisation = (
+ Organisation.objects.filter(pk=self.kwargs["pk"])
+ .prefetch_related("organised_events")
+ .first()
+ )
+ self.past = True
+ return (
+ get_event_qs(self.request)
+ .filter(organisers__in=[self.kwargs["pk"]])
+ .filter(
+ Q(other_versions__isnull=True)
+ | Q(other_versions__representative=F("pk"))
+ | Q(other_versions__representative__isnull=True)
+ )
+ .filter(start_day__lte=datetime.now())
+ .order_by("-start_day")
+ )
+
+ def get_context_data(self, **kwargs):
+ context = super().get_context_data(**kwargs)
+ context["past"] = self.past
+ return context
+
+
+class OrganisationUpdateView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
+ model = Organisation
+ permission_required = "agenda_culturel.change_organisation"
+ success_message = _("The organisation has been successfully updated.")
+ fields = "__all__"
+
+
+class OrganisationCreateView(PermissionRequiredMixin, SuccessMessageMixin, CreateView):
+ model = Organisation
+ permission_required = "agenda_culturel.add_organisation"
+ success_message = _("The organisation has been successfully created.")
+ fields = "__all__"
+
+
+class OrganisationDeleteView(PermissionRequiredMixin, DeleteView):
+ model = Organisation
+ permission_required = "agenda_culturel.delete_organisation"
+ success_url = reverse_lazy("view_organisations")
+
+
+#########################
+## Tags
+#########################
+
+
+class TagUpdateView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
+ model = Tag
+ permission_required = "agenda_culturel.change_tag"
+ form_class = TagForm
+ success_message = _("The tag has been successfully updated.")
+
+
+class TagCreateView(PermissionRequiredMixin, SuccessMessageMixin, CreateView):
+ model = Tag
+ permission_required = "agenda_culturel.add_tag"
+ form_class = TagForm
+ success_message = _("The tag has been successfully created.")
+
+ def get_initial(self, *args, **kwargs):
+ initial = super().get_initial(**kwargs)
+ if "name" in self.request.GET:
+ initial["name"] = self.request.GET.get("name")
+ return initial
+
+ def form_valid(self, form):
+ Tag.clear_cache()
+ return super().form_valid(form)
+
+
+class TagDeleteView(PermissionRequiredMixin, DeleteView):
+ model = Tag
+ permission_required = "agenda_culturel.delete_tag"
+ success_url = reverse_lazy("view_all_tags")
+
+
+def view_tag_past(request, t):
+ return view_tag(request, t, True)
+
+
+def view_tag(request, t, past=False):
+ now = date.today()
+
+ qs = get_event_qs(request).filter(tags__contains=[t])
+
+ if past:
+ qs = qs.filter(start_day__lt=now).order_by("-start_day", "-start_time")
+ else:
+ qs = qs.filter(start_day__gte=now).order_by("start_day", "start_time")
+
+ qs = qs.filter(
+ Q(other_versions__isnull=True)
+ | Q(other_versions__representative=F("pk"))
+ | Q(other_versions__representative__isnull=True)
+ )
+
+ paginator = Paginator(qs, 10)
+ page = request.GET.get("page")
+
+ try:
+ response = paginator.page(page)
+ except PageNotAnInteger:
+ response = paginator.page(1)
+ except EmptyPage:
+ response = paginator.page(paginator.num_pages)
+
+ rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
+
+ tag = Tag.objects.filter(name=t).first()
+ context = {
+ "tag": t,
+ "paginator_filter": response,
+ "object": tag,
+ "rimports": rimports,
+ "past": past,
+ }
+ return render(request, "agenda_culturel/tag.html", context)
+
+
+def statistics(request, pk=None):
+ if pk is not None:
+ rimport = RecurrentImport.objects.filter(pk=pk)
+ source = rimport.values("source").first()["source"]
+ qs = Event.objects.filter(import_sources__contains=[source])
+ else:
+ rimport = None
+ qs = Event.objects
+
+ stats = {}
+ stats_months = {}
+ first = {}
+ last = {}
+
+ ev_published = qs.filter(
+ Q(status=Event.STATUS.PUBLISHED)
+ & (
+ Q(other_versions__isnull=True)
+ | Q(other_versions__representative=F("pk"))
+ | Q(other_versions__representative__isnull=True)
+ )
+ )
+
+ for v in ["start_day", "created_date__date"]:
+ after = 24
+ last[v] = (
+ date.today()
+ if v == "created_date__date"
+ else date.today() + timedelta(weeks=after)
+ )
+ last[v] = last[v].replace(
+ day=_calendar.monthrange(last[v].year, last[v].month)[1]
+ )
+
+ r = 8 * 30
+ if v == "start_day":
+ r += after * 7
+ first[v] = (last[v] - timedelta(days=r)).replace(day=1)
+
+ ev_days = ev_published.annotate(day=F(v)).filter(
+ Q(day__lte=last[v]) & Q(day__gte=first[v])
+ )
+
+ stats[v] = ev_days.values("day").annotate(total=Count("day")).order_by("day")
+
+ stats_months[v] = (
+ ev_days.annotate(month=TruncMonth("day"))
+ .values("month")
+ .annotate(total=Count("month"))
+ .order_by("month")
+ )
+
+ nb_by_city = (
+ ev_published.annotate(city=F("exact_location__city"))
+ .filter(city__isnull=False)
+ .values("city")
+ .annotate(total=Count("city"))
+ .order_by("-total")
+ )
+
+ limit = datetime.now() + timedelta(days=-30)
+
+ stat_qs = qs.filter(start_day__gte=F("created_date")).annotate(
+ foresight=ExtractDay(F("start_day") - F("created_date"))
+ )
+
+ statsa = stat_qs.filter().aggregate(
+ minimum=Min("foresight"),
+ maximum=Max("foresight"),
+ mean=Avg("foresight"),
+ median=Median("foresight"),
+ stdev=StdDev("foresight"),
+ )
+
+ statsm = stat_qs.filter(created_date__gte=limit).aggregate(
+ minimum=Min("foresight"),
+ maximum=Max("foresight"),
+ mean=Avg("foresight"),
+ median=Median("foresight"),
+ stdev=StdDev("foresight"),
+ )
+
+ stats_foresight = [
+ [
+ _(x),
+ round(statsa[x], 2) if statsa[x] is not None else "-",
+ round(statsm[x], 2) if statsm[x] is not None else "-",
+ ]
+ for x in statsa
+ ]
+
+ context = {
+ "stats_by_startday": stats["start_day"],
+ "stats_by_creation": stats["created_date__date"],
+ "stats_months_by_startday": stats_months["start_day"],
+ "stats_months_by_creation": stats_months["created_date__date"],
+ "first_by_startday": first["start_day"],
+ "last_by_startday": last["start_day"],
+ "first_by_creation": first["created_date__date"],
+ "last_by_creation": last["created_date__date"],
+ "nb_by_city": nb_by_city,
+ "stats_foresight": stats_foresight,
+ "object": rimport.first() if rimport else None,
+ }
+
+ if pk is None:
+ return render(request, "agenda_culturel/statistics.html", context)
+ else:
+ return render(request, "agenda_culturel/rimport-statistics.html", context)
+
+
+def tag_list(request):
+ tags = Event.get_all_tags()
+ r_tags = [t["tag"] for t in tags]
+ objects = Tag.objects.order_by("name").all()
+ d_objects = dict()
+ for o in objects:
+ d_objects[o.name] = o
+
+ tags = [
+ t | {"obj": d_objects[t["tag"]]} if t["tag"] in d_objects else t for t in tags
+ ]
+ tags += [
+ {"obj": o, "tag": o.name, "count": 0} for o in objects if o.name not in r_tags
+ ]
+
+ context = {
+ "tags": sorted(
+ tags,
+ key=lambda x: emoji.demojize(
+ remove_accents(x["tag"]).lower(), delimiters=("000", "")
+ ),
+ )
+ }
+ return render(request, "agenda_culturel/tags.html", context)
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.change_tag")
+def rename_tag(request, t):
+ form = TagRenameForm(name=t)
+
+ if request.method == "POST":
+ form = TagRenameForm(request.POST, name=t)
+ if form.is_valid():
+ save = True
+ if form.cleaned_data["name"] == t:
+ messages.warning(
+ request,
+ _("You have not modified the tag name."),
+ )
+ save = False
+ elif not form.is_force():
+ if (
+ Event.objects.filter(
+ tags__contains=[form.cleaned_data["name"]]
+ ).count()
+ > 0
+ ):
+ if Tag.objects.filter(name=form.cleaned_data["name"]):
+ messages.warning(
+ request,
+ (
+ _(
+ "This tag {} is already in use, and is described by different information from the current tag. You can force renaming by checking the corresponding option. The information associated with tag {} will be deleted, and all events associated with tag {} will be associated with tag {}."
+ )
+ ).format(
+ form.cleaned_data["name"],
+ t,
+ t,
+ form.cleaned_data["name"],
+ ),
+ )
+ else:
+ messages.warning(
+ request,
+ (
+ _(
+ "This tag {} is already in use. You can force renaming by checking the corresponding option."
+ )
+ ).format(form.cleaned_data["name"]),
+ )
+ save = False
+ form = TagRenameForm(request.POST, name=t, force=True)
+
+ if save:
+ # find all matching events and update them
+ events = Event.objects.filter(tags__contains=[t])
+ new_name = form.cleaned_data["name"]
+ for e in events:
+ e.tags = [te for te in e.tags if te != t]
+ if new_name not in e.tags:
+ e.tags += [new_name]
+ Event.objects.bulk_update(events, fields=["tags"])
+
+ # find all recurrent imports and fix them
+ rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
+ for ri in rimports:
+ ri.tags = [te for te in ri.defaultTags if te != t]
+ if new_name not in ri.tags:
+ ri.tags += [new_name]
+ RecurrentImport.objects.bulk_update(rimports, fields=["defaultTags"])
+
+ # find tag object
+ tag_object = Tag.objects.filter(name=t).first()
+ if tag_object:
+ tag_object.name = new_name
+ tag_object.save()
+
+ messages.success(
+ request,
+ (_("The tag {} has been successfully renamed to {}.")).format(
+ t, form.cleaned_data["name"]
+ ),
+ )
+ return HttpResponseRedirect(
+ reverse_lazy("view_tag", kwargs={"t": form.cleaned_data["name"]})
+ )
+
+ nb = Event.objects.filter(tags__contains=[t]).count()
+
+ return render(
+ request,
+ "agenda_culturel/tag_rename_form.html",
+ context={"form": form, "tag": t, "nb": nb},
+ )
+
+
+@login_required(login_url="/accounts/login/")
+@permission_required("agenda_culturel.delete_tag")
+def delete_tag(request, t):
+ respage = reverse_lazy("view_all_tags")
+
+ if request.method == "POST":
+ # remove tag from events
+ events = Event.objects.filter(tags__contains=[t])
+ for e in events:
+ e.tags = [te for te in e.tags if te != t]
+ Event.objects.bulk_update(events, fields=["tags"])
+
+ # remove tag from recurrent imports
+ rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
+ for ri in rimports:
+ ri.tags = [te for te in ri.defaultTags if te != t]
+ RecurrentImport.objects.bulk_update(rimports, fields=["defaultTags"])
+
+ # find tag object
+ tag_object = Tag.objects.filter(name=t).first()
+ if tag_object:
+ tag_object.delete()
+
+ messages.success(
+ request,
+ (_("The tag {} has been successfully deleted.")).format(t),
+ )
+ return HttpResponseRedirect(respage)
+ else:
+ nb = Event.objects.filter(tags__contains=[t]).count()
+ obj = Tag.objects.filter(name=t).first()
+ nbi = RecurrentImport.objects.filter(defaultTags__contains=[t]).count()
+ cancel_url = request.META.get("HTTP_REFERER", "")
+ if cancel_url == "":
+ cancel_url = respage
+ return render(
+ request,
+ "agenda_culturel/tag_confirm_delete_by_name.html",
+ {
+ "tag": t,
+ "nb": nb,
+ "nbi": nbi,
+ "cancel_url": cancel_url,
+ "obj": obj,
+ },
+ )
+
+
+def clear_cache(request):
+ if request.method == "POST":
+ cache.clear()
+ messages.success(request, _("Cache successfully cleared."))
+ return HttpResponseRedirect(reverse_lazy("administration"))
+ else:
+ return render(
+ request,
+ "agenda_culturel/clear_cache.html",
+ )
+
+
+class UserProfileUpdateView(
+ SuccessMessageMixin,
+ LoginRequiredMixin,
+ UpdateView,
+):
+ model = UserProfile
+ success_message = _("Your user profile has been successfully modified.")
+ success_url = reverse_lazy("administration")
+ form_class = UserProfileForm
+
+ def get_object(self):
+ return self.request.user.userprofile