diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 79c5ddb..99fe2e0 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,5 +1,5 @@
default_language_version:
- python: python3.13
+ python: python3.12
repos:
# Using this mirror lets us use mypyc-compiled black, which is about 2x faster
- repo: https://github.com/psf/black-pre-commit-mirror
diff --git a/src/agenda_culturel/views.py b/src/agenda_culturel/views.py
deleted file mode 100644
index a44a7c1..0000000
--- a/src/agenda_culturel/views.py
+++ /dev/null
@@ -1,3076 +0,0 @@
-import calendar as _calendar
-import hashlib
-import logging
-from datetime import date, timedelta
-
-import emoji
-from django.contrib import messages
-from django.contrib.auth.decorators import login_required, permission_required
-from django.contrib.auth.mixins import (
- LoginRequiredMixin,
- PermissionRequiredMixin,
- UserPassesTestMixin,
-)
-from django.contrib.messages.views import SuccessMessageMixin
-from django.core.cache import cache
-from django.core.mail import mail_admins
-from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
-from django.db.models import Count, F, Func, OuterRef, Q, Subquery
-from django.db.models.functions import TruncMonth
-from django.http import (
- Http404,
- HttpResponse,
- HttpResponseForbidden,
- HttpResponseRedirect,
-)
-from django.shortcuts import get_object_or_404, render
-from django.urls import reverse, reverse_lazy
-from django.utils.decorators import method_decorator
-from django.utils.html import escape
-from django.utils.safestring import mark_safe
-from django.utils.timezone import datetime
-from django.utils.translation import gettext_lazy as _
-from django.views.generic import DetailView, ListView
-from django.views.generic.edit import (
- CreateView,
- DeleteView,
- ModelFormMixin,
- UpdateView,
-)
-from honeypot.decorators import check_honeypot
-from django.db.models.aggregates import StdDev
-from django.db.models import Avg, Max, Min
-from django.db.models import Aggregate, FloatField
-from django.db.models.functions import ExtractDay
-
-from .calendar import CalendarDay, CalendarList, CalendarMonth, CalendarWeek
-from .celery import app as celery_app
-from .celery import (
- import_events_from_json,
- import_events_from_url,
- import_events_from_urls,
- run_all_recurrent_imports,
- run_all_recurrent_imports_canceled,
- run_all_recurrent_imports_failed,
- run_recurrent_import,
- update_orphan_pure_import_events,
-)
-from .filters import (
- DuplicatedEventsFilter,
- EventFilter,
- EventFilterAdmin,
- MessagesFilterAdmin,
- RecurrentImportFilter,
- SearchEventFilter,
- SimpleSearchEventFilter,
-)
-from .forms import (
- BatchImportationForm,
- CategorisationForm,
- CategorisationRuleImportForm,
- EventAddPlaceForm,
- EventForm,
- EventFormWithContact,
- EventModerateForm,
- FixDuplicates,
- MergeDuplicates,
- MessageEventForm,
- MessageForm,
- PlaceForm,
- RecurrentImportForm,
- SelectEventInList,
- SimpleContactForm,
- TagForm,
- TagRenameForm,
- URLSubmissionFormSet,
- URLSubmissionFormWithContact,
- UserProfileForm,
-)
-from .import_tasks.extractor import Extractor
-from .models import (
- BatchImportation,
- CategorisationRule,
- Category,
- DuplicatedEvents,
- Event,
- Message,
- Organisation,
- Place,
- RecurrentImport,
- StaticContent,
- Tag,
- remove_accents,
- UserProfile,
-)
-from .utils import PlaceGuesser
-
-logger = logging.getLogger(__name__)
-
-
-class Median(Aggregate):
- function = "PERCENTILE_CONT"
- name = "median"
- output_field = FloatField()
- template = "%(function)s(0.5) WITHIN GROUP (ORDER BY %(expressions)s)"
-
-
-class PaginatorFilter(Paginator):
- def __init__(self, filter, nb, request):
- self.request = request
- self.filter = filter
-
- super().__init__(filter.qs, nb)
-
- self.url_first_page = PaginatorFilter.update_param(
- self.request.get_full_path(), "page", 1
- )
- self.url_last_page = PaginatorFilter.update_param(
- self.request.get_full_path(), "page", self.num_pages
- )
-
- def update_param(params, key, value):
- p = params.split("?")
- root = p[0]
- if len(p) > 1:
- other = p[1]
- others = other.split("&")
- others = [o for o in others if not o.startswith(key)]
- others += [key + "=" + str(value)]
- return root + "?" + "&".join(others)
- else:
- return root + "?" + key + "=" + str(value)
-
- def page(self, *args, **kwargs):
- page = super().page(*args, **kwargs)
-
- try:
- page.url_previous_page = PaginatorFilter.update_param(
- self.request.get_full_path(),
- "page",
- page.previous_page_number(),
- )
- except EmptyPage:
- page.url_previous_page = self.request.get_full_path()
-
- try:
- page.url_next_page = PaginatorFilter.update_param(
- self.request.get_full_path(), "page", page.next_page_number()
- )
- except EmptyPage:
- page.url_next_page = self.request.get_full_path()
-
- return page
-
-
-#
-#
-# Useful for translation
-to_be_translated = [
- _("Recurrent import name"),
- _("Add another"),
- _("Browse..."),
- _("No file selected."),
-]
-
-
-def get_event_qs(request):
- if request.user.is_authenticated:
- return Event.objects.filter()
- else:
- return Event.objects.filter(status=Event.STATUS.PUBLISHED)
-
-
-def page_not_found(request, exception=None):
- return render(request, "page-erreur.html", status=404, context={"error": 404})
-
-
-def internal_server_error(request):
- try:
- mail_admins(
- request.site.name + _(": error 500"),
- _("An internal error has occurred on site {} at address {}.").format(
- request.site.name, request.build_absolute_uri()
- ),
- )
- except Exception:
- pass
- return render(request, "page-erreur.html", status=500, context={"error": 500})
-
-
-def thank_you(request):
- return render(request, "agenda_culturel/thank_you.html")
-
-
-def mentions_legales(request):
- context = {
- "title": "Mentions légales",
- "static_content": "mentions_legales",
- "url_path": reverse_lazy("mentions_legales"),
- }
- return render(request, "agenda_culturel/page-single.html", context)
-
-
-def about(request):
- rimports = (
- RecurrentImport.objects.filter(~Q(recurrence=RecurrentImport.RECURRENCE.NEVER))
- .order_by("name__unaccent")
- .all()
- )
- context = {
- "title": "À propos",
- "static_content": "about",
- "url_path": reverse_lazy("about"),
- "rimports": rimports,
- }
- return render(request, "agenda_culturel/page-rimports-list.html", context)
-
-
-def moderation_rules(request):
- context = {
- "title": _("Moderation rules"),
- "static_content": "moderation_rules",
- "url_path": reverse_lazy("moderation_rules"),
- }
- return render(request, "agenda_culturel/page-single.html", context)
-
-
-def import_requirements(request):
- context = {
- "title": _("Import requirements"),
- "static_content": "import_requirements",
- "url_path": reverse_lazy("import_requirements"),
- }
- return render(request, "agenda_culturel/page-single.html", context)
-
-
-def home(request, cat=None):
- return week_view(request, home=True, cat=cat)
-
-
-def month_view(request, year=None, month=None, cat=None):
- now = date.today()
- if year is None and month is None:
- day = now.day
- else:
- day = None
- if year is None:
- year = now.year
- if month is None:
- month = now.month
-
- request = EventFilter.set_default_values(request)
- qs = get_event_qs(request).only(
- "title",
- "start_day",
- "start_time",
- "category",
- "other_versions",
- "recurrences",
- "end_day",
- "end_time",
- "uuids",
- "status",
- "tags",
- )
- if cat is not None:
- category = Category.objects.filter(slug=cat).first()
- qs = qs.filter(category=category)
- else:
- category = None
-
- filter = EventFilter(request.GET, qs, request=request)
-
- if filter.has_category_parameters():
- return HttpResponseRedirect(filter.get_new_url())
-
- cmonth = CalendarMonth(year, month, filter, day=day)
-
- context = {
- "calendar": cmonth,
- "this_month": day is not None,
- "filter": filter,
- "category": category,
- "init_date": now if cmonth.today_in_calendar() else cmonth.firstdate,
- }
- return render(request, "agenda_culturel/page-month.html", context)
-
-
-def week_view(request, year=None, week=None, home=False, cat=None):
- now = date.today()
- if year is None:
- year = now.isocalendar()[0]
- if week is None:
- week = now.isocalendar()[1]
-
- request = EventFilter.set_default_values(request)
- qs = (
- get_event_qs(request)
- .select_related("exact_location")
- .only(
- "title",
- "start_day",
- "start_time",
- "category",
- "other_versions",
- "recurrences",
- "end_day",
- "end_time",
- "uuids",
- "status",
- "tags",
- "local_image",
- "image",
- "image_alt",
- "exact_location",
- "description",
- )
- )
- if cat is not None:
- category = Category.objects.filter(slug=cat).first()
- qs = qs.filter(category=category)
- else:
- category = None
- filter = EventFilter(request.GET, qs, request=request)
-
- if filter.has_category_parameters():
- return HttpResponseRedirect(filter.get_new_url())
-
- cweek = CalendarWeek(year, week, filter)
-
- context = {
- "year": year,
- "week": week,
- "calendar": cweek,
- "filter": filter,
- "category": category,
- "init_date": now if cweek.today_in_calendar() else cweek.firstdate,
- }
- if home:
- context["home"] = 1
- return render(request, "agenda_culturel/page-week.html", context)
-
-
-def day_view(request, year=None, month=None, day=None, cat=None):
- if year is None or month is None or day is None:
- if "when" in request.POST:
- when = datetime.strptime(request.POST["when"], "%Y-%m-%d")
- year = when.year
- month = when.month
- day = when.day
-
- request = EventFilter.set_default_values(request)
- qs = get_event_qs(request).select_related("exact_location")
- if cat is not None:
- category = Category.objects.filter(slug=cat).first()
- qs = qs.filter(category=category)
- else:
- category = None
- filter = EventFilter(request.GET, qs, request=request)
-
- return HttpResponseRedirect(
- reverse_lazy("day_view", args=[year, month, day])
- + "?"
- + filter.get_url()
- )
-
- return upcoming_events(request, year, month, day, 0, cat)
-
-
-def upcoming_events(request, year=None, month=None, day=None, neighsize=1, cat=None):
- now = date.today()
- if year is None:
- year = now.year
- if month is None:
- month = now.month
- if day is None:
- day = now.day
-
- day = date(year, month, day)
- day = day + timedelta(days=neighsize)
-
- request = EventFilter.set_default_values(request)
- qs = get_event_qs(request).select_related("exact_location")
- if cat is not None:
- category = Category.objects.filter(slug=cat).first()
- qs = qs.filter(category=category)
- else:
- category = None
-
- filter = EventFilter(request.GET, qs, request=request)
-
- if filter.has_category_parameters():
- return HttpResponseRedirect(filter.get_new_url())
-
- cal = CalendarList(
- day + timedelta(days=-neighsize),
- day + timedelta(days=neighsize),
- filter,
- True,
- )
-
- context = {
- "calendar": cal,
- "now": now,
- "day": day,
- "init_date": now if cal.today_in_calendar() else day,
- "filter": filter,
- "date_pred": day + timedelta(days=-neighsize - 1),
- "date_next": day + timedelta(days=neighsize + 1),
- "category": category,
- }
-
- return render(request, "agenda_culturel/page-upcoming.html", context)
-
-
-class StaticContentCreateView(LoginRequiredMixin, CreateView):
- model = StaticContent
- fields = ["text"]
- permission_required = "agenda_culturel.add_staticcontent"
-
- def form_valid(self, form):
- form.instance.name = self.request.GET["name"]
- form.instance.url_path = self.request.GET["url_path"]
- return super().form_valid(form)
-
-
-class StaticContentUpdateView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- UpdateView,
-):
- model = StaticContent
- permission_required = "agenda_culturel.change_staticcontent"
- fields = ["text"]
- success_message = _("The static content has been successfully updated.")
-
-
-def update_from_source(request, pk):
- event = get_object_or_404(Event, pk=pk)
-
- url = event.get_updateable_uuid()
- if url is None:
- messages.warning(
- request,
- _(
- "The event cannot be updated because the import process is not available for the referenced sources."
- ),
- )
- else:
- import_events_from_url.delay(
- url,
- None,
- None,
- True,
- user_id=request.user.pk if request.user else None,
- )
- messages.success(
- request,
- _("The event update has been queued and will be completed shortly."),
- )
-
- return HttpResponseRedirect(event.get_absolute_url())
-
-
-class EventUpdateView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- UpdateView,
-):
- model = Event
- permission_required = "agenda_culturel.change_event"
- form_class = EventForm
-
- def get_form_kwargs(self):
- kwargs = super().get_form_kwargs()
- kwargs["is_authenticated"] = self.request.user.is_authenticated
- kwargs["is_moderation_expert"] = (
- self.request.user.userprofile.is_moderation_expert
- )
- kwargs["is_cloning"] = self.is_cloning
- kwargs["is_edit_from_moderation"] = self.is_edit_force()
- kwargs["is_simple_cloning"] = self.is_simple_cloning
- return kwargs
-
- def get_success_message(self, cleaned_data):
- txt = (
- _(" A message has been sent to the person who proposed the event.")
- if hasattr(self, "with_msg") and self.with_msg
- else ""
- )
- return mark_safe(_("The event has been successfully modified.") + txt)
-
- def is_edit_force(self):
- return "edit-force" in self.request.path.split("/")
-
- def get_object(self, queryset=None):
- event = super().get_object(queryset)
- if event.status == Event.STATUS.DRAFT:
- event.status = Event.STATUS.PUBLISHED
- if self.is_edit_force():
- event.free_modification_lock(self.request.user)
- return event
-
- def form_valid(self, form):
- form.instance.set_processing_user(self.request.user)
- self.with_message = form.instance.notify_if_required(self.request)
- return super().form_valid(form)
-
- def get_initial(self):
- self.is_cloning = "clone" in self.request.path.split("/")
- if self.is_cloning:
- messages.info(
- self.request,
- _(
- "Changes will be visible on a local copy of the event. The version identical to the imported source will be hidden."
- ),
- )
- self.is_simple_cloning = "simple-clone" in self.request.path.split("/")
- result = super().get_initial()
-
- if self.is_cloning and "other_versions" not in result:
- obj = self.get_object()
- # if no DuplicatedEvents is associated, create one
- obj.other_versions = DuplicatedEvents.objects.create()
- obj.other_versions.save()
- # save them without updating modified date
- obj.set_no_modification_date_changed()
- obj.save()
- result["other_versions"] = obj.other_versions
- result["status"] = Event.STATUS.PUBLISHED
- result["cloning"] = True
-
- if self.is_simple_cloning:
- result["other_versions"] = None
- result["simple_cloning"] = True
-
- if self.is_cloning or self.is_simple_cloning:
- obj = self.get_object()
- if obj.local_image:
- result["old_local_image"] = obj.local_image.name
-
- return result
-
- def get_success_url(self):
- if "save_and_next" in self.request.POST:
- return reverse_lazy("moderate_event_next", args=[self.object.pk])
- else:
- return self.object.get_absolute_url()
-
-
-class EventModerateView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- UpdateView,
-):
- model = Event
- permission_required = "agenda_culturel.change_event"
- template_name = "agenda_culturel/event_form_moderate.html"
- form_class = EventModerateForm
-
- def get_form_kwargs(self):
- kwargs = super().get_form_kwargs()
- kwargs["is_moderation_expert"] = (
- self.request.user.userprofile.is_moderation_expert
- )
- return kwargs
-
- def get_success_message(self, cleaned_data):
- txt = (
- _(" A message has been sent to the person who proposed the event.")
- if hasattr(self, "with_msg") and self.with_msg
- else ""
- )
- return mark_safe(
- _('The event {} has been moderated with success.').format(
- self.object.get_absolute_url(), self.object.title
- )
- + txt
- )
-
- def is_moderate_next(self):
- return "after" in self.request.path.split("/")
-
- def is_moderate_back(self):
- return "back" in self.request.path.split("/")
-
- def is_moderate_force(self):
- return "moderate-force" in self.request.path.split("/")
-
- def is_starting_moderation(self):
- return "pk" not in self.kwargs
-
- def is_moderation_from_date(self):
- return "m" in self.kwargs and "y" in self.kwargs and "d" in self.kwargs
-
- def get_next_event(start_day, start_time, opk):
- # select non moderated events
- qs = Event.objects.filter(moderated_date__isnull=True)
-
- # select events after the current one
- if start_time:
- qs = qs.filter(
- Q(start_day__gt=start_day)
- | (
- Q(start_day=start_day)
- & (Q(start_time__isnull=True) | Q(start_time__gt=start_time))
- )
- )
- else:
- qs = qs.filter(Q(start_day__gte=start_day) & ~Q(pk=opk))
-
- # get only possibly representative events
- qs = qs.filter(
- Q(other_versions__isnull=True)
- | Q(other_versions__representative=F("pk"))
- | Q(other_versions__representative__isnull=True)
- )
-
- # remove trash events
- qs = qs.filter(~Q(status=Event.STATUS.TRASH))
-
- # sort by datetime
- qs = qs.order_by("start_day", "start_time")
-
- return qs.first()
-
- def get_context_data(self, **kwargs):
- context = super().get_context_data(**kwargs)
- if self.is_moderate_next():
- context["pred"] = self.kwargs["pred"]
- return context
-
- def get_object(self, queryset=None):
- if self.is_starting_moderation():
- now = datetime.now()
- event = EventModerateView.get_next_event(now.date(), now.time(), None)
- else:
- event = super().get_object(queryset)
- if event.status == Event.STATUS.DRAFT:
- event.status = Event.STATUS.PUBLISHED
- if self.is_moderate_back():
- pred = Event.objects.filter(pk=self.kwargs["pred"]).first()
- pred.free_modification_lock(self.request.user)
- if self.is_moderate_force():
- event.free_modification_lock(self.request.user, False)
- return event
-
- def post(self, request, *args, **kwargs):
- try:
- return super().post(request, args, kwargs)
- except Http404:
- return HttpResponseRedirect(
- reverse_lazy("error_next_event", args=[self.object.pk])
- )
-
- def form_valid(self, form):
- form.instance.set_no_modification_date_changed()
- form.instance.set_in_moderation_process()
- form.instance.set_processing_user(self.request.user)
- self.with_msg = form.instance.notify_if_required(self.request)
- return super().form_valid(form)
-
- def get_success_url(self):
- if "save_and_next" in self.request.POST:
- return reverse_lazy("moderate_event_next", args=[self.object.pk])
- elif "save_and_edit_local" in self.request.POST:
- return reverse_lazy("edit_event", args=[self.object.get_local_version().pk])
- else:
- return self.object.get_absolute_url()
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.change_event")
-def error_next_event(request, pk):
- obj = Event.objects.filter(pk=pk).first()
-
- return render(
- request,
- "agenda_culturel/event_next_error_message.html",
- {"pk": pk, "object": obj},
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.change_event")
-def moderate_event_next(request, pk):
- # current event
- obj = Event.objects.filter(pk=pk).first()
- # free lock
- obj.free_modification_lock(request.user)
- start_day = obj.start_day
- start_time = obj.start_time
-
- next_obj = EventModerateView.get_next_event(start_day, start_time, pk)
- if next_obj is None:
- return render(
- request,
- "agenda_culturel/event_next_error_message.html",
- {"pk": pk, "object": obj},
- )
- else:
- return HttpResponseRedirect(
- reverse_lazy("moderate_event_step", args=[next_obj.pk, obj.pk])
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.change_event")
-def moderate_from_date(request, y, m, d):
- d = date(y, m, d)
- obj = EventModerateView.get_next_event(d, None, None)
- return HttpResponseRedirect(reverse_lazy("moderate_event", args=[obj.pk]))
-
-
-class EventDeleteView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- DeleteView,
-):
- model = Event
- permission_required = "agenda_culturel.delete_event"
- success_url = reverse_lazy("recent")
- success_message = _("The event has been successfully deleted.")
-
-
-class EventDetailView(UserPassesTestMixin, DetailView, ModelFormMixin):
- model = Event
- form_class = MessageEventForm
- template_name = "agenda_culturel/page-event.html"
- queryset = (
- Event.objects.select_related("exact_location")
- .select_related("category")
- .select_related("other_versions")
- .select_related("other_versions__representative")
- .prefetch_related("message_set")
- )
-
- def test_func(self):
- return (
- self.request.user.is_authenticated
- or self.get_object().status == Event.STATUS.PUBLISHED
- )
-
- def get_object(self):
- o = super().get_object()
- o.download_missing_image()
- if "year" in self.kwargs:
- y = self.kwargs["year"]
- m = self.kwargs["month"]
- d = self.kwargs["day"]
- obj = o.get_recurrence_at_date(y, m, d)
- obj.set_current_date(date(y, m, d))
- return obj
- else:
- return o
-
- def get_success_url(self):
- return self.get_object().get_absolute_url() + "#chronology"
-
- def post(self, request, *args, **kwargs):
- if not request.user.is_authenticated:
- return HttpResponseForbidden()
- form = self.get_form()
- if form.is_valid():
- return self.form_valid(form)
- else:
- return self.form_invalid(form)
-
- def form_valid(self, form):
- message = form.save(commit=False)
- message.user = self.request.user
- message.related_event = self.get_object()
- message.subject = _("Comment")
- message.spam = False
- message.closed = True
- message.save()
-
- return super().form_valid(form)
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.change_event")
-def change_status_event(request, pk, status):
- event = get_object_or_404(Event, pk=pk)
-
- if request.method == "POST":
- event.status = Event.STATUS(status)
- fields = ["status", "moderated_date", "moderated_by_user"]
- event.set_in_moderation_process()
- event.update_modification_dates()
- event.save(update_fields=fields)
- with_msg = event.notify_if_required(request)
- if with_msg:
- messages.success(
- request,
- _(
- "The status has been successfully modified and a message has been sent to the person who proposed the event."
- ),
- )
- else:
- messages.success(request, _("The status has been successfully modified."))
-
- return HttpResponseRedirect(event.get_absolute_url())
-
- else:
- cancel_url = request.META.get("HTTP_REFERER", "")
- if cancel_url == "":
- cancel_url = reverse_lazy("home")
- return render(
- request,
- "agenda_culturel/event_confirm_change_status.html",
- {"status": status, "event": event, "cancel_url": cancel_url},
- )
-
-
-def import_event_proxy(request):
- return render(request, "agenda_culturel/event_import.html")
-
-
-class EventCreateView(SuccessMessageMixin, CreateView):
- model = Event
- form_class = EventFormWithContact
-
- def get_form_kwargs(self):
- kwargs = super().get_form_kwargs()
- kwargs["is_authenticated"] = self.request.user.is_authenticated
- return kwargs
-
- def get_success_url(self):
- if self.request.user.is_authenticated:
- if "save_and_next" in self.request.POST:
- return reverse_lazy("moderate_event_next", args=[self.object.pk])
- else:
- return self.object.get_absolute_url()
- else:
- return reverse_lazy("home")
-
- def get_success_message(self, cleaned_data):
- if self.request.user.is_authenticated:
- return mark_safe(
- _('The event was created: {}.').format(
- self.object.get_absolute_url(), self.object.title
- )
- )
- else:
- return _(
- "The event has been submitted and will be published as soon as it has been validated by the moderation team."
- )
-
- def form_valid(self, form):
- if form.cleaned_data["simple_cloning"]:
- form.instance.set_skip_duplicate_check()
-
- if form.cleaned_data["cloning"]:
- form.instance.set_in_moderation_process()
-
- if form.cleaned_data.get("email") or form.cleaned_data.get("comments"):
- has_comments = form.cleaned_data.get("comments") not in ["", None]
- form.instance.add_message(
- Message(
- subject=_("during the creation process"),
- message=form.cleaned_data.get("comments"),
- email=form.cleaned_data.get("email"),
- closed=False,
- message_type=(
- Message.TYPE.FROM_CONTRIBUTOR
- if has_comments
- else Message.TYPE.FROM_CONTRIBUTOR_NO_MSG
- ),
- )
- )
-
- form.instance.import_sources = None
- form.instance.set_processing_user(self.request.user)
-
- result = super().form_valid(form)
-
- if form.cleaned_data["cloning"]:
- with_msg = form.instance.notify_if_required(self.request)
- if with_msg:
- messages.success(
- self.request,
- _(
- "A message has been sent to the person who proposed the initial event."
- ),
- )
-
- return result
-
-
-# A class to evaluate the URL according to the existing events and the authentification
-# level of the user
-class URLEventEvaluation:
- def __init__(self, form, is_authenticated):
- self.form = form
- self.is_authenticated = is_authenticated
-
- self.cat = None
- self.tags = []
- self.existing = None
- self.url = form.cleaned_data.get("url")
- self.event = None
- if self.url is not None:
- self.url = Extractor.clean_url(self.url)
- # we check if the url is known
- self.existing = Event.objects.filter(uuids__contains=[self.url])
- # if it's unknown
- if len(self.existing) == 0:
- self.existing = None
- self.cat = form.cleaned_data.get("category")
- if self.cat is not None:
- self.cat = self.cat.name
- self.tags = form.cleaned_data.get("tags")
-
- else:
- published = [
- e for e in self.existing if e.status == Event.STATUS.PUBLISHED
- ]
-
- if self.is_authenticated or len(published) > 1:
- self.event = (
- published[0] if len(published) > 1 else self.existing[0]
- )
- else:
- self.event = None
-
- def exists(self):
- return self.url is not None
-
- def is_new(self):
- return self.exists() and self.existing is None
-
- def is_event_visible(self):
- return self.event is not None
-
- def get_event(self):
- if self.event is None:
- return None
- else:
- return self.event
-
- def get_link(self):
- e = self.get_event()
- if e is None:
- return ""
- else:
- return '' + escape(e.title) + ""
-
- def to_list(self):
- if self.is_new():
- return (self.url, self.cat, self.tags)
-
-
-def import_from_urls(request):
- if request.method == "POST":
- formset = URLSubmissionFormSet(request.POST, request.FILES)
-
- if not request.user.is_authenticated:
- contactform = SimpleContactForm(request.POST)
- if formset.is_valid() and (
- request.user.is_authenticated or contactform.is_valid()
- ):
- # evaluate all the forms
- ucat = [
- URLEventEvaluation(form, request.user.is_authenticated)
- for form in formset.forms
- ]
-
- # for each not new, add a message
- for uc in ucat:
- if uc.exists() and not uc.is_new():
- if uc.is_event_visible():
- messages.info(
- request,
- mark_safe(
- _(
- "{} has not been submitted since it"
- "s already known: {}."
- ).format(uc.url, uc.get_link())
- ),
- )
- else:
- messages.info(
- request,
- _(
- "{} has not been submitted since it"
- "s already known and currently into moderation process."
- ).format(uc.url),
- )
-
- # keep only new ones
- ucat = [uc.to_list() for uc in ucat if uc.is_new()]
-
- # finally process them or go back to home page
- if len(ucat) > 0:
- messages.info(
- request,
- _("Integrating {} url(s) into our import process.").format(
- len(ucat)
- ),
- )
- email = None
- comments = None
- if not request.user.is_authenticated:
- email = contactform.cleaned_data["email"]
- comments = contactform.cleaned_data["comments"]
- import_events_from_urls.delay(
- ucat,
- user_id=request.user.pk if request.user else None,
- email=email,
- comments=comments,
- )
- return HttpResponseRedirect(reverse("thank_you"))
- else:
- return HttpResponseRedirect(reverse("home"))
-
- else:
- formset = URLSubmissionFormSet()
- if not request.user.is_authenticated:
- contactform = SimpleContactForm()
-
- context = {"formset": formset}
- if not request.user.is_authenticated:
- context["contactform"] = contactform
- return render(request, "agenda_culturel/import_set.html", context=context)
-
-
-def import_from_url(request):
- form = URLSubmissionFormWithContact(is_authenticated=request.user.is_authenticated)
-
- initial = {
- "start_day": date.today() + timedelta(days=1),
- "start_time": "20:00",
- "end_time": "22:00",
- }
-
- form_event = EventForm(initial=initial)
-
- # if the form has been sent
- if request.method == "POST":
- form = URLSubmissionFormWithContact(
- request.POST, is_authenticated=request.user.is_authenticated
- )
-
- # if the form is valid
- if form.is_valid():
- uc = URLEventEvaluation(form, request.user.is_authenticated)
-
- if uc.exists() and not uc.is_new():
- if uc.is_event_visible():
- messages.info(
- request,
- mark_safe(
- _(
- "{} has not been submitted since its already known: {}."
- ).format(uc.url, uc.get_link())
- ),
- )
- return HttpResponseRedirect(uc.get_event().get_absolute_url())
- else:
- messages.info(
- request,
- _(
- "{} has not been submitted since it"
- "s already known and currently into moderation process."
- ).format(uc.url),
- )
- return HttpResponseRedirect(reverse("home"))
-
- else:
- messages.info(
- request,
- _("Integrating {} into our import process.").format(uc.url),
- )
- import_events_from_url.delay(
- uc.url,
- uc.cat,
- uc.tags,
- user_id=request.user.pk if request.user else None,
- email=form.cleaned_data.get("email"),
- comments=form.cleaned_data.get("comments"),
- )
- return HttpResponseRedirect(reverse("thank_you"))
-
- return render(
- request,
- "agenda_culturel/import.html",
- context={"form": form, "form_event": form_event},
- )
-
-
-def export_event_ical(request, year, month, day, pk):
- event = get_object_or_404(Event, pk=pk)
- event = event.get_recurrence_at_date(year, month, day)
-
- events = list()
- events.append(event)
-
- cal = Event.export_to_ics(events, request)
-
- response = HttpResponse(content_type="text/calendar")
- response.content = cal.to_ical().decode("utf-8").replace("\r\n", "\n")
- response["Content-Disposition"] = "attachment; filename={0}{1}".format(
- event.title.replace("\n", " ").replace("\r", "")[0:32], ".ics"
- )
-
- return response
-
-
-def export_ical(request, cat=None, tag=None, organisation_pk=None, place_pk=None):
- now = date.today()
-
- qs = get_event_qs(request)
- if cat is not None:
- category = Category.objects.filter(slug=cat).first()
- qs = qs.filter(category=category)
- else:
- category = None
-
- if place_pk is not None:
- qs = qs.filter(exact_location=place_pk)
- if organisation_pk is not None:
- qs = qs.filter(organisers__in=[organisation_pk])
- if tag is not None:
- qs = qs.filter(tags__in=[tag])
-
- request = EventFilter.set_default_values(request)
- filter = EventFilter(request.GET, queryset=qs, request=request)
-
- if filter.has_category_parameters():
- return HttpResponseRedirect(filter.get_new_url())
-
- id_cache = hashlib.md5(
- (
- filter.get_url()
- + "-"
- + str(tag)
- + "-"
- + str(cat)
- + "-"
- + str(organisation_pk)
- + "-"
- + str(place_pk)
- ).encode("utf8")
- ).hexdigest()
- ical = cache.get(id_cache)
- if not ical:
- calendar = CalendarList(
- now + timedelta(days=-7), now + timedelta(days=+60), filter
- )
- ical = calendar.export_to_ics(request)
- cache.set(id_cache, ical, 3600) # 1 heure
-
- response = HttpResponse(content_type="text/calendar")
- response.content = ical.to_ical().decode("utf-8").replace("\r\n", "\n")
- extra = filter.to_str(" ")
- if extra is None:
- extra = ""
- if category is not None:
- extra += " " + str(category)
- if place_pk is not None:
- extra += (
- " @ " + Place.objects.filter(pk=place_pk).values("name").first()["name"]
- )
- if organisation_pk is not None:
- extra += (
- " - "
- + Organisation.objects.filter(pk=organisation_pk)
- .values("name")
- .first()["name"]
- )
- if tag is not None:
- extra += " - " + emoji.replace_emoji(tag, replace="")
-
- response["Content-Disposition"] = "attachment; filename={0}{1}{2}".format(
- "Pommes de lune", extra, ".ics"
- )
-
- return response
-
-
-@method_decorator(check_honeypot, name="post")
-class MessageCreateView(SuccessMessageMixin, CreateView):
- model = Message
- template_name = "agenda_culturel/message_create_form.html"
- form_class = MessageForm
-
- success_url = reverse_lazy("home")
- success_message = _("Your message has been sent successfully.")
-
- def __init__(self, *args, **kwargs):
- self.event = None
- super().__init__(*args, **kwargs)
-
- def get_form(self, form_class=None):
- if form_class is None:
- form_class = self.get_form_class()
- return form_class(**self.get_form_kwargs())
-
- def get_form_kwargs(self):
- kwargs = super().get_form_kwargs()
- kwargs["event"] = self.event
- if self.request.user.is_authenticated:
- kwargs["internal"] = True
- return kwargs
-
- def form_valid(self, form):
- if self.request.user.is_authenticated:
- form.instance.user = self.request.user
- form.instance.message_type = (
- Message.TYPE.EVENT_REPORT
- if "pk" in self.kwargs
- else Message.TYPE.CONTACT_FORM
- )
- return super().form_valid(form)
-
- def get_initial(self):
- result = super().get_initial()
- if "pk" in self.kwargs:
- self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
- result["related_event"] = self.event
- result["subject"] = _("Reporting the event {} on {}").format(
- self.event.title, self.event.start_day
- )
- else:
- result["related_event"] = None
- return result
-
-
-class MessageDeleteView(SuccessMessageMixin, DeleteView):
- model = Message
- success_message = _("The contact message has been successfully deleted.")
- success_url = reverse_lazy("messages")
-
-
-class MessageUpdateView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- UpdateView,
-):
- model = Message
- permission_required = "agenda_culturel.change_message"
- template_name = "agenda_culturel/message_moderation_form.html"
- fields = ("spam", "closed", "comments")
-
- success_message = _(
- "The contact message properties has been successfully modified."
- )
-
- success_url = reverse_lazy("messages")
-
- def get_form_kwargs(self):
- """Return the keyword arguments for instantiating the form."""
- kwargs = super().get_form_kwargs()
- if hasattr(self, "object"):
- kwargs.update({"instance": self.object})
- return kwargs
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_event")
-def activite(request):
- now = date.today()
-
- days = [now]
- while len(days) < 7 or days[-1].weekday() != 0:
- days.append(days[-1] + timedelta(days=-1))
-
- weeks = [days[-1]]
- for w in range(0, 8):
- weeks.append(weeks[-1] + timedelta(days=-7))
-
- daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
- weekly_modifications = Event.get_count_modifications([(w, 7) for w in weeks])
-
- return render(
- request,
- "agenda_culturel/page-activity.html",
- {
- "daily_modifications": daily_modifications,
- "weekly_modifications": weekly_modifications,
- },
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_event")
-def administration(request):
- nb_mod_days = 21
- nb_classes = 4
- today = date.today()
-
- # get information about recent modifications
- days = [today]
- for i in range(0, 2):
- days.append(days[-1] + timedelta(days=-1))
- daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
-
- # get last created events
- events = (
- Event.objects.all()
- .order_by("-created_date")
- .select_related("exact_location", "category")[:5]
- )
-
- # get last batch imports
- rel_event = Event.objects.filter(
- import_sources__contains=[OuterRef("url_source")]
- ).values("pk")[:1]
- batch_imports = (
- BatchImportation.objects.all()
- .select_related("recurrentImport")
- .annotate(event_id=Subquery(rel_event))
- .order_by("-created_date")[:5]
- )
-
- # get info about batch information
- newest = (
- BatchImportation.objects.filter(recurrentImport=OuterRef("pk"))
- .order_by("-created_date")
- .select_related("recurrentImport")
- )
- imported_events = RecurrentImport.objects.annotate(
- last_run_status=Subquery(newest.values("status")[:1])
- )
-
- nb_failed = imported_events.filter(
- last_run_status=BatchImportation.STATUS.FAILED
- ).count()
- nb_canceled = imported_events.filter(
- last_run_status=BatchImportation.STATUS.CANCELED
- ).count()
- nb_running = imported_events.filter(
- last_run_status=BatchImportation.STATUS.RUNNING
- ).count()
- nb_all = imported_events.count()
-
- # get some info about imported (or not) events
- srcs = RecurrentImport.objects.all().values_list("source")
- in_future = Event.objects.filter(Q(start_day__gte=today))
- nb_in_rimport = in_future.filter(Q(import_sources__overlap=srcs)).count()
- nb_in_orphan_import = in_future.filter(
- (
- Q(import_sources__isnull=False)
- & (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
- )
- & ~Q(import_sources__overlap=srcs)
- ).count()
-
- # get all non moderated events
- nb_not_moderated = Event.get_nb_not_moderated(today, nb_mod_days, nb_classes)
-
- return render(
- request,
- "agenda_culturel/administration.html",
- {
- "daily_modifications": daily_modifications,
- "events": events,
- "batch_imports": batch_imports,
- "nb_failed": nb_failed,
- "nb_canceled": nb_canceled,
- "nb_running": nb_running,
- "nb_all": nb_all,
- "nb_not_moderated": nb_not_moderated,
- "nb_in_rimport": nb_in_rimport,
- "nb_in_orphan_import": nb_in_orphan_import,
- },
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_event")
-def recent(request):
- filter = EventFilterAdmin(
- request.GET, queryset=Event.objects.all().order_by("-created_date")
- )
- paginator = PaginatorFilter(filter, 10, request)
- page = request.GET.get("page")
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- return render(
- request,
- "agenda_culturel/recent.html",
- {"filter": filter, "paginator_filter": response},
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_message")
-def view_messages(request):
- filter = MessagesFilterAdmin(
- request.GET, queryset=Message.objects.all().order_by("-date")
- )
- paginator = PaginatorFilter(filter, 10, request)
- page = request.GET.get("page")
-
- nb_spams = Message.objects.filter(spam=True).count()
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- return render(
- request,
- "agenda_culturel/messages.html",
- {"filter": filter, "nb_spams": nb_spams, "paginator_filter": response},
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_message")
-def delete_cm_spam(request):
- if request.method == "POST":
- Message.objects.filter(spam=True).delete()
-
- messages.success(request, _("Spam has been successfully deleted."))
- return HttpResponseRedirect(reverse_lazy("messages"))
- else:
- nb_msgs = Message.objects.values("spam").annotate(total=Count("spam"))
- nb_total = sum([nb["total"] for nb in nb_msgs])
- nb_spams = sum([nb["total"] for nb in nb_msgs if nb["spam"]])
- cancel_url = reverse_lazy("messages")
- return render(
- request,
- "agenda_culturel/delete_spams_confirm.html",
- {
- "nb_total": nb_total,
- "nb_spams": nb_spams,
- "cancel_url": cancel_url,
- },
- )
-
-
-def event_search(request, full=False):
- categories = None
- tags = None
- places = None
- organisations = None
- rimports = None
-
- qs = get_event_qs(request).order_by("-start_day")
- if not request.user.is_authenticated:
- qs = qs.filter(
- (
- Q(other_versions__isnull=True)
- | Q(other_versions__representative=F("pk"))
- | Q(other_versions__representative__isnull=True)
- )
- )
- if full:
- filter = SearchEventFilter(
- request.GET,
- queryset=qs,
- request=request,
- )
- else:
- filter = SimpleSearchEventFilter(
- request.GET,
- queryset=qs,
- request=request,
- )
- if "q" in request.GET:
- categories = Category.objects.filter(name__icontains=request.GET["q"])
- s_q = remove_accents(request.GET["q"].lower())
- tags = (
- Event.objects.extra(
- where=["%s ILIKE ANY (tags)"], params=[request.GET["q"]]
- )
- .annotate(arr_tags=Func(F("tags"), function="unnest"))
- .values_list("arr_tags", flat=True)
- .distinct()
- )
- tags = [
- (
- t,
- emoji.demojize(remove_accents(t).lower(), delimiters=("000", "")),
- )
- for t in tags
- ]
- tags = [t for t in tags if s_q == t[1]]
- tags.sort(key=lambda x: x[1])
- tags = [t[0] for t in tags]
- places = Place.objects.filter(
- Q(name__icontains=request.GET["q"])
- | Q(description__icontains=request.GET["q"])
- | Q(city__icontains=request.GET["q"])
- )
- organisations = Organisation.objects.filter(
- Q(name__icontains=request.GET["q"])
- | Q(description__icontains=request.GET["q"])
- )
- if request.user.is_authenticated:
- rimports = RecurrentImport.objects.filter(
- name__icontains=request.GET["q"]
- )
-
- paginator = PaginatorFilter(filter, 10, request)
- page = request.GET.get("page")
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- return render(
- request,
- "agenda_culturel/search.html",
- {
- "filter": filter,
- "categories": categories,
- "tags": tags,
- "places": places,
- "organisations": organisations,
- "rimports": rimports,
- "has_results": len(request.GET) != 0
- or (len(request.GET) > 1 and "page" in request.GET),
- "paginator_filter": response,
- "full": full,
- },
- )
-
-
-def event_search_full(request):
- return event_search(request, True)
-
-
-#########################
-## batch importations
-#########################
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_batchimportation")
-def imports(request):
- rel_event = Event.objects.filter(
- import_sources__contains=[OuterRef("url_source")]
- ).values("pk")[:1]
- paginator = Paginator(
- BatchImportation.objects.all()
- .order_by("-created_date")
- .annotate(event_id=Subquery(rel_event)),
- 30,
- )
- page = request.GET.get("page")
-
- today = date.today()
-
- srcs = RecurrentImport.objects.all().values_list("source")
- in_future = Event.objects.filter(Q(start_day__gte=today))
- nb_in_orphan_import = in_future.filter(
- (
- Q(import_sources__isnull=False)
- & (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
- )
- & ~Q(import_sources__overlap=srcs)
- ).count()
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- return render(
- request,
- "agenda_culturel/imports.html",
- {
- "paginator_filter": response,
- "nb_in_orphan_import": nb_in_orphan_import,
- },
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- [
- "agenda_culturel.add_batchimportation",
- "agenda_culturel.run_batchimportation",
- ]
-)
-def add_import(request):
- form = BatchImportationForm()
-
- if request.method == "POST":
- form = BatchImportationForm(request.POST)
-
- if form.is_valid():
- import_events_from_json.delay(form.data["json"])
-
- messages.success(request, _("The import has been run successfully."))
- return HttpResponseRedirect(reverse_lazy("imports"))
-
- return render(request, "agenda_culturel/batchimportation_form.html", {"form": form})
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- [
- "agenda_culturel.view_batchimportation",
- "agenda_culturel.run_batchimportation",
- ]
-)
-def cancel_import(request, pk):
- import_process = get_object_or_404(BatchImportation, pk=pk)
-
- if request.method == "POST":
- celery_app.control.revoke(import_process.celery_id)
-
- import_process.status = BatchImportation.STATUS.CANCELED
- import_process.save(update_fields=["status"])
-
- messages.success(request, _("The import has been canceled."))
- return HttpResponseRedirect(reverse_lazy("imports"))
- else:
- cancel_url = reverse_lazy("imports")
- return render(
- request,
- "agenda_culturel/cancel_import_confirm.html",
- {"object": import_process, "cancel_url": cancel_url},
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- [
- "agenda_culturel.view_batchimportation",
- "agenda_culturel.run_batchimportation",
- ]
-)
-def update_orphan_events(request):
- if request.method == "POST":
- # run recurrent import
- update_orphan_pure_import_events.delay()
-
- messages.success(request, _("The orphan event update has been launched."))
- return HttpResponseRedirect(reverse_lazy("imports"))
- else:
- today = date.today()
-
- srcs = RecurrentImport.objects.all().values_list("source")
- in_future = Event.objects.filter(Q(start_day__gte=today))
- nb_in_orphan_import = in_future.filter(
- (
- Q(import_sources__isnull=False)
- & (
- Q(modified_date__isnull=True)
- | Q(modified_date__lte=F("imported_date"))
- )
- )
- & ~Q(import_sources__overlap=srcs)
- ).count()
- return render(
- request,
- "agenda_culturel/run_orphan_imports_confirm.html",
- {"nb_in_orphan_import": nb_in_orphan_import},
- )
-
-
-#########################
-## recurrent importations
-#########################
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_recurrentimport")
-def recurrent_imports(request, status=None):
- newest = BatchImportation.objects.filter(recurrentImport=OuterRef("pk")).order_by(
- "-created_date"
- )
-
- qs = (
- RecurrentImport.objects.all()
- .annotate(last_run_status=Subquery(newest.values("status")[:1]))
- .order_by("-pk")
- )
-
- nb_failed = qs.filter(last_run_status=BatchImportation.STATUS.FAILED).count()
- nb_canceled = qs.filter(last_run_status=BatchImportation.STATUS.CANCELED).count()
- nb_running = qs.filter(last_run_status=BatchImportation.STATUS.RUNNING).count()
- nb_all = qs.count()
-
- if status is not None:
- qs = qs.filter(last_run_status=status)
-
- filter = RecurrentImportFilter(request.GET, queryset=qs)
-
- paginator = PaginatorFilter(filter, 20, request)
-
- page = request.GET.get("page")
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- return render(
- request,
- "agenda_culturel/rimports.html",
- {
- "paginator_filter": response,
- "filter": filter,
- "nb_all": nb_all,
- "nb_failed": nb_failed,
- "nb_canceled": nb_canceled,
- "nb_running": nb_running,
- "status": status,
- },
- )
-
-
-class RecurrentImportCreateView(
- LoginRequiredMixin, PermissionRequiredMixin, CreateView
-):
- model = RecurrentImport
- permission_required = "agenda_culturel.add_recurrentimport"
- success_url = reverse_lazy("recurrent_imports")
- form_class = RecurrentImportForm
-
-
-class RecurrentImportUpdateView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- UpdateView,
-):
- model = RecurrentImport
- permission_required = "agenda_culturel.change_recurrentimport"
- form_class = RecurrentImportForm
- success_message = _("The recurrent import has been successfully modified.")
-
-
-class RecurrentImportDeleteView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- DeleteView,
-):
- model = RecurrentImport
- permission_required = "agenda_culturel.delete_recurrentimport"
- success_url = reverse_lazy("recurrent_imports")
- success_message = _("The recurrent import has been successfully deleted.")
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- [
- "agenda_culturel.view_recurrentimport",
- "agenda_culturel.view_batchimportation",
- ]
-)
-def view_rimport(request, pk):
- obj = get_object_or_404(RecurrentImport, pk=pk)
- paginator = Paginator(
- BatchImportation.objects.filter(recurrentImport=pk).order_by("-created_date"),
- 10,
- )
- page = request.GET.get("page")
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- return render(
- request,
- "agenda_culturel/page-rimport.html",
- {"paginator_filter": response, "object": obj},
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- [
- "agenda_culturel.view_recurrentimport",
- "agenda_culturel.run_recurrentimport",
- ]
-)
-def run_rimport(request, pk):
- rimport = get_object_or_404(RecurrentImport, pk=pk)
-
- if request.method == "POST":
- # run recurrent import
- run_recurrent_import.delay(pk)
-
- messages.success(request, _("The import has been launched."))
- return HttpResponseRedirect(reverse_lazy("view_rimport", args=[pk]))
- else:
- return render(
- request,
- "agenda_culturel/run_rimport_confirm.html",
- {"object": rimport},
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- [
- "agenda_culturel.view_recurrentimport",
- "agenda_culturel.run_recurrentimport",
- ]
-)
-def run_all_rimports(request, status=None):
- if request.method == "POST":
- # run recurrent import
- if status == BatchImportation.STATUS.FAILED:
- run_all_recurrent_imports_failed.delay()
- elif status == BatchImportation.STATUS.CANCELED:
- run_all_recurrent_imports_canceled.delay()
- else:
- run_all_recurrent_imports.delay()
-
- messages.success(request, _("Imports has been launched."))
- return HttpResponseRedirect(reverse_lazy("recurrent_imports"))
- else:
- if status == BatchImportation.STATUS.FAILED:
- return render(request, "agenda_culturel/run_failed_rimports_confirm.html")
- elif status == BatchImportation.STATUS.CANCELED:
- return render(request, "agenda_culturel/run_canceled_rimports_confirm.html")
- else:
- return render(request, "agenda_culturel/run_all_rimports_confirm.html")
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- [
- "agenda_culturel.view_recurrentimport",
- "agenda_culturel.run_recurrentimport",
- ]
-)
-def run_all_fb_rimports(request, status=None):
- if request.method == "POST":
- run_all_recurrent_imports.delay(True)
-
- messages.success(request, _("Facebook imports has been launched."))
- return HttpResponseRedirect(reverse_lazy("recurrent_imports"))
- else:
- return render(request, "agenda_culturel/run_all_fb_rimports_confirm.html")
-
-
-#########################
-## duplicated events
-#########################
-
-
-class DuplicatedEventsDetailView(LoginRequiredMixin, DetailView):
- model = DuplicatedEvents
- template_name = "agenda_culturel/duplicate.html"
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- ["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
-)
-def update_duplicate_event(request, pk, epk):
- edup = get_object_or_404(DuplicatedEvents, pk=pk)
- event = get_object_or_404(Event, pk=epk)
-
- form = MergeDuplicates(duplicates=edup, event=event)
-
- if request.method == "POST":
- form = MergeDuplicates(request.POST, duplicates=edup)
- if form.is_valid():
- for f in edup.get_items_comparison():
- if not f["similar"]:
- selected = form.get_selected_events(f["key"])
- if selected is not None:
- if isinstance(selected, list):
- values = [
- x
- for x in [getattr(s, f["key"]) for s in selected]
- if x is not None
- ]
- if len(values) != 0:
- if isinstance(values[0], str):
- setattr(event, f["key"], "\n".join(values))
- else:
- setattr(event, f["key"], sum(values, []))
- else:
- if f["key"] == "organisers":
- event.organisers.set(selected.organisers.all())
- else:
- setattr(
- event,
- f["key"],
- getattr(selected, f["key"]),
- )
- if f["key"] == "image":
- setattr(
- event,
- "local_image",
- getattr(selected, "local_image"),
- )
-
- event.other_versions.fix(event)
- event.save()
-
- messages.info(request, _("Update successfully completed."))
- return HttpResponseRedirect(event.get_absolute_url())
-
- return render(
- request,
- "agenda_culturel/update_duplicate.html",
- context={
- "form": form,
- "object": edup,
- "event_id": edup.get_event_index(event),
- },
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- ["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
-)
-def merge_duplicate(request, pk):
- edup = get_object_or_404(DuplicatedEvents, pk=pk)
- form = MergeDuplicates(duplicates=edup)
-
- if request.method == "POST":
- form = MergeDuplicates(request.POST, duplicates=edup)
- if form.is_valid():
- events = edup.get_duplicated()
-
- # build fields for the new event
- new_event_data = {}
- for f in edup.get_items_comparison():
- if f["similar"]:
- new_event_data[f["key"]] = getattr(events[0], f["key"])
- else:
- selected = form.get_selected_events(f["key"])
- if selected is None:
- new_event_data[f["key"]] = None
- elif isinstance(selected, list):
- values = [
- x
- for x in [getattr(s, f["key"]) for s in selected]
- if x is not None
- ]
- if len(values) == 0:
- new_event_data[f["key"]] = None
- else:
- if isinstance(values[0], str):
- new_event_data[f["key"]] = "\n".join(values)
- else:
- new_event_data[f["key"]] = sum(values, [])
- else:
- new_event_data[f["key"]] = getattr(selected, f["key"])
- if f["key"] == "image" and "local_image" not in new_event_data:
- new_event_data["local_image"] = getattr(
- selected, "local_image"
- )
-
- organisers = new_event_data.pop("organisers", None)
- # create a new event that merge the selected events
- new_event = Event(**new_event_data)
- new_event.status = Event.STATUS.PUBLISHED
- new_event.other_versions = edup
- new_event.save()
- if organisers is not None:
- new_event.organisers.set(organisers.all())
- edup.fix(new_event)
-
- messages.info(
- request,
- _("Creation of a merged event has been successfully completed."),
- )
- return HttpResponseRedirect(new_event.get_absolute_url())
-
- return render(
- request,
- "agenda_culturel/merge_duplicate.html",
- context={"form": form, "object": edup},
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required(
- ["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
-)
-def fix_duplicate(request, pk):
- edup = get_object_or_404(DuplicatedEvents.objects.select_related(), pk=pk)
-
- if request.method == "POST":
- form = FixDuplicates(request.POST, edup=edup)
-
- if form.is_valid():
- if form.is_action_no_duplicates():
- # all events are different
- events = edup.get_duplicated()
-
- # get redirection date
- if len(events) == 0:
- date = None
- else:
- s_events = [e for e in events if not e.has_recurrences()]
- if len(s_events) != 0:
- s_event = s_events[0]
- else:
- s_event = events[0]
- date = s_event.start_day
-
- messages.success(request, _("Events have been marked as unduplicated."))
- # delete the duplicated event (other_versions will be set to None on all events)
- edup.delete()
- if date is None:
- return HttpResponseRedirect(reverse_lazy("home"))
- else:
- return HttpResponseRedirect(
- reverse_lazy("day_view", args=[date.year, date.month, date.day])
- )
-
- elif form.is_action_select():
- # one element has been selected to be the representative
- selected = form.get_selected_event(edup)
- if selected is None:
- messages.error(
- request,
- _(
- "The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
- ),
- )
- else:
- edup.fix(selected)
- messages.success(
- request,
- _("The selected event has been set as representative"),
- )
- return HttpResponseRedirect(edup.get_absolute_url())
- elif form.is_action_remove():
- # one element is removed from the set
- event = form.get_selected_event(edup)
- if event is None:
- messages.error(
- request,
- _(
- "The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
- ),
- )
- return HttpResponseRedirect(edup.get_absolute_url())
- else:
- event.other_versions = None
- if edup.representative == event:
- edup.representative = None
- event.set_no_modification_date_changed()
- event.save()
- edup.save()
- edup.events = [e for e in edup.events if e.pk != event.pk]
- messages.success(
- request,
- _(
- "The event has been withdrawn from the group and made independent."
- ),
- )
- if edup.nb_duplicated() == 1:
- return HttpResponseRedirect(edup.get_absolute_url())
- else:
- form = FixDuplicates(edup=edup)
- elif form.is_action_update():
- # otherwise, an event will be updated using other elements
- event = form.get_selected_event(edup)
- if event is None:
- messages.error(
- request,
- _(
- "The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
- ),
- )
- return HttpResponseRedirect(edup.get_absolute_url())
- else:
- return HttpResponseRedirect(
- reverse_lazy("update_event", args=[edup.pk, event.pk])
- )
- else:
- # otherwise, a new event will be created using a merging process
- return HttpResponseRedirect(
- reverse_lazy("merge_duplicate", args=[edup.pk])
- )
- else:
- form = FixDuplicates(edup=edup)
-
- return render(
- request,
- "agenda_culturel/fix_duplicate.html",
- context={"form": form, "object": edup},
- )
-
-
-class DuplicatedEventsUpdateView(LoginRequiredMixin, UpdateView):
- model = DuplicatedEvents
- fields = ()
- template_name = "agenda_culturel/fix_duplicate.html"
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_duplicatedevents")
-def duplicates(request):
- nb_removed = DuplicatedEvents.remove_singletons()
- if nb_removed > 0:
- messages.success(
- request,
- _("Cleaning up duplicates: {} item(s) fixed.").format(nb_removed),
- )
-
- filter = DuplicatedEventsFilter(
- request.GET, queryset=DuplicatedEvents.objects.all().order_by("-pk")
- )
- paginator = PaginatorFilter(filter, 10, request)
- page = request.GET.get("page")
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- return render(
- request,
- "agenda_culturel/duplicates.html",
- {
- "filter": filter,
- "paginator_filter": response,
- "paginator": paginator,
- },
- )
-
-
-def set_duplicate(request, year, month, day, pk):
- event = get_object_or_404(Event, pk=pk)
- cday = CalendarDay(date(year, month, day))
- others = [
- e
- for e in cday.get_events()
- if e != event
- and (event.other_versions is None or event.other_versions != e.other_versions)
- and e.status != Event.STATUS.TRASH
- ]
-
- form = SelectEventInList(events=others)
-
- if request.method == "POST":
- form = SelectEventInList(request.POST, events=others)
- if form.is_valid():
- selected = [o for o in others if o.pk == int(form.cleaned_data["event"])]
- event.set_other_versions(selected)
- # save them without updating modified date
- event.set_no_modification_date_changed()
- event.save()
- if request.user.is_authenticated:
- messages.success(request, _("The event was successfully duplicated."))
- return HttpResponseRedirect(
- reverse_lazy("view_duplicate", args=[event.other_versions.pk])
- )
- else:
- messages.info(
- request,
- _(
- "The event has been successfully flagged as a duplicate. The moderation team will deal with your suggestion shortly."
- ),
- )
- return HttpResponseRedirect(event.get_absolute_url())
-
- return render(
- request,
- "agenda_culturel/set_duplicate.html",
- context={"form": form, "event": event},
- )
-
-
-#########################
-## categorisation rules
-#########################
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.view_categorisationrule")
-def categorisation_rules(request):
- paginator = Paginator(
- CategorisationRule.objects.all()
- .order_by("pk")
- .select_related("category")
- .select_related("place"),
- 100,
- )
- page = request.GET.get("page")
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- return render(
- request,
- "agenda_culturel/categorisation_rules.html",
- {"paginator_filter": response},
- )
-
-
-class CategorisationRuleCreateView(
- LoginRequiredMixin, PermissionRequiredMixin, CreateView
-):
- model = CategorisationRule
- permission_required = "agenda_culturel.add_categorisationrule"
- success_url = reverse_lazy("categorisation_rules")
- form_class = CategorisationRuleImportForm
-
-
-class CategorisationRuleUpdateView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- UpdateView,
-):
- model = CategorisationRule
- permission_required = "agenda_culturel.change_categorisationrule"
- form_class = CategorisationRuleImportForm
- success_url = reverse_lazy("categorisation_rules")
- success_message = _("The categorisation rule has been successfully modified.")
-
-
-class CategorisationRuleDeleteView(
- SuccessMessageMixin,
- PermissionRequiredMixin,
- LoginRequiredMixin,
- DeleteView,
-):
- model = CategorisationRule
- permission_required = "agenda_culturel.delete_categorisationrule"
- success_url = reverse_lazy("categorisation_rules")
- success_message = _("The categorisation rule has been successfully deleted.")
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.apply_categorisationrules")
-def apply_categorisation_rules(request):
- if request.method == "POST":
- form = CategorisationForm(request.POST)
- if form.is_valid():
- nb = 0
- for epk, c in form.get_validated():
- e = Event.objects.get(pk=epk)
- cat = Category.objects.filter(name=c).first()
- e.category = cat
- e.save()
- nb += 1
-
- if nb != 0:
- if nb == 1:
- messages.success(
- request,
- _(
- "The rules were successfully applied and 1 event was categorised."
- ),
- )
- else:
- messages.success(
- request,
- _(
- "The rules were successfully applied and {} events were categorised."
- ).format(nb),
- )
- else:
- messages.info(
- request,
- _(
- "The rules were successfully applied and no events were categorised."
- ),
- )
-
- return HttpResponseRedirect(reverse_lazy("categorisation_rules"))
- else:
- return render(
- request,
- "agenda_culturel/categorise_events_form.html",
- context={"form": form},
- )
- else:
- # first we check if events are not correctly categorised
- to_categorise = []
- events = (
- Event.objects.filter(start_day__gte=datetime.now())
- .exclude(category=Category.get_default_category_id())
- .exclude(category=None)
- .select_related("exact_location")
- .select_related("category")
- )
- for e in events:
- c = CategorisationRule.get_category_from_rules(e)
- if c and c != e.category:
- to_categorise.append((e, c))
-
- # then we apply rules on events without category
- nb = 0
- to_save = []
- events = (
- Event.objects.filter(start_day__gte=datetime.now())
- .filter(Q(category=Category.get_default_category_id()) | Q(category=None))
- .select_related("exact_location")
- .select_related("category")
- )
- for e in events:
- success = CategorisationRule.apply_rules(e)
- if success:
- nb += 1
- to_save.append(e)
-
- if nb != 0:
- Event.objects.bulk_update(to_save, fields=["category"])
-
- # set messages
- if nb != 0:
- if nb == 1:
- messages.success(
- request,
- _(
- "The rules were successfully applied and 1 event with default category was categorised."
- ),
- )
- else:
- messages.success(
- request,
- _(
- "The rules were successfully applied and {} events with default category were categorised."
- ).format(nb),
- )
- else:
- messages.info(
- request,
- _(
- "The rules were successfully applied and no events were categorised."
- ),
- )
-
- if len(to_categorise) != 0:
- form = CategorisationForm(events=to_categorise)
- return render(
- request,
- "agenda_culturel/categorise_events_form.html",
- context={
- "form": form,
- "events": dict((e.pk, e) for e, c in to_categorise),
- "categories": dict((e.pk, c) for e, c in to_categorise),
- },
- )
- else:
- return HttpResponseRedirect(reverse_lazy("categorisation_rules"))
-
-
-#########################
-## Places
-#########################
-
-
-class PlaceListView(ListView):
- model = Place
- ordering = ["name__unaccent"]
-
-
-class PlaceListAdminView(PermissionRequiredMixin, ListView):
- model = Place
- paginate_by = 10
- permission_required = "agenda_culturel.add_place"
- ordering = ["name__unaccent"]
- template_name = "agenda_culturel/place_list_admin.html"
-
-
-class PlaceDetailView(ListView):
- model = Place
- template_name = "agenda_culturel/place_detail.html"
- paginate_by = 10
-
- def get_queryset(self):
- self.place = get_object_or_404(Place, pk=self.kwargs["pk"])
- return (
- get_event_qs(self.request)
- .filter(exact_location=self.place)
- .filter(
- Q(other_versions__isnull=True)
- | Q(other_versions__representative=F("pk"))
- | Q(other_versions__representative__isnull=True)
- )
- .filter(start_day__gte=datetime.now())
- .order_by("start_day")
- )
-
- def get_context_data(self, **kwargs):
- context = super().get_context_data(**kwargs)
- context["object"] = self.place
- return context
-
-
-class PlaceDetailViewPast(PlaceDetailView):
- def get_queryset(self):
- self.place = get_object_or_404(Place, pk=self.kwargs["pk"])
- self.past = True
- return (
- get_event_qs(self.request)
- .filter(exact_location=self.place)
- .filter(
- Q(other_versions__isnull=True)
- | Q(other_versions__representative=F("pk"))
- | Q(other_versions__representative__isnull=True)
- )
- .filter(start_day__lte=datetime.now())
- .order_by("-start_day")
- )
-
- def get_context_data(self, **kwargs):
- context = super().get_context_data(**kwargs)
- context["past"] = self.past
- return context
-
-
-class UpdatePlaces:
- def form_valid(self, form):
- result = super().form_valid(form)
- p = form.instance
-
- if not hasattr(self, "nb_applied"):
- self.nb_applied = 0
-
- # if required, find all matching events
- if form.apply():
- self.nb_applied += p.associate_matching_events()
-
- if self.nb_applied > 1:
- messages.success(
- self.request,
- _("{} events have been updated.").format(self.nb_applied),
- )
- elif self.nb_applied == 1:
- messages.success(self.request, _("1 event has been updated."))
- else:
- messages.info(self.request, _("No events have been modified."))
- return result
-
-
-class PlaceUpdateView(
- UpdatePlaces, PermissionRequiredMixin, SuccessMessageMixin, UpdateView
-):
- model = Place
- permission_required = "agenda_culturel.change_place"
- success_message = _("The place has been successfully updated.")
- form_class = PlaceForm
-
-
-class PlaceCreateView(
- UpdatePlaces, PermissionRequiredMixin, SuccessMessageMixin, CreateView
-):
- model = Place
- permission_required = "agenda_culturel.add_place"
- success_message = _("The place has been successfully created.")
- form_class = PlaceForm
-
-
-class PlaceDeleteView(PermissionRequiredMixin, DeleteView):
- model = Place
- permission_required = "agenda_culturel.delete_place"
- success_url = reverse_lazy("view_places_admin")
-
-
-class UnknownPlacesListView(PermissionRequiredMixin, ListView):
- model = Event
- permission_required = "agenda_culturel.add_place"
- paginate_by = 10
- ordering = ["-pk"]
- template_name = "agenda_culturel/place_unknown_list.html"
- queryset = Event.get_qs_events_with_unkwnon_place()
-
-
-def fix_unknown_places(request):
- # get all places
- places = Place.objects.all()
- # get all events without exact location
- u_events = Event.get_qs_events_with_unkwnon_place()
-
- to_be_updated = []
- # try to find matches
- for ue in u_events:
- for p in places:
- if p.match(ue):
- ue.exact_location = p
- to_be_updated.append(ue)
- continue
- # update events with a location
- Event.objects.bulk_update(to_be_updated, fields=["exact_location"])
-
- # create a success message
- nb = len(to_be_updated)
- if nb > 1:
- messages.success(request, _("{} events have been updated.").format(nb))
- elif nb == 1:
- messages.success(request, _("1 event has been updated."))
- else:
- messages.info(request, _("No events have been modified."))
-
- # come back to the list of places
- return HttpResponseRedirect(reverse_lazy("view_unknown_places"))
-
-
-class UnknownPlaceAddView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
- model = Event
- permission_required = (
- "agenda_culturel.change_place",
- "agenda_culturel.change_event",
- )
- form_class = EventAddPlaceForm
- template_name = "agenda_culturel/place_unknown_form.html"
-
- def form_valid(self, form):
- self.modified_event = form.cleaned_data.get("place")
- self.add_alias = form.cleaned_data.get("add_alias")
- result = super().form_valid(form)
-
- if form.cleaned_data.get("place"):
- messages.success(
- self.request,
- _("The selected place has been assigned to the event."),
- )
- if form.cleaned_data.get("add_alias"):
- messages.success(
- self.request,
- _("A new alias has been added to the selected place."),
- )
-
- nb_applied = form.cleaned_data.get("place").associate_matching_events()
-
- if nb_applied > 1:
- messages.success(
- self.request,
- _("{} events have been updated.").format(nb_applied),
- )
- elif nb_applied == 1:
- messages.success(self.request, _("1 event has been updated."))
- else:
- messages.info(self.request, _("No events have been modified."))
-
- return result
-
- def get_success_url(self):
- if self.modified_event:
- return reverse_lazy("view_unknown_places")
- else:
- param = "?add=1" if self.add_alias else ""
- return reverse_lazy("add_place_from_event", args=[self.object.pk]) + param
-
-
-class PlaceFromEventCreateView(PlaceCreateView):
- def get_context_data(self, **kwargs):
- context = super().get_context_data(**kwargs)
- context["event"] = self.event
- return context
-
- def get_initial(self, *args, **kwargs):
- initial = super().get_initial(**kwargs)
- self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
- if self.event.location and "add" in self.request.GET:
- initial["aliases"] = [self.event.location]
- guesser = PlaceGuesser()
- name, address, postcode, city = guesser.guess_address_elements(
- self.event.location
- )
- initial["name"] = name
- initial["address"] = address
- initial["postcode"] = postcode
- initial["city"] = city
- initial["location"] = ""
-
- return initial
-
- def form_valid(self, form):
- result = super().form_valid(form)
- self.event.exact_location = form.instance
- self.event.save(update_fields=["exact_location"])
- return result
-
- def get_success_url(self):
- return self.event.get_absolute_url()
-
-
-#########################
-## Organisations
-#########################
-
-
-class OrganisationListView(ListView):
- model = Organisation
- paginate_by = 10
- ordering = ["name__unaccent"]
-
-
-class OrganisationDetailView(ListView):
- model = Organisation
- template_name = "agenda_culturel/organisation_detail.html"
- paginate_by = 10
-
- def get_queryset(self):
- self.organisation = (
- Organisation.objects.filter(pk=self.kwargs["pk"])
- .prefetch_related("organised_events")
- .first()
- )
- return (
- get_event_qs(self.request)
- .filter(organisers__in=[self.kwargs["pk"]])
- .filter(
- Q(other_versions__isnull=True)
- | Q(other_versions__representative=F("pk"))
- | Q(other_versions__representative__isnull=True)
- )
- .filter(start_day__gte=datetime.now())
- .order_by("start_day")
- )
-
- def get_context_data(self, **kwargs):
- context = super().get_context_data(**kwargs)
- context["object"] = self.organisation
- return context
-
-
-class OrganisationDetailViewPast(OrganisationDetailView):
- def get_queryset(self):
- self.organisation = (
- Organisation.objects.filter(pk=self.kwargs["pk"])
- .prefetch_related("organised_events")
- .first()
- )
- self.past = True
- return (
- get_event_qs(self.request)
- .filter(organisers__in=[self.kwargs["pk"]])
- .filter(
- Q(other_versions__isnull=True)
- | Q(other_versions__representative=F("pk"))
- | Q(other_versions__representative__isnull=True)
- )
- .filter(start_day__lte=datetime.now())
- .order_by("-start_day")
- )
-
- def get_context_data(self, **kwargs):
- context = super().get_context_data(**kwargs)
- context["past"] = self.past
- return context
-
-
-class OrganisationUpdateView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
- model = Organisation
- permission_required = "agenda_culturel.change_organisation"
- success_message = _("The organisation has been successfully updated.")
- fields = "__all__"
-
-
-class OrganisationCreateView(PermissionRequiredMixin, SuccessMessageMixin, CreateView):
- model = Organisation
- permission_required = "agenda_culturel.add_organisation"
- success_message = _("The organisation has been successfully created.")
- fields = "__all__"
-
-
-class OrganisationDeleteView(PermissionRequiredMixin, DeleteView):
- model = Organisation
- permission_required = "agenda_culturel.delete_organisation"
- success_url = reverse_lazy("view_organisations")
-
-
-#########################
-## Tags
-#########################
-
-
-class TagUpdateView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
- model = Tag
- permission_required = "agenda_culturel.change_tag"
- form_class = TagForm
- success_message = _("The tag has been successfully updated.")
-
-
-class TagCreateView(PermissionRequiredMixin, SuccessMessageMixin, CreateView):
- model = Tag
- permission_required = "agenda_culturel.add_tag"
- form_class = TagForm
- success_message = _("The tag has been successfully created.")
-
- def get_initial(self, *args, **kwargs):
- initial = super().get_initial(**kwargs)
- if "name" in self.request.GET:
- initial["name"] = self.request.GET.get("name")
- return initial
-
- def form_valid(self, form):
- Tag.clear_cache()
- return super().form_valid(form)
-
-
-class TagDeleteView(PermissionRequiredMixin, DeleteView):
- model = Tag
- permission_required = "agenda_culturel.delete_tag"
- success_url = reverse_lazy("view_all_tags")
-
-
-def view_tag_past(request, t):
- return view_tag(request, t, True)
-
-
-def view_tag(request, t, past=False):
- now = date.today()
-
- qs = get_event_qs(request).filter(tags__contains=[t])
-
- if past:
- qs = qs.filter(start_day__lt=now).order_by("-start_day", "-start_time")
- else:
- qs = qs.filter(start_day__gte=now).order_by("start_day", "start_time")
-
- qs = qs.filter(
- Q(other_versions__isnull=True)
- | Q(other_versions__representative=F("pk"))
- | Q(other_versions__representative__isnull=True)
- )
-
- paginator = Paginator(qs, 10)
- page = request.GET.get("page")
-
- try:
- response = paginator.page(page)
- except PageNotAnInteger:
- response = paginator.page(1)
- except EmptyPage:
- response = paginator.page(paginator.num_pages)
-
- rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
-
- tag = Tag.objects.filter(name=t).first()
- context = {
- "tag": t,
- "paginator_filter": response,
- "object": tag,
- "rimports": rimports,
- "past": past,
- }
- return render(request, "agenda_culturel/tag.html", context)
-
-
-def statistics(request, pk=None):
- if pk is not None:
- rimport = RecurrentImport.objects.filter(pk=pk)
- source = rimport.values("source").first()["source"]
- qs = Event.objects.filter(import_sources__contains=[source])
- else:
- rimport = None
- qs = Event.objects
-
- stats = {}
- stats_months = {}
- first = {}
- last = {}
-
- ev_published = qs.filter(
- Q(status=Event.STATUS.PUBLISHED)
- & (
- Q(other_versions__isnull=True)
- | Q(other_versions__representative=F("pk"))
- | Q(other_versions__representative__isnull=True)
- )
- )
-
- for v in ["start_day", "created_date__date"]:
- after = 24
- last[v] = (
- date.today()
- if v == "created_date__date"
- else date.today() + timedelta(weeks=after)
- )
- last[v] = last[v].replace(
- day=_calendar.monthrange(last[v].year, last[v].month)[1]
- )
-
- r = 8 * 30
- if v == "start_day":
- r += after * 7
- first[v] = (last[v] - timedelta(days=r)).replace(day=1)
-
- ev_days = ev_published.annotate(day=F(v)).filter(
- Q(day__lte=last[v]) & Q(day__gte=first[v])
- )
-
- stats[v] = ev_days.values("day").annotate(total=Count("day")).order_by("day")
-
- stats_months[v] = (
- ev_days.annotate(month=TruncMonth("day"))
- .values("month")
- .annotate(total=Count("month"))
- .order_by("month")
- )
-
- nb_by_city = (
- ev_published.annotate(city=F("exact_location__city"))
- .filter(city__isnull=False)
- .values("city")
- .annotate(total=Count("city"))
- .order_by("-total")
- )
-
- limit = datetime.now() + timedelta(days=-30)
-
- stat_qs = qs.filter(start_day__gte=F("created_date")).annotate(
- foresight=ExtractDay(F("start_day") - F("created_date"))
- )
-
- statsa = stat_qs.filter().aggregate(
- minimum=Min("foresight"),
- maximum=Max("foresight"),
- mean=Avg("foresight"),
- median=Median("foresight"),
- stdev=StdDev("foresight"),
- )
-
- statsm = stat_qs.filter(created_date__gte=limit).aggregate(
- minimum=Min("foresight"),
- maximum=Max("foresight"),
- mean=Avg("foresight"),
- median=Median("foresight"),
- stdev=StdDev("foresight"),
- )
-
- stats_foresight = [
- [
- _(x),
- round(statsa[x], 2) if statsa[x] is not None else "-",
- round(statsm[x], 2) if statsm[x] is not None else "-",
- ]
- for x in statsa
- ]
-
- context = {
- "stats_by_startday": stats["start_day"],
- "stats_by_creation": stats["created_date__date"],
- "stats_months_by_startday": stats_months["start_day"],
- "stats_months_by_creation": stats_months["created_date__date"],
- "first_by_startday": first["start_day"],
- "last_by_startday": last["start_day"],
- "first_by_creation": first["created_date__date"],
- "last_by_creation": last["created_date__date"],
- "nb_by_city": nb_by_city,
- "stats_foresight": stats_foresight,
- "object": rimport.first() if rimport else None,
- }
-
- if pk is None:
- return render(request, "agenda_culturel/statistics.html", context)
- else:
- return render(request, "agenda_culturel/rimport-statistics.html", context)
-
-
-def tag_list(request):
- tags = Event.get_all_tags()
- r_tags = [t["tag"] for t in tags]
- objects = Tag.objects.order_by("name").all()
- d_objects = dict()
- for o in objects:
- d_objects[o.name] = o
-
- tags = [
- t | {"obj": d_objects[t["tag"]]} if t["tag"] in d_objects else t for t in tags
- ]
- tags += [
- {"obj": o, "tag": o.name, "count": 0} for o in objects if o.name not in r_tags
- ]
-
- context = {
- "tags": sorted(
- tags,
- key=lambda x: emoji.demojize(
- remove_accents(x["tag"]).lower(), delimiters=("000", "")
- ),
- )
- }
- return render(request, "agenda_culturel/tags.html", context)
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.change_tag")
-def rename_tag(request, t):
- form = TagRenameForm(name=t)
-
- if request.method == "POST":
- form = TagRenameForm(request.POST, name=t)
- if form.is_valid():
- save = True
- if form.cleaned_data["name"] == t:
- messages.warning(
- request,
- _("You have not modified the tag name."),
- )
- save = False
- elif not form.is_force():
- if (
- Event.objects.filter(
- tags__contains=[form.cleaned_data["name"]]
- ).count()
- > 0
- ):
- if Tag.objects.filter(name=form.cleaned_data["name"]):
- messages.warning(
- request,
- (
- _(
- "This tag {} is already in use, and is described by different information from the current tag. You can force renaming by checking the corresponding option. The information associated with tag {} will be deleted, and all events associated with tag {} will be associated with tag {}."
- )
- ).format(
- form.cleaned_data["name"],
- t,
- t,
- form.cleaned_data["name"],
- ),
- )
- else:
- messages.warning(
- request,
- (
- _(
- "This tag {} is already in use. You can force renaming by checking the corresponding option."
- )
- ).format(form.cleaned_data["name"]),
- )
- save = False
- form = TagRenameForm(request.POST, name=t, force=True)
-
- if save:
- # find all matching events and update them
- events = Event.objects.filter(tags__contains=[t])
- new_name = form.cleaned_data["name"]
- for e in events:
- e.tags = [te for te in e.tags if te != t]
- if new_name not in e.tags:
- e.tags += [new_name]
- Event.objects.bulk_update(events, fields=["tags"])
-
- # find all recurrent imports and fix them
- rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
- for ri in rimports:
- ri.tags = [te for te in ri.defaultTags if te != t]
- if new_name not in ri.tags:
- ri.tags += [new_name]
- RecurrentImport.objects.bulk_update(rimports, fields=["defaultTags"])
-
- # find tag object
- tag_object = Tag.objects.filter(name=t).first()
- if tag_object:
- tag_object.name = new_name
- tag_object.save()
-
- messages.success(
- request,
- (_("The tag {} has been successfully renamed to {}.")).format(
- t, form.cleaned_data["name"]
- ),
- )
- return HttpResponseRedirect(
- reverse_lazy("view_tag", kwargs={"t": form.cleaned_data["name"]})
- )
-
- nb = Event.objects.filter(tags__contains=[t]).count()
-
- return render(
- request,
- "agenda_culturel/tag_rename_form.html",
- context={"form": form, "tag": t, "nb": nb},
- )
-
-
-@login_required(login_url="/accounts/login/")
-@permission_required("agenda_culturel.delete_tag")
-def delete_tag(request, t):
- respage = reverse_lazy("view_all_tags")
-
- if request.method == "POST":
- # remove tag from events
- events = Event.objects.filter(tags__contains=[t])
- for e in events:
- e.tags = [te for te in e.tags if te != t]
- Event.objects.bulk_update(events, fields=["tags"])
-
- # remove tag from recurrent imports
- rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
- for ri in rimports:
- ri.tags = [te for te in ri.defaultTags if te != t]
- RecurrentImport.objects.bulk_update(rimports, fields=["defaultTags"])
-
- # find tag object
- tag_object = Tag.objects.filter(name=t).first()
- if tag_object:
- tag_object.delete()
-
- messages.success(
- request,
- (_("The tag {} has been successfully deleted.")).format(t),
- )
- return HttpResponseRedirect(respage)
- else:
- nb = Event.objects.filter(tags__contains=[t]).count()
- obj = Tag.objects.filter(name=t).first()
- nbi = RecurrentImport.objects.filter(defaultTags__contains=[t]).count()
- cancel_url = request.META.get("HTTP_REFERER", "")
- if cancel_url == "":
- cancel_url = respage
- return render(
- request,
- "agenda_culturel/tag_confirm_delete_by_name.html",
- {
- "tag": t,
- "nb": nb,
- "nbi": nbi,
- "cancel_url": cancel_url,
- "obj": obj,
- },
- )
-
-
-def clear_cache(request):
- if request.method == "POST":
- cache.clear()
- messages.success(request, _("Cache successfully cleared."))
- return HttpResponseRedirect(reverse_lazy("administration"))
- else:
- return render(
- request,
- "agenda_culturel/clear_cache.html",
- )
-
-
-class UserProfileUpdateView(
- SuccessMessageMixin,
- LoginRequiredMixin,
- UpdateView,
-):
- model = UserProfile
- success_message = _("Your user profile has been successfully modified.")
- success_url = reverse_lazy("administration")
- form_class = UserProfileForm
-
- def get_object(self):
- return self.request.user.userprofile