2025-04-14 17:50:59 +02:00

1133 lines
35 KiB
Python

import hashlib
import logging
from datetime import date, timedelta
import emoji
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import (
LoginRequiredMixin,
PermissionRequiredMixin,
UserPassesTestMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.core.cache import cache
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.db.models import Aggregate, FloatField
from django.db.models import F, OuterRef, Q, Subquery
from django.http import (
HttpResponse,
HttpResponseForbidden,
HttpResponseRedirect,
)
from django.shortcuts import get_object_or_404, render
from django.urls import reverse, reverse_lazy
from django.utils.decorators import method_decorator
from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.timezone import datetime
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView
from django.views.generic.edit import (
CreateView,
DeleteView,
ModelFormMixin,
UpdateView,
)
from honeypot.decorators import check_honeypot
from .utils import get_event_qs
from ..calendar import CalendarList, CalendarMonth, CalendarWeek
from ..celery import (
import_events_from_url,
import_events_from_urls,
)
from ..filters import (
EventFilter,
EventFilterAdmin,
)
from ..forms import (
EventForm,
EventFormWithContact,
MessageEventForm,
MessageForm,
SimpleContactForm,
URLSubmissionFormSet,
URLSubmissionFormWithContact,
UserProfileForm,
)
from ..import_tasks.extractor import Extractor
from ..models import (
BatchImportation,
Category,
DuplicatedEvents,
Event,
Message,
Organisation,
Place,
RecurrentImport,
StaticContent,
UserProfile,
)
logger = logging.getLogger(__name__)
class Median(Aggregate):
function = "PERCENTILE_CONT"
name = "median"
output_field = FloatField()
template = "%(function)s(0.5) WITHIN GROUP (ORDER BY %(expressions)s)"
class PaginatorFilter(Paginator):
def __init__(self, filter, nb, request):
self.request = request
self.filter = filter
super().__init__(filter.qs, nb)
self.url_first_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", 1
)
self.url_last_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", self.num_pages
)
def update_param(params, key, value):
p = params.split("?")
root = p[0]
if len(p) > 1:
other = p[1]
others = other.split("&")
others = [o for o in others if not o.startswith(key)]
others += [key + "=" + str(value)]
return root + "?" + "&".join(others)
else:
return root + "?" + key + "=" + str(value)
def page(self, *args, **kwargs):
page = super().page(*args, **kwargs)
try:
page.url_previous_page = PaginatorFilter.update_param(
self.request.get_full_path(),
"page",
page.previous_page_number(),
)
except EmptyPage:
page.url_previous_page = self.request.get_full_path()
try:
page.url_next_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", page.next_page_number()
)
except EmptyPage:
page.url_next_page = self.request.get_full_path()
return page
#
#
# Useful for translation
to_be_translated = [
_("Recurrent import name"),
_("Add another"),
_("Browse..."),
_("No file selected."),
]
def month_view(request, year=None, month=None, cat=None):
now = date.today()
if year is None and month is None:
day = now.day
else:
day = None
if year is None:
year = now.year
if month is None:
month = now.month
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).only(
"title",
"start_day",
"start_time",
"category",
"other_versions",
"recurrences",
"end_day",
"end_time",
"uuids",
"status",
"tags",
)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cmonth = CalendarMonth(year, month, filter, day=day)
context = {
"calendar": cmonth,
"this_month": day is not None,
"filter": filter,
"category": category,
"init_date": now if cmonth.today_in_calendar() else cmonth.firstdate,
}
return render(request, "agenda_culturel/page-month.html", context)
def week_view(request, year=None, week=None, home=False, cat=None):
now = date.today()
if year is None:
year = now.isocalendar()[0]
if week is None:
week = now.isocalendar()[1]
request = EventFilter.set_default_values(request)
qs = (
get_event_qs(request)
.select_related("exact_location")
.only(
"title",
"start_day",
"start_time",
"category",
"other_versions",
"recurrences",
"end_day",
"end_time",
"uuids",
"status",
"tags",
"local_image",
"image",
"image_alt",
"exact_location",
"description",
)
)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cweek = CalendarWeek(year, week, filter)
context = {
"year": year,
"week": week,
"calendar": cweek,
"filter": filter,
"category": category,
"init_date": now if cweek.today_in_calendar() else cweek.firstdate,
}
if home:
context["home"] = 1
return render(request, "agenda_culturel/page-week.html", context)
def day_view(request, year=None, month=None, day=None, cat=None):
if year is None or month is None or day is None:
if "when" in request.POST:
when = datetime.strptime(request.POST["when"], "%Y-%m-%d")
year = when.year
month = when.month
day = when.day
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).select_related("exact_location")
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
return HttpResponseRedirect(
reverse_lazy("day_view", args=[year, month, day])
+ "?"
+ filter.get_url()
)
return upcoming_events(request, year, month, day, 0, cat)
def upcoming_events(request, year=None, month=None, day=None, neighsize=1, cat=None):
now = date.today()
if year is None:
year = now.year
if month is None:
month = now.month
if day is None:
day = now.day
day = date(year, month, day)
day = day + timedelta(days=neighsize)
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).select_related("exact_location")
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cal = CalendarList(
day + timedelta(days=-neighsize),
day + timedelta(days=neighsize),
filter,
True,
)
context = {
"calendar": cal,
"now": now,
"day": day,
"init_date": now if cal.today_in_calendar() else day,
"filter": filter,
"date_pred": day + timedelta(days=-neighsize - 1),
"date_next": day + timedelta(days=neighsize + 1),
"category": category,
}
return render(request, "agenda_culturel/page-upcoming.html", context)
class StaticContentCreateView(LoginRequiredMixin, CreateView):
model = StaticContent
fields = ["text"]
permission_required = "agenda_culturel.add_staticcontent"
def form_valid(self, form):
form.instance.name = self.request.GET["name"]
form.instance.url_path = self.request.GET["url_path"]
return super().form_valid(form)
class StaticContentUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = StaticContent
permission_required = "agenda_culturel.change_staticcontent"
fields = ["text"]
success_message = _("The static content has been successfully updated.")
def update_from_source(request, pk):
event = get_object_or_404(Event, pk=pk)
url = event.get_updateable_uuid()
if url is None:
messages.warning(
request,
_(
"The event cannot be updated because the import process is not available for the referenced sources."
),
)
else:
import_events_from_url.delay(
url,
None,
None,
True,
user_id=request.user.pk if request.user else None,
)
messages.success(
request,
_("The event update has been queued and will be completed shortly."),
)
return HttpResponseRedirect(event.get_absolute_url())
class EventUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = Event
permission_required = "agenda_culturel.change_event"
form_class = EventForm
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["is_authenticated"] = self.request.user.is_authenticated
kwargs["is_moderation_expert"] = (
self.request.user.userprofile.is_moderation_expert
)
kwargs["is_cloning"] = self.is_cloning
kwargs["is_edit_from_moderation"] = self.is_edit_force()
kwargs["is_simple_cloning"] = self.is_simple_cloning
return kwargs
def get_success_message(self, cleaned_data):
txt = (
_(" A message has been sent to the person who proposed the event.")
if hasattr(self, "with_msg") and self.with_msg
else ""
)
return mark_safe(_("The event has been successfully modified.") + txt)
def is_edit_force(self):
return "edit-force" in self.request.path.split("/")
def get_object(self, queryset=None):
event = super().get_object(queryset)
if event.status == Event.STATUS.DRAFT:
event.status = Event.STATUS.PUBLISHED
if self.is_edit_force():
event.free_modification_lock(self.request.user)
return event
def form_valid(self, form):
form.instance.set_processing_user(self.request.user)
self.with_message = form.instance.notify_if_required(self.request)
return super().form_valid(form)
def get_initial(self):
self.is_cloning = "clone" in self.request.path.split("/")
if self.is_cloning:
messages.info(
self.request,
_(
"Changes will be visible on a local copy of the event. The version identical to the imported source will be hidden."
),
)
self.is_simple_cloning = "simple-clone" in self.request.path.split("/")
result = super().get_initial()
if self.is_cloning and "other_versions" not in result:
obj = self.get_object()
# if no DuplicatedEvents is associated, create one
obj.other_versions = DuplicatedEvents.objects.create()
obj.other_versions.save()
# save them without updating modified date
obj.set_no_modification_date_changed()
obj.save()
result["other_versions"] = obj.other_versions
result["status"] = Event.STATUS.PUBLISHED
result["cloning"] = True
if self.is_simple_cloning:
result["other_versions"] = None
result["simple_cloning"] = True
if self.is_cloning or self.is_simple_cloning:
obj = self.get_object()
if obj.local_image:
result["old_local_image"] = obj.local_image.name
return result
def get_success_url(self):
if "save_and_next" in self.request.POST:
return reverse_lazy("moderate_event_next", args=[self.object.pk])
else:
return self.object.get_absolute_url()
class EventDeleteView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
DeleteView,
):
model = Event
permission_required = "agenda_culturel.delete_event"
success_url = reverse_lazy("recent")
success_message = _("The event has been successfully deleted.")
class EventDetailView(UserPassesTestMixin, DetailView, ModelFormMixin):
model = Event
form_class = MessageEventForm
template_name = "agenda_culturel/page-event.html"
queryset = (
Event.objects.select_related("exact_location")
.select_related("category")
.select_related("other_versions")
.select_related("other_versions__representative")
.prefetch_related("message_set")
)
def test_func(self):
return (
self.request.user.is_authenticated
or self.get_object().status == Event.STATUS.PUBLISHED
)
def get_object(self):
o = super().get_object()
o.download_missing_image()
if "year" in self.kwargs:
y = self.kwargs["year"]
m = self.kwargs["month"]
d = self.kwargs["day"]
obj = o.get_recurrence_at_date(y, m, d)
obj.set_current_date(date(y, m, d))
return obj
else:
return o
def get_success_url(self):
return self.get_object().get_absolute_url() + "#chronology"
def post(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return HttpResponseForbidden()
form = self.get_form()
if form.is_valid():
return self.form_valid(form)
else:
return self.form_invalid(form)
def form_valid(self, form):
message = form.save(commit=False)
message.user = self.request.user
message.related_event = self.get_object()
message.subject = _("Comment")
message.spam = False
message.closed = True
message.save()
return super().form_valid(form)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_event")
def change_status_event(request, pk, status):
event = get_object_or_404(Event, pk=pk)
if request.method == "POST":
event.status = Event.STATUS(status)
fields = ["status", "moderated_date", "moderated_by_user"]
event.set_in_moderation_process()
event.update_modification_dates()
event.save(update_fields=fields)
with_msg = event.notify_if_required(request)
if with_msg:
messages.success(
request,
_(
"The status has been successfully modified and a message has been sent to the person who proposed the event."
),
)
else:
messages.success(request, _("The status has been successfully modified."))
return HttpResponseRedirect(event.get_absolute_url())
else:
cancel_url = request.META.get("HTTP_REFERER", "")
if cancel_url == "":
cancel_url = reverse_lazy("home")
return render(
request,
"agenda_culturel/event_confirm_change_status.html",
{"status": status, "event": event, "cancel_url": cancel_url},
)
def import_event_proxy(request):
return render(request, "agenda_culturel/event_import.html")
class EventCreateView(SuccessMessageMixin, CreateView):
model = Event
form_class = EventFormWithContact
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["is_authenticated"] = self.request.user.is_authenticated
return kwargs
def get_success_url(self):
if self.request.user.is_authenticated:
if "save_and_next" in self.request.POST:
return reverse_lazy("moderate_event_next", args=[self.object.pk])
else:
return self.object.get_absolute_url()
else:
return reverse_lazy("home")
def get_success_message(self, cleaned_data):
if self.request.user.is_authenticated:
return mark_safe(
_('The event was created: <a href="{}">{}</a>.').format(
self.object.get_absolute_url(), self.object.title
)
)
else:
return _(
"The event has been submitted and will be published as soon as it has been validated by the moderation team."
)
def form_valid(self, form):
if form.cleaned_data["simple_cloning"]:
form.instance.set_skip_duplicate_check()
if form.cleaned_data["cloning"]:
form.instance.set_in_moderation_process()
if form.cleaned_data.get("email") or form.cleaned_data.get("comments"):
has_comments = form.cleaned_data.get("comments") not in ["", None]
form.instance.add_message(
Message(
subject=_("during the creation process"),
message=form.cleaned_data.get("comments"),
email=form.cleaned_data.get("email"),
closed=False,
message_type=(
Message.TYPE.FROM_CONTRIBUTOR
if has_comments
else Message.TYPE.FROM_CONTRIBUTOR_NO_MSG
),
)
)
form.instance.import_sources = None
form.instance.set_processing_user(self.request.user)
result = super().form_valid(form)
if form.cleaned_data["cloning"]:
with_msg = form.instance.notify_if_required(self.request)
if with_msg:
messages.success(
self.request,
_(
"A message has been sent to the person who proposed the initial event."
),
)
return result
# A class to evaluate the URL according to the existing events and the authentification
# level of the user
class URLEventEvaluation:
def __init__(self, form, is_authenticated):
self.form = form
self.is_authenticated = is_authenticated
self.cat = None
self.tags = []
self.existing = None
self.url = form.cleaned_data.get("url")
self.event = None
if self.url is not None:
self.url = Extractor.clean_url(self.url)
# we check if the url is known
self.existing = Event.objects.filter(uuids__contains=[self.url])
# if it's unknown
if len(self.existing) == 0:
self.existing = None
self.cat = form.cleaned_data.get("category")
if self.cat is not None:
self.cat = self.cat.name
self.tags = form.cleaned_data.get("tags")
else:
published = [
e for e in self.existing if e.status == Event.STATUS.PUBLISHED
]
if self.is_authenticated or len(published) > 1:
self.event = (
published[0] if len(published) > 1 else self.existing[0]
)
else:
self.event = None
def exists(self):
return self.url is not None
def is_new(self):
return self.exists() and self.existing is None
def is_event_visible(self):
return self.event is not None
def get_event(self):
if self.event is None:
return None
else:
return self.event
def get_link(self):
e = self.get_event()
if e is None:
return ""
else:
return '<a href="' + e.get_absolute_url() + '">' + escape(e.title) + "</a>"
def to_list(self):
if self.is_new():
return (self.url, self.cat, self.tags)
def import_from_urls(request):
if request.method == "POST":
formset = URLSubmissionFormSet(request.POST, request.FILES)
if not request.user.is_authenticated:
contactform = SimpleContactForm(request.POST)
if formset.is_valid() and (
request.user.is_authenticated or contactform.is_valid()
):
# evaluate all the forms
ucat = [
URLEventEvaluation(form, request.user.is_authenticated)
for form in formset.forms
]
# for each not new, add a message
for uc in ucat:
if uc.exists() and not uc.is_new():
if uc.is_event_visible():
messages.info(
request,
mark_safe(
_(
"{} has not been submitted since it"
"s already known: {}."
).format(uc.url, uc.get_link())
),
)
else:
messages.info(
request,
_(
"{} has not been submitted since it"
"s already known and currently into moderation process."
).format(uc.url),
)
# keep only new ones
ucat = [uc.to_list() for uc in ucat if uc.is_new()]
# finally process them or go back to home page
if len(ucat) > 0:
messages.info(
request,
_("Integrating {} url(s) into our import process.").format(
len(ucat)
),
)
email = None
comments = None
if not request.user.is_authenticated:
email = contactform.cleaned_data["email"]
comments = contactform.cleaned_data["comments"]
import_events_from_urls.delay(
ucat,
user_id=request.user.pk if request.user else None,
email=email,
comments=comments,
)
return HttpResponseRedirect(reverse("thank_you"))
else:
return HttpResponseRedirect(reverse("home"))
else:
formset = URLSubmissionFormSet()
if not request.user.is_authenticated:
contactform = SimpleContactForm()
context = {"formset": formset}
if not request.user.is_authenticated:
context["contactform"] = contactform
return render(request, "agenda_culturel/import_set.html", context=context)
def import_from_url(request):
form = URLSubmissionFormWithContact(is_authenticated=request.user.is_authenticated)
initial = {
"start_day": date.today() + timedelta(days=1),
"start_time": "20:00",
"end_time": "22:00",
}
form_event = EventForm(initial=initial)
# if the form has been sent
if request.method == "POST":
form = URLSubmissionFormWithContact(
request.POST, is_authenticated=request.user.is_authenticated
)
# if the form is valid
if form.is_valid():
uc = URLEventEvaluation(form, request.user.is_authenticated)
if uc.exists() and not uc.is_new():
if uc.is_event_visible():
messages.info(
request,
mark_safe(
_(
"{} has not been submitted since its already known: {}."
).format(uc.url, uc.get_link())
),
)
return HttpResponseRedirect(uc.get_event().get_absolute_url())
else:
messages.info(
request,
_(
"{} has not been submitted since it"
"s already known and currently into moderation process."
).format(uc.url),
)
return HttpResponseRedirect(reverse("home"))
else:
messages.info(
request,
_("Integrating {} into our import process.").format(uc.url),
)
import_events_from_url.delay(
uc.url,
uc.cat,
uc.tags,
user_id=request.user.pk if request.user else None,
email=form.cleaned_data.get("email"),
comments=form.cleaned_data.get("comments"),
)
return HttpResponseRedirect(reverse("thank_you"))
return render(
request,
"agenda_culturel/import.html",
context={"form": form, "form_event": form_event},
)
def export_event_ical(request, year, month, day, pk):
event = get_object_or_404(Event, pk=pk)
event = event.get_recurrence_at_date(year, month, day)
events = list()
events.append(event)
cal = Event.export_to_ics(events, request)
response = HttpResponse(content_type="text/calendar")
response.content = cal.to_ical().decode("utf-8").replace("\r\n", "\n")
response["Content-Disposition"] = "attachment; filename={0}{1}".format(
event.title.replace("\n", " ").replace("\r", "")[0:32], ".ics"
)
return response
def export_ical(request, cat=None, tag=None, organisation_pk=None, place_pk=None):
now = date.today()
qs = get_event_qs(request)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
if place_pk is not None:
qs = qs.filter(exact_location=place_pk)
if organisation_pk is not None:
qs = qs.filter(organisers__in=[organisation_pk])
if tag is not None:
qs = qs.filter(tags__in=[tag])
request = EventFilter.set_default_values(request)
filter = EventFilter(request.GET, queryset=qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
id_cache = hashlib.md5(
(
filter.get_url()
+ "-"
+ str(tag)
+ "-"
+ str(cat)
+ "-"
+ str(organisation_pk)
+ "-"
+ str(place_pk)
).encode("utf8")
).hexdigest()
ical = cache.get(id_cache)
if not ical:
calendar = CalendarList(
now + timedelta(days=-7), now + timedelta(days=+60), filter
)
ical = calendar.export_to_ics(request)
cache.set(id_cache, ical, 3600) # 1 heure
response = HttpResponse(content_type="text/calendar")
response.content = ical.to_ical().decode("utf-8").replace("\r\n", "\n")
extra = filter.to_str(" ")
if extra is None:
extra = ""
if category is not None:
extra += " " + str(category)
if place_pk is not None:
extra += (
" @ " + Place.objects.filter(pk=place_pk).values("name").first()["name"]
)
if organisation_pk is not None:
extra += (
" - "
+ Organisation.objects.filter(pk=organisation_pk)
.values("name")
.first()["name"]
)
if tag is not None:
extra += " - " + emoji.replace_emoji(tag, replace="")
response["Content-Disposition"] = "attachment; filename={0}{1}{2}".format(
"Pommes de lune", extra, ".ics"
)
return response
@method_decorator(check_honeypot, name="post")
class MessageCreateView(SuccessMessageMixin, CreateView):
model = Message
template_name = "agenda_culturel/message_create_form.html"
form_class = MessageForm
success_url = reverse_lazy("home")
success_message = _("Your message has been sent successfully.")
def __init__(self, *args, **kwargs):
self.event = None
super().__init__(*args, **kwargs)
def get_form(self, form_class=None):
if form_class is None:
form_class = self.get_form_class()
return form_class(**self.get_form_kwargs())
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["event"] = self.event
if self.request.user.is_authenticated:
kwargs["internal"] = True
return kwargs
def form_valid(self, form):
if self.request.user.is_authenticated:
form.instance.user = self.request.user
form.instance.message_type = (
Message.TYPE.EVENT_REPORT
if "pk" in self.kwargs
else Message.TYPE.CONTACT_FORM
)
return super().form_valid(form)
def get_initial(self):
result = super().get_initial()
if "pk" in self.kwargs:
self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
result["related_event"] = self.event
result["subject"] = _("Reporting the event {} on {}").format(
self.event.title, self.event.start_day
)
else:
result["related_event"] = None
return result
class MessageDeleteView(SuccessMessageMixin, DeleteView):
model = Message
success_message = _("The contact message has been successfully deleted.")
success_url = reverse_lazy("messages")
class MessageUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = Message
permission_required = "agenda_culturel.change_message"
template_name = "agenda_culturel/message_moderation_form.html"
fields = ("spam", "closed", "comments")
success_message = _(
"The contact message properties has been successfully modified."
)
success_url = reverse_lazy("messages")
def get_form_kwargs(self):
"""Return the keyword arguments for instantiating the form."""
kwargs = super().get_form_kwargs()
if hasattr(self, "object"):
kwargs.update({"instance": self.object})
return kwargs
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_event")
def administration(request):
nb_mod_days = 21
nb_classes = 4
today = date.today()
# get information about recent modifications
days = [today]
for i in range(0, 2):
days.append(days[-1] + timedelta(days=-1))
daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
# get last created events
events = (
Event.objects.all()
.order_by("-created_date")
.select_related("exact_location", "category")[:5]
)
# get last batch imports
rel_event = Event.objects.filter(
import_sources__contains=[OuterRef("url_source")]
).values("pk")[:1]
batch_imports = (
BatchImportation.objects.all()
.select_related("recurrentImport")
.annotate(event_id=Subquery(rel_event))
.order_by("-created_date")[:5]
)
# get info about batch information
newest = (
BatchImportation.objects.filter(recurrentImport=OuterRef("pk"))
.order_by("-created_date")
.select_related("recurrentImport")
)
imported_events = RecurrentImport.objects.annotate(
last_run_status=Subquery(newest.values("status")[:1])
)
nb_failed = imported_events.filter(
last_run_status=BatchImportation.STATUS.FAILED
).count()
nb_canceled = imported_events.filter(
last_run_status=BatchImportation.STATUS.CANCELED
).count()
nb_running = imported_events.filter(
last_run_status=BatchImportation.STATUS.RUNNING
).count()
nb_all = imported_events.count()
# get some info about imported (or not) events
srcs = RecurrentImport.objects.all().values_list("source")
in_future = Event.objects.filter(Q(start_day__gte=today))
nb_in_rimport = in_future.filter(Q(import_sources__overlap=srcs)).count()
nb_in_orphan_import = in_future.filter(
(
Q(import_sources__isnull=False)
& (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
)
& ~Q(import_sources__overlap=srcs)
).count()
# get all non moderated events
nb_not_moderated = Event.get_nb_not_moderated(today, nb_mod_days, nb_classes)
return render(
request,
"agenda_culturel/administration.html",
{
"daily_modifications": daily_modifications,
"events": events,
"batch_imports": batch_imports,
"nb_failed": nb_failed,
"nb_canceled": nb_canceled,
"nb_running": nb_running,
"nb_all": nb_all,
"nb_not_moderated": nb_not_moderated,
"nb_in_rimport": nb_in_rimport,
"nb_in_orphan_import": nb_in_orphan_import,
},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_event")
def recent(request):
filter = EventFilterAdmin(
request.GET, queryset=Event.objects.all().order_by("-created_date")
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/recent.html",
{"filter": filter, "paginator_filter": response},
)
def clear_cache(request):
if request.method == "POST":
cache.clear()
messages.success(request, _("Cache successfully cleared."))
return HttpResponseRedirect(reverse_lazy("administration"))
else:
return render(
request,
"agenda_culturel/clear_cache.html",
)
class UserProfileUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
UpdateView,
):
model = UserProfile
success_message = _("Your user profile has been successfully modified.")
success_url = reverse_lazy("administration")
form_class = UserProfileForm
def get_object(self):
return self.request.user.userprofile