Merge pull request '340_réorganisation_views' (#403) from 340_réorganisation_views into main

Reviewed-on: https://forge.chapril.org/jmtrivial/agenda_culturel/pulls/403
This commit is contained in:
jmtrivial
2025-04-20 15:19:15 +02:00
25 changed files with 3823 additions and 3621 deletions

View File

@@ -1,5 +1,5 @@
default_language_version: default_language_version:
python: python3.13 python: python3
repos: repos:
# Using this mirror lets us use mypyc-compiled black, which is about 2x faster # Using this mirror lets us use mypyc-compiled black, which is about 2x faster
- repo: https://github.com/psf/black-pre-commit-mirror - repo: https://github.com/psf/black-pre-commit-mirror

View File

@@ -199,9 +199,11 @@ class SimpleContactForm(GroupFormMixin, Form):
del self.fields["email"] del self.fields["email"]
del self.fields["comments"] del self.fields["comments"]
class URLSubmissionSimpleForm(Form): class URLSubmissionSimpleForm(Form):
url = URLField(max_length=512) url = URLField(max_length=512)
class URLSubmissionForm(GroupFormMixin, Form): class URLSubmissionForm(GroupFormMixin, Form):
required_css_class = "required" required_css_class = "required"

View File

@@ -11,7 +11,7 @@ logger = logging.getLogger(__name__)
class CExtractor(Extractor): class CExtractor(Extractor):
patternEvent = r'^https://associations\.clermont-ferrand\.fr/evenement/([\w\-]+)' patternEvent = r"^https://associations\.clermont-ferrand\.fr/evenement/([\w\-]+)"
def __init__(self): def __init__(self):
super().__init__() super().__init__()

View File

@@ -124,7 +124,7 @@ class ChromiumHeadlessDownloader(Downloader):
try: try:
self.driver.quit() self.driver.quit()
except Exception as e: except Exception as e:
print('Error: ' + str(e)) print("Error: " + str(e))
def screenshot(self, url, path_image): def screenshot(self, url, path_image):
print("Screenshot {}".format(url)) print("Screenshot {}".format(url))

View File

@@ -1,9 +1,7 @@
import logging import logging
from datetime import datetime, timedelta
from django.conf import settings from django.conf import settings
import dateutil.parser import dateutil.parser
import requests import requests
from bs4 import BeautifulSoup
import re import re
import json import json
@@ -14,8 +12,10 @@ logger = logging.getLogger(__name__)
# A class dedicated to get events from helloasso # A class dedicated to get events from helloasso
class CExtractor(Extractor): class CExtractor(Extractor):
patternEvent = r'^https://www\.helloasso\.com/associations/([\w\-]+)/evenements/([\w\-]+)' patternEvent = (
patternOrg = r'^https://www\.helloasso\.com/associations/([\w\-]+)' r"^https://www\.helloasso\.com/associations/([\w\-]+)/evenements/([\w\-]+)"
)
patternOrg = r"^https://www\.helloasso\.com/associations/([\w\-]+)"
urlAPI = "https://api.helloasso.com" urlAPI = "https://api.helloasso.com"
@@ -27,7 +27,10 @@ class CExtractor(Extractor):
return settings.HELLOASSO_ID != "" and settings.HELLOASSO_SECRET != "" return settings.HELLOASSO_ID != "" and settings.HELLOASSO_SECRET != ""
def is_known_url(url): def is_known_url(url):
return re.match(CExtractor.patternEvent, url) is not None or re.match(CExtractor.patternOrg, url) is not None return (
re.match(CExtractor.patternEvent, url) is not None
or re.match(CExtractor.patternOrg, url) is not None
)
def get_header_with_token(): def get_header_with_token():
url = CExtractor.urlAPI + "/oauth2/token" url = CExtractor.urlAPI + "/oauth2/token"
@@ -35,11 +38,11 @@ class CExtractor(Extractor):
payload = { payload = {
"grant_type": "client_credentials", "grant_type": "client_credentials",
"client_id": settings.HELLOASSO_ID, "client_id": settings.HELLOASSO_ID,
"client_secret": settings.HELLOASSO_SECRET "client_secret": settings.HELLOASSO_SECRET,
} }
headers = { headers = {
"accept": "application/json", "accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded" "Content-Type": "application/x-www-form-urlencoded",
} }
response = requests.post(url, data=payload, headers=headers) response = requests.post(url, data=payload, headers=headers)
@@ -48,18 +51,25 @@ class CExtractor(Extractor):
return { return {
"accept": "application/json", "accept": "application/json",
"authorization": "Bearer " + data["access_token"] "authorization": "Bearer " + data["access_token"],
} }
def _get_single_event(org, event, headers=None): def _get_single_event(org, event, headers=None):
if headers is None: if headers is None:
headers = CExtractor.get_header_with_token() headers = CExtractor.get_header_with_token()
url = CExtractor.urlAPI + "/v5/organizations/{}/forms/Event/{}/public".format(org, event) url = CExtractor.urlAPI + "/v5/organizations/{}/forms/Event/{}/public".format(
org, event
)
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
return json.loads(response.text) return json.loads(response.text)
def _get_all_events(org): def _get_all_events(org):
url = CExtractor.urlAPI + "/v5/organizations/{}/forms?states=Public&formTypes=Event&pageSize=20".format(org) url = (
CExtractor.urlAPI
+ "/v5/organizations/{}/forms?states=Public&formTypes=Event&pageSize=20".format(
org
)
)
headers = CExtractor.get_header_with_token() headers = CExtractor.get_header_with_token()
result = [] result = []
for i in range(1, 200): for i in range(1, 200):
@@ -69,8 +79,9 @@ class CExtractor(Extractor):
return result return result
else: else:
for e in events: for e in events:
result.append(CExtractor._get_single_event(org, e["formSlug"], headers)) result.append(
CExtractor._get_single_event(org, e["formSlug"], headers)
)
def _get_events(self, url): def _get_events(self, url):
match = re.match(CExtractor.patternEvent, url) match = re.match(CExtractor.patternEvent, url)
@@ -116,7 +127,6 @@ class CExtractor(Extractor):
if "description" in event["banner"]: if "description" in event["banner"]:
image_alt = event["banner"]["description"] image_alt = event["banner"]["description"]
location = [] location = []
if "place" in event and event["place"] is not None: if "place" in event and event["place"] is not None:
for elplace in ["name", "address", "zipCode", "city"]: for elplace in ["name", "address", "zipCode", "city"]:

View File

@@ -17,95 +17,111 @@ from .sitemaps import (
) )
from .models import Event, Place, Organisation, Category from .models import Event, Place, Organisation, Category
from .views import ( from .views import (
home, # Calendar
week_view,
month_view,
day_view, day_view,
month_view,
upcoming_events, upcoming_events,
export_ical, week_view,
view_tag, # Categorisation rules
view_tag_past, CategorisationRuleCreateView,
TagUpdateView, CategorisationRuleDeleteView,
CategorisationRuleUpdateView,
apply_categorisation_rules,
categorisation_rules,
# Errors
internal_server_error,
page_not_found,
# Events
recent, recent,
EventDetailView,
EventUpdateView,
EventCreateView,
update_from_source,
change_status_event,
EventDeleteView,
import_event_proxy,
import_from_url,
import_from_urls,
# Event duplicates
DuplicatedEventsDetailView,
duplicates,
fix_duplicate,
merge_duplicate,
set_duplicate,
update_duplicate_event,
# Export
export_event_ical,
export_ical,
# General pages
about,
administration, administration,
activite, activite,
OrganisationDeleteView, clear_cache,
home,
import_requirements,
mentions_legales,
moderation_rules,
StaticContentCreateView,
StaticContentUpdateView,
statistics,
UserProfileUpdateView,
thank_you,
# Import batch
imports,
add_import,
cancel_import,
update_orphan_events,
# Import récurrent
RecurrentImportCreateView,
RecurrentImportDeleteView,
RecurrentImportUpdateView,
recurrent_imports,
run_all_fb_rimports,
run_all_rimports,
run_rimport,
view_rimport,
# Messages
delete_cm_spam,
MessageCreateView,
MessageDeleteView,
MessageUpdateView,
view_messages,
# Moderation
EventModerateView,
moderate_event_next,
moderate_from_date,
# Organisations
OrganisationCreateView, OrganisationCreateView,
OrganisationDeleteView,
OrganisationDetailView, OrganisationDetailView,
OrganisationDetailViewPast, OrganisationDetailViewPast,
OrganisationListView, OrganisationListView,
OrganisationUpdateView, OrganisationUpdateView,
# Places
PlaceCreateView,
PlaceDeleteView, PlaceDeleteView,
PlaceDetailView, PlaceDetailView,
PlaceDetailViewPast, PlaceDetailViewPast,
PlaceUpdateView, PlaceFromEventCreateView,
PlaceListView,
PlaceListAdminView, PlaceListAdminView,
PlaceListView,
PlaceUpdateView,
UnknownPlaceAddView, UnknownPlaceAddView,
UnknownPlacesListView, UnknownPlacesListView,
fix_duplicate,
fix_unknown_places, fix_unknown_places,
clear_cache, # Search
export_event_ical, event_search,
MessageDeleteView, event_search_full,
imports, # Tags
add_import, view_tag,
update_orphan_events, view_tag_past,
cancel_import, TagUpdateView,
run_all_fb_rimports,
run_all_rimports,
tag_list, tag_list,
TagDeleteView, TagDeleteView,
rename_tag, rename_tag,
delete_tag, delete_tag,
TagCreateView, TagCreateView,
EventDetailView, # Special periods
EventUpdateView,
EventModerateView,
moderate_event_next,
RecurrentImportCreateView,
RecurrentImportDeleteView,
RecurrentImportUpdateView,
run_rimport,
categorisation_rules,
duplicates,
DuplicatedEventsDetailView,
StaticContentCreateView,
StaticContentUpdateView,
about,
thank_you,
MessageCreateView,
merge_duplicate,
EventCreateView,
event_search,
event_search_full,
recurrent_imports,
delete_cm_spam,
page_not_found,
internal_server_error,
PlaceCreateView,
PlaceFromEventCreateView,
moderate_from_date,
update_from_source,
change_status_event,
EventDeleteView,
set_duplicate,
import_event_proxy,
import_from_url,
import_from_urls,
mentions_legales,
view_messages,
MessageUpdateView,
statistics,
view_rimport,
update_duplicate_event,
CategorisationRuleCreateView,
CategorisationRuleDeleteView,
CategorisationRuleUpdateView,
apply_categorisation_rules,
moderation_rules,
import_requirements,
UserProfileUpdateView,
SpecialPeriodCreateView, SpecialPeriodCreateView,
SpecialPeriodDeleteView, SpecialPeriodDeleteView,
SpecialPeriodListView, SpecialPeriodListView,
@@ -141,64 +157,81 @@ sitemaps = {
} }
urlpatterns = [ urlpatterns = [
path("", home, name="home"), # Calendar
path("cat:<cat>/", home, name="home_category"),
path(
"cat:<cat>/semaine/<int:year>/<int:week>/",
week_view,
name="week_view_category",
),
path("cat:<cat>/cette-semaine/", week_view, name="cette_semaine_category"),
path(
"cat:<cat>/mois/<int:year>/<int:month>/",
month_view,
name="month_view_category",
),
path( path(
"cat:<cat>/jour/<int:year>/<int:month>/<int:day>/", "cat:<cat>/jour/<int:year>/<int:month>/<int:day>/",
day_view, day_view,
name="day_view_category", name="day_view_category",
), ),
path("cat:<cat>/jour/", day_view, name="day_view_category_when"), path("cat:<cat>/jour/", day_view, name="day_view_category_when"),
path("cat:<cat>/a-venir/", upcoming_events, name="a_venir_category"),
path("cat:<cat>/aujourdhui/", day_view, name="aujourdhui_category"), path("cat:<cat>/aujourdhui/", day_view, name="aujourdhui_category"),
path("jour/<int:year>/<int:month>/<int:day>/", day_view, name="day_view"),
path("jour/", day_view, name="day_view_when"),
path("aujourdhui/", day_view, name="aujourdhui"),
path(
"cat:<cat>/mois/<int:year>/<int:month>/",
month_view,
name="month_view_category",
),
path("cat:<cat>/ce-mois-ci", month_view, name="ce_mois_ci_category"),
path("mois/<int:year>/<int:month>/", month_view, name="month_view"),
path("ce-mois-ci", month_view, name="ce_mois_ci"),
path("cat:<cat>/a-venir/", upcoming_events, name="a_venir_category"),
path( path(
"cat:<cat>/a-venir/<int:year>/<int:month>/<int:day>/", "cat:<cat>/a-venir/<int:year>/<int:month>/<int:day>/",
upcoming_events, upcoming_events,
name="a_venir_jour_category", name="a_venir_jour_category",
), ),
path("cat:<cat>/cette-semaine/", week_view, name="cette_semaine_category"),
path("cat:<cat>/ical", export_ical, name="export_ical_category"),
path("cat:<cat>/ce-mois-ci", month_view, name="ce_mois_ci_category"),
path("semaine/<int:year>/<int:week>/", week_view, name="week_view"),
path("mois/<int:year>/<int:month>/", month_view, name="month_view"),
path("jour/<int:year>/<int:month>/<int:day>/", day_view, name="day_view"),
path("jour/", day_view, name="day_view_when"),
path("aujourdhui/", day_view, name="aujourdhui"),
path("a-venir/", upcoming_events, name="a_venir"), path("a-venir/", upcoming_events, name="a_venir"),
path( path(
"a-venir/<int:year>/<int:month>/<int:day>/", "a-venir/<int:year>/<int:month>/<int:day>/",
upcoming_events, upcoming_events,
name="a_venir_jour", name="a_venir_jour",
), ),
path("cette-semaine/", week_view, name="cette_semaine"),
path("ce-mois-ci", month_view, name="ce_mois_ci"),
path("tag/<t>/", view_tag, name="view_tag"),
path("tag/<tag>/ical", export_ical, name="export_ical_tag"),
path("tag/<t>/past", view_tag_past, name="view_tag_past"),
path("tags/", tag_list, name="view_all_tags"),
path("tag/<int:pk>/edit", TagUpdateView.as_view(), name="edit_tag"),
path( path(
"tag/<int:pk>/delete", "cat:<cat>/semaine/<int:year>/<int:week>/",
TagDeleteView.as_view(), week_view,
name="delete_object_tag", name="week_view_category",
), ),
path("tag/<t>/rename", rename_tag, name="rename_tag"), path("cat:<cat>/cette-semaine/", week_view, name="cette_semaine_category"),
path("tag/<t>/delete", delete_tag, name="delete_tag"), path("cat:<cat>/cette-semaine/", week_view, name="cette_semaine_category"),
path("tags/add", TagCreateView.as_view(), name="add_tag"), path("semaine/<int:year>/<int:week>/", week_view, name="week_view"),
path("cette-semaine/", week_view, name="cette_semaine"),
# Categorisation rules
path("cat:<cat>/", home, name="home_category"),
path(
"catrules/add",
CategorisationRuleCreateView.as_view(),
name="add_catrule",
),
path(
"catrules/<int:pk>/delete",
CategorisationRuleDeleteView.as_view(),
name="delete_catrule",
),
path(
"catrules/<int:pk>/edit",
CategorisationRuleUpdateView.as_view(),
name="edit_catrule",
),
path("catrules/apply", apply_categorisation_rules, name="apply_catrules"),
path("catrules/", categorisation_rules, name="categorisation_rules"),
# Errors
path("500/", internal_server_error, name="internal_server_error"),
path("404/", page_not_found, name="page_not_found"),
path("duplicates/<int:pk>/merge", merge_duplicate, name="merge_duplicate"),
path(
"event/<int:year>/<int:month>/<int:day>/<int:pk>/set_duplicate",
set_duplicate,
name="set_duplicate",
),
path(
"duplicates/<int:pk>/update/<int:epk>",
update_duplicate_event,
name="update_event",
),
# Events
path("recent/", recent, name="recent"), path("recent/", recent, name="recent"),
path("administration/", administration, name="administration"),
path("activite/", activite, name="activite"),
path( path(
"event/<int:year>/<int:month>/<int:day>/<int:pk>-<extra>", "event/<int:year>/<int:month>/<int:day>/<int:pk>-<extra>",
EventDetailView.as_view(), EventDetailView.as_view(),
@@ -209,6 +242,133 @@ urlpatterns = [
path( path(
"event/<int:pk>/edit-force", EventUpdateView.as_view(), name="edit_event_force" "event/<int:pk>/edit-force", EventUpdateView.as_view(), name="edit_event_force"
), ),
path(
"event/<int:pk>/simple-clone/edit",
EventUpdateView.as_view(),
name="simple_clone_edit",
),
path(
"event/<int:pk>/clone/edit",
EventUpdateView.as_view(),
name="clone_edit",
),
path(
"event/<int:pk>/update-from-source",
update_from_source,
name="update_from_source",
),
path(
"event/<int:pk>/change-status/<status>",
change_status_event,
name="change_status_event",
),
path("event/<int:pk>/delete", EventDeleteView.as_view(), name="delete_event"),
path("ajouter", import_event_proxy, name="add_event"),
path("ajouter/url", import_from_url, name="add_event_url"),
path("ajouter/urls", import_from_urls, name="add_event_urls"),
path("ajouter/details", EventCreateView.as_view(), name="add_event_details"),
# Event duplicates
path(
"duplicates/<int:pk>",
DuplicatedEventsDetailView.as_view(),
name="view_duplicate",
),
# Export
path(
"event/<int:year>/<int:month>/<int:day>/<int:pk>/ical",
export_event_ical,
name="export_event_ical",
),
path("cat:<cat>/ical", export_ical, name="export_ical_category"),
path("ical", export_ical, name="export_ical"),
path(
"organisme/<int:organisation_pk>/ical",
export_ical,
name="export_ical_organisation",
),
path("place/<int:place_pk>/ical", export_ical, name="export_ical_place"),
path("tag/<tag>/ical", export_ical, name="export_ical_tag"),
path("duplicates/", duplicates, name="duplicates"),
path("duplicates/<int:pk>/fix", fix_duplicate, name="fix_duplicate"),
# General pages
path("a-propos", about, name="about"),
path("administration/", administration, name="administration"),
path("activite/", activite, name="activite"),
path("cache/clear", clear_cache, name="clear_cache"),
path("", home, name="home"),
path("besoin-pour-import", import_requirements, name="import_requirements"),
path("mentions-legales", mentions_legales, name="mentions_legales"),
path("regles-de-moderation", moderation_rules, name="moderation_rules"),
path(
"static-content/create",
StaticContentCreateView.as_view(),
name="create_static_content",
),
path(
"static-content/<int:pk>/edit",
StaticContentUpdateView.as_view(),
name="edit_static_content",
),
path("profile/edit", UserProfileUpdateView.as_view(), name="edit_profile"),
path("statistiques", statistics, name="statistics"),
path("merci", thank_you, name="thank_you"),
# Import batch
path("imports/", imports, name="imports"),
path("imports/add", add_import, name="add_import"),
path("imports/<int:pk>/cancel", cancel_import, name="cancel_import"),
path(
"imports/orphans/run",
update_orphan_events,
name="update_orphan_events",
),
# Import récurrent
path("rimports/add", RecurrentImportCreateView.as_view(), name="add_rimport"),
path(
"rimports/<int:pk>/delete",
RecurrentImportDeleteView.as_view(),
name="delete_rimport",
),
path(
"rimports/<int:pk>/edit",
RecurrentImportUpdateView.as_view(),
name="edit_rimport",
),
path("rimports/", recurrent_imports, name="recurrent_imports"),
path(
"rimports/status/<status>",
recurrent_imports,
name="recurrent_imports_status",
),
path("rimports/fb/run", run_all_fb_rimports, name="run_all_fb_rimports"),
path("rimports/run", run_all_rimports, name="run_all_rimports"),
path(
"rimports/status/<status>/run",
run_all_rimports,
name="run_all_rimports_status",
),
path("rimports/<int:pk>/run", run_rimport, name="run_rimport"),
path("rimports/<int:pk>/view", view_rimport, name="view_rimport"),
# Messages
path("messages/spams/delete", delete_cm_spam, name="delete_cm_spam"),
path(
"event/<int:pk>/message",
MessageCreateView.as_view(),
name="message_for_event",
),
path("contact", MessageCreateView.as_view(), name="contact"),
path(
"message/<int:pk>/delete",
MessageDeleteView.as_view(),
name="delete_message",
),
path(
"message/<int:pk>",
MessageUpdateView.as_view(),
name="message",
),
path("messages", view_messages, name="messages"),
# Moderation
path("moderate", EventModerateView.as_view(), name="moderate"),
path( path(
"event/<int:pk>/moderate", "event/<int:pk>/moderate",
EventModerateView.as_view(), EventModerateView.as_view(),
@@ -234,225 +394,89 @@ urlpatterns = [
moderate_event_next, moderate_event_next,
name="moderate_event_next", name="moderate_event_next",
), ),
path("moderate", EventModerateView.as_view(), name="moderate"),
path( path(
"moderate/<int:y>/<int:m>/<int:d>", "moderate/<int:y>/<int:m>/<int:d>",
moderate_from_date, moderate_from_date,
name="moderate_from_date", name="moderate_from_date",
), ),
# Organisations
path( path(
"event/<int:pk>/simple-clone/edit", "organisme/add",
EventUpdateView.as_view(), OrganisationCreateView.as_view(),
name="simple_clone_edit", name="add_organisation",
),
path(
"event/<int:pk>/clone/edit",
EventUpdateView.as_view(),
name="clone_edit",
),
path(
"event/<int:pk>/message",
MessageCreateView.as_view(),
name="message_for_event",
),
path(
"event/<int:pk>/update-from-source",
update_from_source,
name="update_from_source",
),
path(
"event/<int:pk>/change-status/<status>",
change_status_event,
name="change_status_event",
),
path("event/<int:pk>/delete", EventDeleteView.as_view(), name="delete_event"),
path(
"event/<int:year>/<int:month>/<int:day>/<int:pk>/set_duplicate",
set_duplicate,
name="set_duplicate",
),
path("ajouter", import_event_proxy, name="add_event"),
path("ajouter/url", import_from_url, name="add_event_url"),
path("ajouter/urls", import_from_urls, name="add_event_urls"),
path("ajouter/details", EventCreateView.as_view(), name="add_event_details"),
path("admin/", admin.site.urls),
path("accounts/", include("django.contrib.auth.urls")),
path("test_app/", include("test_app.urls")),
path(
"static-content/create",
StaticContentCreateView.as_view(),
name="create_static_content",
),
path(
"static-content/<int:pk>/edit",
StaticContentUpdateView.as_view(),
name="edit_static_content",
),
path("rechercher", event_search, name="event_search"),
path("rechercher/complet/", event_search_full, name="event_search_full"),
path("mentions-legales", mentions_legales, name="mentions_legales"),
path("a-propos", about, name="about"),
path("regles-de-moderation", moderation_rules, name="moderation_rules"),
path("besoin-pour-import", import_requirements, name="import_requirements"),
path("merci", thank_you, name="thank_you"),
path("contact", MessageCreateView.as_view(), name="contact"),
path("messages", view_messages, name="messages"),
path("statistiques", statistics, name="statistics"),
path("messages/spams/delete", delete_cm_spam, name="delete_cm_spam"),
path(
"message/<int:pk>",
MessageUpdateView.as_view(),
name="message",
),
path(
"message/<int:pk>/delete",
MessageDeleteView.as_view(),
name="delete_message",
),
path("imports/", imports, name="imports"),
path("imports/add", add_import, name="add_import"),
path(
"imports/orphans/run",
update_orphan_events,
name="update_orphan_events",
),
path("imports/<int:pk>/cancel", cancel_import, name="cancel_import"),
path("rimports/", recurrent_imports, name="recurrent_imports"),
path("rimports/run", run_all_rimports, name="run_all_rimports"),
path("rimports/fb/run", run_all_fb_rimports, name="run_all_fb_rimports"),
path(
"rimports/status/<status>",
recurrent_imports,
name="recurrent_imports_status",
),
path(
"rimports/status/<status>/run",
run_all_rimports,
name="run_all_rimports_status",
),
path("rimports/add", RecurrentImportCreateView.as_view(), name="add_rimport"),
path("rimports/<int:pk>/view", view_rimport, name="view_rimport"),
path("rimports/<int:pk>/stats", statistics, name="stats_rimport"),
path(
"rimports/<int:pk>/edit",
RecurrentImportUpdateView.as_view(),
name="edit_rimport",
),
path(
"rimports/<int:pk>/delete",
RecurrentImportDeleteView.as_view(),
name="delete_rimport",
),
path("rimports/<int:pk>/run", run_rimport, name="run_rimport"),
path("catrules/", categorisation_rules, name="categorisation_rules"),
path(
"catrules/add",
CategorisationRuleCreateView.as_view(),
name="add_catrule",
),
path(
"catrules/<int:pk>/edit",
CategorisationRuleUpdateView.as_view(),
name="edit_catrule",
),
path(
"catrules/<int:pk>/delete",
CategorisationRuleDeleteView.as_view(),
name="delete_catrule",
),
path("catrules/apply", apply_categorisation_rules, name="apply_catrules"),
path("duplicates/", duplicates, name="duplicates"),
path(
"duplicates/<int:pk>",
DuplicatedEventsDetailView.as_view(),
name="view_duplicate",
),
path("duplicates/<int:pk>/fix", fix_duplicate, name="fix_duplicate"),
path("duplicates/<int:pk>/merge", merge_duplicate, name="merge_duplicate"),
path(
"duplicates/<int:pk>/update/<int:epk>",
update_duplicate_event,
name="update_event",
),
path("404/", page_not_found, name="page_not_found"),
path("500/", internal_server_error, name="internal_server_error"),
path(
"organisme/<int:pk>/past",
OrganisationDetailViewPast.as_view(),
name="view_organisation_past",
),
path(
"organisme/<int:pk>",
OrganisationDetailView.as_view(),
name="view_organisation_shortname",
),
path(
"organisme/<int:organisation_pk>/ical",
export_ical,
name="export_ical_organisation",
),
path(
"organisme/<int:pk>-<extra>",
OrganisationDetailView.as_view(),
name="view_organisation",
),
path(
"organisme/<int:pk>-<extra>/past",
OrganisationDetailViewPast.as_view(),
name="view_organisation_past_fullname",
),
path(
"organisme/<int:pk>-<extra>",
OrganisationDetailView.as_view(),
name="view_organisation_fullname",
),
path(
"organisme/<int:pk>/edit",
OrganisationUpdateView.as_view(),
name="edit_organisation",
), ),
path( path(
"organisme/<int:pk>/delete", "organisme/<int:pk>/delete",
OrganisationDeleteView.as_view(), OrganisationDeleteView.as_view(),
name="delete_organisation", name="delete_organisation",
), ),
path(
"organisme/<int:pk>-<extra>",
OrganisationDetailView.as_view(),
name="view_organisation",
),
path(
"organisme/<int:pk>-<extra>",
OrganisationDetailView.as_view(),
name="view_organisation_fullname",
),
path(
"organisme/<int:pk>",
OrganisationDetailView.as_view(),
name="view_organisation_shortname",
),
path(
"organisme/<int:pk>/past",
OrganisationDetailViewPast.as_view(),
name="view_organisation_past",
),
path(
"organisme/<int:pk>-<extra>/past",
OrganisationDetailViewPast.as_view(),
name="view_organisation_past_fullname",
),
path( path(
"organismes/", "organismes/",
OrganisationListView.as_view(), OrganisationListView.as_view(),
name="view_organisations", name="view_organisations",
), ),
path( path(
"organisme/add", "organisme/<int:pk>/edit",
OrganisationCreateView.as_view(), OrganisationUpdateView.as_view(),
name="add_organisation", name="edit_organisation",
),
# Places
path("places/add", PlaceCreateView.as_view(), name="add_place"),
path("place/<int:pk>/delete", PlaceDeleteView.as_view(), name="delete_place"),
path("place/<int:pk>", PlaceDetailView.as_view(), name="view_place"),
path(
"place/<int:pk>-<extra>",
PlaceDetailView.as_view(),
name="view_place_fullname",
), ),
path( path(
"place/<int:pk>/past", "place/<int:pk>/past",
PlaceDetailViewPast.as_view(), PlaceDetailViewPast.as_view(),
name="view_place_past", name="view_place_past",
), ),
path("place/<int:pk>", PlaceDetailView.as_view(), name="view_place"),
path("place/<int:place_pk>/ical", export_ical, name="export_ical_place"),
path( path(
"place/<int:pk>-<extra>/past", "place/<int:pk>-<extra>/past",
PlaceDetailViewPast.as_view(), PlaceDetailViewPast.as_view(),
name="view_place_past_fullname", name="view_place_past_fullname",
), ),
path(
"place/<int:pk>-<extra>",
PlaceDetailView.as_view(),
name="view_place_fullname",
),
path("place/<int:pk>/edit", PlaceUpdateView.as_view(), name="edit_place"),
path("place/<int:pk>/delete", PlaceDeleteView.as_view(), name="delete_place"),
path("places/", PlaceListView.as_view(), name="view_places"),
path("places/list", PlaceListAdminView.as_view(), name="view_places_admin"),
path("places/add", PlaceCreateView.as_view(), name="add_place"),
path( path(
"places/add/<int:pk>", "places/add/<int:pk>",
PlaceFromEventCreateView.as_view(), PlaceFromEventCreateView.as_view(),
name="add_place_from_event", name="add_place_from_event",
), ),
path("places/list", PlaceListAdminView.as_view(), name="view_places_admin"),
path("places/", PlaceListView.as_view(), name="view_places"),
path("place/<int:pk>/edit", PlaceUpdateView.as_view(), name="edit_place"),
path(
"event/<int:pk>/addplace",
UnknownPlaceAddView.as_view(),
name="add_place_to_event",
),
path( path(
"events/unknown-places", "events/unknown-places",
UnknownPlacesListView.as_view(), UnknownPlacesListView.as_view(),
@@ -463,17 +487,27 @@ urlpatterns = [
fix_unknown_places, fix_unknown_places,
name="fix_unknown_places", name="fix_unknown_places",
), ),
# Search
path("rechercher", event_search, name="event_search"),
path("rechercher/complet/", event_search_full, name="event_search_full"),
# Tags
path("tag/<t>/", view_tag, name="view_tag"),
path("tag/<t>/past", view_tag_past, name="view_tag_past"),
path("tag/<int:pk>/edit", TagUpdateView.as_view(), name="edit_tag"),
path("tags/", tag_list, name="view_all_tags"),
path( path(
"event/<int:pk>/addplace", "tag/<int:pk>/delete",
UnknownPlaceAddView.as_view(), TagDeleteView.as_view(),
name="add_place_to_event", name="delete_object_tag",
), ),
path( path("tag/<t>/rename", rename_tag, name="rename_tag"),
"event/<int:year>/<int:month>/<int:day>/<int:pk>/ical", path("tag/<t>/delete", delete_tag, name="delete_tag"),
export_event_ical, path("tags/add", TagCreateView.as_view(), name="add_tag"),
name="export_event_ical", # Django
), path("admin/", admin.site.urls),
path("ical", export_ical, name="export_ical"), path("accounts/", include("django.contrib.auth.urls")),
path("test_app/", include("test_app.urls")),
path("rimports/<int:pk>/stats", statistics, name="stats_rimport"),
re_path(r"^robots\.txt", include("robots.urls")), re_path(r"^robots\.txt", include("robots.urls")),
path("__debug__/", include("debug_toolbar.urls")), path("__debug__/", include("debug_toolbar.urls")),
path("ckeditor5/", include("django_ckeditor_5.urls")), path("ckeditor5/", include("django_ckeditor_5.urls")),
@@ -483,8 +517,7 @@ urlpatterns = [
{"sitemaps": sitemaps}, {"sitemaps": sitemaps},
name="cached-sitemap", name="cached-sitemap",
), ),
path("cache/clear", clear_cache, name="clear_cache"), # special periods
path("profile/edit", UserProfileUpdateView.as_view(), name="edit_profile"),
path( path(
"specialperiods/", SpecialPeriodListView.as_view(), name="list_specialperiods" "specialperiods/", SpecialPeriodListView.as_view(), name="list_specialperiods"
), ),

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,16 @@
from .calendar_views import *
from .categorisation_rules_view import *
from .event_views import *
from .event_duplicate_views import *
from .errors import *
from .export_views import *
from .general_pages_views import *
from .import_batch_views import *
from .import_recurrent_views import *
from .message_views import *
from .moderation_views import *
from .organisations_views import *
from .places_views import *
from .tag_views import *
from .search_views import *
from .special_period_views import *

View File

@@ -0,0 +1,188 @@
from datetime import date, timedelta
from django.http import (
HttpResponseRedirect,
)
from django.shortcuts import render
from django.urls import reverse_lazy
from django.utils.timezone import datetime
from .utils import get_event_qs
from ..calendar import CalendarMonth, CalendarWeek, CalendarList
from ..filters import EventFilter
from ..models import Category
def day_view(request, year=None, month=None, day=None, cat=None):
if year is None or month is None or day is None:
if "when" in request.POST:
when = datetime.strptime(request.POST["when"], "%Y-%m-%d")
year = when.year
month = when.month
day = when.day
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).select_related("exact_location")
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
return HttpResponseRedirect(
reverse_lazy("day_view", args=[year, month, day])
+ "?"
+ filter.get_url()
)
return upcoming_events(request, year, month, day, 0, cat)
def month_view(request, year=None, month=None, cat=None):
now = date.today()
if year is None and month is None:
day = now.day
else:
day = None
if year is None:
year = now.year
if month is None:
month = now.month
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).only(
"title",
"start_day",
"start_time",
"category",
"other_versions",
"recurrences",
"end_day",
"end_time",
"uuids",
"status",
"tags",
)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cmonth = CalendarMonth(year, month, filter, day=day)
context = {
"calendar": cmonth,
"this_month": day is not None,
"filter": filter,
"category": category,
"init_date": now if cmonth.today_in_calendar() else cmonth.firstdate,
}
return render(request, "agenda_culturel/page-month.html", context)
def week_view(request, year=None, week=None, home=False, cat=None):
now = date.today()
if year is None:
year = now.isocalendar()[0]
if week is None:
week = now.isocalendar()[1]
request = EventFilter.set_default_values(request)
qs = (
get_event_qs(request)
.select_related("exact_location")
.only(
"title",
"start_day",
"start_time",
"category",
"other_versions",
"recurrences",
"end_day",
"end_time",
"uuids",
"status",
"tags",
"local_image",
"image",
"image_alt",
"exact_location",
"description",
)
)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cweek = CalendarWeek(year, week, filter)
context = {
"year": year,
"week": week,
"calendar": cweek,
"filter": filter,
"category": category,
"init_date": now if cweek.today_in_calendar() else cweek.firstdate,
}
if home:
context["home"] = 1
return render(request, "agenda_culturel/page-week.html", context)
def upcoming_events(request, year=None, month=None, day=None, neighsize=1, cat=None):
now = date.today()
if year is None:
year = now.year
if month is None:
month = now.month
if day is None:
day = now.day
day = date(year, month, day)
day = day + timedelta(days=neighsize)
request = EventFilter.set_default_values(request)
qs = get_event_qs(request).select_related("exact_location")
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
filter = EventFilter(request.GET, qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
cal = CalendarList(
day + timedelta(days=-neighsize),
day + timedelta(days=neighsize),
filter,
True,
)
context = {
"calendar": cal,
"now": now,
"day": day,
"init_date": now if cal.today_in_calendar() else day,
"filter": filter,
"date_pred": day + timedelta(days=-neighsize - 1),
"date_next": day + timedelta(days=neighsize + 1),
"category": category,
}
return render(request, "agenda_culturel/page-upcoming.html", context)

View File

@@ -0,0 +1,192 @@
from datetime import datetime
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import PermissionRequiredMixin, LoginRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage
from django.db.models import Q
from django.http import HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import UpdateView, CreateView, DeleteView
from ..forms import CategorisationRuleImportForm, CategorisationForm
from ..models import Event, CategorisationRule, Category
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_categorisationrule")
def categorisation_rules(request):
paginator = Paginator(
CategorisationRule.objects.all()
.order_by("pk")
.select_related("category")
.select_related("place"),
100,
)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/categorisation_rules.html",
{"paginator_filter": response},
)
class CategorisationRuleCreateView(
LoginRequiredMixin, PermissionRequiredMixin, CreateView
):
model = CategorisationRule
permission_required = "agenda_culturel.add_categorisationrule"
success_url = reverse_lazy("categorisation_rules")
form_class = CategorisationRuleImportForm
class CategorisationRuleUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = CategorisationRule
permission_required = "agenda_culturel.change_categorisationrule"
form_class = CategorisationRuleImportForm
success_url = reverse_lazy("categorisation_rules")
success_message = _("The categorisation rule has been successfully modified.")
class CategorisationRuleDeleteView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
DeleteView,
):
model = CategorisationRule
permission_required = "agenda_culturel.delete_categorisationrule"
success_url = reverse_lazy("categorisation_rules")
success_message = _("The categorisation rule has been successfully deleted.")
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.apply_categorisationrules")
def apply_categorisation_rules(request):
if request.method == "POST":
form = CategorisationForm(request.POST)
if form.is_valid():
nb = 0
for epk, c in form.get_validated():
e = Event.objects.get(pk=epk)
cat = Category.objects.filter(name=c).first()
e.category = cat
e.save()
nb += 1
if nb != 0:
if nb == 1:
messages.success(
request,
_(
"The rules were successfully applied and 1 event was categorised."
),
)
else:
messages.success(
request,
_(
"The rules were successfully applied and {} events were categorised."
).format(nb),
)
else:
messages.info(
request,
_(
"The rules were successfully applied and no events were categorised."
),
)
return HttpResponseRedirect(reverse_lazy("categorisation_rules"))
else:
return render(
request,
"agenda_culturel/categorise_events_form.html",
context={"form": form},
)
else:
# first we check if events are not correctly categorised
to_categorise = []
events = (
Event.objects.filter(start_day__gte=datetime.now())
.exclude(category=Category.get_default_category_id())
.exclude(category=None)
.select_related("exact_location")
.select_related("category")
)
for e in events:
c = CategorisationRule.get_category_from_rules(e)
if c and c != e.category:
to_categorise.append((e, c))
# then we apply rules on events without category
nb = 0
to_save = []
events = (
Event.objects.filter(start_day__gte=datetime.now())
.filter(Q(category=Category.get_default_category_id()) | Q(category=None))
.select_related("exact_location")
.select_related("category")
)
for e in events:
success = CategorisationRule.apply_rules(e)
if success:
nb += 1
to_save.append(e)
if nb != 0:
Event.objects.bulk_update(to_save, fields=["category"])
# set messages
if nb != 0:
if nb == 1:
messages.success(
request,
_(
"The rules were successfully applied and 1 event with default category was categorised."
),
)
else:
messages.success(
request,
_(
"The rules were successfully applied and {} events with default category were categorised."
).format(nb),
)
else:
messages.info(
request,
_(
"The rules were successfully applied and no events were categorised."
),
)
if len(to_categorise) != 0:
form = CategorisationForm(events=to_categorise)
return render(
request,
"agenda_culturel/categorise_events_form.html",
context={
"form": form,
"events": dict((e.pk, e) for e, c in to_categorise),
"categories": dict((e.pk, c) for e, c in to_categorise),
},
)
else:
return HttpResponseRedirect(reverse_lazy("categorisation_rules"))

View File

@@ -0,0 +1,35 @@
from django.contrib.auth.decorators import login_required, permission_required
from django.core.mail import mail_admins
from django.shortcuts import render
from django.utils.translation import gettext_lazy as _
from ..models import Event
def page_not_found(request, exception=None):
return render(request, "page-erreur.html", status=404, context={"error": 404})
def internal_server_error(request):
try:
mail_admins(
request.site.name + _(": error 500"),
_("An internal error has occurred on site {} at address {}.").format(
request.site.name, request.build_absolute_uri()
),
)
except Exception:
pass
return render(request, "page-erreur.html", status=500, context={"error": 500})
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_event")
def error_next_event(request, pk):
obj = Event.objects.filter(pk=pk).first()
return render(
request,
"agenda_culturel/event_next_error_message.html",
{"pk": pk, "object": obj},
)

View File

@@ -0,0 +1,344 @@
from datetime import date
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.paginator import PageNotAnInteger, EmptyPage
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404, render
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView, UpdateView
from ..calendar import CalendarDay
from ..filters import DuplicatedEventsFilter
from ..forms import MergeDuplicates, FixDuplicates, SelectEventInList
from ..models import DuplicatedEvents, Event
from .utils import PaginatorFilter
class DuplicatedEventsDetailView(LoginRequiredMixin, DetailView):
model = DuplicatedEvents
template_name = "agenda_culturel/duplicate.html"
@login_required(login_url="/accounts/login/")
@permission_required(
["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
)
def update_duplicate_event(request, pk, epk):
edup = get_object_or_404(DuplicatedEvents, pk=pk)
event = get_object_or_404(Event, pk=epk)
form = MergeDuplicates(duplicates=edup, event=event)
if request.method == "POST":
form = MergeDuplicates(request.POST, duplicates=edup)
if form.is_valid():
for f in edup.get_items_comparison():
if not f["similar"]:
selected = form.get_selected_events(f["key"])
if selected is not None:
if isinstance(selected, list):
values = [
x
for x in [getattr(s, f["key"]) for s in selected]
if x is not None
]
if len(values) != 0:
if isinstance(values[0], str):
setattr(event, f["key"], "\n".join(values))
else:
setattr(event, f["key"], sum(values, []))
else:
if f["key"] == "organisers":
event.organisers.set(selected.organisers.all())
else:
setattr(
event,
f["key"],
getattr(selected, f["key"]),
)
if f["key"] == "image":
setattr(
event,
"local_image",
getattr(selected, "local_image"),
)
event.other_versions.fix(event)
event.save()
messages.info(request, _("Update successfully completed."))
return HttpResponseRedirect(event.get_absolute_url())
return render(
request,
"agenda_culturel/update_duplicate.html",
context={
"form": form,
"object": edup,
"event_id": edup.get_event_index(event),
},
)
@login_required(login_url="/accounts/login/")
@permission_required(
["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
)
def merge_duplicate(request, pk):
edup = get_object_or_404(DuplicatedEvents, pk=pk)
form = MergeDuplicates(duplicates=edup)
if request.method == "POST":
form = MergeDuplicates(request.POST, duplicates=edup)
if form.is_valid():
events = edup.get_duplicated()
# build fields for the new event
new_event_data = {}
for f in edup.get_items_comparison():
if f["similar"]:
new_event_data[f["key"]] = getattr(events[0], f["key"])
else:
selected = form.get_selected_events(f["key"])
if selected is None:
new_event_data[f["key"]] = None
elif isinstance(selected, list):
values = [
x
for x in [getattr(s, f["key"]) for s in selected]
if x is not None
]
if len(values) == 0:
new_event_data[f["key"]] = None
else:
if isinstance(values[0], str):
new_event_data[f["key"]] = "\n".join(values)
else:
new_event_data[f["key"]] = sum(values, [])
else:
new_event_data[f["key"]] = getattr(selected, f["key"])
if f["key"] == "image" and "local_image" not in new_event_data:
new_event_data["local_image"] = getattr(
selected, "local_image"
)
organisers = new_event_data.pop("organisers", None)
# create a new event that merge the selected events
new_event = Event(**new_event_data)
new_event.status = Event.STATUS.PUBLISHED
new_event.other_versions = edup
new_event.save()
if organisers is not None:
new_event.organisers.set(organisers.all())
edup.fix(new_event)
messages.info(
request,
_("Creation of a merged event has been successfully completed."),
)
return HttpResponseRedirect(new_event.get_absolute_url())
return render(
request,
"agenda_culturel/merge_duplicate.html",
context={"form": form, "object": edup},
)
@login_required(login_url="/accounts/login/")
@permission_required(
["agenda_culturel.change_event", "agenda_culturel.change_duplicatedevents"]
)
def fix_duplicate(request, pk):
edup = get_object_or_404(DuplicatedEvents.objects.select_related(), pk=pk)
if request.method == "POST":
form = FixDuplicates(request.POST, edup=edup)
if form.is_valid():
if form.is_action_no_duplicates():
# all events are different
events = edup.get_duplicated()
# get redirection date
if len(events) == 0:
date = None
else:
s_events = [e for e in events if not e.has_recurrences()]
if len(s_events) != 0:
s_event = s_events[0]
else:
s_event = events[0]
date = s_event.start_day
messages.success(request, _("Events have been marked as unduplicated."))
# delete the duplicated event (other_versions will be set to None on all events)
edup.delete()
if date is None:
return HttpResponseRedirect(reverse_lazy("home"))
else:
return HttpResponseRedirect(
reverse_lazy("day_view", args=[date.year, date.month, date.day])
)
elif form.is_action_select():
# one element has been selected to be the representative
selected = form.get_selected_event(edup)
if selected is None:
messages.error(
request,
_(
"The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
),
)
else:
edup.fix(selected)
messages.success(
request,
_("The selected event has been set as representative"),
)
return HttpResponseRedirect(edup.get_absolute_url())
elif form.is_action_remove():
# one element is removed from the set
event = form.get_selected_event(edup)
if event is None:
messages.error(
request,
_(
"The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
),
)
return HttpResponseRedirect(edup.get_absolute_url())
else:
event.other_versions = None
if edup.representative == event:
edup.representative = None
event.set_no_modification_date_changed()
event.save()
edup.save()
edup.events = [e for e in edup.events if e.pk != event.pk]
messages.success(
request,
_(
"The event has been withdrawn from the group and made independent."
),
)
if edup.nb_duplicated() == 1:
return HttpResponseRedirect(edup.get_absolute_url())
else:
form = FixDuplicates(edup=edup)
elif form.is_action_update():
# otherwise, an event will be updated using other elements
event = form.get_selected_event(edup)
if event is None:
messages.error(
request,
_(
"The selected item is no longer included in the list of duplicates. Someone else has probably modified the list in the meantime."
),
)
return HttpResponseRedirect(edup.get_absolute_url())
else:
return HttpResponseRedirect(
reverse_lazy("update_event", args=[edup.pk, event.pk])
)
else:
# otherwise, a new event will be created using a merging process
return HttpResponseRedirect(
reverse_lazy("merge_duplicate", args=[edup.pk])
)
else:
form = FixDuplicates(edup=edup)
return render(
request,
"agenda_culturel/fix_duplicate.html",
context={"form": form, "object": edup},
)
class DuplicatedEventsUpdateView(
LoginRequiredMixin, UpdateView
): # Todo à supprimer, pas dutilisation ?
model = DuplicatedEvents
fields = ()
template_name = "agenda_culturel/fix_duplicate.html"
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_duplicatedevents")
def duplicates(request):
nb_removed = DuplicatedEvents.remove_singletons()
if nb_removed > 0:
messages.success(
request,
_("Cleaning up duplicates: {} item(s) fixed.").format(nb_removed),
)
filter = DuplicatedEventsFilter(
request.GET, queryset=DuplicatedEvents.objects.all().order_by("-pk")
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/duplicates.html",
{
"filter": filter,
"paginator_filter": response,
"paginator": paginator,
},
)
def set_duplicate(request, year, month, day, pk):
event = get_object_or_404(Event, pk=pk)
cday = CalendarDay(date(year, month, day))
others = [
e
for e in cday.get_events()
if e != event
and (event.other_versions is None or event.other_versions != e.other_versions)
and e.status != Event.STATUS.TRASH
]
form = SelectEventInList(events=others)
if request.method == "POST":
form = SelectEventInList(request.POST, events=others)
if form.is_valid():
selected = [o for o in others if o.pk == int(form.cleaned_data["event"])]
event.set_other_versions(selected)
# save them without updating modified date
event.set_no_modification_date_changed()
event.save()
if request.user.is_authenticated:
messages.success(request, _("The event was successfully duplicated."))
return HttpResponseRedirect(
reverse_lazy("view_duplicate", args=[event.other_versions.pk])
)
else:
messages.info(
request,
_(
"The event has been successfully flagged as a duplicate. The moderation team will deal with your suggestion shortly."
),
)
return HttpResponseRedirect(event.get_absolute_url())
return render(
request,
"agenda_culturel/set_duplicate.html",
context={"form": form, "event": event},
)

View File

@@ -0,0 +1,662 @@
from datetime import date
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import (
LoginRequiredMixin,
PermissionRequiredMixin,
UserPassesTestMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.core.paginator import EmptyPage, PageNotAnInteger
from django.http import (
HttpResponseForbidden,
HttpResponseRedirect,
)
from django.shortcuts import get_object_or_404, render
from django.urls import reverse, reverse_lazy
from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView
from django.views.generic.edit import (
CreateView,
DeleteView,
ModelFormMixin,
UpdateView,
)
from django.http import Http404
from urllib.parse import quote
from .utils import PaginatorFilter
from ..celery import (
import_events_from_url,
import_events_from_urls,
)
from ..filters import (
EventFilterAdmin,
SimpleSearchEventFilter,
)
from ..forms import (
EventForm,
EventFormWithContact,
MessageEventForm,
SimpleContactForm,
URLSubmissionFormSet,
URLSubmissionFormWithContact,
URLSubmissionSimpleForm,
)
from ..import_tasks.extractor import Extractor
from ..models import (
DuplicatedEvents,
Event,
Message,
)
from .utils import get_event_qs
def update_from_source(request, pk):
event = get_object_or_404(Event, pk=pk)
url = event.get_updateable_uuid()
if url is None:
messages.warning(
request,
_(
"The event cannot be updated because the import process is not available for the referenced sources."
),
)
else:
import_events_from_url.delay(
url,
None,
None,
True,
user_id=request.user.pk if request.user else None,
)
messages.success(
request,
_("The event update has been queued and will be completed shortly."),
)
return HttpResponseRedirect(event.get_absolute_url())
class EventUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = Event
permission_required = "agenda_culturel.change_event"
form_class = EventForm
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["is_authenticated"] = self.request.user.is_authenticated
kwargs["is_moderation_expert"] = (
self.request.user.userprofile.is_moderation_expert
)
kwargs["is_cloning"] = self.is_cloning
kwargs["is_edit_from_moderation"] = self.is_edit_force()
kwargs["is_simple_cloning"] = self.is_simple_cloning
return kwargs
def get_success_message(self, cleaned_data):
txt = (
_(" A message has been sent to the person who proposed the event.")
if hasattr(self, "with_msg") and self.with_msg
else ""
)
return mark_safe(_("The event has been successfully modified.") + txt)
def is_edit_force(self):
return "edit-force" in self.request.path.split("/")
def get_object(self, queryset=None):
event = super().get_object(queryset)
if event.status == Event.STATUS.DRAFT:
event.status = Event.STATUS.PUBLISHED
if self.is_edit_force():
event.free_modification_lock(self.request.user)
return event
def form_valid(self, form):
form.instance.set_processing_user(self.request.user)
self.with_message = form.instance.notify_if_required(self.request)
return super().form_valid(form)
def get_initial(self):
self.is_cloning = "clone" in self.request.path.split("/")
if self.is_cloning:
messages.info(
self.request,
_(
"Changes will be visible on a local copy of the event. The version identical to the imported source will be hidden."
),
)
self.is_simple_cloning = "simple-clone" in self.request.path.split("/")
result = super().get_initial()
if self.is_cloning and "other_versions" not in result:
obj = self.get_object()
# if no DuplicatedEvents is associated, create one
obj.other_versions = DuplicatedEvents.objects.create()
obj.other_versions.save()
# save them without updating modified date
obj.set_no_modification_date_changed()
obj.save()
result["other_versions"] = obj.other_versions
result["status"] = Event.STATUS.PUBLISHED
result["cloning"] = True
if self.is_simple_cloning:
result["other_versions"] = None
result["simple_cloning"] = True
if self.is_cloning or self.is_simple_cloning:
obj = self.get_object()
if obj.local_image:
result["old_local_image"] = obj.local_image.name
return result
def get_success_url(self):
if "save_and_next" in self.request.POST:
return reverse_lazy("moderate_event_next", args=[self.object.pk])
else:
return self.object.get_absolute_url()
class EventDeleteView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
DeleteView,
):
model = Event
permission_required = "agenda_culturel.delete_event"
success_url = reverse_lazy("recent")
success_message = _("The event has been successfully deleted.")
class EventDetailView(UserPassesTestMixin, DetailView, ModelFormMixin):
model = Event
form_class = MessageEventForm
template_name = "agenda_culturel/page-event.html"
queryset = (
Event.objects.select_related("exact_location")
.select_related("category")
.select_related("other_versions")
.select_related("other_versions__representative")
.prefetch_related("message_set")
)
def test_func(self):
return (
self.request.user.is_authenticated
or self.get_object().status == Event.STATUS.PUBLISHED
)
def get_object(self):
o = super().get_object()
o.download_missing_image()
if "year" in self.kwargs:
y = self.kwargs["year"]
m = self.kwargs["month"]
d = self.kwargs["day"]
obj = o.get_recurrence_at_date(y, m, d)
obj.set_current_date(date(y, m, d))
return obj
else:
return o
def get(self, request, *args, **kwargs):
try:
self.object = self.get_object()
except Http404 as e:
if "year" in self.kwargs:
y = self.kwargs["year"]
m = self.kwargs["month"]
d = self.kwargs["day"]
day = date(y, m, d)
s = self.kwargs["extra"].replace("-", " ")
qs = get_event_qs(request)
request.GET._mutable = True
request.GET["q"] = s
filter = SimpleSearchEventFilter(
request.GET,
queryset=qs,
request=request,
)
context = {
"alternative_events": Event.alternative_events_if_not_found(day, s),
"filter": filter,
}
return render(
request, "page-event-not-found.html", status=404, context=context
)
else:
raise e
context = self.get_context_data(object=self.object)
return self.render_to_response(context)
def get_success_url(self):
return self.get_object().get_absolute_url() + "#chronology"
def post(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return HttpResponseForbidden()
form = self.get_form()
if form.is_valid():
return self.form_valid(form)
else:
return self.form_invalid(form)
def form_valid(self, form):
message = form.save(commit=False)
message.user = self.request.user
message.related_event = self.get_object()
message.subject = _("Comment")
message.spam = False
message.closed = True
message.save()
return super().form_valid(form)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_event")
def change_status_event(request, pk, status):
event = get_object_or_404(Event, pk=pk)
if request.method == "POST":
event.status = Event.STATUS(status)
fields = ["status", "moderated_date", "moderated_by_user"]
event.set_in_moderation_process()
event.update_modification_dates()
event.save(update_fields=fields)
with_msg = event.notify_if_required(request)
if with_msg:
messages.success(
request,
_(
"The status has been successfully modified and a message has been sent to the person who proposed the event."
),
)
else:
messages.success(request, _("The status has been successfully modified."))
return HttpResponseRedirect(event.get_absolute_url())
else:
cancel_url = request.META.get("HTTP_REFERER", "")
if cancel_url == "":
cancel_url = reverse_lazy("home")
return render(
request,
"agenda_culturel/event_confirm_change_status.html",
{"status": status, "event": event, "cancel_url": cancel_url},
)
def import_event_proxy(request):
if request.method == "POST":
form = URLSubmissionSimpleForm(request.POST, request.FILES)
if form.is_valid():
url = form.cleaned_data["url"]
if Extractor.is_known_url_default_extractors(url):
c_url = Extractor.clean_url_default_extractors(url)
ex = Event.is_know_url_get_visible_event(
c_url, request.user.is_authenticated
)
if ex is True:
messages.info(
request,
_(
"{} has not been submitted since it"
"s already known and currently into moderation process."
).format(c_url),
)
return HttpResponseRedirect(reverse_lazy("home"))
elif isinstance(ex, Event):
messages.info(
request,
mark_safe(
_(
"{} has not been submitted since it"
"s already known: {}."
).format(
c_url,
'<a href="'
+ ex.get_absolute_url()
+ '">'
+ escape(ex.title)
+ "</a>",
)
),
)
return HttpResponseRedirect(ex.get_absolute_url())
else:
messages.info(
request,
_(
"This type of address is known to the calendar, so an automatic import is proposed."
),
)
return HttpResponseRedirect(
reverse_lazy("add_event_url") + "?url=" + quote(c_url)
)
else:
manual = reverse_lazy("add_event_details") + "?url=" + quote(url)
if request.user.is_authenticated:
messages.info(
request,
mark_safe(
_(
'This type of address is unknown to the agenda. As a logged-in user, you can still try to import the event, which may work if the page contains a google calendar link, for example. When in doubt, opt for <a href="{}">manual entry</a>.'
).format(manual)
),
)
return HttpResponseRedirect(
reverse_lazy("add_event_url") + "?url=" + quote(url)
)
else:
messages.info(
request,
_(
"This type of address is unknown to the calendar, so we suggest that you enter the information for the event you wish to import, in addition to the link you've provided."
),
)
return HttpResponseRedirect(manual)
else:
form = URLSubmissionSimpleForm()
return render(request, "agenda_culturel/event_import.html", {"form": form})
class EventCreateView(SuccessMessageMixin, CreateView):
model = Event
form_class = EventFormWithContact
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["is_authenticated"] = self.request.user.is_authenticated
return kwargs
def get_initial(self):
initial = super().get_initial()
initial["reference_urls"] = [self.request.GET.get("url", "")]
return initial
def get_success_url(self):
if self.request.user.is_authenticated:
if "save_and_next" in self.request.POST:
return reverse_lazy("moderate_event_next", args=[self.object.pk])
else:
return self.object.get_absolute_url()
else:
return reverse_lazy("home")
def get_success_message(self, cleaned_data):
if self.request.user.is_authenticated:
return mark_safe(
_('The event was created: <a href="{}">{}</a>.').format(
self.object.get_absolute_url(), self.object.title
)
)
else:
return _(
"The event has been submitted and will be published as soon as it has been validated by the moderation team."
)
def form_valid(self, form):
if form.cleaned_data["simple_cloning"]:
form.instance.set_skip_duplicate_check()
if form.cleaned_data["cloning"]:
form.instance.set_in_moderation_process()
if form.cleaned_data.get("email") or form.cleaned_data.get("comments"):
has_comments = form.cleaned_data.get("comments") not in ["", None]
form.instance.add_message(
Message(
subject=_("during the creation process"),
message=form.cleaned_data.get("comments"),
email=form.cleaned_data.get("email"),
closed=False,
message_type=(
Message.TYPE.FROM_CONTRIBUTOR
if has_comments
else Message.TYPE.FROM_CONTRIBUTOR_NO_MSG
),
)
)
form.instance.import_sources = None
form.instance.set_processing_user(self.request.user)
result = super().form_valid(form)
if form.cleaned_data["cloning"]:
with_msg = form.instance.notify_if_required(self.request)
if with_msg:
messages.success(
self.request,
_(
"A message has been sent to the person who proposed the initial event."
),
)
return result
# A class to evaluate the URL according to the existing events and the authentification
# level of the user
class URLEventEvaluation:
def __init__(self, form, is_authenticated):
self.form = form
self.is_authenticated = is_authenticated
self.cat = None
self.tags = []
self.existing = None
self.url = form.cleaned_data.get("url")
self.event = None
if self.url is not None:
self.url = Extractor.clean_url_default_extractors(self.url)
print(self.url)
# we check if the url is known
self.existing = Event.is_know_url_get_visible_event(
self.url, is_authenticated
)
if self.existing is False:
self.cat = form.cleaned_data.get("category")
if self.cat is not None:
self.cat = self.cat.name
self.tags = form.cleaned_data.get("tags")
def exists(self):
return self.url is not None
def is_new(self):
return self.exists() and self.existing is False
def is_event_visible(self):
return isinstance(self.existing, Event)
def get_event(self):
if isinstance(self.existing, Event):
return self.existing
else:
return None
def get_link(self):
e = self.get_event()
if e is None:
return ""
else:
return '<a href="' + e.get_absolute_url() + '">' + escape(e.title) + "</a>"
def to_list(self):
if self.is_new():
return (self.url, self.cat, self.tags)
def import_from_urls(request):
if request.method == "POST":
formset = URLSubmissionFormSet(request.POST, request.FILES)
if not request.user.is_authenticated:
contactform = SimpleContactForm(request.POST)
if formset.is_valid() and (
request.user.is_authenticated or contactform.is_valid()
):
# evaluate all the forms
ucat = [
URLEventEvaluation(form, request.user.is_authenticated)
for form in formset.forms
]
# for each not new, add a message
for uc in ucat:
if uc.exists() and not uc.is_new():
if uc.is_event_visible():
messages.info(
request,
mark_safe(
_(
"{} has not been submitted since it"
"s already known: {}."
).format(uc.url, uc.get_link())
),
)
else:
messages.info(
request,
_(
"{} has not been submitted since it"
"s already known and currently into moderation process."
).format(uc.url),
)
# keep only new ones
ucat = [uc.to_list() for uc in ucat if uc.is_new()]
# finally process them or go back to home page
if len(ucat) > 0:
messages.info(
request,
_("Integrating {} url(s) into our import process.").format(
len(ucat)
),
)
email = None
comments = None
if not request.user.is_authenticated:
email = contactform.cleaned_data["email"]
comments = contactform.cleaned_data["comments"]
import_events_from_urls.delay(
ucat,
user_id=request.user.pk if request.user else None,
email=email,
comments=comments,
)
return HttpResponseRedirect(reverse("thank_you"))
else:
return HttpResponseRedirect(reverse("home"))
else:
formset = URLSubmissionFormSet()
if not request.user.is_authenticated:
contactform = SimpleContactForm()
context = {"formset": formset}
if not request.user.is_authenticated:
context["contactform"] = contactform
return render(request, "agenda_culturel/import_set.html", context=context)
def import_from_url(request):
# if the form has been sent
if request.method == "POST":
form = URLSubmissionFormWithContact(
request.POST, is_authenticated=request.user.is_authenticated
)
# if the form is valid
if form.is_valid():
uc = URLEventEvaluation(form, request.user.is_authenticated)
print(uc.exists(), uc.is_new(), uc.event)
if uc.exists() and not uc.is_new():
if uc.is_event_visible():
messages.info(
request,
mark_safe(
_(
"{} has not been submitted since its already known: {}."
).format(uc.url, uc.get_link())
),
)
return HttpResponseRedirect(uc.get_event().get_absolute_url())
else:
messages.info(
request,
_(
"{} has not been submitted since it"
"s already known and currently into moderation process."
).format(uc.url),
)
return HttpResponseRedirect(reverse("home"))
else:
messages.info(
request,
_("Integrating {} into our import process.").format(uc.url),
)
import_events_from_url.delay(
uc.url,
uc.cat,
uc.tags,
user_id=request.user.pk if request.user else None,
email=form.cleaned_data.get("email"),
comments=form.cleaned_data.get("comments"),
)
return HttpResponseRedirect(reverse("thank_you"))
else:
url = request.GET.get("url", "") if request.method == "GET" else ""
initial = {"url": url}
form = URLSubmissionFormWithContact(
is_authenticated=request.user.is_authenticated, initial=initial
)
return render(
request,
"agenda_culturel/import.html",
context={"form": form},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_event")
def recent(request):
filter = EventFilterAdmin(
request.GET, queryset=Event.objects.all().order_by("-created_date")
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/recent.html",
{"filter": filter, "paginator_filter": response},
)

View File

@@ -0,0 +1,103 @@
import hashlib
from datetime import timedelta, date
import emoji
from django.core.cache import cache
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404
from . import get_event_qs
from ..calendar import CalendarList
from ..filters import EventFilter
from ..models import Event, Category, Place, Organisation, SiteConfiguration
def export_event_ical(request, year, month, day, pk):
event = get_object_or_404(Event, pk=pk)
event = event.get_recurrence_at_date(year, month, day)
events = list()
events.append(event)
cal = Event.export_to_ics(events, request)
response = HttpResponse(content_type="text/calendar")
response.content = cal.to_ical().decode("utf-8").replace("\r\n", "\n")
response["Content-Disposition"] = "attachment; filename={0}{1}".format(
event.title.replace("\n", " ").replace("\r", "")[0:32], ".ics"
)
return response
def export_ical(request, cat=None, tag=None, organisation_pk=None, place_pk=None):
now = date.today()
qs = get_event_qs(request)
if cat is not None:
category = Category.objects.filter(slug=cat).first()
qs = qs.filter(category=category)
else:
category = None
if place_pk is not None:
qs = qs.filter(exact_location=place_pk)
if organisation_pk is not None:
qs = qs.filter(organisers__in=[organisation_pk])
if tag is not None:
qs = qs.filter(tags__contains=[tag])
request = EventFilter.set_default_values(request)
filter = EventFilter(request.GET, queryset=qs, request=request)
if filter.has_category_parameters():
return HttpResponseRedirect(filter.get_new_url())
id_cache = hashlib.md5(
(
filter.get_url()
+ "-"
+ str(tag)
+ "-"
+ str(cat)
+ "-"
+ str(organisation_pk)
+ "-"
+ str(place_pk)
).encode("utf8")
).hexdigest()
ical = cache.get(id_cache)
if not ical:
calendar = CalendarList(
now + timedelta(days=-7), now + timedelta(days=+60), filter
)
ical = calendar.export_to_ics(request)
cache.set(id_cache, ical, 3600) # 1 heure
response = HttpResponse(content_type="text/calendar")
response.content = ical.to_ical().decode("utf-8").replace("\r\n", "\n")
extra = filter.to_str(" ")
if extra is None:
extra = ""
if category is not None:
extra += " " + str(category)
if place_pk is not None:
extra += (
" @ " + Place.objects.filter(pk=place_pk).values("name").first()["name"]
)
if organisation_pk is not None:
extra += (
" - "
+ Organisation.objects.filter(pk=organisation_pk)
.values("name")
.first()["name"]
)
if tag is not None:
extra += " - " + emoji.replace_emoji(tag, replace="")
response["Content-Disposition"] = "attachment; filename={0}{1}{2}".format(
SiteConfiguration.get_solo().site_name, extra, ".ics"
)
return response

View File

@@ -0,0 +1,348 @@
import calendar as _calendar
from datetime import date, timedelta, datetime
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.core.cache import cache
from django.db.models import Count, F, Subquery, OuterRef
from django.db.models import Q, Min, Max, Avg, StdDev
from django.db.models.functions import TruncMonth, ExtractDay
from django.http import HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import CreateView, UpdateView
from . import week_view
from .utils import Median
from ..forms import UserProfileForm
from ..models import (
RecurrentImport,
Event,
StaticContent,
BatchImportation,
UserProfile,
)
def home(request, cat=None):
return week_view(request, home=True, cat=cat)
def thank_you(request):
return render(request, "agenda_culturel/thank_you.html")
def mentions_legales(request):
context = {
"title": "Mentions légales",
"static_content": "mentions_legales",
"url_path": reverse_lazy("mentions_legales"),
}
return render(request, "agenda_culturel/page-single.html", context)
def about(request):
rimports = (
RecurrentImport.objects.filter(~Q(recurrence=RecurrentImport.RECURRENCE.NEVER))
.order_by("name__unaccent")
.all()
)
context = {
"title": "À propos",
"static_content": "about",
"url_path": reverse_lazy("about"),
"rimports": rimports,
}
return render(request, "agenda_culturel/page-rimports-list.html", context)
def moderation_rules(request):
context = {
"title": _("Moderation rules"),
"static_content": "moderation_rules",
"url_path": reverse_lazy("moderation_rules"),
}
return render(request, "agenda_culturel/page-single.html", context)
def import_requirements(request):
context = {
"title": _("Import requirements"),
"static_content": "import_requirements",
"url_path": reverse_lazy("import_requirements"),
}
return render(request, "agenda_culturel/page-single.html", context)
def statistics(request, pk=None):
if pk is not None:
rimport = RecurrentImport.objects.filter(pk=pk)
source = rimport.values("source").first()["source"]
qs = Event.objects.filter(import_sources__contains=[source])
else:
rimport = None
qs = Event.objects
stats = {}
stats_months = {}
first = {}
last = {}
ev_published = qs.filter(
Q(status=Event.STATUS.PUBLISHED)
& (
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
)
for v in ["start_day", "created_date__date"]:
after = 24
last[v] = (
date.today()
if v == "created_date__date"
else date.today() + timedelta(weeks=after)
)
last[v] = last[v].replace(
day=_calendar.monthrange(last[v].year, last[v].month)[1]
)
r = 8 * 30
if v == "start_day":
r += after * 7
first[v] = (last[v] - timedelta(days=r)).replace(day=1)
ev_days = ev_published.annotate(day=F(v)).filter(
Q(day__lte=last[v]) & Q(day__gte=first[v])
)
stats[v] = ev_days.values("day").annotate(total=Count("day")).order_by("day")
stats_months[v] = (
ev_days.annotate(month=TruncMonth("day"))
.values("month")
.annotate(total=Count("month"))
.order_by("month")
)
nb_by_city = (
ev_published.annotate(city=F("exact_location__city"))
.filter(city__isnull=False)
.values("city")
.annotate(total=Count("city"))
.order_by("-total")
)
limit = datetime.now() + timedelta(days=-30)
stat_qs = qs.filter(start_day__gte=F("created_date")).annotate(
foresight=ExtractDay(F("start_day") - F("created_date"))
)
statsa = stat_qs.filter().aggregate(
minimum=Min("foresight"),
maximum=Max("foresight"),
mean=Avg("foresight"),
median=Median("foresight"),
stdev=StdDev("foresight"),
)
statsm = stat_qs.filter(created_date__gte=limit).aggregate(
minimum=Min("foresight"),
maximum=Max("foresight"),
mean=Avg("foresight"),
median=Median("foresight"),
stdev=StdDev("foresight"),
)
stats_foresight = [
[
_(x),
round(statsa[x], 2) if statsa[x] is not None else "-",
round(statsm[x], 2) if statsm[x] is not None else "-",
]
for x in statsa
]
context = {
"stats_by_startday": stats["start_day"],
"stats_by_creation": stats["created_date__date"],
"stats_months_by_startday": stats_months["start_day"],
"stats_months_by_creation": stats_months["created_date__date"],
"first_by_startday": first["start_day"],
"last_by_startday": last["start_day"],
"first_by_creation": first["created_date__date"],
"last_by_creation": last["created_date__date"],
"nb_by_city": nb_by_city,
"stats_foresight": stats_foresight,
"object": rimport.first() if rimport else None,
}
if pk is None:
return render(request, "agenda_culturel/statistics.html", context)
else:
return render(request, "agenda_culturel/rimport-statistics.html", context)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_event")
def activite(request):
now = date.today()
days = [now]
while len(days) < 7 or days[-1].weekday() != 0:
days.append(days[-1] + timedelta(days=-1))
weeks = [days[-1]]
for w in range(0, 8):
weeks.append(weeks[-1] + timedelta(days=-7))
daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
weekly_modifications = Event.get_count_modifications([(w, 7) for w in weeks])
return render(
request,
"agenda_culturel/page-activity.html",
{
"daily_modifications": daily_modifications,
"weekly_modifications": weekly_modifications,
},
)
class StaticContentCreateView(LoginRequiredMixin, CreateView):
model = StaticContent
fields = ["text"]
permission_required = "agenda_culturel.add_staticcontent"
def form_valid(self, form):
form.instance.name = self.request.GET["name"]
form.instance.url_path = self.request.GET["url_path"]
return super().form_valid(form)
class StaticContentUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = StaticContent
permission_required = "agenda_culturel.change_staticcontent"
fields = ["text"]
success_message = _("The static content has been successfully updated.")
def clear_cache(request):
if request.method == "POST":
cache.clear()
messages.success(request, _("Cache successfully cleared."))
return HttpResponseRedirect(reverse_lazy("administration"))
else:
return render(
request,
"agenda_culturel/clear_cache.html",
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_event")
def administration(request):
nb_mod_days = 21
nb_classes = 4
today = date.today()
# get information about recent modifications
days = [today]
for i in range(0, 2):
days.append(days[-1] + timedelta(days=-1))
daily_modifications = Event.get_count_modifications([(d, 1) for d in days])
# get last created events
events = (
Event.objects.all()
.order_by("-created_date")
.select_related("exact_location", "category")[:5]
)
# get last batch imports
rel_event = Event.objects.filter(
import_sources__contains=[OuterRef("url_source")]
).values("pk")[:1]
batch_imports = (
BatchImportation.objects.all()
.select_related("recurrentImport")
.annotate(event_id=Subquery(rel_event))
.order_by("-created_date")[:5]
)
# get info about batch information
newest = (
BatchImportation.objects.filter(recurrentImport=OuterRef("pk"))
.order_by("-created_date")
.select_related("recurrentImport")
)
imported_events = RecurrentImport.objects.annotate(
last_run_status=Subquery(newest.values("status")[:1])
)
nb_failed = imported_events.filter(
last_run_status=BatchImportation.STATUS.FAILED
).count()
nb_canceled = imported_events.filter(
last_run_status=BatchImportation.STATUS.CANCELED
).count()
nb_running = imported_events.filter(
last_run_status=BatchImportation.STATUS.RUNNING
).count()
nb_all = imported_events.count()
# get some info about imported (or not) events
srcs = RecurrentImport.objects.all().values_list("source")
in_future = Event.objects.filter(Q(start_day__gte=today))
nb_in_rimport = in_future.filter(Q(import_sources__overlap=srcs)).count()
nb_in_orphan_import = in_future.filter(
(
Q(import_sources__isnull=False)
& (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
)
& ~Q(import_sources__overlap=srcs)
).count()
# get all non moderated events
nb_not_moderated = Event.get_nb_not_moderated(today, nb_mod_days, nb_classes)
return render(
request,
"agenda_culturel/administration.html",
{
"daily_modifications": daily_modifications,
"events": events,
"batch_imports": batch_imports,
"nb_failed": nb_failed,
"nb_canceled": nb_canceled,
"nb_running": nb_running,
"nb_all": nb_all,
"nb_not_moderated": nb_not_moderated,
"nb_in_rimport": nb_in_rimport,
"nb_in_orphan_import": nb_in_orphan_import,
},
)
class UserProfileUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
UpdateView,
):
model = UserProfile
success_message = _("Your user profile has been successfully modified.")
success_url = reverse_lazy("administration")
form_class = UserProfileForm
def get_object(self):
return self.request.user.userprofile

View File

@@ -0,0 +1,143 @@
from datetime import date
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage
from django.db.models import F, Q, OuterRef, Subquery
from django.http import HttpResponseRedirect
from django.shortcuts import render, get_object_or_404
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from ..celery import app as celery_app, update_orphan_pure_import_events
from ..celery import import_events_from_json
from ..forms import BatchImportationForm
from ..models import Event, BatchImportation, RecurrentImport
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_batchimportation")
def imports(request):
rel_event = Event.objects.filter(
import_sources__contains=[OuterRef("url_source")]
).values("pk")[:1]
paginator = Paginator(
BatchImportation.objects.all()
.order_by("-created_date")
.annotate(event_id=Subquery(rel_event)),
30,
)
page = request.GET.get("page")
today = date.today()
srcs = RecurrentImport.objects.all().values_list("source")
in_future = Event.objects.filter(Q(start_day__gte=today))
nb_in_orphan_import = in_future.filter(
(
Q(import_sources__isnull=False)
& (Q(modified_date__isnull=True) | Q(modified_date__lte=F("imported_date")))
)
& ~Q(import_sources__overlap=srcs)
).count()
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/imports.html",
{
"paginator_filter": response,
"nb_in_orphan_import": nb_in_orphan_import,
},
)
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.add_batchimportation",
"agenda_culturel.run_batchimportation",
]
)
def add_import(request):
form = BatchImportationForm()
if request.method == "POST":
form = BatchImportationForm(request.POST)
if form.is_valid():
import_events_from_json.delay(form.data["json"])
messages.success(request, _("The import has been run successfully."))
return HttpResponseRedirect(reverse_lazy("imports"))
return render(request, "agenda_culturel/batchimportation_form.html", {"form": form})
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_batchimportation",
"agenda_culturel.run_batchimportation",
]
)
def cancel_import(request, pk):
import_process = get_object_or_404(BatchImportation, pk=pk)
if request.method == "POST":
celery_app.control.revoke(import_process.celery_id)
import_process.status = BatchImportation.STATUS.CANCELED
import_process.save(update_fields=["status"])
messages.success(request, _("The import has been canceled."))
return HttpResponseRedirect(reverse_lazy("imports"))
else:
cancel_url = reverse_lazy("imports")
return render(
request,
"agenda_culturel/cancel_import_confirm.html",
{"object": import_process, "cancel_url": cancel_url},
)
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_batchimportation",
"agenda_culturel.run_batchimportation",
]
)
def update_orphan_events(request):
if request.method == "POST":
# run recurrent import
update_orphan_pure_import_events.delay()
messages.success(request, _("The orphan event update has been launched."))
return HttpResponseRedirect(reverse_lazy("imports"))
else:
today = date.today()
srcs = RecurrentImport.objects.all().values_list("source")
in_future = Event.objects.filter(Q(start_day__gte=today))
nb_in_orphan_import = in_future.filter(
(
Q(import_sources__isnull=False)
& (
Q(modified_date__isnull=True)
| Q(modified_date__lte=F("imported_date"))
)
)
& ~Q(import_sources__overlap=srcs)
).count()
return render(
request,
"agenda_culturel/run_orphan_imports_confirm.html",
{"nb_in_orphan_import": nb_in_orphan_import},
)

View File

@@ -0,0 +1,202 @@
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage
from django.db.models import OuterRef, Subquery
from django.http import HttpResponseRedirect
from django.shortcuts import render, get_object_or_404
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import CreateView, UpdateView, DeleteView
from . import PaginatorFilter
from ..celery import (
run_all_recurrent_imports,
run_all_recurrent_imports_canceled,
run_all_recurrent_imports_failed,
run_recurrent_import,
)
from ..filters import RecurrentImportFilter
from ..forms import RecurrentImportForm
from ..models import BatchImportation, RecurrentImport
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_recurrentimport")
def recurrent_imports(request, status=None):
newest = BatchImportation.objects.filter(recurrentImport=OuterRef("pk")).order_by(
"-created_date"
)
qs = (
RecurrentImport.objects.all()
.annotate(last_run_status=Subquery(newest.values("status")[:1]))
.order_by("-pk")
)
nb_failed = qs.filter(last_run_status=BatchImportation.STATUS.FAILED).count()
nb_canceled = qs.filter(last_run_status=BatchImportation.STATUS.CANCELED).count()
nb_running = qs.filter(last_run_status=BatchImportation.STATUS.RUNNING).count()
nb_all = qs.count()
if status is not None:
qs = qs.filter(last_run_status=status)
filter = RecurrentImportFilter(request.GET, queryset=qs)
paginator = PaginatorFilter(filter, 20, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/rimports.html",
{
"paginator_filter": response,
"filter": filter,
"nb_all": nb_all,
"nb_failed": nb_failed,
"nb_canceled": nb_canceled,
"nb_running": nb_running,
"status": status,
},
)
class RecurrentImportCreateView(
LoginRequiredMixin, PermissionRequiredMixin, CreateView
):
model = RecurrentImport
permission_required = "agenda_culturel.add_recurrentimport"
success_url = reverse_lazy("recurrent_imports")
form_class = RecurrentImportForm
class RecurrentImportUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = RecurrentImport
permission_required = "agenda_culturel.change_recurrentimport"
form_class = RecurrentImportForm
success_message = _("The recurrent import has been successfully modified.")
class RecurrentImportDeleteView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
DeleteView,
):
model = RecurrentImport
permission_required = "agenda_culturel.delete_recurrentimport"
success_url = reverse_lazy("recurrent_imports")
success_message = _("The recurrent import has been successfully deleted.")
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_recurrentimport",
"agenda_culturel.view_batchimportation",
]
)
def view_rimport(request, pk):
obj = get_object_or_404(RecurrentImport, pk=pk)
paginator = Paginator(
BatchImportation.objects.filter(recurrentImport=pk).order_by("-created_date"),
10,
)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/page-rimport.html",
{"paginator_filter": response, "object": obj},
)
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_recurrentimport",
"agenda_culturel.run_recurrentimport",
]
)
def run_rimport(request, pk):
rimport = get_object_or_404(RecurrentImport, pk=pk)
if request.method == "POST":
# run recurrent import
run_recurrent_import.delay(pk)
messages.success(request, _("The import has been launched."))
return HttpResponseRedirect(reverse_lazy("view_rimport", args=[pk]))
else:
return render(
request,
"agenda_culturel/run_rimport_confirm.html",
{"object": rimport},
)
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_recurrentimport",
"agenda_culturel.run_recurrentimport",
]
)
def run_all_rimports(request, status=None):
if request.method == "POST":
# run recurrent import
if status == BatchImportation.STATUS.FAILED:
run_all_recurrent_imports_failed.delay()
elif status == BatchImportation.STATUS.CANCELED:
run_all_recurrent_imports_canceled.delay()
else:
run_all_recurrent_imports.delay()
messages.success(request, _("Imports has been launched."))
return HttpResponseRedirect(reverse_lazy("recurrent_imports"))
else:
if status == BatchImportation.STATUS.FAILED:
return render(request, "agenda_culturel/run_failed_rimports_confirm.html")
elif status == BatchImportation.STATUS.CANCELED:
return render(request, "agenda_culturel/run_canceled_rimports_confirm.html")
else:
return render(request, "agenda_culturel/run_all_rimports_confirm.html")
@login_required(login_url="/accounts/login/")
@permission_required(
[
"agenda_culturel.view_recurrentimport",
"agenda_culturel.run_recurrentimport",
]
)
def run_all_fb_rimports(request, status=None):
if request.method == "POST":
run_all_recurrent_imports.delay(True)
messages.success(request, _("Facebook imports has been launched."))
return HttpResponseRedirect(reverse_lazy("recurrent_imports"))
else:
return render(request, "agenda_culturel/run_all_fb_rimports_confirm.html")

View File

@@ -0,0 +1,146 @@
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import PermissionRequiredMixin, LoginRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.core.checks import messages
from django.core.paginator import PageNotAnInteger, EmptyPage
from django.db.models import Count
from django.http import HttpResponseRedirect
from django.shortcuts import render, get_object_or_404
from django.urls import reverse_lazy
from django.utils.decorators import method_decorator
from django.utils.translation import gettext_lazy as _
from django.views.generic import DeleteView, UpdateView, CreateView
from honeypot.decorators import check_honeypot
from . import PaginatorFilter
from ..filters import MessagesFilterAdmin
from ..forms import MessageForm
from ..models import Event, Message
@method_decorator(check_honeypot, name="post")
class MessageCreateView(SuccessMessageMixin, CreateView):
model = Message
template_name = "agenda_culturel/message_create_form.html"
form_class = MessageForm
success_url = reverse_lazy("home")
success_message = _("Your message has been sent successfully.")
def __init__(self, *args, **kwargs):
self.event = None
super().__init__(*args, **kwargs)
def get_form(self, form_class=None):
if form_class is None:
form_class = self.get_form_class()
return form_class(**self.get_form_kwargs())
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["event"] = self.event
if self.request.user.is_authenticated:
kwargs["internal"] = True
return kwargs
def form_valid(self, form):
if self.request.user.is_authenticated:
form.instance.user = self.request.user
form.instance.message_type = (
Message.TYPE.EVENT_REPORT
if "pk" in self.kwargs
else Message.TYPE.CONTACT_FORM
)
return super().form_valid(form)
def get_initial(self):
result = super().get_initial()
if "pk" in self.kwargs:
self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
result["related_event"] = self.event
result["subject"] = _("Reporting the event {} on {}").format(
self.event.title, self.event.start_day
)
else:
result["related_event"] = None
return result
class MessageDeleteView(SuccessMessageMixin, DeleteView):
model = Message
success_message = _("The contact message has been successfully deleted.")
success_url = reverse_lazy("messages")
class MessageUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = Message
permission_required = "agenda_culturel.change_message"
template_name = "agenda_culturel/message_moderation_form.html"
fields = ("spam", "closed", "comments")
success_message = _(
"The contact message properties has been successfully modified."
)
success_url = reverse_lazy("messages")
def get_form_kwargs(self):
"""Return the keyword arguments for instantiating the form."""
kwargs = super().get_form_kwargs()
if hasattr(self, "object"):
kwargs.update({"instance": self.object})
return kwargs
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_message")
def view_messages(request):
filter = MessagesFilterAdmin(
request.GET, queryset=Message.objects.all().order_by("-date")
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
nb_spams = Message.objects.filter(spam=True).count()
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/messages.html",
{"filter": filter, "nb_spams": nb_spams, "paginator_filter": response},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.view_message")
def delete_cm_spam(request):
if request.method == "POST":
Message.objects.filter(spam=True).delete()
messages.success(request, _("Spam has been successfully deleted."))
return HttpResponseRedirect(reverse_lazy("messages"))
else:
nb_msgs = Message.objects.values("spam").annotate(total=Count("spam"))
nb_total = sum([nb["total"] for nb in nb_msgs])
nb_spams = sum([nb["total"] for nb in nb_msgs if nb["spam"]])
cancel_url = reverse_lazy("messages")
return render(
request,
"agenda_culturel/delete_spams_confirm.html",
{
"nb_total": nb_total,
"nb_spams": nb_spams,
"cancel_url": cancel_url,
},
)

View File

@@ -0,0 +1,168 @@
from datetime import date
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import PermissionRequiredMixin, LoginRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.http import Http404, HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse_lazy
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from django.views.generic import UpdateView
from django.db.models import Q, F
from django.utils.timezone import datetime
from ..forms import EventModerateForm
from ..models import Event
class EventModerateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = Event
permission_required = "agenda_culturel.change_event"
template_name = "agenda_culturel/event_form_moderate.html"
form_class = EventModerateForm
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["is_moderation_expert"] = (
self.request.user.userprofile.is_moderation_expert
)
return kwargs
def get_success_message(self, cleaned_data):
txt = (
_(" A message has been sent to the person who proposed the event.")
if hasattr(self, "with_msg") and self.with_msg
else ""
)
return mark_safe(
_('The event <a href="{}">{}</a> has been moderated with success.').format(
self.object.get_absolute_url(), self.object.title
)
+ txt
)
def is_moderate_next(self):
return "after" in self.request.path.split("/")
def is_moderate_back(self):
return "back" in self.request.path.split("/")
def is_moderate_force(self):
return "moderate-force" in self.request.path.split("/")
def is_starting_moderation(self):
return "pk" not in self.kwargs
def is_moderation_from_date(self):
return "m" in self.kwargs and "y" in self.kwargs and "d" in self.kwargs
def get_next_event(start_day, start_time, opk):
# select non moderated events
qs = Event.objects.filter(moderated_date__isnull=True)
# select events after the current one
if start_time:
qs = qs.filter(
Q(start_day__gt=start_day)
| (
Q(start_day=start_day)
& (Q(start_time__isnull=True) | Q(start_time__gt=start_time))
)
)
else:
qs = qs.filter(Q(start_day__gte=start_day) & ~Q(pk=opk))
# get only possibly representative events
qs = qs.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
# remove trash events
qs = qs.filter(~Q(status=Event.STATUS.TRASH))
# sort by datetime
qs = qs.order_by("start_day", "start_time")
return qs.first()
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if self.is_moderate_next():
context["pred"] = self.kwargs["pred"]
return context
def get_object(self, queryset=None):
if self.is_starting_moderation():
now = datetime.now()
event = EventModerateView.get_next_event(now.date(), now.time(), None)
else:
event = super().get_object(queryset)
if event.status == Event.STATUS.DRAFT:
event.status = Event.STATUS.PUBLISHED
if self.is_moderate_back():
pred = Event.objects.filter(pk=self.kwargs["pred"]).first()
pred.free_modification_lock(self.request.user)
if self.is_moderate_force():
event.free_modification_lock(self.request.user, False)
return event
def post(self, request, *args, **kwargs):
try:
return super().post(request, args, kwargs)
except Http404:
return HttpResponseRedirect(
reverse_lazy("error_next_event", args=[self.object.pk])
)
def form_valid(self, form):
form.instance.set_no_modification_date_changed()
form.instance.set_in_moderation_process()
form.instance.set_processing_user(self.request.user)
self.with_msg = form.instance.notify_if_required(self.request)
return super().form_valid(form)
def get_success_url(self):
if "save_and_next" in self.request.POST:
return reverse_lazy("moderate_event_next", args=[self.object.pk])
elif "save_and_edit_local" in self.request.POST:
return reverse_lazy("edit_event", args=[self.object.get_local_version().pk])
else:
return self.object.get_absolute_url()
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_event")
def moderate_event_next(request, pk):
# current event
obj = Event.objects.filter(pk=pk).first()
# free lock
obj.free_modification_lock(request.user)
start_day = obj.start_day
start_time = obj.start_time
next_obj = EventModerateView.get_next_event(start_day, start_time, pk)
if next_obj is None:
return render(
request,
"agenda_culturel/event_next_error_message.html",
{"pk": pk, "object": obj},
)
else:
return HttpResponseRedirect(
reverse_lazy("moderate_event_step", args=[next_obj.pk, obj.pk])
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_event")
def moderate_from_date(request, y, m, d):
d = date(y, m, d)
obj = EventModerateView.get_next_event(d, None, None)
return HttpResponseRedirect(reverse_lazy("moderate_event", args=[obj.pk]))

View File

@@ -0,0 +1,92 @@
from datetime import datetime
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.urls import reverse_lazy
from django.db.models import F, Q
from django.views.generic import ListView, UpdateView, CreateView, DeleteView
from django.utils.translation import gettext_lazy as _
from . import get_event_qs
from ..models import Organisation
class OrganisationListView(ListView):
model = Organisation
paginate_by = 10
ordering = ["name__unaccent"]
class OrganisationDetailView(ListView):
model = Organisation
template_name = "agenda_culturel/organisation_detail.html"
paginate_by = 10
def get_queryset(self):
self.organisation = (
Organisation.objects.filter(pk=self.kwargs["pk"])
.prefetch_related("organised_events")
.first()
)
return (
get_event_qs(self.request)
.filter(organisers__in=[self.kwargs["pk"]])
.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
.filter(start_day__gte=datetime.now())
.order_by("start_day")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["object"] = self.organisation
return context
class OrganisationDetailViewPast(OrganisationDetailView):
def get_queryset(self):
self.organisation = (
Organisation.objects.filter(pk=self.kwargs["pk"])
.prefetch_related("organised_events")
.first()
)
self.past = True
return (
get_event_qs(self.request)
.filter(organisers__in=[self.kwargs["pk"]])
.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
.filter(start_day__lte=datetime.now())
.order_by("-start_day")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["past"] = self.past
return context
class OrganisationUpdateView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
model = Organisation
permission_required = "agenda_culturel.change_organisation"
success_message = _("The organisation has been successfully updated.")
fields = "__all__"
class OrganisationCreateView(PermissionRequiredMixin, SuccessMessageMixin, CreateView):
model = Organisation
permission_required = "agenda_culturel.add_organisation"
success_message = _("The organisation has been successfully created.")
fields = "__all__"
class OrganisationDeleteView(PermissionRequiredMixin, DeleteView):
model = Organisation
permission_required = "agenda_culturel.delete_organisation"
success_url = reverse_lazy("view_organisations")

View File

@@ -0,0 +1,258 @@
from datetime import datetime
from django.contrib import messages
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.db.models import F, Q
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import ListView, UpdateView, CreateView, DeleteView
from django.contrib.gis.measure import D
from django.utils.safestring import mark_safe
from .utils import get_event_qs
from ..forms import PlaceForm, EventAddPlaceForm
from ..models import Place, Event
from ..utils import PlaceGuesser
class PlaceListView(ListView):
model = Place
ordering = ["name__unaccent"]
class PlaceListAdminView(PermissionRequiredMixin, ListView):
model = Place
paginate_by = 10
permission_required = "agenda_culturel.add_place"
ordering = ["name__unaccent"]
template_name = "agenda_culturel/place_list_admin.html"
class PlaceDetailView(ListView):
model = Place
template_name = "agenda_culturel/place_detail.html"
paginate_by = 10
def get_queryset(self):
self.place = get_object_or_404(Place, pk=self.kwargs["pk"])
return (
get_event_qs(self.request)
.filter(exact_location=self.place)
.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
.filter(start_day__gte=datetime.now())
.order_by("start_day")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["object"] = self.place
context["place_list"] = Place.objects.filter(
~Q(pk=self.place.pk)
& Q(location__distance_lte=(self.place.location, D(m=4000)))
).only("location", "name", "pk")
return context
class PlaceDetailViewPast(PlaceDetailView):
def get_queryset(self):
self.place = get_object_or_404(Place, pk=self.kwargs["pk"])
self.past = True
return (
get_event_qs(self.request)
.filter(exact_location=self.place)
.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
.filter(start_day__lte=datetime.now())
.order_by("-start_day")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["past"] = self.past
return context
class UpdatePlaces:
def form_valid(self, form):
result = super().form_valid(form)
p = form.instance
if not hasattr(self, "nb_applied"):
self.nb_applied = 0
# if required, find all matching events
if form.apply():
self.nb_applied += p.associate_matching_events()
if self.nb_applied > 1:
messages.success(
self.request,
_("{} events have been updated.").format(self.nb_applied),
)
elif self.nb_applied == 1:
messages.success(self.request, _("1 event has been updated."))
else:
messages.info(self.request, _("No events have been modified."))
return result
class PlaceUpdateView(
UpdatePlaces, PermissionRequiredMixin, SuccessMessageMixin, UpdateView
):
model = Place
permission_required = "agenda_culturel.change_place"
success_message = _("The place has been successfully updated.")
form_class = PlaceForm
class PlaceCreateView(
UpdatePlaces, PermissionRequiredMixin, SuccessMessageMixin, CreateView
):
model = Place
permission_required = "agenda_culturel.add_place"
success_message = _("The place has been successfully created.")
form_class = PlaceForm
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["place_list"] = Place.objects.all().only("location", "name", "pk")
return context
class PlaceDeleteView(PermissionRequiredMixin, DeleteView):
model = Place
permission_required = "agenda_culturel.delete_place"
success_url = reverse_lazy("view_places_admin")
class UnknownPlacesListView(PermissionRequiredMixin, ListView):
model = Event
permission_required = "agenda_culturel.add_place"
paginate_by = 10
ordering = ["-pk"]
template_name = "agenda_culturel/place_unknown_list.html"
queryset = Event.get_qs_events_with_unkwnon_place()
def fix_unknown_places(request):
# get all places
places = Place.objects.all()
# get all events without exact location
u_events = Event.get_qs_events_with_unkwnon_place()
to_be_updated = []
# try to find matches
for ue in u_events:
for p in places:
if p.match(ue):
ue.exact_location = p
to_be_updated.append(ue)
continue
# update events with a location
Event.objects.bulk_update(to_be_updated, fields=["exact_location"])
# create a success message
nb = len(to_be_updated)
if nb > 1:
messages.success(request, _("{} events have been updated.").format(nb))
elif nb == 1:
messages.success(request, _("1 event has been updated."))
else:
messages.info(request, _("No events have been modified."))
# come back to the list of places
return HttpResponseRedirect(reverse_lazy("view_unknown_places"))
class UnknownPlaceAddView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
model = Event
permission_required = (
"agenda_culturel.change_place",
"agenda_culturel.change_event",
)
form_class = EventAddPlaceForm
template_name = "agenda_culturel/place_unknown_form.html"
def form_valid(self, form):
self.modified_event = form.cleaned_data.get("place")
self.add_alias = form.cleaned_data.get("add_alias")
result = super().form_valid(form)
if form.cleaned_data.get("place"):
messages.success(
self.request,
mark_safe(
_(
'The selected place has been assigned to the event <a href="{}">{}</a>.'
).format(self.object.get_absolute_url(), str(self.object))
),
)
if form.cleaned_data.get("add_alias"):
messages.success(
self.request,
_("A new alias has been added to the selected place."),
)
nb_applied = form.cleaned_data.get("place").associate_matching_events()
if nb_applied > 1:
messages.success(
self.request,
_("{} events have been updated.").format(nb_applied),
)
elif nb_applied == 1:
messages.success(self.request, _("1 event has been updated."))
else:
messages.info(self.request, _("No events have been modified."))
return result
def get_success_url(self):
if self.modified_event:
return reverse_lazy("view_unknown_places")
else:
param = "?add=1" if self.add_alias else ""
return reverse_lazy("add_place_from_event", args=[self.object.pk]) + param
class PlaceFromEventCreateView(PlaceCreateView):
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["event"] = self.event
return context
def get_initial(self, *args, **kwargs):
initial = super().get_initial(**kwargs)
self.event = get_object_or_404(Event, pk=self.kwargs["pk"])
if self.event.location and "add" in self.request.GET:
initial["aliases"] = [self.event.location]
guesser = PlaceGuesser()
name, address, postcode, city = guesser.guess_address_elements(
self.event.location
)
initial["name"] = name
initial["address"] = address
initial["postcode"] = postcode
initial["city"] = city
initial["location"] = ""
return initial
def form_valid(self, form):
result = super().form_valid(form)
self.event.exact_location = form.instance
self.event.save(update_fields=["exact_location"])
return result
def get_success_url(self):
return self.event.get_absolute_url()

View File

@@ -0,0 +1,111 @@
import emoji
from django.core.paginator import PageNotAnInteger, EmptyPage
from django.shortcuts import render
from . import PaginatorFilter
from ..filters import SearchEventFilter, SimpleSearchEventFilter
from ..models import (
Category,
remove_accents,
Event,
Place,
Organisation,
RecurrentImport,
)
from ..views import get_event_qs
from django.db.models import Q, F, Func
def event_search(request, full=False):
categories = None
tags = None
places = None
organisations = None
rimports = None
qs = get_event_qs(request).order_by("-start_day")
if not request.user.is_authenticated:
qs = qs.filter(
(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
)
if full:
filter = SearchEventFilter(
request.GET,
queryset=qs,
request=request,
)
else:
filter = SimpleSearchEventFilter(
request.GET,
queryset=qs,
request=request,
)
if "q" in request.GET:
categories = Category.objects.filter(name__icontains=request.GET["q"])
s_q = remove_accents(request.GET["q"].lower())
tags = (
Event.objects.extra(
where=["%s ILIKE ANY (tags)"], params=[request.GET["q"]]
)
.annotate(arr_tags=Func(F("tags"), function="unnest"))
.values_list("arr_tags", flat=True)
.distinct()
)
tags = [
(
t,
emoji.demojize(remove_accents(t).lower(), delimiters=("000", "")),
)
for t in tags
]
tags = [t for t in tags if s_q == t[1]]
tags.sort(key=lambda x: x[1])
tags = [t[0] for t in tags]
places = Place.objects.filter(
Q(name__icontains=request.GET["q"])
| Q(description__icontains=request.GET["q"])
| Q(city__icontains=request.GET["q"])
)
organisations = Organisation.objects.filter(
Q(name__icontains=request.GET["q"])
| Q(description__icontains=request.GET["q"])
)
if request.user.is_authenticated:
rimports = RecurrentImport.objects.filter(
name__icontains=request.GET["q"]
)
paginator = PaginatorFilter(filter, 10, request)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
return render(
request,
"agenda_culturel/search.html",
{
"filter": filter,
"categories": categories,
"tags": tags,
"places": places,
"organisations": organisations,
"rimports": rimports,
"has_results": len(request.GET) != 0
or (len(request.GET) > 1 and "page" in request.GET),
"paginator_filter": response,
"full": full,
},
)
def event_search_full(request):
return event_search(request, True)

View File

@@ -0,0 +1,106 @@
from django.contrib import messages
from django.contrib.auth.mixins import PermissionRequiredMixin, LoginRequiredMixin
from django.contrib.messages.views import SuccessMessageMixin
from django.http import HttpResponseRedirect
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _, ngettext
from django.views.generic import ListView, UpdateView, CreateView, DeleteView
from django.shortcuts import render
from ..forms import SpecialPeriodFileForm, SpecialPeriodForm
from ..models import SpecialPeriod
class SpecialPeriodCreateView(
PermissionRequiredMixin, LoginRequiredMixin, SuccessMessageMixin, CreateView
):
model = SpecialPeriod
permission_required = "agenda_culturel.add_specialperiod"
success_message = _("The special period has been successfully created.")
success_url = reverse_lazy("list_specialperiods")
form_class = SpecialPeriodForm
class SpecialPeriodListView(PermissionRequiredMixin, LoginRequiredMixin, ListView):
model = SpecialPeriod
paginate_by = 10
permission_required = "agenda_culturel.add_specialperiod"
ordering = ["start_date", "name__unaccent"]
class SpecialPeriodDeleteView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
DeleteView,
):
model = SpecialPeriod
permission_required = "agenda_culturel.delete_specialperiod"
success_url = reverse_lazy("list_specialperiods")
success_message = _("The special period has been successfully deleted.")
class SpecialPeriodUpdateView(
SuccessMessageMixin,
PermissionRequiredMixin,
LoginRequiredMixin,
UpdateView,
):
model = SpecialPeriod
permission_required = "agenda_culturel.change_specialperiod"
success_message = _("The special period has been successfully updated.")
success_url = reverse_lazy("list_specialperiods")
form_class = SpecialPeriodForm
def load_specialperiods_from_ical(request):
if request.method == "POST":
form = SpecialPeriodFileForm(request.POST, request.FILES)
if form.is_valid():
nb_created, nb_overlap, nb_error, error = SpecialPeriod.load_from_ical(
request.FILES["file"], request.POST["periodtype"]
)
if nb_created > 0:
messages.success(
request,
ngettext(
"%(nb_created)d interval inserted.",
"%(nb_created)d intervals inserted.",
nb_created,
)
% {"nb_created": nb_created},
)
if nb_overlap > 0:
messages.success(
request,
ngettext(
"%(nb_overlap)d insersion was not possible due to overlap.",
"%(nb_overlap)d insersion were not possible due to overlap.",
nb_overlap,
)
% {"nb_overlap": nb_overlap},
)
if nb_error > 0:
messages.success(
request,
ngettext(
"%(nb_error)d error while reading ical file.",
"%(nb_error)d error while reading ical file.",
nb_error,
)
% {"nb_error": nb_error},
)
if error is not None:
messages.success(
request, _("Error during file reading: {}").format(error)
)
return HttpResponseRedirect(reverse_lazy("list_specialperiods"))
else:
form = SpecialPeriodFileForm()
return render(
request, "agenda_culturel/load_specialperiods_from_ical.html", {"form": form}
)

View File

@@ -0,0 +1,270 @@
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import (
PermissionRequiredMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.views.generic.edit import (
CreateView,
DeleteView,
UpdateView,
)
from ..forms import (
TagForm,
TagRenameForm,
)
from ..models import (
Tag,
RecurrentImport,
Event,
remove_accents,
)
from django.utils.translation import gettext_lazy as _
from django.urls import reverse_lazy
from datetime import date
from .utils import get_event_qs
from django.db.models import F, Q
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.shortcuts import render
from django.contrib import messages
from django.http import (
HttpResponseRedirect,
)
import emoji
class TagUpdateView(PermissionRequiredMixin, SuccessMessageMixin, UpdateView):
model = Tag
permission_required = "agenda_culturel.change_tag"
form_class = TagForm
success_message = _("The tag has been successfully updated.")
class TagCreateView(PermissionRequiredMixin, SuccessMessageMixin, CreateView):
model = Tag
permission_required = "agenda_culturel.add_tag"
form_class = TagForm
success_message = _("The tag has been successfully created.")
def get_initial(self, *args, **kwargs):
initial = super().get_initial(**kwargs)
if "name" in self.request.GET:
initial["name"] = self.request.GET.get("name")
return initial
def form_valid(self, form):
Tag.clear_cache()
return super().form_valid(form)
class TagDeleteView(PermissionRequiredMixin, DeleteView):
model = Tag
permission_required = "agenda_culturel.delete_tag"
success_url = reverse_lazy("view_all_tags")
def view_tag_past(request, t):
return view_tag(request, t, True)
def view_tag(request, t, past=False):
now = date.today()
qs = get_event_qs(request).filter(tags__contains=[t])
if past:
qs = qs.filter(start_day__lt=now).order_by("-start_day", "-start_time")
else:
qs = qs.filter(start_day__gte=now).order_by("start_day", "start_time")
qs = qs.filter(
Q(other_versions__isnull=True)
| Q(other_versions__representative=F("pk"))
| Q(other_versions__representative__isnull=True)
)
paginator = Paginator(qs, 10)
page = request.GET.get("page")
try:
response = paginator.page(page)
except PageNotAnInteger:
response = paginator.page(1)
except EmptyPage:
response = paginator.page(paginator.num_pages)
rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
tag = Tag.objects.filter(name=t).first()
context = {
"tag": t,
"paginator_filter": response,
"object": tag,
"rimports": rimports,
"past": past,
}
return render(request, "agenda_culturel/tag.html", context)
def tag_list(request):
tags = Event.get_all_tags()
r_tags = [t["tag"] for t in tags]
objects = Tag.objects.order_by("name").all()
d_objects = dict()
for o in objects:
d_objects[o.name] = o
tags = [
t | {"obj": d_objects[t["tag"]]} if t["tag"] in d_objects else t for t in tags
]
tags += [
{"obj": o, "tag": o.name, "count": 0} for o in objects if o.name not in r_tags
]
context = {
"tags": sorted(
tags,
key=lambda x: emoji.demojize(
remove_accents(x["tag"]).lower(), delimiters=("000", "")
),
)
}
return render(request, "agenda_culturel/tags.html", context)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.change_tag")
def rename_tag(request, t):
form = TagRenameForm(name=t)
if request.method == "POST":
form = TagRenameForm(request.POST, name=t)
if form.is_valid():
save = True
if form.cleaned_data["name"] == t:
messages.warning(
request,
_("You have not modified the tag name."),
)
save = False
elif not form.is_force():
if (
Event.objects.filter(
tags__contains=[form.cleaned_data["name"]]
).count()
> 0
):
if Tag.objects.filter(name=form.cleaned_data["name"]):
messages.warning(
request,
(
_(
"This tag {} is already in use, and is described by different information from the current tag. You can force renaming by checking the corresponding option. The information associated with tag {} will be deleted, and all events associated with tag {} will be associated with tag {}."
)
).format(
form.cleaned_data["name"],
t,
t,
form.cleaned_data["name"],
),
)
else:
messages.warning(
request,
(
_(
"This tag {} is already in use. You can force renaming by checking the corresponding option."
)
).format(form.cleaned_data["name"]),
)
save = False
form = TagRenameForm(request.POST, name=t, force=True)
if save:
# find all matching events and update them
events = Event.objects.filter(tags__contains=[t])
new_name = form.cleaned_data["name"]
for e in events:
e.tags = [te for te in e.tags if te != t]
if new_name not in e.tags:
e.tags += [new_name]
Event.objects.bulk_update(events, fields=["tags"])
# find all recurrent imports and fix them
rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
for ri in rimports:
ri.defaultTags = [te for te in ri.defaultTags if te != t]
if new_name not in ri.tags:
ri.defaultTags += [new_name]
RecurrentImport.objects.bulk_update(rimports, fields=["defaultTags"])
# find tag object
tag_object = Tag.objects.filter(name=t).first()
if tag_object:
tag_object.name = new_name
tag_object.save()
messages.success(
request,
(_("The tag {} has been successfully renamed to {}.")).format(
t, form.cleaned_data["name"]
),
)
return HttpResponseRedirect(
reverse_lazy("view_tag", kwargs={"t": form.cleaned_data["name"]})
)
nb = Event.objects.filter(tags__contains=[t]).count()
return render(
request,
"agenda_culturel/tag_rename_form.html",
context={"form": form, "tag": t, "nb": nb},
)
@login_required(login_url="/accounts/login/")
@permission_required("agenda_culturel.delete_tag")
def delete_tag(request, t):
respage = reverse_lazy("view_all_tags")
if request.method == "POST":
# remove tag from events
events = Event.objects.filter(tags__contains=[t])
for e in events:
e.tags = [te for te in e.tags if te != t]
Event.objects.bulk_update(events, fields=["tags"])
# remove tag from recurrent imports
rimports = RecurrentImport.objects.filter(defaultTags__contains=[t])
for ri in rimports:
ri.tags = [te for te in ri.defaultTags if te != t]
RecurrentImport.objects.bulk_update(rimports, fields=["defaultTags"])
# find tag object
tag_object = Tag.objects.filter(name=t).first()
if tag_object:
tag_object.delete()
messages.success(
request,
(_("The tag {} has been successfully deleted.")).format(t),
)
return HttpResponseRedirect(respage)
else:
nb = Event.objects.filter(tags__contains=[t]).count()
obj = Tag.objects.filter(name=t).first()
nbi = RecurrentImport.objects.filter(defaultTags__contains=[t]).count()
cancel_url = request.META.get("HTTP_REFERER", "")
if cancel_url == "":
cancel_url = respage
return render(
request,
"agenda_culturel/tag_confirm_delete_by_name.html",
{
"tag": t,
"nb": nb,
"nbi": nbi,
"cancel_url": cancel_url,
"obj": obj,
},
)

View File

@@ -0,0 +1,77 @@
from django.core.paginator import EmptyPage, Paginator
from django.utils.translation import gettext_lazy as _
from django.db.models import Aggregate, FloatField
from ..models import Event
#
#
# Useful for translation
to_be_translated = [
_("Recurrent import name"),
_("Add another"),
_("Browse..."),
_("No file selected."),
]
def get_event_qs(request):
if request.user.is_authenticated:
return Event.objects.filter()
else:
return Event.objects.filter(status=Event.STATUS.PUBLISHED)
class Median(Aggregate):
function = "PERCENTILE_CONT"
name = "median"
output_field = FloatField()
template = "%(function)s(0.5) WITHIN GROUP (ORDER BY %(expressions)s)"
class PaginatorFilter(Paginator):
def __init__(self, filter, nb, request):
self.request = request
self.filter = filter
super().__init__(filter.qs, nb)
self.url_first_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", 1
)
self.url_last_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", self.num_pages
)
def update_param(params, key, value):
p = params.split("?")
root = p[0]
if len(p) > 1:
other = p[1]
others = other.split("&")
others = [o for o in others if not o.startswith(key)]
others += [key + "=" + str(value)]
return root + "?" + "&".join(others)
else:
return root + "?" + key + "=" + str(value)
def page(self, *args, **kwargs):
page = super().page(*args, **kwargs)
try:
page.url_previous_page = PaginatorFilter.update_param(
self.request.get_full_path(),
"page",
page.previous_page_number(),
)
except EmptyPage:
page.url_previous_page = self.request.get_full_path()
try:
page.url_next_page = PaginatorFilter.update_param(
self.request.get_full_path(), "page", page.next_page_number()
)
except EmptyPage:
page.url_next_page = self.request.get_full_path()
return page