2025-03-24 10:26:59 +01:00

3047 lines
97 KiB
Python

import calendar as _calendar
import hashlib
import logging
from datetime import date, timedelta
import emoji
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import (
LoginRequiredMixin,
PermissionRequiredMixin,
UserPassesTestMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.core.cache import cache
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.db.models import Count, F, Func, OuterRef, Q, Subquery
from django.db.models.functions import TruncMonth
from django.http import (
Http404,
HttpResponse,
HttpResponseForbidden,
HttpResponseRedirect,
)
from django.shortcuts import get_object_or_404, render
from django.urls import reverse, reverse_lazy
from django.utils.decorators import method_decorator
from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.timezone import datetime
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView, ListView
from django.views.generic.edit import (
CreateView,
DeleteView,
ModelFormMixin,
UpdateView,
)
from honeypot.decorators import check_honeypot
from django.db.models.aggregates import StdDev
from django.db.models import Avg, Max, Min
from django.db.models import Aggregate, FloatField
from django.db.models.functions import ExtractDay
from ..calendar import CalendarDay, CalendarList, CalendarMonth, CalendarWeek
from ..celery import app as celery_app
from ..celery import (
import_events_from_json,
import_events_from_url,
import_events_from_urls,
run_all_recurrent_imports,
run_all_recurrent_imports_canceled,
run_all_recurrent_imports_failed,
run_recurrent_import,
update_orphan_pure_import_events,
)
from ..filters import (
DuplicatedEventsFilter,
EventFilter,
EventFilterAdmin,
MessagesFilterAdmin,
RecurrentImportFilter,
SearchEventFilter,
SimpleSearchEventFilter,
)
from ..forms import (
BatchImportationForm,
CategorisationForm,
CategorisationRuleImportForm,
EventAddPlaceForm,
EventForm,
EventFormWithContact,
EventModerateForm,
FixDuplicates,
MergeDuplicates,
MessageEventForm,
MessageForm,
PlaceForm,
RecurrentImportForm,
SelectEventInList,
SimpleContactForm,
TagForm,
TagRenameForm,
URLSubmissionFormSet,
URLSubmissionFormWithContact,
UserProfileForm,
)
from ..import_tasks.extractor import Extractor
from ..models import (
BatchImportation,
CategorisationRule,
Category,
DuplicatedEvents,
Event,
Message,
Organisation,
Place,
RecurrentImport,
StaticContent,
Tag,
remove_accents,
UserProfile,
)
from ..utils import PlaceGuesser
logger = logging.getLogger(__name__)
class Median(Aggregate):
function = "PERCENTILE_CONT"
name = "median"
output_field = FloatField()
template = "%(function)s(0.5) WITHIN GROUP (ORDER BY %(expressions)s)"
class PaginatorFilter(Paginator):
def __init__(self, filter, nb, request):
self.request = request
self.filter = filter
super().__init__(filter.qs, nb)
self.url_first_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", 1
)
self.url_last_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", self.num_pages
)
def update_param(params, key, value):
p = params.split("?")
root = p[0]
if len(p) > 1:
other = p[1]
others = other.split("&")
others = [o for o in others if not o.startswith(key)]
others += [key + "=" + str(value)]
return root + "?" + "&".join(others)
else:
return root + "?" + key + "=" + str(value)
def page(self, *args, **kwargs):
page = super().page(*args, **kwargs)
try:
page.url_previous_page = PaginatorFilter.update_param(
self.request.get_full_path(),
"page",
page.previous_page_number(),
)
except EmptyPage:
page.url_previous_page = self.request.get_full_path()
try:
page.url_next_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", page.next_page_number()
)
except EmptyPage:
page.url_next_page = self.request.get_full_path()
return page
#
#
# Useful for translation
to_be_translated = [
_("Recurrent import name"),
_("Add another"),
_("Browse..."),
_("No file selected."),
]
def get_event_qs(request):
if request.user.is_authenticated:
return Event.objects.filter()
else:
return Event.objects.filter(status=Event.STATUS.PUBLISHED)
def thank_you(request):
return render(request, "agenda_culturel/thank_you.html")
def mentions_legales(request):
context = {
"title": "Mentions légales",
"static_content": "mentions_legales",
"url_path": reverse_lazy("mentions_legales"),
}
return render(request, "agenda_culturel/page-single.html", context)
def about(request):
rimports = (
RecurrentImport.objects.filter(~Q(recurrence=RecurrentImport.RECURRENCE.NEVER))
.order_by("name__unaccent")
.all()
)
context = {
"title": "À propos",
"static_content": "about",
"url_path": reverse_lazy("about"),
"rimports": rimports,
}
return render(request, "agenda_culturel/page-rimports-list.html", context)
def moderation_rules(request):
context = {
"title": _("Moderation rules"),
"static_content": "moderation_rules",
"url_path": reverse_lazy("moderation_rules"),
}
return render(request, "agenda_culturel/page-single.html", context)
def import_requirements(request):
context = {
"title": _("Import requirements"),
"static_content": "import_requirements",
"url_path": reverse_lazy("import_requirements"),
}
return render(request, "agenda_culturel/page-single.html", context)
def home(request, cat=None):
return week_view(request, home=True, cat=cat)
def month_view(request, year=None, month=None, cat=None):
now = date.today()
if year is None and month is None:
day = now.day
else:
day = None
if year is None:
year = now.year
if month is None:
month = now.month
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).only(
"title",
"start_day",
"start_time",
"category",
"other_versions",
"recurrences",
"end_day",
"end_time",
"uuids",
"status",
"tags",
)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cmonth = CalendarMonth(year, month, filter, day=day)
context = {
"calendar": cmonth,
"this_month": day is not None,
"filter": filter,
"category": category,
"init_date": now if cmonth.today_in_calendar() else cmonth.firstdate,
}
return render(request, "agenda_culturel/page-month.html", context)
def week_view(request, year=None, week=None, home=False, cat=None):
now = date.today()
if year is None:
year = now.isocalendar()[0]
if week is None:
week = now.isocalendar()[1]
request = EventFilter.set_default_values(request)
qs = (
get_event_qs(request)
.select_related("exact_location")
.only(
"title",
"start_day",
"start_time",
"category",
"other_versions",
"recurrences",
"end_day",
"end_time",
"uuids",
"status",
"tags",
"local_image",
"image",
"image_alt",
"exact_location",
"description",
)
)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cweek = CalendarWeek(year, week, filter)
context = {
"year": year,
"week": week,
"calendar": cweek,
"filter": filter,
"category": category,
"init_date": now if cweek.today_in_calendar() else cweek.firstdate,
}
if home:
context["home"] = 1
return render(request, "agenda_culturel/page-week.html", context)
def day_view(request, year=None, month=None, day=None, cat=None):
if year is None or month is None or day is None:
if "when" in request.POST:
when = datetime.strptime(request.POST["when"], "%Y-%m-%d")
year = when.year
month = when.month
day = when.day
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).select_related("exact_location")
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
return HttpResponseRedirect(
reverse_lazy("day_view", args=[year, month, day])
+ "?"
+ filter.get_url()
)
return upcoming_events(request, year, month, day, 0, cat)
def upcoming_events(request, year=None, month=None, day=None, neighsize=1, cat=None):
now = date.today()
if year is None:
year = now.year
if month is None:
month = now.month
if day is None:
day = now.day
day = date(year, month, day)
day = day + timedelta(days=neighsize)
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).select_related("exact_location")
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cal = CalendarList(
day + timedelta(days=-neighsize),
day + timedelta(days=neighsize),
filter,
True,
)
context = {
"calendar": cal,
"now": now,
"day": day,
"init_date": now if cal.today_in_calendar() else day,
"filter": filter,
"date_pred": day + timedelta(days=-neighsize - 1),
"date_next": day + timedelta(days=neighsize + 1),
"category": category,
}
return render(request, "agenda_culturel/page-upcoming.html", context)
class StaticContentCreateView(LoginRequiredMixin, CreateView):
model = StaticContent
fields = ["text"]
permission_required = "agenda_culturel.add_staticcontent"
def form_valid(self, form):
form.instance.name = self.request.GET["name"]
form.instance.url_path = self.request.GET["url_path"]
return super().form_valid(form)
class StaticContentUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = StaticContent
permission_required = "agenda_culturel.change_staticcontent"
fields = ["text"]
success_message = _("The static content has been successfully updated.")
def update_from_source(request, pk):
event = get_object_or_404(Event, pk=pk)
url = event.get_updateable_uuid()
if url is None:
messages.warning(
request,
_(
"The event cannot be updated because the import process is not available for the referenced sources."
),
)
else:
import_events_from_url.delay(
url,
None,
None,
True,
user_id=request.user.pk if request.user else None,
)
messages.success(
request,
_("The event update has been queued and will be completed shortly."),
)
return HttpResponseRedirect(event.get_absolute_url())
class EventUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = Event
permission_required = "agenda_culturel.change_event"
form_class = EventForm
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["is_authenticated"] = self.request.user.is_authenticated
kwargs["is_moderation_expert"] = (
self.request.user.userprofile.is_moderation_expert
)
kwargs["is_cloning"] = self.is_cloning
kwargs["is_edit_from_moderation"] = self.is_edit_force()
kwargs["is_simple_cloning"] = self.is_simple_cloning
return kwargs
def get_success_message(self, cleaned_data):
txt = (
_(" A message has been sent to the person who proposed the event.")
if hasattr(self, "with_msg") and self.with_msg
else ""
)
return mark_safe(_("The event has been successfully modified.") + txt)
def is_edit_force(self):
return "edit-force" in self.request.path.split("/")
def get_object(self, queryset=None):
event = super().get_object(queryset)
if event.status == Event.STATUS.DRAFT:
event.status = Event.STATUS.PUBLISHED
if self.is_edit_force():
event.free_modification_lock(self.request.user)
return event
def form_valid(self, form):
form.instance.set_processing_user(self.request.user)
self.with_message = form.instance.notify_if_required(self.request)
return super().form_valid(form)
def get_initial(self):
self.is_cloning = "clone" in self.request.path.split("/")
if self.is_cloning:
messages.info(
self.request,
_(
"Changes will be visible on a local copy of the event. The version identical to the imported source will be hidden."
),
)
self.is_simple_cloning = "simple-clone" in self.request.path.split("/")
result = super().get_initial()
if self.is_cloning and "other_versions" not in result:
obj = self.get_object()
# if no DuplicatedEvents is associated, create one
obj.other_versions = DuplicatedEvents.objects.create()
obj.other_versions.save()
# save them without updating modified date
obj.set_no_modification_date_changed()
obj.save()
result["other_versions"] = obj.other_versions
result["status"] = Event.STATUS.PUBLISHED
result["cloning"] = True
if self.is_simple_cloning:
result["other_versions"] = None
result["simple_cloning"] = True
if self.is_cloning or self.is_simple_cloning:
obj = self.get_object()
if obj.local_image:
result["old_local_image"] = obj.local_image.name
return result
def get_success_url(self):
if "save_and_next" in self.request.POST:
return reverse_lazy("moderate_event_next", args=[self.object.pk])
else:
return self.object.get_absolute_url()
class EventModerateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = Event
permission_required = "agenda_culturel.change_event"
template_name = "agenda_culturel/event_form_moderate.html"
form_class = EventModerateForm
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["is_moderation_expert"] = (
self.request.user.userprofile.is_moderation_expert
)
return kwargs
def get_success_message(self, cleaned_data):
txt = (
_(" A message has been sent to the person who proposed the event.")
if hasattr(self, "with_msg") and self.with_msg
else ""
)
return mark_safe(
_('The event <a href="{}">{}</a> has been moderated with success.').format(
self.object.get_absolute_url(), self.object.title
)
+ txt
)
def is_moderate_next(self):
return "after" in self.request.path.split("/")
def is_moderate_back(self):
return "back" in self.request.path.split("/")
def is_moderate_force(self):
return "moderate-force" in self.request.path.split("/")
def is_starting_moderation(self):
return "pk" not in self.kwargs
def is_moderation_from_date(self):
return "m" in self.kwargs and "y" in self.kwargs and "d" in self.kwargs
def get_next_event(start_day, start_time, opk):
# select non moderated events
qs = Event.objects.filter(moderated_date__isnull=True)
# select events after the current one
if start_time:
qs = qs.filter(
Q(start_day__gt=start_day)
| (
Q(start_day=start_day)
& (Q(start_time__isnull=True) | Q(start_time__gt=start_time))
)
)
else:
qs = qs.filter(Q(start_day__gte=start_day) & ~Q(pk=opk))
# get only possibly representative events
qs = qs.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
# remove trash events
qs = qs.filter(~Q(status=Event.STATUS.TRASH))
# sort by datetime
qs = qs.order_by("start_day", "start_time")
return qs.first()
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if self.is_moderate_next():
context["pred"] = self.kwargs["pred"]
return context
def get_object(self, queryset=None):
if self.is_starting_moderation():
now = datetime.now()
event = EventModerateView.get_next_event(now.date(), now.time(), None)
else:
event = super().get_object(queryset)
if event.status == Event.STATUS.DRAFT:
event.status = Event.STATUS.PUBLISHED
if self.is_moderate_back():
pred = Event.objects.filter(pk=self.kwargs["pred"]).first()
pred.free_modification_lock(self.request.user)
if self.is_moderate_force():
event.free_modification_lock(self.request.user, False)
return event
def post(self, request, *args, **kwargs):
try:
return super().post(request, args, kwargs)
except Http404:
return HttpResponseRedirect(
reverse_lazy("error_next_event", args=[self.object.pk])
)
def form_valid(self, form):
form.instance.set_no_modification_date_changed()
form.instance.set_in_moderation_process()
form.instance.set_processing_user(self.request.user)
self.with_msg = form.instance.notify_if_required(self.request)
return super().form_valid(form)
def get_success_url(self):
if "save_and_next" in self.request.POST:
return reverse_lazy("moderate_event_next", args=[self.object.pk])
elif "save_and_edit_local" in self.request.POST:
return reverse_lazy("edit_event", args=[self.object.get_local_version().pk])
else:
return self.object.get_absolute_url()
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_event")
def moderate_event_next(request, pk):
# current event
obj = Event.objects.filter(pk=pk).first()
# free lock
obj.free_modification_lock(request.user)
start_day = obj.start_day
start_time = obj.start_time
next_obj = EventModerateView.get_next_event(start_day, start_time, pk)
if next_obj is None:
return render(
request,
"agenda_culturel/event_next_error_message.html",
{"pk": pk, "object": obj},
)
else:
return HttpResponseRedirect(
reverse_lazy("moderate_event_step", args=[next_obj.pk, obj.pk])
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_event")
def moderate_from_date(request, y, m, d):
d = date(y, m, d)
obj = EventModerateView.get_next_event(d, None, None)
return HttpResponseRedirect(reverse_lazy("moderate_event", args=[obj.pk]))
class EventDeleteView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
DeleteView,
):
model = Event
permission_required = "agenda_culturel.delete_event"
success_url = reverse_lazy("recent")
success_message = _("The event has been successfully deleted.")
class EventDetailView(UserPassesTestMixin, DetailView, ModelFormMixin):
model = Event
form_class = MessageEventForm
template_name = "agenda_culturel/page-event.html"
queryset = (
Event.objects.select_related("exact_location")
.select_related("category")
.select_related("other_versions")
.select_related("other_versions__representative")
.prefetch_related("message_set")
)
def test_func(self):
return (
self.request.user.is_authenticated
or self.get_object().status == Event.STATUS.PUBLISHED
)
def get_object(self):
o = super().get_object()
o.download_missing_image()
if "year" in self.kwargs:
y = self.kwargs["year"]
m = self.kwargs["month"]
d = self.kwargs["day"]
obj = o.get_recurrence_at_date(y, m, d)
obj.set_current_date(date(y, m, d))
return obj
else:
return o
def get_success_url(self):
return self.get_object().get_absolute_url() + "#chronology"
def post(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return HttpResponseForbidden()
form = self.get_form()
if form.is_valid():
return self.form_valid(form)
else:
return self.form_invalid(form)
def form_valid(self, form):
message = form.save(commit=False)
message.user = self.request.user
message.related_event = self.get_object()
message.subject = _("Comment")
message.spam = False
message.closed = True
message.save()
return super().form_valid(form)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_event")
def change_status_event(request, pk, status):
event = get_object_or_404(Event, pk=pk)
if request.method == "POST":
event.status = Event.STATUS(status)
fields = ["status", "moderated_date", "moderated_by_user"]
event.set_in_moderation_process()
event.update_modification_dates()
event.save(update_fields=fields)
with_msg = event.notify_if_required(request)
if with_msg:
messages.success(
request,
_(
"The status has been successfully modified and a message has been sent to the person who proposed the event."
),
)
else:
messages.success(request, _("The status has been successfully modified."))
return HttpResponseRedirect(event.get_absolute_url())
else:
cancel_url = request.META.get("HTTP_REFERER", "")
if cancel_url == "":
cancel_url = reverse_lazy("home")
return render(
request,
"agenda_culturel/event_confirm_change_status.html",
{"status": status, "event": event, "cancel_url": cancel_url},
)
def import_event_proxy(request):
return render(request, "agenda_culturel/event_import.html")
class EventCreateView(SuccessMessageMixin, CreateView):
model = Event
form_class = EventFormWithContact
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["is_authenticated"] = self.request.user.is_authenticated
return kwargs
def get_success_url(self):
if self.request.user.is_authenticated:
if "save_and_next" in self.request.POST:
return reverse_lazy("moderate_event_next", args=[self.object.pk])
else:
return self.object.get_absolute_url()
else:
return reverse_lazy("home")
def get_success_message(self, cleaned_data):
if self.request.user.is_authenticated:
return mark_safe(
_('The event was created: <a href="{}">{}</a>.').format(
self.object.get_absolute_url(), self.object.title
)
)
else:
return _(
"The event has been submitted and will be published as soon as it has been validated by the moderation team."
)
def form_valid(self, form):
if form.cleaned_data["simple_cloning"]:
form.instance.set_skip_duplicate_check()
if form.cleaned_data["cloning"]:
form.instance.set_in_moderation_process()
if form.cleaned_data.get("email") or form.cleaned_data.get("comments"):
has_comments = form.cleaned_data.get("comments") not in ["", None]
form.instance.add_message(
Message(
subject=_("during the creation process"),
message=form.cleaned_data.get("comments"),
email=form.cleaned_data.get("email"),
closed=False,
message_type=(
Message.TYPE.FROM_CONTRIBUTOR
if has_comments
else Message.TYPE.FROM_CONTRIBUTOR_NO_MSG
),
)
)
form.instance.import_sources = None
form.instance.set_processing_user(self.request.user)
result = super().form_valid(form)
if form.cleaned_data["cloning"]:
with_msg = form.instance.notify_if_required(self.request)
if with_msg:
messages.success(
self.request,
_(
"A message has been sent to the person who proposed the initial event."
),
)
return result
# A class to evaluate the URL according to the existing events and the authentification
# level of the user
class URLEventEvaluation:
def __init__(self, form, is_authenticated):
self.form = form
self.is_authenticated = is_authenticated
self.cat = None
self.tags = []
self.existing = None
self.url = form.cleaned_data.get("url")
self.event = None
if self.url is not None:
self.url = Extractor.clean_url(self.url)
# we check if the url is known
self.existing = Event.objects.filter(uuids__contains=[self.url])
# if it's unknown
if len(self.existing) == 0:
self.existing = None
self.cat = form.cleaned_data.get("category")
if self.cat is not None:
self.cat = self.cat.name
self.tags = form.cleaned_data.get("tags")
else:
published = [
e for e in self.existing if e.status == Event.STATUS.PUBLISHED
]
if self.is_authenticated or len(published) > 1:
self.event = (
published[0] if len(published) > 1 else self.existing[0]
)
else:
self.event = None
def exists(self):
return self.url is not None
def is_new(self):
return self.exists() and self.existing is None
def is_event_visible(self):
return self.event is not None
def get_event(self):
if self.event is None:
return None
else:
return self.event
def get_link(self):
e = self.get_event()
if e is None:
return ""
else:
return '<a href="' + e.get_absolute_url() + '">' + escape(e.title) + "</a>"
def to_list(self):
if self.is_new():
return (self.url, self.cat, self.tags)
def import_from_urls(request):
if request.method == "POST":
formset = URLSubmissionFormSet(request.POST, request.FILES)
if not request.user.is_authenticated:
contactform = SimpleContactForm(request.POST)
if formset.is_valid() and (
request.user.is_authenticated or contactform.is_valid()
):
# evaluate all the forms
ucat = [
URLEventEvaluation(form, request.user.is_authenticated)
for form in formset.forms
]
# for each not new, add a message
for uc in ucat:
if uc.exists() and not uc.is_new():
if uc.is_event_visible():
messages.info(
request,
mark_safe(
_(
"{} has not been submitted since it"
"s already known: {}."
).format(uc.url, uc.get_link())
),
)
else:
messages.info(
request,
_(
"{} has not been submitted since it"
"s already known and currently into moderation process."
).format(uc.url),
)
# keep only new ones
ucat = [uc.to_list() for uc in ucat if uc.is_new()]
# finally process them or go back to home page
if len(ucat) > 0:
messages.info(
request,
_("Integrating {} url(s) into our import process.").format(
len(ucat)
),
)
email = None
comments = None
if not request.user.is_authenticated:
email = contactform.cleaned_data["email"]
comments = contactform.cleaned_data["comments"]
import_events_from_urls.delay(
ucat,
user_id=request.user.pk if request.user else None,
email=email,
comments=comments,
)
return HttpResponseRedirect(reverse("thank_you"))
else:
return HttpResponseRedirect(reverse("home"))
else:
formset = URLSubmissionFormSet()
if not request.user.is_authenticated:
contactform = SimpleContactForm()
context = {"formset": formset}
if not request.user.is_authenticated:
context["contactform"] = contactform
return render(request, "agenda_culturel/import_set.html", context=context)
def import_from_url(request):
form = URLSubmissionFormWithContact(is_authenticated=request.user.is_authenticated)
initial = {
"start_day": date.today() + timedelta(days=1),
"start_time": "20:00",
"end_time": "22:00",
}
form_event = EventForm(initial=initial)
# if the form has been sent
if request.method == "POST":
form = URLSubmissionFormWithContact(
request.POST, is_authenticated=request.user.is_authenticated
)
# if the form is valid
if form.is_valid():
uc = URLEventEvaluation(form, request.user.is_authenticated)
if uc.exists() and not uc.is_new():
if uc.is_event_visible():
messages.info(
request,
mark_safe(
_(
"{} has not been submitted since its already known: {}."
).format(uc.url, uc.get_link())
),
)
return HttpResponseRedirect(uc.get_event().get_absolute_url())
else:
messages.info(
request,
_(
"{} has not been submitted since it"
"s already known and currently into moderation process."
).format(uc.url),
)
return HttpResponseRedirect(reverse("home"))
else:
messages.info(
request,
_("Integrating {} into our import process.").format(uc.url),
)
import_events_from_url.delay(
uc.url,
uc.cat,
uc.tags,
user_id=request.user.pk if request.user else None,
email=form.cleaned_data.get("email"),
comments=form.cleaned_data.get("comments"),
)
return HttpResponseRedirect(reverse("thank_you"))
return render(
request,
"agenda_culturel/import.html",
context={"form": form, "form_event": form_event},
)
def export_event_ical(request, year, month, day, pk):
event = get_object_or_404(Event, pk=pk)
event = event.get_recurrence_at_date(year, month, day)
events = list()
events.append(event)
cal = Event.export_to_ics(events, request)
response = HttpResponse(content_type="text/calendar")
response.content = cal.to_ical().decode("utf-8").replace("\r\n", "\n")
response["Content-Disposition"] = "attachment; filename={0}{1}".format(
event.title.replace("\n", " ").replace("\r", "")[0:32], ".ics"
)
return response
def export_ical(request, cat=None, tag=None, organisation_pk=None, place_pk=None):
now = date.today()
qs = get_event_qs(request)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
if place_pk is not None:
qs = qs.filter(exact_location=place_pk)
if organisation_pk is not None:
qs = qs.filter(organisers__in=[organisation_pk])
if tag is not None:
qs = qs.filter(tags__in=[tag])
request = EventFilter.set_default_values(request)
filter = EventFilter(request.GET, queryset=qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
id_cache = hashlib.md5(
(
filter.get_url()
+ "-"
+ str(tag)
+ "-"
+ str(cat)
+ "-"
+ str(organisation_pk)
+ "-"
+ str(place_pk)
).encode("utf8")
).hexdigest()
ical = cache.get(id_cache)
if not ical:
calendar = CalendarList(
now + timedelta(days=-7), now + timedelta(days=+60), filter
)
ical = calendar.export_to_ics(request)
cache.set(id_cache, ical, 3600) # 1 heure
response = HttpResponse(content_type="text/calendar")
response.content = ical.to_ical().decode("utf-8").replace("\r\n", "\n")
extra = filter.to_str(" ")
if extra is None:
extra = ""
if category is not None:
extra += " " + str(category)
if place_pk is not None:
extra += (
" @ " + Place.objects.filter(pk=place_pk).values("name").first()["name"]
)
if organisation_pk is not None:
extra += (
" - "
+ Organisation.objects.filter(pk=organisation_pk)
.values("name")
.first()["name"]
)
if tag is not None:
extra += " - " + emoji.replace_emoji(tag, replace="")
response["Content-Disposition"] = "attachment; filename={0}{1}{2}".format(
"Pommes de lune", extra, ".ics"
)
return response
@method_decorator(check_honeypot, name="post")
class MessageCreateView(SuccessMessageMixin, CreateView):
model = Message
template_name = "agenda_culturel/message_create_form.html"
form_class = MessageForm
success_url = reverse_lazy("home")
success_message = _("Your message has been sent successfully.")
def __init__(self, *args, **kwargs):
self.event = None
super().__init__(*args, **kwargs)
def get_form(self, form_class=None):
if form_class is None:
form_class = self.get_form_class()
return form_class(**self.get_form_kwargs())
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["event"] = self.event
if self.request.user.is_authenticated:
kwargs["internal"] = True
return kwargs
def form_valid(self, form):
if self.request.user.is_authenticated:
form.instance.user = self.request.user
form.instance.message_type = (
Message.TYPE.EVENT_REPORT
if "pk" in self.kwargs
else Message.TYPE.CONTACT_FORM
)
return super().form_valid(form)
def get_initial(self):
result = super().get_initial()
if "pk" in self.kwargs:
self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
result["related_event"] = self.event
result["subject"] = _("Reporting the event {} on {}").format(
self.event.title, self.event.start_day
)
else:
result["related_event"] = None
return result
class MessageDeleteView(SuccessMessageMixin, DeleteView):
model = Message
success_message = _("The contact message has been successfully deleted.")
success_url = reverse_lazy("messages")
class MessageUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = Message
permission_required = "agenda_culturel.change_message"
template_name = "agenda_culturel/message_moderation_form.html"
fields = ("spam", "closed", "comments")
success_message = _(
"The contact message properties has been successfully modified."
)
success_url = reverse_lazy("messages")
def get_form_kwargs(self):
"""Return the keyword arguments for instantiating the form."""
kwargs = super().get_form_kwargs()
if hasattr(self, "object"):
kwargs.update({"instance": self.object})
return kwargs
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_event")
def activite(request):
now = date.today()
days = [now]
while len(days) < 7 or days[-1].weekday() != 0:
days.append(days[-1] + timedelta(days=-1))
weeks = [days[-1]]
for w in range(0, 8):
weeks.append(weeks[-1] + timedelta(days=-7))
daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
weekly_modifications = Event.get_count_modifications([(w, 7) for w in weeks])
return render(
request,
"agenda_culturel/page-activity.html",
{
"daily_modifications": daily_modifications,
"weekly_modifications": weekly_modifications,
},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_event")
def administration(request):
nb_mod_days = 21
nb_classes = 4
today = date.today()
# get information about recent modifications
days = [today]
for i in range(0, 2):
days.append(days[-1] + timedelta(days=-1))
daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
# get last created events
events = (
Event.objects.all()
.order_by("-created_date")
.select_related("exact_location", "category")[:5]
)
# get last batch imports
rel_event = Event.objects.filter(
import_sources__contains=[OuterRef("url_source")]
).values("pk")[:1]
batch_imports = (
BatchImportation.objects.all()
.select_related("recurrentImport")
.annotate(event_id=Subquery(rel_event))
.order_by("-created_date")[:5]
)
# get info about batch information
newest = (
BatchImportation.objects.filter(recurrentImport=OuterRef("pk"))
.order_by("-created_date")
.select_related("recurrentImport")
)
imported_events = RecurrentImport.objects.annotate(
last_run_status=Subquery(newest.values("status")[:1])
)
nb_failed = imported_events.filter(
last_run_status=BatchImportation.STATUS.FAILED
).count()
nb_canceled = imported_events.filter(
last_run_status=BatchImportation.STATUS.CANCELED
).count()
nb_running = imported_events.filter(
last_run_status=BatchImportation.STATUS.RUNNING
).count()
nb_all = imported_events.count()
# get some info about imported (or not) events
srcs = RecurrentImport.objects.all().values_list("source")
in_future = Event.objects.filter(Q(start_day__gte=today))
nb_in_rimport = in_future.filter(Q(import_sources__overlap=srcs)).count()
nb_in_orphan_import = in_future.filter(
(
Q(import_sources__isnull=False)
& (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
)
& ~Q(import_sources__overlap=srcs)
).count()
# get all non moderated events
nb_not_moderated = Event.get_nb_not_moderated(today, nb_mod_days, nb_classes)
return render(
request,
"agenda_culturel/administration.html",
{
"daily_modifications": daily_modifications,
"events": events,
"batch_imports": batch_imports,
"nb_failed": nb_failed,
"nb_canceled": nb_canceled,
"nb_running": nb_running,
"nb_all": nb_all,
"nb_not_moderated": nb_not_moderated,
"nb_in_rimport": nb_in_rimport,
"nb_in_orphan_import": nb_in_orphan_import,
},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_event")
def recent(request):
filter = EventFilterAdmin(
request.GET, queryset=Event.objects.all().order_by("-created_date")
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/recent.html",
{"filter": filter, "paginator_filter": response},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_message")
def view_messages(request):
filter = MessagesFilterAdmin(
request.GET, queryset=Message.objects.all().order_by("-date")
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
nb_spams = Message.objects.filter(spam=True).count()
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/messages.html",
{"filter": filter, "nb_spams": nb_spams, "paginator_filter": response},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_message")
def delete_cm_spam(request):
if request.method == "POST":
Message.objects.filter(spam=True).delete()
messages.success(request, _("Spam has been successfully deleted."))
return HttpResponseRedirect(reverse_lazy("messages"))
else:
nb_msgs = Message.objects.values("spam").annotate(total=Count("spam"))
nb_total = sum([nb["total"] for nb in nb_msgs])
nb_spams = sum([nb["total"] for nb in nb_msgs if nb["spam"]])
cancel_url = reverse_lazy("messages")
return render(
request,
"agenda_culturel/delete_spams_confirm.html",
{
"nb_total": nb_total,
"nb_spams": nb_spams,
"cancel_url": cancel_url,
},
)
def event_search(request, full=False):
categories = None
tags = None
places = None
organisations = None
rimports = None
qs = get_event_qs(request).order_by("-start_day")
if not request.user.is_authenticated:
qs = qs.filter(
(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
)
if full:
filter = SearchEventFilter(
request.GET,
queryset=qs,
request=request,
)
else:
filter = SimpleSearchEventFilter(
request.GET,
queryset=qs,
request=request,
)
if "q" in request.GET:
categories = Category.objects.filter(name__icontains=request.GET["q"])
s_q = remove_accents(request.GET["q"].lower())
tags = (
Event.objects.extra(
where=["%s ILIKE ANY (tags)"], params=[request.GET["q"]]
)
.annotate(arr_tags=Func(F("tags"), function="unnest"))
.values_list("arr_tags", flat=True)
.distinct()
)
tags = [
(
t,
emoji.demojize(remove_accents(t).lower(), delimiters=("000", "")),
)
for t in tags
]
tags = [t for t in tags if s_q == t[1]]
tags.sort(key=lambda x: x[1])
tags = [t[0] for t in tags]
places = Place.objects.filter(
Q(name__icontains=request.GET["q"])
| Q(description__icontains=request.GET["q"])
| Q(city__icontains=request.GET["q"])
)
organisations = Organisation.objects.filter(
Q(name__icontains=request.GET["q"])
| Q(description__icontains=request.GET["q"])
)
if request.user.is_authenticated:
rimports = RecurrentImport.objects.filter(
name__icontains=request.GET["q"]
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/search.html",
{
"filter": filter,
"categories": categories,
"tags": tags,
"places": places,
"organisations": organisations,
"rimports": rimports,
"has_results": len(request.GET) != 0
or (len(request.GET) > 1 and "page" in request.GET),
"paginator_filter": response,
"full": full,
},
)
def event_search_full(request):
return event_search(request, True)
#########################
## batch importations
#########################
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_batchimportation")
def imports(request):
rel_event = Event.objects.filter(
import_sources__contains=[OuterRef("url_source")]
).values("pk")[:1]
paginator = Paginator(
BatchImportation.objects.all()
.order_by("-created_date")
.annotate(event_id=Subquery(rel_event)),
30,
)
page = request.GET.get("page")
today = date.today()
srcs = RecurrentImport.objects.all().values_list("source")
in_future = Event.objects.filter(Q(start_day__gte=today))
nb_in_orphan_import = in_future.filter(
(
Q(import_sources__isnull=False)
& (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
)
& ~Q(import_sources__overlap=srcs)
).count()
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/imports.html",
{
"paginator_filter": response,
"nb_in_orphan_import": nb_in_orphan_import,
},
)
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.add_batchimportation",
"agenda_culturel.run_batchimportation",
]
)
def add_import(request):
form = BatchImportationForm()
if request.method == "POST":
form = BatchImportationForm(request.POST)
if form.is_valid():
import_events_from_json.delay(form.data["json"])
messages.success(request, _("The import has been run successfully."))
return HttpResponseRedirect(reverse_lazy("imports"))
return render(request, "agenda_culturel/batchimportation_form.html", {"form": form})
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_batchimportation",
"agenda_culturel.run_batchimportation",
]
)
def cancel_import(request, pk):
import_process = get_object_or_404(BatchImportation, pk=pk)
if request.method == "POST":
celery_app.control.revoke(import_process.celery_id)
import_process.status = BatchImportation.STATUS.CANCELED
import_process.save(update_fields=["status"])
messages.success(request, _("The import has been canceled."))
return HttpResponseRedirect(reverse_lazy("imports"))
else:
cancel_url = reverse_lazy("imports")
return render(
request,
"agenda_culturel/cancel_import_confirm.html",
{"object": import_process, "cancel_url": cancel_url},
)
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_batchimportation",
"agenda_culturel.run_batchimportation",
]
)
def update_orphan_events(request):
if request.method == "POST":
# run recurrent import
update_orphan_pure_import_events.delay()
messages.success(request, _("The orphan event update has been launched."))
return HttpResponseRedirect(reverse_lazy("imports"))
else:
today = date.today()
srcs = RecurrentImport.objects.all().values_list("source")
in_future = Event.objects.filter(Q(start_day__gte=today))
nb_in_orphan_import = in_future.filter(
(
Q(import_sources__isnull=False)
& (
Q(modified_date__isnull=True)
| Q(modified_date__lte=F("imported_date"))
)
)
& ~Q(import_sources__overlap=srcs)
).count()
return render(
request,
"agenda_culturel/run_orphan_imports_confirm.html",
{"nb_in_orphan_import": nb_in_orphan_import},
)
#########################
## recurrent importations
#########################
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_recurrentimport")
def recurrent_imports(request, status=None):
newest = BatchImportation.objects.filter(recurrentImport=OuterRef("pk")).order_by(
"-created_date"
)
qs = (
RecurrentImport.objects.all()
.annotate(last_run_status=Subquery(newest.values("status")[:1]))
.order_by("-pk")
)
nb_failed = qs.filter(last_run_status=BatchImportation.STATUS.FAILED).count()
nb_canceled = qs.filter(last_run_status=BatchImportation.STATUS.CANCELED).count()
nb_running = qs.filter(last_run_status=BatchImportation.STATUS.RUNNING).count()
nb_all = qs.count()
if status is not None:
qs = qs.filter(last_run_status=status)
filter = RecurrentImportFilter(request.GET, queryset=qs)
paginator = PaginatorFilter(filter, 20, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/rimports.html",
{
"paginator_filter": response,
"filter": filter,
"nb_all": nb_all,
"nb_failed": nb_failed,
"nb_canceled": nb_canceled,
"nb_running": nb_running,
"status": status,
},
)
class RecurrentImportCreateView(
LoginRequiredMixin, PermissionRequiredMixin, CreateView
):
model = RecurrentImport
permission_required = "agenda_culturel.add_recurrentimport"
success_url = reverse_lazy("recurrent_imports")
form_class = RecurrentImportForm
class RecurrentImportUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = RecurrentImport
permission_required = "agenda_culturel.change_recurrentimport"
form_class = RecurrentImportForm
success_message = _("The recurrent import has been successfully modified.")
class RecurrentImportDeleteView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
DeleteView,
):
model = RecurrentImport
permission_required = "agenda_culturel.delete_recurrentimport"
success_url = reverse_lazy("recurrent_imports")
success_message = _("The recurrent import has been successfully deleted.")
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_recurrentimport",
"agenda_culturel.view_batchimportation",
]
)
def view_rimport(request, pk):
obj = get_object_or_404(RecurrentImport, pk=pk)
paginator = Paginator(
BatchImportation.objects.filter(recurrentImport=pk).order_by("-created_date"),
10,
)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/page-rimport.html",
{"paginator_filter": response, "object": obj},
)
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_recurrentimport",
"agenda_culturel.run_recurrentimport",
]
)
def run_rimport(request, pk):
rimport = get_object_or_404(RecurrentImport, pk=pk)
if request.method == "POST":
# run recurrent import
run_recurrent_import.delay(pk)
messages.success(request, _("The import has been launched."))
return HttpResponseRedirect(reverse_lazy("view_rimport", args=[pk]))
else:
return render(
request,
"agenda_culturel/run_rimport_confirm.html",
{"object": rimport},
)
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_recurrentimport",
"agenda_culturel.run_recurrentimport",
]
)
def run_all_rimports(request, status=None):
if request.method == "POST":
# run recurrent import
if status == BatchImportation.STATUS.FAILED:
run_all_recurrent_imports_failed.delay()
elif status == BatchImportation.STATUS.CANCELED:
run_all_recurrent_imports_canceled.delay()
else:
run_all_recurrent_imports.delay()
messages.success(request, _("Imports has been launched."))
return HttpResponseRedirect(reverse_lazy("recurrent_imports"))
else:
if status == BatchImportation.STATUS.FAILED:
return render(request, "agenda_culturel/run_failed_rimports_confirm.html")
elif status == BatchImportation.STATUS.CANCELED:
return render(request, "agenda_culturel/run_canceled_rimports_confirm.html")
else:
return render(request, "agenda_culturel/run_all_rimports_confirm.html")
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_recurrentimport",
"agenda_culturel.run_recurrentimport",
]
)
def run_all_fb_rimports(request, status=None):
if request.method == "POST":
run_all_recurrent_imports.delay(True)
messages.success(request, _("Facebook imports has been launched."))
return HttpResponseRedirect(reverse_lazy("recurrent_imports"))
else:
return render(request, "agenda_culturel/run_all_fb_rimports_confirm.html")
#########################
## duplicated events
#########################
class DuplicatedEventsDetailView(LoginRequiredMixin, DetailView):
model = DuplicatedEvents
template_name = "agenda_culturel/duplicate.html"
@login_required(login_url="/accounts/login/")
@permission_required(
["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
)
def update_duplicate_event(request, pk, epk):
edup = get_object_or_404(DuplicatedEvents, pk=pk)
event = get_object_or_404(Event, pk=epk)
form = MergeDuplicates(duplicates=edup, event=event)
if request.method == "POST":
form = MergeDuplicates(request.POST, duplicates=edup)
if form.is_valid():
for f in edup.get_items_comparison():
if not f["similar"]:
selected = form.get_selected_events(f["key"])
if selected is not None:
if isinstance(selected, list):
values = [
x
for x in [getattr(s, f["key"]) for s in selected]
if x is not None
]
if len(values) != 0:
if isinstance(values[0], str):
setattr(event, f["key"], "\n".join(values))
else:
setattr(event, f["key"], sum(values, []))
else:
if f["key"] == "organisers":
event.organisers.set(selected.organisers.all())
else:
setattr(
event,
f["key"],
getattr(selected, f["key"]),
)
if f["key"] == "image":
setattr(
event,
"local_image",
getattr(selected, "local_image"),
)
event.other_versions.fix(event)
event.save()
messages.info(request, _("Update successfully completed."))
return HttpResponseRedirect(event.get_absolute_url())
return render(
request,
"agenda_culturel/update_duplicate.html",
context={
"form": form,
"object": edup,
"event_id": edup.get_event_index(event),
},
)
@login_required(login_url="/accounts/login/")
@permission_required(
["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
)
def merge_duplicate(request, pk):
edup = get_object_or_404(DuplicatedEvents, pk=pk)
form = MergeDuplicates(duplicates=edup)
if request.method == "POST":
form = MergeDuplicates(request.POST, duplicates=edup)
if form.is_valid():
events = edup.get_duplicated()
# build fields for the new event
new_event_data = {}
for f in edup.get_items_comparison():
if f["similar"]:
new_event_data[f["key"]] = getattr(events[0], f["key"])
else:
selected = form.get_selected_events(f["key"])
if selected is None:
new_event_data[f["key"]] = None
elif isinstance(selected, list):
values = [
x
for x in [getattr(s, f["key"]) for s in selected]
if x is not None
]
if len(values) == 0:
new_event_data[f["key"]] = None
else:
if isinstance(values[0], str):
new_event_data[f["key"]] = "\n".join(values)
else:
new_event_data[f["key"]] = sum(values, [])
else:
new_event_data[f["key"]] = getattr(selected, f["key"])
if f["key"] == "image" and "local_image" not in new_event_data:
new_event_data["local_image"] = getattr(
selected, "local_image"
)
organisers = new_event_data.pop("organisers", None)
# create a new event that merge the selected events
new_event = Event(**new_event_data)
new_event.status = Event.STATUS.PUBLISHED
new_event.other_versions = edup
new_event.save()
if organisers is not None:
new_event.organisers.set(organisers.all())
edup.fix(new_event)
messages.info(
request,
_("Creation of a merged event has been successfully completed."),
)
return HttpResponseRedirect(new_event.get_absolute_url())
return render(
request,
"agenda_culturel/merge_duplicate.html",
context={"form": form, "object": edup},
)
@login_required(login_url="/accounts/login/")
@permission_required(
["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
)
def fix_duplicate(request, pk):
edup = get_object_or_404(DuplicatedEvents.objects.select_related(), pk=pk)
if request.method == "POST":
form = FixDuplicates(request.POST, edup=edup)
if form.is_valid():
if form.is_action_no_duplicates():
# all events are different
events = edup.get_duplicated()
# get redirection date
if len(events) == 0:
date = None
else:
s_events = [e for e in events if not e.has_recurrences()]
if len(s_events) != 0:
s_event = s_events[0]
else:
s_event = events[0]
date = s_event.start_day
messages.success(request, _("Events have been marked as unduplicated."))
# delete the duplicated event (other_versions will be set to None on all events)
edup.delete()
if date is None:
return HttpResponseRedirect(reverse_lazy("home"))
else:
return HttpResponseRedirect(
reverse_lazy("day_view", args=[date.year, date.month, date.day])
)
elif form.is_action_select():
# one element has been selected to be the representative
selected = form.get_selected_event(edup)
if selected is None:
messages.error(
request,
_(
"The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
),
)
else:
edup.fix(selected)
messages.success(
request,
_("The selected event has been set as representative"),
)
return HttpResponseRedirect(edup.get_absolute_url())
elif form.is_action_remove():
# one element is removed from the set
event = form.get_selected_event(edup)
if event is None:
messages.error(
request,
_(
"The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
),
)
return HttpResponseRedirect(edup.get_absolute_url())
else:
event.other_versions = None
if edup.representative == event:
edup.representative = None
event.set_no_modification_date_changed()
event.save()
edup.save()
edup.events = [e for e in edup.events if e.pk != event.pk]
messages.success(
request,
_(
"The event has been withdrawn from the group and made independent."
),
)
if edup.nb_duplicated() == 1:
return HttpResponseRedirect(edup.get_absolute_url())
else:
form = FixDuplicates(edup=edup)
elif form.is_action_update():
# otherwise, an event will be updated using other elements
event = form.get_selected_event(edup)
if event is None:
messages.error(
request,
_(
"The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
),
)
return HttpResponseRedirect(edup.get_absolute_url())
else:
return HttpResponseRedirect(
reverse_lazy("update_event", args=[edup.pk, event.pk])
)
else:
# otherwise, a new event will be created using a merging process
return HttpResponseRedirect(
reverse_lazy("merge_duplicate", args=[edup.pk])
)
else:
form = FixDuplicates(edup=edup)
return render(
request,
"agenda_culturel/fix_duplicate.html",
context={"form": form, "object": edup},
)
class DuplicatedEventsUpdateView(LoginRequiredMixin, UpdateView):
model = DuplicatedEvents
fields = ()
template_name = "agenda_culturel/fix_duplicate.html"
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_duplicatedevents")
def duplicates(request):
nb_removed = DuplicatedEvents.remove_singletons()
if nb_removed > 0:
messages.success(
request,
_("Cleaning up duplicates: {} item(s) fixed.").format(nb_removed),
)
filter = DuplicatedEventsFilter(
request.GET, queryset=DuplicatedEvents.objects.all().order_by("-pk")
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/duplicates.html",
{
"filter": filter,
"paginator_filter": response,
"paginator": paginator,
},
)
def set_duplicate(request, year, month, day, pk):
event = get_object_or_404(Event, pk=pk)
cday = CalendarDay(date(year, month, day))
others = [
e
for e in cday.get_events()
if e != event
and (event.other_versions is None or event.other_versions != e.other_versions)
and e.status != Event.STATUS.TRASH
]
form = SelectEventInList(events=others)
if request.method == "POST":
form = SelectEventInList(request.POST, events=others)
if form.is_valid():
selected = [o for o in others if o.pk == int(form.cleaned_data["event"])]
event.set_other_versions(selected)
# save them without updating modified date
event.set_no_modification_date_changed()
event.save()
if request.user.is_authenticated:
messages.success(request, _("The event was successfully duplicated."))
return HttpResponseRedirect(
reverse_lazy("view_duplicate", args=[event.other_versions.pk])
)
else:
messages.info(
request,
_(
"The event has been successfully flagged as a duplicate. The moderation team will deal with your suggestion shortly."
),
)
return HttpResponseRedirect(event.get_absolute_url())
return render(
request,
"agenda_culturel/set_duplicate.html",
context={"form": form, "event": event},
)
#########################
## categorisation rules
#########################
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_categorisationrule")
def categorisation_rules(request):
paginator = Paginator(
CategorisationRule.objects.all()
.order_by("pk")
.select_related("category")
.select_related("place"),
100,
)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/categorisation_rules.html",
{"paginator_filter": response},
)
class CategorisationRuleCreateView(
LoginRequiredMixin, PermissionRequiredMixin, CreateView
):
model = CategorisationRule
permission_required = "agenda_culturel.add_categorisationrule"
success_url = reverse_lazy("categorisation_rules")
form_class = CategorisationRuleImportForm
class CategorisationRuleUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = CategorisationRule
permission_required = "agenda_culturel.change_categorisationrule"
form_class = CategorisationRuleImportForm
success_url = reverse_lazy("categorisation_rules")
success_message = _("The categorisation rule has been successfully modified.")
class CategorisationRuleDeleteView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
DeleteView,
):
model = CategorisationRule
permission_required = "agenda_culturel.delete_categorisationrule"
success_url = reverse_lazy("categorisation_rules")
success_message = _("The categorisation rule has been successfully deleted.")
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.apply_categorisationrules")
def apply_categorisation_rules(request):
if request.method == "POST":
form = CategorisationForm(request.POST)
if form.is_valid():
nb = 0
for epk, c in form.get_validated():
e = Event.objects.get(pk=epk)
cat = Category.objects.filter(name=c).first()
e.category = cat
e.save()
nb += 1
if nb != 0:
if nb == 1:
messages.success(
request,
_(
"The rules were successfully applied and 1 event was categorised."
),
)
else:
messages.success(
request,
_(
"The rules were successfully applied and {} events were categorised."
).format(nb),
)
else:
messages.info(
request,
_(
"The rules were successfully applied and no events were categorised."
),
)
return HttpResponseRedirect(reverse_lazy("categorisation_rules"))
else:
return render(
request,
"agenda_culturel/categorise_events_form.html",
context={"form": form},
)
else:
# first we check if events are not correctly categorised
to_categorise = []
events = (
Event.objects.filter(start_day__gte=datetime.now())
.exclude(category=Category.get_default_category_id())
.exclude(category=None)
.select_related("exact_location")
.select_related("category")
)
for e in events:
c = CategorisationRule.get_category_from_rules(e)
if c and c != e.category:
to_categorise.append((e, c))
# then we apply rules on events without category
nb = 0
to_save = []
events = (
Event.objects.filter(start_day__gte=datetime.now())
.filter(Q(category=Category.get_default_category_id()) | Q(category=None))
.select_related("exact_location")
.select_related("category")
)
for e in events:
success = CategorisationRule.apply_rules(e)
if success:
nb += 1
to_save.append(e)
if nb != 0:
Event.objects.bulk_update(to_save, fields=["category"])
# set messages
if nb != 0:
if nb == 1:
messages.success(
request,
_(
"The rules were successfully applied and 1 event with default category was categorised."
),
)
else:
messages.success(
request,
_(
"The rules were successfully applied and {} events with default category were categorised."
).format(nb),
)
else:
messages.info(
request,
_(
"The rules were successfully applied and no events were categorised."
),
)
if len(to_categorise) != 0:
form = CategorisationForm(events=to_categorise)
return render(
request,
"agenda_culturel/categorise_events_form.html",
context={
"form": form,
"events": dict((e.pk, e) for e, c in to_categorise),
"categories": dict((e.pk, c) for e, c in to_categorise),
},
)
else:
return HttpResponseRedirect(reverse_lazy("categorisation_rules"))
#########################
## Places
#########################
class PlaceListView(ListView):
model = Place
ordering = ["name__unaccent"]
class PlaceListAdminView(PermissionRequiredMixin, ListView):
model = Place
paginate_by = 10
permission_required = "agenda_culturel.add_place"
ordering = ["name__unaccent"]
template_name = "agenda_culturel/place_list_admin.html"
class PlaceDetailView(ListView):
model = Place
template_name = "agenda_culturel/place_detail.html"
paginate_by = 10
def get_queryset(self):
self.place = get_object_or_404(Place, pk=self.kwargs["pk"])
return (
get_event_qs(self.request)
.filter(exact_location=self.place)
.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
.filter(start_day__gte=datetime.now())
.order_by("start_day")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["object"] = self.place
return context
class PlaceDetailViewPast(PlaceDetailView):
def get_queryset(self):
self.place = get_object_or_404(Place, pk=self.kwargs["pk"])
self.past = True
return (
get_event_qs(self.request)
.filter(exact_location=self.place)
.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
.filter(start_day__lte=datetime.now())
.order_by("-start_day")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["past"] = self.past
return context
class UpdatePlaces:
def form_valid(self, form):
result = super().form_valid(form)
p = form.instance
if not hasattr(self, "nb_applied"):
self.nb_applied = 0
# if required, find all matching events
if form.apply():
self.nb_applied += p.associate_matching_events()
if self.nb_applied > 1:
messages.success(
self.request,
_("{} events have been updated.").format(self.nb_applied),
)
elif self.nb_applied == 1:
messages.success(self.request, _("1 event has been updated."))
else:
messages.info(self.request, _("No events have been modified."))
return result
class PlaceUpdateView(
UpdatePlaces, PermissionRequiredMixin, SuccessMessageMixin, UpdateView
):
model = Place
permission_required = "agenda_culturel.change_place"
success_message = _("The place has been successfully updated.")
form_class = PlaceForm
class PlaceCreateView(
UpdatePlaces, PermissionRequiredMixin, SuccessMessageMixin, CreateView
):
model = Place
permission_required = "agenda_culturel.add_place"
success_message = _("The place has been successfully created.")
form_class = PlaceForm
class PlaceDeleteView(PermissionRequiredMixin, DeleteView):
model = Place
permission_required = "agenda_culturel.delete_place"
success_url = reverse_lazy("view_places_admin")
class UnknownPlacesListView(PermissionRequiredMixin, ListView):
model = Event
permission_required = "agenda_culturel.add_place"
paginate_by = 10
ordering = ["-pk"]
template_name = "agenda_culturel/place_unknown_list.html"
queryset = Event.get_qs_events_with_unkwnon_place()
def fix_unknown_places(request):
# get all places
places = Place.objects.all()
# get all events without exact location
u_events = Event.get_qs_events_with_unkwnon_place()
to_be_updated = []
# try to find matches
for ue in u_events:
for p in places:
if p.match(ue):
ue.exact_location = p
to_be_updated.append(ue)
continue
# update events with a location
Event.objects.bulk_update(to_be_updated, fields=["exact_location"])
# create a success message
nb = len(to_be_updated)
if nb > 1:
messages.success(request, _("{} events have been updated.").format(nb))
elif nb == 1:
messages.success(request, _("1 event has been updated."))
else:
messages.info(request, _("No events have been modified."))
# come back to the list of places
return HttpResponseRedirect(reverse_lazy("view_unknown_places"))
class UnknownPlaceAddView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
model = Event
permission_required = (
"agenda_culturel.change_place",
"agenda_culturel.change_event",
)
form_class = EventAddPlaceForm
template_name = "agenda_culturel/place_unknown_form.html"
def form_valid(self, form):
self.modified_event = form.cleaned_data.get("place")
self.add_alias = form.cleaned_data.get("add_alias")
result = super().form_valid(form)
if form.cleaned_data.get("place"):
messages.success(
self.request,
_("The selected place has been assigned to the event."),
)
if form.cleaned_data.get("add_alias"):
messages.success(
self.request,
_("A new alias has been added to the selected place."),
)
nb_applied = form.cleaned_data.get("place").associate_matching_events()
if nb_applied > 1:
messages.success(
self.request,
_("{} events have been updated.").format(nb_applied),
)
elif nb_applied == 1:
messages.success(self.request, _("1 event has been updated."))
else:
messages.info(self.request, _("No events have been modified."))
return result
def get_success_url(self):
if self.modified_event:
return reverse_lazy("view_unknown_places")
else:
param = "?add=1" if self.add_alias else ""
return reverse_lazy("add_place_from_event", args=[self.object.pk]) + param
class PlaceFromEventCreateView(PlaceCreateView):
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["event"] = self.event
return context
def get_initial(self, *args, **kwargs):
initial = super().get_initial(**kwargs)
self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
if self.event.location and "add" in self.request.GET:
initial["aliases"] = [self.event.location]
guesser = PlaceGuesser()
name, address, postcode, city = guesser.guess_address_elements(
self.event.location
)
initial["name"] = name
initial["address"] = address
initial["postcode"] = postcode
initial["city"] = city
initial["location"] = ""
return initial
def form_valid(self, form):
result = super().form_valid(form)
self.event.exact_location = form.instance
self.event.save(update_fields=["exact_location"])
return result
def get_success_url(self):
return self.event.get_absolute_url()
#########################
## Organisations
#########################
class OrganisationListView(ListView):
model = Organisation
paginate_by = 10
ordering = ["name__unaccent"]
class OrganisationDetailView(ListView):
model = Organisation
template_name = "agenda_culturel/organisation_detail.html"
paginate_by = 10
def get_queryset(self):
self.organisation = (
Organisation.objects.filter(pk=self.kwargs["pk"])
.prefetch_related("organised_events")
.first()
)
return (
get_event_qs(self.request)
.filter(organisers__in=[self.kwargs["pk"]])
.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
.filter(start_day__gte=datetime.now())
.order_by("start_day")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["object"] = self.organisation
return context
class OrganisationDetailViewPast(OrganisationDetailView):
def get_queryset(self):
self.organisation = (
Organisation.objects.filter(pk=self.kwargs["pk"])
.prefetch_related("organised_events")
.first()
)
self.past = True
return (
get_event_qs(self.request)
.filter(organisers__in=[self.kwargs["pk"]])
.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
.filter(start_day__lte=datetime.now())
.order_by("-start_day")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["past"] = self.past
return context
class OrganisationUpdateView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
model = Organisation
permission_required = "agenda_culturel.change_organisation"
success_message = _("The organisation has been successfully updated.")
fields = "__all__"
class OrganisationCreateView(PermissionRequiredMixin, SuccessMessageMixin, CreateView):
model = Organisation
permission_required = "agenda_culturel.add_organisation"
success_message = _("The organisation has been successfully created.")
fields = "__all__"
class OrganisationDeleteView(PermissionRequiredMixin, DeleteView):
model = Organisation
permission_required = "agenda_culturel.delete_organisation"
success_url = reverse_lazy("view_organisations")
#########################
## Tags
#########################
class TagUpdateView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
model = Tag
permission_required = "agenda_culturel.change_tag"
form_class = TagForm
success_message = _("The tag has been successfully updated.")
class TagCreateView(PermissionRequiredMixin, SuccessMessageMixin, CreateView):
model = Tag
permission_required = "agenda_culturel.add_tag"
form_class = TagForm
success_message = _("The tag has been successfully created.")
def get_initial(self, *args, **kwargs):
initial = super().get_initial(**kwargs)
if "name" in self.request.GET:
initial["name"] = self.request.GET.get("name")
return initial
def form_valid(self, form):
Tag.clear_cache()
return super().form_valid(form)
class TagDeleteView(PermissionRequiredMixin, DeleteView):
model = Tag
permission_required = "agenda_culturel.delete_tag"
success_url = reverse_lazy("view_all_tags")
def view_tag_past(request, t):
return view_tag(request, t, True)
def view_tag(request, t, past=False):
now = date.today()
qs = get_event_qs(request).filter(tags__contains=[t])
if past:
qs = qs.filter(start_day__lt=now).order_by("-start_day", "-start_time")
else:
qs = qs.filter(start_day__gte=now).order_by("start_day", "start_time")
qs = qs.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
paginator = Paginator(qs, 10)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
tag = Tag.objects.filter(name=t).first()
context = {
"tag": t,
"paginator_filter": response,
"object": tag,
"rimports": rimports,
"past": past,
}
return render(request, "agenda_culturel/tag.html", context)
def statistics(request, pk=None):
if pk is not None:
rimport = RecurrentImport.objects.filter(pk=pk)
source = rimport.values("source").first()["source"]
qs = Event.objects.filter(import_sources__contains=[source])
else:
rimport = None
qs = Event.objects
stats = {}
stats_months = {}
first = {}
last = {}
ev_published = qs.filter(
Q(status=Event.STATUS.PUBLISHED)
& (
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
)
for v in ["start_day", "created_date__date"]:
after = 24
last[v] = (
date.today()
if v == "created_date__date"
else date.today() + timedelta(weeks=after)
)
last[v] = last[v].replace(
day=_calendar.monthrange(last[v].year, last[v].month)[1]
)
r = 8 * 30
if v == "start_day":
r += after * 7
first[v] = (last[v] - timedelta(days=r)).replace(day=1)
ev_days = ev_published.annotate(day=F(v)).filter(
Q(day__lte=last[v]) & Q(day__gte=first[v])
)
stats[v] = ev_days.values("day").annotate(total=Count("day")).order_by("day")
stats_months[v] = (
ev_days.annotate(month=TruncMonth("day"))
.values("month")
.annotate(total=Count("month"))
.order_by("month")
)
nb_by_city = (
ev_published.annotate(city=F("exact_location__city"))
.filter(city__isnull=False)
.values("city")
.annotate(total=Count("city"))
.order_by("-total")
)
limit = datetime.now() + timedelta(days=-30)
stat_qs = qs.filter(start_day__gte=F("created_date")).annotate(
foresight=ExtractDay(F("start_day") - F("created_date"))
)
statsa = stat_qs.filter().aggregate(
minimum=Min("foresight"),
maximum=Max("foresight"),
mean=Avg("foresight"),
median=Median("foresight"),
stdev=StdDev("foresight"),
)
statsm = stat_qs.filter(created_date__gte=limit).aggregate(
minimum=Min("foresight"),
maximum=Max("foresight"),
mean=Avg("foresight"),
median=Median("foresight"),
stdev=StdDev("foresight"),
)
stats_foresight = [
[
_(x),
round(statsa[x], 2) if statsa[x] is not None else "-",
round(statsm[x], 2) if statsm[x] is not None else "-",
]
for x in statsa
]
context = {
"stats_by_startday": stats["start_day"],
"stats_by_creation": stats["created_date__date"],
"stats_months_by_startday": stats_months["start_day"],
"stats_months_by_creation": stats_months["created_date__date"],
"first_by_startday": first["start_day"],
"last_by_startday": last["start_day"],
"first_by_creation": first["created_date__date"],
"last_by_creation": last["created_date__date"],
"nb_by_city": nb_by_city,
"stats_foresight": stats_foresight,
"object": rimport.first() if rimport else None,
}
if pk is None:
return render(request, "agenda_culturel/statistics.html", context)
else:
return render(request, "agenda_culturel/rimport-statistics.html", context)
def tag_list(request):
tags = Event.get_all_tags()
r_tags = [t["tag"] for t in tags]
objects = Tag.objects.order_by("name").all()
d_objects = dict()
for o in objects:
d_objects[o.name] = o
tags = [
t | {"obj": d_objects[t["tag"]]} if t["tag"] in d_objects else t for t in tags
]
tags += [
{"obj": o, "tag": o.name, "count": 0} for o in objects if o.name not in r_tags
]
context = {
"tags": sorted(
tags,
key=lambda x: emoji.demojize(
remove_accents(x["tag"]).lower(), delimiters=("000", "")
),
)
}
return render(request, "agenda_culturel/tags.html", context)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_tag")
def rename_tag(request, t):
form = TagRenameForm(name=t)
if request.method == "POST":
form = TagRenameForm(request.POST, name=t)
if form.is_valid():
save = True
if form.cleaned_data["name"] == t:
messages.warning(
request,
_("You have not modified the tag name."),
)
save = False
elif not form.is_force():
if (
Event.objects.filter(
tags__contains=[form.cleaned_data["name"]]
).count()
> 0
):
if Tag.objects.filter(name=form.cleaned_data["name"]):
messages.warning(
request,
(
_(
"This tag {} is already in use, and is described by different information from the current tag. You can force renaming by checking the corresponding option. The information associated with tag {} will be deleted, and all events associated with tag {} will be associated with tag {}."
)
).format(
form.cleaned_data["name"],
t,
t,
form.cleaned_data["name"],
),
)
else:
messages.warning(
request,
(
_(
"This tag {} is already in use. You can force renaming by checking the corresponding option."
)
).format(form.cleaned_data["name"]),
)
save = False
form = TagRenameForm(request.POST, name=t, force=True)
if save:
# find all matching events and update them
events = Event.objects.filter(tags__contains=[t])
new_name = form.cleaned_data["name"]
for e in events:
e.tags = [te for te in e.tags if te != t]
if new_name not in e.tags:
e.tags += [new_name]
Event.objects.bulk_update(events, fields=["tags"])
# find all recurrent imports and fix them
rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
for ri in rimports:
ri.tags = [te for te in ri.defaultTags if te != t]
if new_name not in ri.tags:
ri.tags += [new_name]
RecurrentImport.objects.bulk_update(rimports, fields=["defaultTags"])
# find tag object
tag_object = Tag.objects.filter(name=t).first()
if tag_object:
tag_object.name = new_name
tag_object.save()
messages.success(
request,
(_("The tag {} has been successfully renamed to {}.")).format(
t, form.cleaned_data["name"]
),
)
return HttpResponseRedirect(
reverse_lazy("view_tag", kwargs={"t": form.cleaned_data["name"]})
)
nb = Event.objects.filter(tags__contains=[t]).count()
return render(
request,
"agenda_culturel/tag_rename_form.html",
context={"form": form, "tag": t, "nb": nb},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.delete_tag")
def delete_tag(request, t):
respage = reverse_lazy("view_all_tags")
if request.method == "POST":
# remove tag from events
events = Event.objects.filter(tags__contains=[t])
for e in events:
e.tags = [te for te in e.tags if te != t]
Event.objects.bulk_update(events, fields=["tags"])
# remove tag from recurrent imports
rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
for ri in rimports:
ri.tags = [te for te in ri.defaultTags if te != t]
RecurrentImport.objects.bulk_update(rimports, fields=["defaultTags"])
# find tag object
tag_object = Tag.objects.filter(name=t).first()
if tag_object:
tag_object.delete()
messages.success(
request,
(_("The tag {} has been successfully deleted.")).format(t),
)
return HttpResponseRedirect(respage)
else:
nb = Event.objects.filter(tags__contains=[t]).count()
obj = Tag.objects.filter(name=t).first()
nbi = RecurrentImport.objects.filter(defaultTags__contains=[t]).count()
cancel_url = request.META.get("HTTP_REFERER", "")
if cancel_url == "":
cancel_url = respage
return render(
request,
"agenda_culturel/tag_confirm_delete_by_name.html",
{
"tag": t,
"nb": nb,
"nbi": nbi,
"cancel_url": cancel_url,
"obj": obj,
},
)
def clear_cache(request):
if request.method == "POST":
cache.clear()
messages.success(request, _("Cache successfully cleared."))
return HttpResponseRedirect(reverse_lazy("administration"))
else:
return render(
request,
"agenda_culturel/clear_cache.html",
)
class UserProfileUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
UpdateView,
):
model = UserProfile
success_message = _("Your user profile has been successfully modified.")
success_url = reverse_lazy("administration")
form_class = UserProfileForm
def get_object(self):
return self.request.user.userprofile