Delete unused code

@oscar: Only the user restricter is still present in the code since it is needed for whoami
This commit is contained in:
Finn Stutzenstein 2021-03-04 16:15:57 +01:00
parent 265145f001
commit 3ba4f99876
No known key found for this signature in database
GPG Key ID: 9042F605C6324654
49 changed files with 133 additions and 3233 deletions

View File

@ -40,7 +40,7 @@ jobs:
run: mypy openslides/ tests/
- name: test using pytest
run: pytest --cov --cov-fail-under=74
run: pytest --cov --cov-fail-under=73
install-client-dependencies:
runs-on: ubuntu-latest

View File

@ -1,72 +0,0 @@
from typing import Any, Dict, List
from ..utils.access_permissions import BaseAccessPermissions
from ..utils.auth import async_has_perm
class ItemAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Item and ItemViewSet.
"""
base_permission = "agenda.can_see"
# TODO: In the following method we use full_data['is_hidden'] and
# full_data['is_internal'] but this can be out of date.
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Returns the restricted serialized data for the instance prepared
for the user. If the user does not have agenda.can_see, no data will
be retuned.
Hidden items can only be seen by managers with can_manage permission. If a user
does not have this permission, he is not allowed to see comments.
Internal items can only be seen by users with can_see_internal_items. If a user
does not have this permission, he is not allowed to see the duration.
"""
def filtered_data(full_data, blocked_keys):
"""
Returns a new dict like full_data but with all blocked_keys removed.
"""
whitelist = full_data.keys() - blocked_keys
return {key: full_data[key] for key in whitelist}
# Parse data.
if full_data and await async_has_perm(user_id, "agenda.can_see"):
# Assume the user has all permissions. Restrict this below.
data = full_data
blocked_keys: List[str] = []
# Restrict data for non managers
if not await async_has_perm(user_id, "agenda.can_manage"):
data = [
full for full in data if not full["is_hidden"]
] # filter hidden items
blocked_keys.append("comment")
# Restrict data for users without can_see_internal_items
if not await async_has_perm(user_id, "agenda.can_see_internal_items"):
data = [full for full in data if not full["is_internal"]]
blocked_keys.append("duration")
if len(blocked_keys) > 0:
data = [filtered_data(full, blocked_keys) for full in data]
else:
data = []
return data
class ListOfSpeakersAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for ListOfSpeakers and ListOfSpeakersViewSet.
No data will be restricted, because everyone can see the list of speakers
at any time.
"""
base_permission = "agenda.can_see_list_of_speakers"

View File

@ -1,5 +1,3 @@
from typing import Any, Dict, Set
from django.apps import AppConfig
@ -12,7 +10,6 @@ class AgendaAppConfig(AppConfig):
from django.db.models.signals import post_save, pre_delete
from ..core.signals import permission_change
from ..utils.access_permissions import required_user
from ..utils.rest_api import router
from . import serializers # noqa
from .signals import (
@ -42,11 +39,6 @@ class AgendaAppConfig(AppConfig):
ListOfSpeakersViewSet,
)
# register required_users
required_user.add_collection_string(
self.get_model("ListOfSpeakers").get_collection_string(), required_users
)
def get_config_variables(self):
from .config_variables import get_config_variables
@ -59,10 +51,3 @@ class AgendaAppConfig(AppConfig):
"""
yield self.get_model("Item")
yield self.get_model("ListOfSpeakers")
async def required_users(element: Dict[str, Any]) -> Set[int]:
"""
Returns all user ids that are displayed as speaker in the given element.
"""
return set(speaker["user_id"] for speaker in element["speakers"])

View File

@ -22,8 +22,6 @@ from openslides.utils.models import (
from openslides.utils.postgres import restart_id_sequence
from openslides.utils.utils import to_roman
from .access_permissions import ItemAccessPermissions, ListOfSpeakersAccessPermissions
class ItemManager(BaseManager):
"""
@ -204,7 +202,6 @@ class Item(RESTModelMixin, models.Model):
An Agenda Item
"""
access_permissions = ItemAccessPermissions()
objects = ItemManager()
can_see_permission = "agenda.can_see"
@ -362,7 +359,6 @@ class ListOfSpeakersManager(BaseManager):
class ListOfSpeakers(RESTModelMixin, models.Model):
access_permissions = ListOfSpeakersAccessPermissions()
objects = ListOfSpeakersManager()
can_see_permission = "agenda.can_see_list_of_speakers"
@ -458,7 +454,6 @@ class SpeakerManager(models.Manager):
speaker.save(
force_insert=True,
skip_autoupdate=skip_autoupdate,
no_delete_on_restriction=True,
)
return speaker

View File

@ -8,10 +8,8 @@ from openslides.utils.autoupdate import inform_changed_data
from openslides.utils.exceptions import OpenSlidesError
from openslides.utils.rest_api import (
GenericViewSet,
ListModelMixin,
ModelViewSet,
Response,
RetrieveModelMixin,
UpdateModelMixin,
ValidationError,
detail_route,
@ -22,7 +20,6 @@ from openslides.utils.views import TreeSortMixin
from ..utils.auth import has_perm
from ..utils.utils import get_model_from_collection_string
from .access_permissions import ItemAccessPermissions
from .models import Item, ListOfSpeakers, Speaker
@ -36,16 +33,13 @@ class ItemViewSet(ModelViewSet, TreeSortMixin):
There are some views, see check_view_permissions.
"""
access_permissions = ItemAccessPermissions()
queryset = Item.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve", "metadata"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in (
if self.action in (
"partial_update",
"update",
"destroy",
@ -268,25 +262,20 @@ class ItemViewSet(ModelViewSet, TreeSortMixin):
)
class ListOfSpeakersViewSet(
ListModelMixin, RetrieveModelMixin, UpdateModelMixin, TreeSortMixin, GenericViewSet
):
class ListOfSpeakersViewSet(UpdateModelMixin, TreeSortMixin, GenericViewSet):
"""
API endpoint for agenda items.
There are some views, see check_view_permissions.
"""
access_permissions = ItemAccessPermissions()
queryset = ListOfSpeakers.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve", "metadata"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("manage_speaker",):
if self.action == "manage_speaker":
result = has_perm(self.request.user, "agenda.can_see_list_of_speakers")
# For manage_speaker requests the rest of the check is
# done in the specific method. See below.

View File

@ -1,34 +0,0 @@
from ..poll.access_permissions import (
BaseOptionAccessPermissions,
BasePollAccessPermissions,
BaseVoteAccessPermissions,
)
from ..utils.access_permissions import BaseAccessPermissions
class AssignmentAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Assignment and AssignmentViewSet.
"""
base_permission = "assignments.can_see"
class AssignmentPollAccessPermissions(BasePollAccessPermissions):
base_permission = "assignments.can_see"
manage_permission = "assignments.can_manage"
additional_fields = [
"amount_global_yes",
"amount_global_no",
"amount_global_abstain",
]
class AssignmentOptionAccessPermissions(BaseOptionAccessPermissions):
base_permission = "assignments.can_see"
manage_permission = "assignments.can_manage"
class AssignmentVoteAccessPermissions(BaseVoteAccessPermissions):
base_permission = "assignments.can_see"
manage_permission = "assignments.can_manage"

View File

@ -1,5 +1,3 @@
from typing import Any, Dict, Set
from django.apps import AppConfig
@ -10,7 +8,6 @@ class AssignmentsAppConfig(AppConfig):
def ready(self):
# Import all required stuff.
from ..core.signals import permission_change
from ..utils.access_permissions import required_user
from ..utils.rest_api import router
from . import serializers # noqa
from .signals import get_permission_change_data
@ -44,20 +41,6 @@ class AssignmentsAppConfig(AppConfig):
AssignmentVoteViewSet,
)
# Register required_users
required_user.add_collection_string(
self.get_model("Assignment").get_collection_string(),
required_users_assignments,
)
required_user.add_collection_string(
self.get_model("AssignmentPoll").get_collection_string(),
required_users_assignment_polls,
)
required_user.add_collection_string(
self.get_model("AssignmentOption").get_collection_string(),
required_users_assignment_options,
)
def get_config_variables(self):
from .config_variables import get_config_variables
@ -75,30 +58,3 @@ class AssignmentsAppConfig(AppConfig):
"AssignmentOption",
):
yield self.get_model(model_name)
async def required_users_assignments(element: Dict[str, Any]) -> Set[int]:
"""
Returns all user ids that are displayed as candidates (including poll
options) in the assignment element.
"""
return set(
related_user["user_id"] for related_user in element["assignment_related_users"]
)
async def required_users_assignment_polls(element: Dict[str, Any]) -> Set[int]:
"""
Returns all user ids that have voted on an option and are therefore required for the single votes table.
"""
from openslides.poll.models import BasePoll
if element["state"] == BasePoll.STATE_PUBLISHED:
return element["voted_id"]
else:
return set()
async def required_users_assignment_options(element: Dict[str, Any]) -> Set[int]:
return set([element["user_id"]])

View File

@ -17,12 +17,6 @@ from openslides.utils.models import RESTModelMixin
from openslides.utils.rest_api import ValidationError
from ..utils.models import CASCADE_AND_AUTOUPDATE, SET_NULL_AND_AUTOUPDATE
from .access_permissions import (
AssignmentAccessPermissions,
AssignmentOptionAccessPermissions,
AssignmentPollAccessPermissions,
AssignmentVoteAccessPermissions,
)
class AssignmentRelatedUser(RESTModelMixin, models.Model):
@ -93,7 +87,6 @@ class Assignment(RESTModelMixin, AgendaItemWithListOfSpeakersMixin, models.Model
Model for assignments.
"""
access_permissions = AssignmentAccessPermissions()
can_see_permission = "assignments.can_see"
objects = AssignmentManager()
@ -237,7 +230,6 @@ class AssignmentVoteManager(BaseManager):
class AssignmentVote(RESTModelMixin, BaseVote):
access_permissions = AssignmentVoteAccessPermissions()
objects = AssignmentVoteManager()
option = models.ForeignKey(
@ -268,7 +260,6 @@ class AssignmentOptionManager(BaseManager):
class AssignmentOption(RESTModelMixin, BaseOption):
access_permissions = AssignmentOptionAccessPermissions()
can_see_permission = "assignments.can_see"
objects = AssignmentOptionManager()
vote_class = AssignmentVote
@ -306,7 +297,6 @@ class AssignmentPollManager(BaseManager):
class AssignmentPoll(RESTModelMixin, BasePoll):
access_permissions = AssignmentPollAccessPermissions()
can_see_permission = "assignments.can_see"
objects = AssignmentPollManager()

View File

@ -15,7 +15,6 @@ from openslides.utils.rest_api import (
)
from openslides.utils.utils import is_int
from .access_permissions import AssignmentAccessPermissions
from .models import (
Assignment,
AssignmentOption,
@ -31,24 +30,15 @@ from .models import (
class AssignmentViewSet(ModelViewSet):
"""
API endpoint for assignments.
There are the following views: metadata, list, retrieve, create,
partial_update, update, destroy, candidature_self, candidature_other and create_poll.
"""
access_permissions = AssignmentAccessPermissions()
queryset = Assignment.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action == "metadata":
# Everybody is allowed to see the metadata.
result = True
elif self.action in (
if self.action in (
"create",
"partial_update",
"update",
@ -551,7 +541,7 @@ class AssignmentPollViewSet(BasePollViewSet):
weight=weight,
value=value,
)
inform_changed_data(vote, no_delete_on_restriction=True)
inform_changed_data(vote)
else: # global_no or global_abstain
option = options[0]
weight = vote_weight if config["users_activate_vote_weight"] else Decimal(1)
@ -562,7 +552,7 @@ class AssignmentPollViewSet(BasePollViewSet):
weight=weight,
value=data,
)
inform_changed_data(vote, no_delete_on_restriction=True)
inform_changed_data(vote)
inform_changed_data(option)
inform_changed_data(poll)
@ -586,8 +576,8 @@ class AssignmentPollViewSet(BasePollViewSet):
value=result,
weight=weight,
)
inform_changed_data(vote, no_delete_on_restriction=True)
inform_changed_data(option, no_delete_on_restriction=True)
inform_changed_data(vote)
inform_changed_data(option)
def add_user_to_voted_array(self, user, poll):
VotedModel = AssignmentPoll.voted.through

View File

@ -1,40 +0,0 @@
from typing import Any, Dict, List
from openslides.utils.access_permissions import BaseAccessPermissions
from openslides.utils.auth import async_has_perm, async_in_some_groups
class ChatGroupAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for ChatGroup and ChatGroupViewSet.
No base perm: The access permissions are done with the read/write groups.
"""
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Manage users can see all groups. Else, for each group either it has no access groups
or the user must be in an access group.
"""
data: List[Dict[str, Any]] = []
if await async_has_perm(user_id, "chat.can_manage"):
data = full_data
else:
for full in full_data:
read_groups = full.get("read_groups_id", [])
write_groups = full.get("write_groups_id", [])
if await async_in_some_groups(
user_id, read_groups
) or await async_in_some_groups(user_id, write_groups):
data.append(full)
return data
class ChatMessageAccessPermissions(ChatGroupAccessPermissions):
"""
Access permissions container for ChatMessage and ChatMessageViewSet.
It does exaclty the same as ChatGroupAccessPermissions
"""
pass

View File

@ -5,7 +5,6 @@ from openslides.utils.manager import BaseManager
from ..utils.auth import has_perm, in_some_groups
from ..utils.models import CASCADE_AND_AUTOUPDATE, RESTModelMixin
from .access_permissions import ChatGroupAccessPermissions, ChatMessageAccessPermissions
class ChatGroupManager(BaseManager):
@ -14,7 +13,6 @@ class ChatGroupManager(BaseManager):
"""
def get_prefetched_queryset(self, *args, **kwargs):
""""""
return (
super()
.get_prefetched_queryset(*args, **kwargs)
@ -23,10 +21,6 @@ class ChatGroupManager(BaseManager):
class ChatGroup(RESTModelMixin, models.Model):
""""""
access_permissions = ChatGroupAccessPermissions()
objects = ChatGroupManager()
name = models.CharField(max_length=256)
@ -57,7 +51,6 @@ class ChatMessageManager(BaseManager):
"""
def get_prefetched_queryset(self, *args, **kwargs):
""""""
return (
super()
.get_prefetched_queryset(*args, **kwargs)
@ -68,10 +61,6 @@ class ChatMessageManager(BaseManager):
class ChatMessage(RESTModelMixin, models.Model):
""""""
access_permissions = ChatMessageAccessPermissions()
objects = ChatMessageManager()
text = models.CharField(max_length=512)

View File

@ -12,15 +12,12 @@ from openslides.utils.rest_api import (
CreateModelMixin,
DestroyModelMixin,
GenericViewSet,
ListModelMixin,
ModelViewSet,
Response,
RetrieveModelMixin,
detail_route,
status,
)
from .access_permissions import ChatGroupAccessPermissions, ChatMessageAccessPermissions
from .models import ChatGroup, ChatMessage
@ -35,7 +32,6 @@ class ChatGroupViewSet(ModelViewSet):
partial_update, update, destroy and clear.
"""
access_permissions = ChatGroupAccessPermissions()
queryset = ChatGroup.objects.all()
def check_view_permissions(self):
@ -70,8 +66,6 @@ class ChatGroupViewSet(ModelViewSet):
class ChatMessageViewSet(
ListModelMixin,
RetrieveModelMixin,
CreateModelMixin,
DestroyModelMixin,
GenericViewSet,
@ -82,7 +76,6 @@ class ChatMessageViewSet(
There are the following views: metadata, list, retrieve, create
"""
access_permissions = ChatMessageAccessPermissions()
queryset = ChatMessage.objects.all()
def check_view_permissions(self):

View File

@ -1,46 +0,0 @@
from ..utils.access_permissions import BaseAccessPermissions
class ProjectorAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Projector and ProjectorViewSet.
"""
base_permission = "core.can_see_projector"
class ProjectionDefaultAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Projector and ProjectorViewSet.
"""
base_permission = "core.can_see_projector"
class TagAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Tag and TagViewSet.
"""
class ProjectorMessageAccessPermissions(BaseAccessPermissions):
"""
Access permissions for ProjectorMessage.
"""
base_permission = "core.can_see_projector"
class CountdownAccessPermissions(BaseAccessPermissions):
"""
Access permissions for Countdown.
"""
base_permission = "core.can_see_projector"
class ConfigAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for the config (ConfigStore and
ConfigViewSet).
"""

View File

@ -13,15 +13,6 @@ from openslides.utils.manager import BaseManager
from openslides.utils.models import SET_NULL_AND_AUTOUPDATE, RESTModelMixin
from openslides.utils.postgres import is_postgres
from .access_permissions import (
ConfigAccessPermissions,
CountdownAccessPermissions,
ProjectionDefaultAccessPermissions,
ProjectorAccessPermissions,
ProjectorMessageAccessPermissions,
TagAccessPermissions,
)
class ProjectorManager(BaseManager):
"""
@ -73,8 +64,6 @@ class Projector(RESTModelMixin, models.Model):
on e. g. the URL /rest/core/projector/1/activate_elements/.
"""
access_permissions = ProjectorAccessPermissions()
objects = ProjectorManager()
elements = JSONField(default=list)
@ -134,8 +123,6 @@ class ProjectionDefault(RESTModelMixin, models.Model):
name on the front end for the user.
"""
access_permissions = ProjectionDefaultAccessPermissions()
name = models.CharField(max_length=256)
display_name = models.CharField(max_length=256)
@ -157,8 +144,6 @@ class Tag(RESTModelMixin, models.Model):
motions or assignments.
"""
access_permissions = TagAccessPermissions()
name = models.CharField(max_length=255, unique=True)
class Meta:
@ -175,8 +160,6 @@ class ConfigStore(RESTModelMixin, models.Model):
A model class to store all config variables in the database.
"""
access_permissions = ConfigAccessPermissions()
key = models.CharField(max_length=255, unique=True, db_index=True)
"""A string, the key of the config variable."""
@ -200,8 +183,6 @@ class ProjectorMessage(RESTModelMixin, models.Model):
Model for ProjectorMessages.
"""
access_permissions = ProjectorMessageAccessPermissions()
message = models.TextField(blank=True)
class Meta:
@ -213,8 +194,6 @@ class Countdown(RESTModelMixin, models.Model):
Model for countdowns.
"""
access_permissions = CountdownAccessPermissions()
title = models.CharField(max_length=256, unique=True)
description = models.CharField(max_length=256, blank=True)

View File

@ -32,22 +32,12 @@ from ..utils.plugins import (
)
from ..utils.rest_api import (
GenericViewSet,
ListModelMixin,
ModelViewSet,
Response,
RetrieveModelMixin,
ValidationError,
detail_route,
list_route,
)
from .access_permissions import (
ConfigAccessPermissions,
CountdownAccessPermissions,
ProjectionDefaultAccessPermissions,
ProjectorAccessPermissions,
ProjectorMessageAccessPermissions,
TagAccessPermissions,
)
from .config import config
from .exceptions import ConfigError, ConfigNotFound
from .models import (
@ -115,18 +105,13 @@ class ProjectorViewSet(ModelViewSet):
There are the following views: See strings in check_view_permissions().
"""
access_permissions = ProjectorAccessPermissions()
queryset = Projector.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action == "metadata":
result = has_perm(self.request.user, "core.can_see_projector")
elif self.action in (
if self.action in (
"create",
"update",
"partial_update",
@ -343,7 +328,7 @@ class ProjectorViewSet(ModelViewSet):
return Response()
class ProjectionDefaultViewSet(ListModelMixin, RetrieveModelMixin, GenericViewSet):
class ProjectionDefaultViewSet(GenericViewSet):
"""
API endpoint for projection defaults.
@ -351,18 +336,13 @@ class ProjectionDefaultViewSet(ListModelMixin, RetrieveModelMixin, GenericViewSe
to projectors can be done by updating the projector.
"""
access_permissions = ProjectionDefaultAccessPermissions()
queryset = ProjectionDefault.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
else:
result = False
return result
return False
class TagViewSet(ModelViewSet):
@ -373,20 +353,13 @@ class TagViewSet(ModelViewSet):
partial_update, update and destroy.
"""
access_permissions = TagAccessPermissions()
queryset = Tag.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action == "metadata":
# Every authenticated user can see the metadata.
# Anonymous users can do so if they are enabled.
result = self.request.user.is_authenticated or anonymous_is_enabled()
elif self.action in ("create", "partial_update", "update", "destroy"):
if self.action in ("create", "partial_update", "update", "destroy"):
result = has_perm(self.request.user, "core.can_manage_tags")
else:
result = False
@ -401,7 +374,6 @@ class ConfigViewSet(ModelViewSet):
partial_update.
"""
access_permissions = ConfigAccessPermissions()
queryset = ConfigStore.objects.all()
can_manage_config = None
@ -411,9 +383,7 @@ class ConfigViewSet(ModelViewSet):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("partial_update", "update"):
if self.action in ("partial_update", "update"):
result = self.check_config_permission(self.kwargs["pk"])
elif self.action == "reset_groups":
result = has_perm(self.request.user, "core.can_manage_config")
@ -527,16 +497,13 @@ class ProjectorMessageViewSet(ModelViewSet):
partial_update and destroy.
"""
access_permissions = ProjectorMessageAccessPermissions()
queryset = ProjectorMessage.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("create", "partial_update", "update", "destroy"):
if self.action in ("create", "partial_update", "update", "destroy"):
result = has_perm(self.request.user, "core.can_manage_projector")
else:
result = False
@ -551,16 +518,13 @@ class CountdownViewSet(ModelViewSet):
partial_update and destroy.
"""
access_permissions = CountdownAccessPermissions()
queryset = Countdown.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("create", "partial_update", "update", "destroy"):
if self.action in ("create", "partial_update", "update", "destroy"):
result = has_perm(self.request.user, "core.can_manage_projector")
else:
result = False

View File

@ -1,37 +0,0 @@
from typing import Any, Dict, List
from ..utils.access_permissions import BaseAccessPermissions
from ..utils.auth import async_has_perm, async_in_some_groups, async_is_superadmin
class MediafileAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Mediafile and MediafileViewSet.
"""
base_permission = "mediafiles.can_see"
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Returns the restricted serialized data for the instance prepared
for the user. Removes hidden mediafiles for some users.
"""
if not await async_has_perm(user_id, "mediafiles.can_see"):
return []
# This allows to see everything, which is important for inherited_access_groups=False.
if await async_is_superadmin(user_id):
return full_data
data = []
for full in full_data:
access_groups = full["inherited_access_groups_id"]
if (isinstance(access_groups, bool) and access_groups) or (
isinstance(access_groups, list)
and await async_in_some_groups(user_id, access_groups)
):
data.append(full)
return data

View File

@ -13,7 +13,6 @@ from ..agenda.mixins import ListOfSpeakersMixin
from ..core.config import config
from ..utils.models import RESTModelMixin
from ..utils.rest_api import ValidationError
from .access_permissions import MediafileAccessPermissions
from .utils import bytes_to_human
@ -62,7 +61,6 @@ class Mediafile(RESTModelMixin, ListOfSpeakersMixin, models.Model):
"""
objects = MediafileManager()
access_permissions = MediafileAccessPermissions()
can_see_permission = "mediafiles.can_see"
mediafile = models.FileField(upload_to=get_file_path, null=True)

View File

@ -19,7 +19,6 @@ from openslides.utils.rest_api import (
status,
)
from .access_permissions import MediafileAccessPermissions
from .config import watch_and_update_configs
from .models import Mediafile
from .utils import bytes_to_human, get_pdf_information
@ -47,16 +46,13 @@ class MediafileViewSet(ModelViewSet):
partial_update, update and destroy.
"""
access_permissions = MediafileAccessPermissions()
queryset = Mediafile.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve", "metadata"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in (
if self.action in (
"create",
"partial_update",
"update",

View File

@ -1,209 +0,0 @@
import json
from typing import Any, Dict, List
from ..poll.access_permissions import (
BaseOptionAccessPermissions,
BasePollAccessPermissions,
BaseVoteAccessPermissions,
)
from ..utils.access_permissions import BaseAccessPermissions
from ..utils.auth import async_has_perm, async_in_some_groups
class MotionAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Motion and MotionViewSet.
"""
base_permission = "motions.can_see"
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Returns the restricted serialized data for the instance prepared for
the user. Removes motion if the user has not the permission to see
the motion in this state. Removes comments sections for
some unauthorized users. Ensures that a user can only see his own
personal notes.
"""
# Parse data.
if await async_has_perm(user_id, "motions.can_see"):
# TODO: Refactor this after personal_notes system is refactored.
data = []
for full in full_data:
# Check if user is submitter of this motion.
if user_id:
is_submitter = user_id in [
submitter["user_id"] for submitter in full.get("submitters", [])
]
else:
# Anonymous users can not be submitters.
is_submitter = False
# Check see permission for this motion.
restriction = full["state_restriction"]
# Managers can see all motions.
permission = await async_has_perm(user_id, "motions.can_manage")
# If restriction field is an empty list, everybody can see the motion.
permission = permission or not restriction
if not permission:
# Parse values of restriction field.
# If at least one restriction is ok, permissions are granted.
for value in restriction:
if (
value
in (
"motions.can_see_internal",
"motions.can_manage_metadata",
"motions.can_manage",
)
and await async_has_perm(user_id, value)
):
permission = True
break
elif value == "is_submitter" and is_submitter:
permission = True
break
# Parse single motion.
if permission:
full_copy = json.loads(json.dumps(full))
full_copy["comments"] = []
for comment in full["comments"]:
if await async_in_some_groups(
user_id, comment["read_groups_id"]
):
full_copy["comments"].append(comment)
data.append(full_copy)
else:
data = []
return data
class MotionChangeRecommendationAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for MotionChangeRecommendation and MotionChangeRecommendationViewSet.
"""
base_permission = "motions.can_see"
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Removes change recommendations if they are internal and the user has
not the can_manage permission. To see change recommendation the user needs
the can_see permission.
"""
# Parse data.
if await async_has_perm(user_id, self.base_permission):
has_manage_perms = await async_has_perm(user_id, "motions.can_manage")
data = []
for full in full_data:
if not full["internal"] or has_manage_perms:
data.append(full)
else:
data = []
return data
class MotionCommentSectionAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for MotionCommentSection and MotionCommentSectionViewSet.
"""
base_permission = "motions.can_see"
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
If the user has manage rights, he can see all sections. If not all sections
will be removed, when the user is not in at least one of the read_groups.
"""
data: List[Dict[str, Any]] = []
if await async_has_perm(user_id, "motions.can_manage"):
data = full_data
elif await async_has_perm(user_id, self.base_permission):
for full in full_data:
read_groups = full.get("read_groups_id", [])
if await async_in_some_groups(user_id, read_groups):
data.append(full)
else:
data = []
return data
class StatuteParagraphAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for StatuteParagraph and StatuteParagraphViewSet.
"""
base_permission = "motions.can_see"
class CategoryAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Category and CategoryViewSet.
"""
base_permission = "motions.can_see"
class MotionBlockAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Category and CategoryViewSet.
"""
base_permission = "motions.can_see"
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Users without `motions.can_manage` cannot see internal blocks.
"""
data: List[Dict[str, Any]] = []
if await async_has_perm(user_id, "motions.can_manage"):
data = full_data
elif await async_has_perm(user_id, self.base_permission):
data = [full for full in full_data if not full["internal"]]
else:
data = []
return data
class WorkflowAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Workflow and WorkflowViewSet.
"""
base_permission = "motions.can_see"
class StateAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for State and StateViewSet.
"""
base_permission = "motions.can_see"
class MotionPollAccessPermissions(BasePollAccessPermissions):
base_permission = "motions.can_see"
manage_permission = "motions.can_manage_polls"
class MotionOptionAccessPermissions(BaseOptionAccessPermissions):
base_permission = "motions.can_see"
manage_permission = "motions.can_manage_polls"
class MotionVoteAccessPermissions(BaseVoteAccessPermissions):
base_permission = "motions.can_see"
manage_permission = "motions.can_manage_polls"

View File

@ -1,5 +1,3 @@
from typing import Any, Dict, Set
from django.apps import AppConfig
from django.db.models.signals import post_migrate
@ -12,7 +10,6 @@ class MotionsAppConfig(AppConfig):
# Import all required stuff.
from openslides.core.signals import permission_change
from openslides.utils.rest_api import router
from ..utils.access_permissions import required_user
from . import serializers # noqa
from .signals import create_builtin_workflows, get_permission_change_data
from .views import (
@ -72,16 +69,6 @@ class MotionsAppConfig(AppConfig):
)
router.register(self.get_model("State").get_collection_string(), StateViewSet)
# Register required_users
required_user.add_collection_string(
self.get_model("Motion").get_collection_string(), required_users_motions
)
required_user.add_collection_string(
self.get_model("MotionPoll").get_collection_string(),
required_users_motion_polls,
)
def get_config_variables(self):
from .config_variables import get_config_variables
@ -106,28 +93,3 @@ class MotionsAppConfig(AppConfig):
"MotionVote",
):
yield self.get_model(model_name)
async def required_users_motions(element: Dict[str, Any]) -> Set[int]:
"""
Returns all user ids that are displayed as as submitter or supporter in
any motion if request_user can see motions. This function may return an
empty set.
"""
submitters_supporters = set(
[submitter["user_id"] for submitter in element["submitters"]]
)
submitters_supporters.update(element["supporters_id"])
return submitters_supporters
async def required_users_motion_polls(element: Dict[str, Any]) -> Set[int]:
"""
Returns all user ids that have voted on an option and are therefore required for the single votes table.
"""
from openslides.poll.models import BasePoll
if element["state"] == BasePoll.STATE_PUBLISHED:
return element["voted_id"]
else:
return set()

View File

@ -16,19 +16,6 @@ from openslides.utils.models import RESTModelMixin
from openslides.utils.rest_api import ValidationError
from ..utils.models import CASCADE_AND_AUTOUPDATE, SET_NULL_AND_AUTOUPDATE
from .access_permissions import (
CategoryAccessPermissions,
MotionAccessPermissions,
MotionBlockAccessPermissions,
MotionChangeRecommendationAccessPermissions,
MotionCommentSectionAccessPermissions,
MotionOptionAccessPermissions,
MotionPollAccessPermissions,
MotionVoteAccessPermissions,
StateAccessPermissions,
StatuteParagraphAccessPermissions,
WorkflowAccessPermissions,
)
from .exceptions import WorkflowError
@ -37,8 +24,6 @@ class StatuteParagraph(RESTModelMixin, models.Model):
Model for parts of the statute
"""
access_permissions = StatuteParagraphAccessPermissions()
title = models.CharField(max_length=255)
"""Title of the statute paragraph."""
@ -96,9 +81,6 @@ class Motion(RESTModelMixin, AgendaItemWithListOfSpeakersMixin, models.Model):
This class is the main entry point to all other classes related to a motion.
"""
access_permissions = MotionAccessPermissions()
can_see_permission = "motions.can_see"
objects = MotionManager()
title = models.CharField(max_length=255)
@ -540,8 +522,6 @@ class MotionCommentSection(RESTModelMixin, models.Model):
each motions has the ability to have comments from the same section.
"""
access_permissions = MotionCommentSectionAccessPermissions()
name = models.CharField(max_length=255)
"""
The name of the section.
@ -672,8 +652,6 @@ class MotionChangeRecommendation(RESTModelMixin, models.Model):
A MotionChangeRecommendation object saves change recommendations for a specific Motion
"""
access_permissions = MotionChangeRecommendationAccessPermissions()
motion = models.ForeignKey(
Motion, on_delete=CASCADE_AND_AUTOUPDATE, related_name="change_recommendations"
)
@ -763,8 +741,6 @@ class Category(RESTModelMixin, models.Model):
Model for categories of motions.
"""
access_permissions = CategoryAccessPermissions()
name = models.CharField(max_length=255)
"""Name of the category."""
@ -831,8 +807,6 @@ class MotionBlock(RESTModelMixin, AgendaItemWithListOfSpeakersMixin, models.Mode
Model for blocks of motions.
"""
access_permissions = MotionBlockAccessPermissions()
objects = MotionBlockManager()
title = models.CharField(max_length=255)
@ -872,7 +846,6 @@ class MotionVoteManager(BaseManager):
class MotionVote(RESTModelMixin, BaseVote):
access_permissions = MotionVoteAccessPermissions()
option = models.ForeignKey(
"MotionOption", on_delete=CASCADE_AND_AUTOUPDATE, related_name="votes"
)
@ -903,8 +876,6 @@ class MotionOptionManager(BaseManager):
class MotionOption(RESTModelMixin, BaseOption):
access_permissions = MotionOptionAccessPermissions()
can_see_permission = "motions.can_see"
objects = MotionOptionManager()
vote_class = MotionVote
@ -935,8 +906,6 @@ class MotionPollManager(BaseManager):
class MotionPoll(RESTModelMixin, BasePoll):
access_permissions = MotionPollAccessPermissions()
can_see_permission = "motions.can_see"
option_class = MotionOption
objects = MotionPollManager()
@ -973,8 +942,6 @@ class State(RESTModelMixin, models.Model):
state.
"""
access_permissions = StateAccessPermissions()
name = models.CharField(max_length=255)
"""A string representing the state."""
@ -1118,8 +1085,6 @@ class Workflow(RESTModelMixin, models.Model):
Defines a workflow for a motion.
"""
access_permissions = WorkflowAccessPermissions()
objects = WorkflowManager()
name = models.CharField(max_length=255)

View File

@ -25,16 +25,6 @@ from ..utils.rest_api import (
list_route,
)
from ..utils.views import TreeSortMixin
from .access_permissions import (
CategoryAccessPermissions,
MotionAccessPermissions,
MotionBlockAccessPermissions,
MotionChangeRecommendationAccessPermissions,
MotionCommentSectionAccessPermissions,
StateAccessPermissions,
StatuteParagraphAccessPermissions,
WorkflowAccessPermissions,
)
from .models import (
Category,
Motion,
@ -63,16 +53,13 @@ class MotionViewSet(TreeSortMixin, ModelViewSet):
There are a lot of views. See check_view_permissions().
"""
access_permissions = MotionAccessPermissions()
queryset = Motion.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("metadata", "partial_update", "update", "destroy"):
if self.action in ("metadata", "partial_update", "update", "destroy"):
result = has_perm(self.request.user, "motions.can_see")
# For partial_update, update and destroy requests the rest of the check is
# done in the update method. See below.
@ -1280,7 +1267,7 @@ class MotionPollViewSet(BasePollViewSet):
value=data,
weight=weight,
)
vote.save(no_delete_on_restriction=True)
vote.save()
inform_changed_data(option)
@ -1306,18 +1293,13 @@ class MotionChangeRecommendationViewSet(ModelViewSet):
partial_update, update and destroy.
"""
access_permissions = MotionChangeRecommendationAccessPermissions()
queryset = MotionChangeRecommendation.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action == "metadata":
result = has_perm(self.request.user, "motions.can_see")
elif self.action in ("create", "destroy", "partial_update", "update"):
if self.action in ("create", "destroy", "partial_update", "update"):
result = has_perm(self.request.user, "motions.can_see") and has_perm(
self.request.user, "motions.can_manage"
)
@ -1372,16 +1354,13 @@ class MotionCommentSectionViewSet(ModelViewSet):
API endpoint for motion comment fields.
"""
access_permissions = MotionCommentSectionAccessPermissions()
queryset = MotionCommentSection.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("create", "destroy", "update", "partial_update", "sort"):
if self.action in ("create", "destroy", "update", "partial_update", "sort"):
result = has_perm(self.request.user, "motions.can_see") and has_perm(
self.request.user, "motions.can_manage"
)
@ -1463,16 +1442,13 @@ class StatuteParagraphViewSet(ModelViewSet):
partial_update, update and destroy.
"""
access_permissions = StatuteParagraphAccessPermissions()
queryset = StatuteParagraph.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("create", "partial_update", "update", "destroy"):
if self.action in ("create", "partial_update", "update", "destroy"):
result = has_perm(self.request.user, "motions.can_see") and has_perm(
self.request.user, "motions.can_manage"
)
@ -1489,16 +1465,13 @@ class CategoryViewSet(TreeSortMixin, ModelViewSet):
partial_update, update, destroy and numbering.
"""
access_permissions = CategoryAccessPermissions()
queryset = Category.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve", "metadata"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in (
if self.action in (
"create",
"partial_update",
"update",
@ -1606,18 +1579,13 @@ class MotionBlockViewSet(ModelViewSet):
partial_update, update and destroy.
"""
access_permissions = MotionBlockAccessPermissions()
queryset = MotionBlock.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action == "metadata":
result = has_perm(self.request.user, "motions.can_see")
elif self.action in (
if self.action in (
"create",
"partial_update",
"update",
@ -1684,16 +1652,13 @@ class WorkflowViewSet(ModelViewSet, ProtectedErrorMessageMixin):
partial_update, update and destroy.
"""
access_permissions = WorkflowAccessPermissions()
queryset = Workflow.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve", "metadata"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("create", "partial_update", "update", "destroy"):
if self.action in ("create", "partial_update", "update", "destroy"):
result = has_perm(self.request.user, "motions.can_see") and has_perm(
self.request.user, "motions.can_manage"
)
@ -1734,15 +1699,12 @@ class StateViewSet(ModelViewSet, ProtectedErrorMessageMixin):
"""
queryset = State.objects.all()
access_permissions = StateAccessPermissions()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve", "metadata"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("create", "partial_update", "update", "destroy"):
if self.action in ("create", "partial_update", "update", "destroy"):
result = has_perm(self.request.user, "motions.can_see") and has_perm(
self.request.user, "motions.can_manage"
)

View File

@ -1,122 +0,0 @@
import json
from typing import Any, Dict, List
from ..poll.views import BasePoll
from ..utils import logging
from ..utils.access_permissions import BaseAccessPermissions
from ..utils.auth import async_has_perm, user_collection_string
from ..utils.cache import element_cache
logger = logging.getLogger(__name__)
class BaseVoteAccessPermissions(BaseAccessPermissions):
manage_permission = "" # set by subclass
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Poll-managers have full access, even during an active poll.
Every user can see it's own votes.
If the pollstate is published, everyone can see the votes.
"""
if await async_has_perm(user_id, self.manage_permission):
data = full_data
elif await async_has_perm(user_id, self.base_permission):
data = [
vote
for vote in full_data
if vote["pollstate"] == BasePoll.STATE_PUBLISHED
or vote["user_id"] == user_id
or vote["delegated_user_id"] == user_id
]
else:
data = []
return data
class BaseOptionAccessPermissions(BaseAccessPermissions):
manage_permission = "" # set by subclass
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
if await async_has_perm(user_id, self.manage_permission):
data = full_data
elif await async_has_perm(user_id, self.base_permission):
data = []
for option in full_data:
if option["pollstate"] != BasePoll.STATE_PUBLISHED:
option = json.loads(
json.dumps(option)
) # copy, so we can remove some fields.
del option["yes"]
del option["no"]
del option["abstain"]
data.append(option)
else:
data = []
return data
class BasePollAccessPermissions(BaseAccessPermissions):
manage_permission = "" # set by subclass
additional_fields: List[str] = []
""" Add fields to be removed from each unpublished poll """
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Poll-managers have full access, even during an active poll.
Non-published polls will be restricted:
- Remove votes* values from the poll
- Remove yes/no/abstain fields from options
- Remove fields given in self.additional_fields from the poll
"""
# add has_voted for all users to check whether op has voted
# also fill user_has_voted_for_delegations with all users for which he has
# already voted
user_data = await element_cache.get_element_data(
user_collection_string, user_id
)
if user_data is None:
logger.error(f"Could not find userdata for {user_id}")
vote_delegated_from_ids = set()
else:
vote_delegated_from_ids = set(user_data["vote_delegated_from_users_id"])
for poll in full_data:
poll["user_has_voted"] = user_id in poll["voted_id"]
voted_ids = set(poll["voted_id"])
voted_for_delegations = list(
vote_delegated_from_ids.intersection(voted_ids)
)
poll["user_has_voted_for_delegations"] = voted_for_delegations
data_copy = json.loads(json.dumps(full_data))
for poll in data_copy:
if poll["state"] not in (BasePoll.STATE_FINISHED, BasePoll.STATE_PUBLISHED):
del poll["voted_id"]
if await async_has_perm(user_id, self.manage_permission):
pass
elif await async_has_perm(user_id, self.base_permission):
for poll in data_copy:
if poll["state"] != BasePoll.STATE_PUBLISHED:
del poll["votesvalid"]
del poll["votesinvalid"]
del poll["votescast"]
if "voted_id" in poll: # could be removed earlier
del poll["voted_id"]
for field in self.additional_fields:
del poll[field]
else:
data_copy = []
return data_copy

View File

@ -11,10 +11,8 @@ from openslides.utils.autoupdate import disable_history, inform_changed_data
from openslides.utils.rest_api import (
DecimalField,
GenericViewSet,
ListModelMixin,
ModelViewSet,
Response,
RetrieveModelMixin,
ValidationError,
detail_route,
)
@ -373,9 +371,9 @@ class BasePollViewSet(ModelViewSet):
raise NotImplementedError()
class BaseVoteViewSet(ListModelMixin, RetrieveModelMixin, GenericViewSet):
class BaseVoteViewSet(GenericViewSet):
pass
class BaseOptionViewSet(ListModelMixin, RetrieveModelMixin, GenericViewSet):
class BaseOptionViewSet(GenericViewSet):
pass

View File

@ -1,9 +0,0 @@
from ..utils.access_permissions import BaseAccessPermissions
class TopicAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Topic and TopicViewSet.
"""
base_permission = "agenda.can_see"

View File

@ -5,7 +5,6 @@ from openslides.utils.manager import BaseManager
from ..agenda.mixins import AgendaItemWithListOfSpeakersMixin
from ..mediafiles.models import Mediafile
from ..utils.models import RESTModelMixin
from .access_permissions import TopicAccessPermissions
class TopicManager(BaseManager):
@ -31,8 +30,6 @@ class Topic(RESTModelMixin, AgendaItemWithListOfSpeakersMixin, models.Model):
Model for slides with custom content. Used to be called custom slide.
"""
access_permissions = TopicAccessPermissions()
objects = TopicManager()
title = models.CharField(max_length=256)

View File

@ -1,7 +1,6 @@
from openslides.utils.rest_api import ModelViewSet
from ..utils.auth import has_perm
from .access_permissions import TopicAccessPermissions
from .models import Topic
@ -13,15 +12,14 @@ class TopicViewSet(ModelViewSet):
partial_update, update and destroy.
"""
access_permissions = TopicAccessPermissions()
queryset = Topic.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
else:
if self.action in ("create", "update", "partial_update", "destroy"):
result = has_perm(self.request.user, "agenda.can_manage")
else:
result = False
return result

View File

@ -1,159 +0,0 @@
from typing import Any, Dict, List, Set
from ..utils.access_permissions import BaseAccessPermissions, required_user
from ..utils.auth import async_has_perm
from ..utils.utils import get_model_from_collection_string
class UserAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for User and UserViewSet.
"""
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Returns the restricted serialized data for the instance prepared
for the user. Removes several fields for non admins so that they do
not get the fields they should not get.
"""
from .serializers import (
USERCANSEEEXTRASERIALIZER_FIELDS,
USERCANSEESERIALIZER_FIELDS,
)
def filtered_data(full_data, whitelist, whitelist_operator=None):
"""
Returns a new dict like full_data but only with whitelisted keys.
If the whitelist_operator is given and the full_data-user is the
oeperator (the user with user_id), the whitelist_operator will
be used instead of the whitelist.
"""
if whitelist_operator is not None and full_data["id"] == user_id:
return {key: full_data[key] for key in whitelist_operator}
else:
return {key: full_data[key] for key in whitelist}
# We have some sets of data to be sent:
# * full data i. e. all fields (including session_auth_hash),
# * all data i. e. all fields but not session_auth_hash,
# * many data i. e. all fields but not the default password and session_auth_hash,
# * little data i. e. all fields but not the default password, session_auth_hash,
# comments, gender, email, last_email_send, active status and auth_type
# * own data i. e. all little data fields plus email and gender. This is applied
# to the own user, if he just can see little or no data.
# * no data.
# Prepare field set for users with "all" data, "many" data and with "little" data.
all_data_fields = set(USERCANSEEEXTRASERIALIZER_FIELDS)
all_data_fields.add("groups_id")
all_data_fields.discard("groups")
all_data_fields.add("default_password")
many_data_fields = all_data_fields.copy()
many_data_fields.discard("default_password")
little_data_fields = set(USERCANSEESERIALIZER_FIELDS)
little_data_fields.add("groups_id")
little_data_fields.discard("groups")
own_data_fields = set(little_data_fields)
own_data_fields.add("email")
own_data_fields.add("gender")
own_data_fields.add("vote_delegated_to_id")
own_data_fields.add("vote_delegated_from_users_id")
# Check user permissions.
if await async_has_perm(user_id, "users.can_see_name"):
whitelist_operator = None
if await async_has_perm(user_id, "users.can_see_extra_data"):
if await async_has_perm(user_id, "users.can_manage"):
whitelist = all_data_fields
else:
whitelist = many_data_fields
else:
whitelist = little_data_fields
whitelist_operator = own_data_fields
# for managing {motion, assignment} polls the users needs to know
# the vote delegation structure.
if await async_has_perm(
user_id, "motion.can_manage_polls"
) or await async_has_perm(user_id, "assignments.can_manage"):
whitelist.add("vote_delegated_to_id")
whitelist.add("vote_delegated_from_users_id")
data = [
filtered_data(full, whitelist, whitelist_operator) for full in full_data
]
else:
# Build a list of users, that can be seen without any permissions (with little fields).
# Everybody can see himself. Also everybody can see every user
# that is required e. g. as speaker, motion submitter or
# assignment candidate.
can_see_collection_strings: Set[str] = set()
for collection_string in required_user.get_collection_strings():
if await async_has_perm(
user_id,
get_model_from_collection_string(
collection_string
).can_see_permission,
):
can_see_collection_strings.add(collection_string)
required_user_ids = await required_user.get_required_users(
can_see_collection_strings
)
# Add oneself.
if user_id:
required_user_ids.add(user_id)
# add vote delegations
# Find our model in full_data and get vote_delegated_from_users_id from it.
for user in full_data:
if user["id"] == user_id:
required_user_ids.update(user["vote_delegated_from_users_id"])
break
# Parse data.
data = [
filtered_data(full, little_data_fields, own_data_fields)
for full in full_data
if full["id"] in required_user_ids
]
return data
class GroupAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for Groups. Everyone can see them
"""
class PersonalNoteAccessPermissions(BaseAccessPermissions):
"""
Access permissions container for personal notes. Every authenticated user
can handle personal notes.
"""
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Returns the restricted serialized data for the instance prepared
for the user. Everybody gets only his own personal notes.
"""
# Parse data.
if not user_id:
data: List[Dict[str, Any]] = []
else:
for full in full_data:
if full["user_id"] == user_id:
data = [full]
break
else:
data = []
return data

View File

@ -28,11 +28,6 @@ from ..utils.models import (
SET_NULL_AND_AUTOUPDATE,
RESTModelMixin,
)
from .access_permissions import (
GroupAccessPermissions,
PersonalNoteAccessPermissions,
UserAccessPermissions,
)
class UserManager(BaseUserManager):
@ -130,8 +125,6 @@ class User(RESTModelMixin, PermissionsMixin, AbstractBaseUser):
candidates.
"""
access_permissions = UserAccessPermissions()
USERNAME_FIELD = "username"
username = models.CharField(max_length=255, unique=True, blank=True)
@ -356,7 +349,6 @@ class Group(RESTModelMixin, DjangoGroup):
Extend the django group with support of our REST and caching system.
"""
access_permissions = GroupAccessPermissions()
objects = GroupManager()
class Meta:
@ -382,14 +374,6 @@ class PersonalNote(RESTModelMixin, models.Model):
openslides objects like motions.
"""
access_permissions = PersonalNoteAccessPermissions()
personalized_model = True
"""
Each model belongs to one user. This relation is set during creation and
will not be changed.
"""
objects = PersonalNoteManager()
user = models.OneToOneField(User, on_delete=CASCADE_AND_AUTOUPDATE)

View File

@ -0,0 +1,74 @@
from typing import Any, Dict
from ..utils.auth import async_has_perm
async def restrict_user(full_user: Dict[str, Any]) -> Dict[str, Any]:
"""
Returns the restricted serialized data for the instance prepared
for the user. Removes several fields for non admins so that they do
not get the fields they should not get.
"""
from .serializers import (
USERCANSEEEXTRASERIALIZER_FIELDS,
USERCANSEESERIALIZER_FIELDS,
)
user_id = full_user["id"]
def filtered_data(full_user, whitelist):
"""
Returns a new dict like full_user but only with whitelisted keys.
"""
return {key: full_user[key] for key in whitelist}
# We have some sets of data to be sent:
# * full data i. e. all fields (including session_auth_hash),
# * all data i. e. all fields but not session_auth_hash,
# * many data i. e. all fields but not the default password and session_auth_hash,
# * little data i. e. all fields but not the default password, session_auth_hash,
# comments, gender, email, last_email_send, active status and auth_type
# * own data i. e. all little data fields plus email and gender. This is applied
# to the own user, if he just can see little or no data.
# * no data.
# Prepare field set for users with "all" data, "many" data and with "little" data.
all_data_fields = set(USERCANSEEEXTRASERIALIZER_FIELDS)
all_data_fields.add("groups_id")
all_data_fields.discard("groups")
all_data_fields.add("default_password")
many_data_fields = all_data_fields.copy()
many_data_fields.discard("default_password")
little_data_fields = set(USERCANSEESERIALIZER_FIELDS)
little_data_fields.add("groups_id")
little_data_fields.discard("groups")
own_data_fields = set(little_data_fields)
own_data_fields.add("email")
own_data_fields.add("gender")
own_data_fields.add("vote_delegated_to_id")
own_data_fields.add("vote_delegated_from_users_id")
# Check user permissions.
if await async_has_perm(user_id, "users.can_see_name"):
if await async_has_perm(user_id, "users.can_see_extra_data"):
if await async_has_perm(user_id, "users.can_manage"):
whitelist = all_data_fields
else:
whitelist = many_data_fields
else:
whitelist = own_data_fields
# for managing {motion, assignment} polls the users needs to know
# the vote delegation structure.
if await async_has_perm(
user_id, "motion.can_manage_polls"
) or await async_has_perm(user_id, "assignments.can_manage"):
whitelist.add("vote_delegated_to_id")
whitelist.add("vote_delegated_from_users_id")
data = filtered_data(full_user, whitelist)
else:
# Parse data.
data = filtered_data(full_user, own_data_fields)
return data

View File

@ -48,12 +48,8 @@ from ..utils.rest_api import (
)
from ..utils.validate import validate_json
from ..utils.views import APIView
from .access_permissions import (
GroupAccessPermissions,
PersonalNoteAccessPermissions,
UserAccessPermissions,
)
from .models import Group, PersonalNote, User
from .restrict import restrict_user
from .serializers import GroupSerializer, PermissionRelatedField
from .user_backend import user_backend_manager
@ -85,18 +81,13 @@ class UserViewSet(ModelViewSet):
partial_update, update, destroy and reset_password.
"""
access_permissions = UserAccessPermissions()
queryset = User.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action == "metadata":
result = has_perm(self.request.user, "users.can_see_name")
elif self.action in ("update", "partial_update"):
if self.action in ("update", "partial_update"):
result = self.request.user.is_authenticated
elif self.action in (
"create",
@ -597,19 +588,12 @@ class GroupViewSet(ModelViewSet):
metadata_class = GroupViewSetMetadata
queryset = Group.objects.all()
serializer_class = GroupSerializer
access_permissions = GroupAccessPermissions()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action == "metadata":
# Every authenticated user can see the metadata.
# Anonymous users can do so if they are enabled.
result = self.request.user.is_authenticated or anonymous_is_enabled()
elif self.action in (
if self.action in (
"create",
"partial_update",
"update",
@ -762,16 +746,13 @@ class PersonalNoteViewSet(ModelViewSet):
partial_update, update, and destroy.
"""
access_permissions = PersonalNoteAccessPermissions()
queryset = PersonalNote.objects.all()
def check_view_permissions(self):
"""
Returns True if the user has required permissions.
"""
if self.action in ("list", "retrieve"):
result = self.get_access_permissions().check_permissions(self.request.user)
elif self.action in ("create_or_update", "destroy"):
if self.action in ("create_or_update", "destroy"):
# Every authenticated user can see metadata and create personal
# notes for himself and can manipulate only his own personal notes.
# See self.perform_create(), self.update() and self.destroy().
@ -869,9 +850,7 @@ class WhoAmIDataView(APIView):
raise APIException(f"Could not find user {user_id}", 500)
auth_type = user_full_data["auth_type"]
user_data = async_to_sync(element_cache.restrict_element_data)(
user_full_data, self.request.user.get_collection_string(), user_id
)
user_data = async_to_sync(restrict_user)(user_full_data)
group_ids = user_data["groups_id"] or [GROUP_DEFAULT_PK]
else:
user_data = None

View File

@ -1,104 +0,0 @@
from typing import Any, Callable, Coroutine, Dict, List, Set
from asgiref.sync import async_to_sync
from .auth import async_anonymous_is_enabled, async_has_perm, user_to_user_id
from .cache import element_cache
class BaseAccessPermissions:
"""
Base access permissions container.
Every app which has autoupdate models has to create classes subclassing
from this base class for every autoupdate root model.
"""
base_permission = ""
"""
Set to a permission the user needs to see the element.
If this string is empty, all users can see it.
"""
def check_permissions(self, user_id: int) -> bool:
"""
Returns True if the user has read access to model instances.
"""
# Convert user to right type
# TODO: Remove this and make sure, that user has always the right type
user_id = user_to_user_id(user_id)
return async_to_sync(self.async_check_permissions)(user_id)
async def async_check_permissions(self, user_id: int) -> bool:
"""
Returns True if the user has read access to model instances.
"""
if self.base_permission:
return await async_has_perm(user_id, self.base_permission)
else:
return bool(user_id) or await async_anonymous_is_enabled()
async def get_restricted_data(
self, full_data: List[Dict[str, Any]], user_id: int
) -> List[Dict[str, Any]]:
"""
Returns the restricted serialized data for the instance prepared
for the user.
The argument full_data has to be a list of full_data dicts. The type of
the return is the same. Returns an empty list if the user has no read
access. Returns reduced data if the user has limited access. Default:
Returns full data if the user has read access to model instances.
"""
return full_data if await self.async_check_permissions(user_id) else []
class RequiredUsers:
"""
Helper class to find all users that are required by another element.
"""
callables: Dict[str, Callable[[Dict[str, Any]], Coroutine[Any, Any, Set[int]]]] = {}
def get_collection_strings(self) -> Set[str]:
"""
Returns all collection strings for elements that could have required users.
"""
return set(self.callables.keys())
def add_collection_string(
self,
collection_string: str,
callable: Callable[[Dict[str, Any]], Coroutine[Any, Any, Set[int]]],
) -> None:
"""
Add a callable for a collection_string to get the required users of the
elements.
"""
self.callables[collection_string] = callable
async def get_required_users(self, collection_strings: Set[str]) -> Set[int]:
"""
Returns the user ids that are required by other elements.
Returns only user ids required by elements with a collection_string
in the argument collection_strings.
"""
user_ids: Set[int] = set()
for collection_string in collection_strings:
collection_data = await element_cache.get_collection_data(collection_string)
# Get the callable for the collection_string
get_user_ids = self.callables.get(collection_string)
if not (get_user_ids and collection_data):
# if the collection_string is unknown or it has no data, do nothing
continue
for element in collection_data.values():
user_ids.update(await get_user_ids(element))
return user_ids
required_user = RequiredUsers()

View File

@ -1,13 +1,10 @@
from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from django.db.models import Model
from mypy_extensions import TypedDict
from .auth import UserDoesNotExist
from .autoupdate_bundle import AutoupdateElement, autoupdate_bundle
from .cache import ChangeIdTooLowError, element_cache
from .utils import is_iterable, split_element_id
from .utils import is_iterable
AutoupdateFormat = TypedDict(
@ -33,7 +30,6 @@ def inform_changed_data(
information: List[str] = None,
user_id: Optional[int] = None,
disable_history: bool = False,
no_delete_on_restriction: bool = False,
) -> None:
"""
Informs the autoupdate system and the caching system about the creation or
@ -58,7 +54,6 @@ def inform_changed_data(
disable_history=disable_history,
information=information,
user_id=user_id,
no_delete_on_restriction=no_delete_on_restriction,
)
elements.append(element)
inform_elements(elements)
@ -101,57 +96,3 @@ def inform_elements(elements: Iterable[AutoupdateElement]) -> None:
"""
with autoupdate_bundle() as bundle:
bundle.add(elements)
async def get_autoupdate_data(
from_change_id: int, user_id: int
) -> Tuple[int, Optional[AutoupdateFormat]]:
try:
return await _get_autoupdate_data(from_change_id, user_id)
except UserDoesNotExist:
return 0, None
async def _get_autoupdate_data(
from_change_id: int, user_id: int
) -> Tuple[int, Optional[AutoupdateFormat]]:
"""
Returns the max_change_id and the autoupdate from from_change_id to max_change_id
"""
try:
(
max_change_id,
changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(user_id, from_change_id)
except ChangeIdTooLowError:
# The change_id is lower the the lowerst change_id in redis. Return all data
(
max_change_id,
changed_elements,
) = await element_cache.get_all_data_list_with_max_change_id(user_id)
deleted_elements: Dict[str, List[int]] = {}
all_data = True
else:
all_data = False
deleted_elements = defaultdict(list)
for element_id in deleted_element_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
# Check, if the autoupdate has any data.
if not changed_elements and not deleted_element_ids:
# Skip empty updates
return max_change_id, None
else:
# Normal autoupdate with data
return (
max_change_id,
AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=from_change_id,
to_change_id=max_change_id,
all_data=all_data,
),
)

View File

@ -35,17 +35,11 @@ class AutoupdateElement(AutoupdateElementBase, total=False):
disable_history: If this is True, the element (and the containing full_data) won't
be saved into the history. Information and user_id is then irrelevant.
no_delete_on_restriction is a flag, which is saved into the models in the cache
as the _no_delete_on_restriction key. If this is true, there should neither be an
entry for one specific model in the changed *nor the deleted* part of the
autoupdate, if the model was restricted.
"""
information: List[str]
user_id: Optional[int]
disable_history: bool
no_delete_on_restriction: bool
full_data: Optional[Dict[str, Any]]
@ -122,12 +116,7 @@ class AutoupdateBundle:
cache_elements: Dict[str, Optional[Dict[str, Any]]] = {}
for element in self.element_iterator:
element_id = get_element_id(element["collection_string"], element["id"])
full_data = element.get("full_data")
if full_data:
full_data["_no_delete_on_restriction"] = element.get(
"no_delete_on_restriction", False
)
cache_elements[element_id] = full_data
cache_elements[element_id] = element.get("full_data")
return cache_elements
async def dispatch_autoupdate(self) -> int:

View File

@ -2,7 +2,7 @@ import json
from collections import defaultdict
from datetime import datetime
from time import sleep
from typing import Any, Callable, Dict, List, Optional, Tuple, Type
from typing import Any, Callable, Dict, List, Optional, Type
from asgiref.sync import async_to_sync, sync_to_async
from django.apps import apps
@ -23,10 +23,6 @@ from .utils import get_element_id, split_element_id
logger = logging.getLogger(__name__)
class ChangeIdTooLowError(Exception):
pass
def get_all_cachables() -> List[Cachable]:
"""
Returns all element of OpenSlides.
@ -231,9 +227,7 @@ class ElementCache:
changed_elements, deleted_elements
)
async def get_all_data_list(
self, user_id: Optional[int] = None
) -> Dict[str, List[Dict[str, Any]]]:
async def get_all_data_list(self) -> Dict[str, List[Dict[str, Any]]]:
"""
Returns all data with a list per collection:
{
@ -242,33 +236,17 @@ class ElementCache:
If the user id is given the data will be restricted for this user.
"""
all_data = await self.cache_provider.get_all_data()
return await self.format_all_data(all_data, user_id)
async def get_all_data_list_with_max_change_id(
self, user_id: Optional[int] = None
) -> Tuple[int, Dict[str, List[Dict[str, Any]]]]:
(
max_change_id,
all_data,
) = await self.cache_provider.get_all_data_with_max_change_id()
return max_change_id, await self.format_all_data(all_data, user_id)
return await self.format_all_data(all_data)
async def format_all_data(
self, all_data_bytes: Dict[bytes, bytes], user_id: Optional[int]
self, all_data_bytes: Dict[bytes, bytes]
) -> Dict[str, List[Dict[str, Any]]]:
all_data: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for element_id, data in all_data_bytes.items():
collection, _ = split_element_id(element_id)
element = json.loads(data.decode())
element.pop(
"_no_delete_on_restriction", False
) # remove special field for get_data_since
all_data[collection].append(element)
if user_id is not None:
for collection in all_data.keys():
restricter = self.cachables[collection].restrict_elements
all_data[collection] = await restricter(user_id, all_data[collection])
return dict(all_data)
async def get_collection_data(self, collection: str) -> Dict[int, Dict[str, Any]]:
@ -281,13 +259,10 @@ class ElementCache:
collection_data = {}
for id in encoded_collection_data.keys():
collection_data[id] = json.loads(encoded_collection_data[id].decode())
collection_data[id].pop(
"_no_delete_on_restriction", False
) # remove special field for get_data_since
return collection_data
async def get_element_data(
self, collection: str, id: int, user_id: Optional[int] = None
self, collection: str, id: int
) -> Optional[Dict[str, Any]]:
"""
Returns one element or None, if the element does not exist.
@ -299,108 +274,7 @@ class ElementCache:
if encoded_element is None:
return None
element = json.loads(encoded_element.decode()) # type: ignore
element.pop(
"_no_delete_on_restriction", False
) # remove special field for get_data_since
if user_id is not None:
element = await self.restrict_element_data(element, collection, user_id)
return element
async def restrict_element_data(
self, element: Dict[str, Any], collection: str, user_id: int
) -> Optional[Dict[str, Any]]:
restricter = self.cachables[collection].restrict_elements
restricted_elements = await restricter(user_id, [element])
return restricted_elements[0] if restricted_elements else None
async def get_data_since(
self, user_id: Optional[int] = None, change_id: int = 0
) -> Tuple[int, Dict[str, List[Dict[str, Any]]], List[str]]:
"""
Returns all data since change_id until the max change id.cIf the user id is given the
data will be restricted for this user.
Returns three values inside a tuple. The first value is the max change id. The second
value is a dict where the key is the collection and the value is a list of data.
The third is a list of element_ids with deleted elements.
Only returns elements with the change_id or newer. When change_id is 0,
all elements are returned.
Raises a ChangeIdTooLowError when the lowest change_id in redis is higher then
the requested change_id. In this case the method has to be rerun with
change_id=0. This is importend because there could be deleted elements
that the cache does not know about.
"""
if change_id == 0:
(
max_change_id,
changed_elements,
) = await self.get_all_data_list_with_max_change_id(user_id)
return (max_change_id, changed_elements, [])
# This raises a Runtime Exception, if there is no change_id
lowest_change_id = await self.get_lowest_change_id()
if change_id < lowest_change_id:
# When change_id is lower then the lowest change_id in redis, we can
# not inform the user about deleted elements.
raise ChangeIdTooLowError(
f"change_id {change_id} is lower then the lowest change_id in redis {lowest_change_id}."
)
(
max_change_id,
raw_changed_elements,
deleted_elements,
) = await self.cache_provider.get_data_since(change_id)
changed_elements = {
collection: [json.loads(value.decode()) for value in value_list]
for collection, value_list in raw_changed_elements.items()
}
if user_id is None:
for elements in changed_elements.values():
for element in elements:
element.pop("_no_delete_on_restriction", False)
else:
# the list(...) is important, because `changed_elements` will be
# altered during iteration and restricting data
for collection, elements in list(changed_elements.items()):
# Remove the _no_delete_on_restriction from each element. Collect all ids, where
# this field is absent or False.
unrestricted_ids = set()
for element in elements:
no_delete_on_restriction = element.pop(
"_no_delete_on_restriction", False
)
if not no_delete_on_restriction:
unrestricted_ids.add(element["id"])
cacheable = self.cachables[collection]
restricted_elements = await cacheable.restrict_elements(
user_id, elements
)
# If the model is personalized, it must not be deleted for other users
if not cacheable.personalized_model:
# Add removed objects (through restricter) to deleted elements.
restricted_element_ids = set(
[element["id"] for element in restricted_elements]
)
# Delete all ids, that are allowed to be deleted (see unrestricted_ids) and are
# not present after restricting the data.
for id in unrestricted_ids - restricted_element_ids:
deleted_elements.append(get_element_id(collection, id))
if not restricted_elements:
del changed_elements[collection]
else:
changed_elements[collection] = restricted_elements
return (max_change_id, changed_elements, deleted_elements)
return json.loads(encoded_element.decode()) # type: ignore
async def get_current_change_id(self) -> int:
"""

View File

@ -1,6 +1,5 @@
import functools
import hashlib
from collections import defaultdict
from textwrap import dedent
from typing import Any, Callable, Coroutine, Dict, List, Optional, Set, Tuple
@ -70,11 +69,6 @@ class ElementCacheProvider(Protocol):
) -> int:
...
async def get_data_since(
self, change_id: int
) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
...
async def get_current_change_id(self) -> int:
...
@ -252,34 +246,6 @@ class RedisCacheProvider:
""",
True,
),
"get_data_since": (
"""
-- get max change id
local tmp = redis.call('zrevrangebyscore', KEYS[2], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1)
local max_change_id
if next(tmp) == nil then
-- The key does not exist
return redis.error_reply("cache_reset")
else
max_change_id = tmp[2]
end
-- Get change ids of changed elements
local element_ids = redis.call('zrangebyscore', KEYS[2], ARGV[1], max_change_id)
-- Save elements in array. First is the max_change_id with the key "max_change_id"
-- Than rotate element_id and element_json. This is ocnverted into a dict in python code.
local elements = {}
table.insert(elements, 'max_change_id')
table.insert(elements, max_change_id)
for _, element_id in pairs(element_ids) do
table.insert(elements, element_id)
table.insert(elements, redis.call('hget', KEYS[1], element_id))
end
return elements
""",
True,
),
}
def __init__(self, ensure_cache: Callable[[], Coroutine[Any, Any, None]]) -> None:
@ -430,47 +396,6 @@ class RedisCacheProvider:
)
)
@ensure_cache_wrapper()
async def get_data_since(
self, change_id: int
) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
"""
Returns all elements since a change_id (included) and until the max_change_id (included).
The returend value is a two element tuple. The first value is a dict the elements where
the key is the collection and the value a list of (json-) encoded elements. The
second element is a list of element_ids, that have been deleted since the change_id.
"""
changed_elements: Dict[str, List[bytes]] = defaultdict(list)
deleted_elements: List[str] = []
# lua script that returns gets all element_ids from change_id_cache_key
# and then uses each element_id on full_data or restricted_data.
# It returns a list where the odd values are the change_id and the
# even values the element as json. The function wait_make_dict creates
# a python dict from the returned list.
elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict(
self.eval(
"get_data_since",
keys=[self.full_data_cache_key, self.change_id_cache_key],
args=[change_id],
read_only=True,
)
)
max_change_id = int(elements[b"max_change_id"].decode()) # type: ignore
for element_id, element_json in elements.items():
if element_id.startswith(b"_config") or element_id == b"max_change_id":
# Ignore config values from the change_id cache key
continue
if element_json is None:
# The element is not in the cache. It has to be deleted.
deleted_elements.append(element_id.decode())
else:
collection, id = split_element_id(element_id)
changed_elements[collection].append(element_json)
return max_change_id, changed_elements, deleted_elements
@ensure_cache_wrapper()
async def get_current_change_id(self) -> int:
"""
@ -662,27 +587,6 @@ class MemoryCacheProvider:
return change_id
async def get_data_since(
self, change_id: int
) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
changed_elements: Dict[str, List[bytes]] = defaultdict(list)
deleted_elements: List[str] = []
all_element_ids: Set[str] = set()
for data_change_id, element_ids in self.change_id_data.items():
if data_change_id >= change_id:
all_element_ids.update(element_ids)
for element_id in all_element_ids:
element_json = self.full_data.get(element_id, None)
if element_json is None:
deleted_elements.append(element_id)
else:
collection, id = split_element_id(element_id)
changed_elements[collection].append(element_json.encode())
max_change_id = await self.get_current_change_id()
return (max_change_id, changed_elements, deleted_elements)
async def get_current_change_id(self) -> int:
if self.change_id_data:
return max(self.change_id_data.keys())
@ -706,8 +610,6 @@ class Cachable(Protocol):
It needs at least the methods defined here.
"""
personalized_model: bool
def get_collection_string(self) -> str:
"""
Returns the string representing the name of the cachable.
@ -717,13 +619,3 @@ class Cachable(Protocol):
"""
Returns all elements of the cachable.
"""
async def restrict_elements(
self, user_id: int, elements: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Converts full_data to restricted_data.
elements can be an empty list, a list with some elements of the cachable or with all
elements of the cachable.
"""

View File

@ -1,12 +1,9 @@
import time
from typing import Any, Dict, List, Optional
from django.core.exceptions import ImproperlyConfigured
from django.db import models
from . import logging
from .access_permissions import BaseAccessPermissions
from .auth import UserDoesNotExist
from .autoupdate import AutoupdateElement, inform_changed_data, inform_elements
from .rest_api import model_serializer_classes
from .utils import convert_camel_case_to_pseudo_snake_case, get_element_id
@ -37,17 +34,6 @@ class RESTModelMixin:
Mixin for Django models which are used in our REST API.
"""
access_permissions: Optional[BaseAccessPermissions] = None
personalized_model = False
"""
Flag, if the model is personalized on a per-user basis.
Requires the model to have a `user_id` which should be
a OneToOne relation to User. The relation must never change,
because it won't be deleted from it's former user when the relation
changes.
"""
def get_root_rest_element(self) -> models.Model:
"""
Returns the root rest instance.
@ -56,18 +42,6 @@ class RESTModelMixin:
"""
return self
@classmethod
def get_access_permissions(cls) -> BaseAccessPermissions:
"""
Returns a container to handle access permissions for this model and
its corresponding viewset.
"""
if cls.access_permissions is None:
raise ImproperlyConfigured(
"A RESTModel needs to have an access_permission."
)
return cls.access_permissions
@classmethod
def get_collection_string(cls) -> str:
"""
@ -97,7 +71,6 @@ class RESTModelMixin:
def save(
self,
skip_autoupdate: bool = False,
no_delete_on_restriction: bool = False,
disable_history: bool = False,
*args: Any,
**kwargs: Any,
@ -117,7 +90,6 @@ class RESTModelMixin:
if not skip_autoupdate:
inform_changed_data(
self.get_root_rest_element(),
no_delete_on_restriction=no_delete_on_restriction,
disable_history=disable_history,
)
return return_value
@ -183,20 +155,6 @@ class RESTModelMixin:
return full_data
@classmethod
async def restrict_elements(
cls, user_id: int, elements: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Converts a list of elements from full_data to restricted_data.
"""
try:
return await cls.get_access_permissions().get_restricted_data(
elements, user_id
)
except UserDoesNotExist:
return []
def get_full_data(self) -> Dict[str, Any]:
"""
Returns the full_data of the instance.

View File

@ -1,18 +1,14 @@
from collections import OrderedDict
from typing import Any, Dict, Iterable, Optional, Type
from typing import Any, Dict, Iterable, Type
from asgiref.sync import async_to_sync
from django.db.models import Model
from django.http import Http404
from rest_framework import status
from rest_framework.decorators import detail_route, list_route
from rest_framework.exceptions import APIException
from rest_framework.metadata import SimpleMetadata
from rest_framework.mixins import (
CreateModelMixin as _CreateModelMixin,
DestroyModelMixin,
ListModelMixin as _ListModelMixin,
RetrieveModelMixin as _RetrieveModelMixin,
DestroyModelMixin as _DestroyModelMixin,
UpdateModelMixin as _UpdateModelMixin,
)
from rest_framework.relations import MANY_RELATION_KWARGS
@ -40,14 +36,9 @@ from rest_framework.serializers import (
ValidationError,
)
from rest_framework.utils.serializer_helpers import ReturnDict
from rest_framework.viewsets import (
GenericViewSet as _GenericViewSet,
ModelViewSet as _ModelViewSet,
)
from rest_framework.viewsets import GenericViewSet as _GenericViewSet
from . import logging
from .access_permissions import BaseAccessPermissions
from .cache import element_cache
__all__ = [
@ -156,7 +147,7 @@ class ErrorLoggingMixin:
class PermissionMixin:
"""
Mixin for subclasses of APIView like GenericViewSet and ModelViewSet.
Mixin for subclasses of APIView like GenericViewSet.
The method check_view_permissions is evaluated. If it returns False
self.permission_denied() is called. Django REST Framework's permission
@ -166,8 +157,6 @@ class PermissionMixin:
viewset.
"""
access_permissions: Optional[BaseAccessPermissions] = None
def get_permissions(self) -> Iterable[str]:
"""
Overridden method to check view permissions. Returns an empty
@ -189,13 +178,6 @@ class PermissionMixin:
"""
return False
def get_access_permissions(self) -> BaseAccessPermissions:
"""
Returns a container to handle access permissions for this viewset and
its corresponding model.
"""
return self.access_permissions # type: ignore
def get_serializer_class(self) -> Type[Serializer]:
"""
Overridden method to return the serializer class for the model.
@ -265,54 +247,6 @@ class ModelSerializer(_ModelSerializer, metaclass=ModelSerializerRegisterer):
return fields
class ListModelMixin(_ListModelMixin):
"""
Mixin to add the caching system to list requests.
"""
def list(self, request: Any, *args: Any, **kwargs: Any) -> Response:
model = self.get_queryset().model
try:
collection_string = model.get_collection_string()
except AttributeError:
# The corresponding queryset does not support caching.
response = super().list(request, *args, **kwargs)
else:
# TODO
# This loads all data from the cache, not only the requested data.
# If we would use the rest api, we should add a method
# element_cache.get_collection_restricted_data
all_restricted_data = async_to_sync(element_cache.get_all_data_list)(
request.user.pk or 0
)
response = Response(all_restricted_data.get(collection_string, []))
return response
class RetrieveModelMixin(_RetrieveModelMixin):
"""
Mixin to add the caching system to retrieve requests.
"""
def retrieve(self, request: Any, *args: Any, **kwargs: Any) -> Response:
model = self.get_queryset().model
try:
collection_string = model.get_collection_string()
except AttributeError:
# The corresponding queryset does not support caching.
response = super().retrieve(request, *args, **kwargs)
else:
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
user_id = request.user.pk or 0
content = async_to_sync(element_cache.get_element_data)(
collection_string, self.kwargs[lookup_url_kwarg], user_id
)
if content is None:
raise Http404
response = Response(content)
return response
class CreateModelMixin(_CreateModelMixin):
"""
Mixin to override create requests.
@ -349,6 +283,10 @@ class UpdateModelMixin(_UpdateModelMixin):
return response
class DestroyModelMixin(_DestroyModelMixin):
pass
class GenericViewSet(ErrorLoggingMixin, PermissionMixin, _GenericViewSet):
pass
@ -356,10 +294,9 @@ class GenericViewSet(ErrorLoggingMixin, PermissionMixin, _GenericViewSet):
class ModelViewSet(
ErrorLoggingMixin,
PermissionMixin,
ListModelMixin,
RetrieveModelMixin,
CreateModelMixin,
UpdateModelMixin,
_ModelViewSet,
DestroyModelMixin,
_GenericViewSet,
):
pass

View File

@ -1,6 +1,5 @@
import pytest
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse
from django.utils import timezone
@ -14,13 +13,9 @@ from openslides.core.models import Countdown
from openslides.mediafiles.models import Mediafile
from openslides.motions.models import Motion, MotionBlock
from openslides.topics.models import Topic
from openslides.users.models import Group
from openslides.utils.autoupdate import inform_changed_data
from tests.count_queries import count_queries
from tests.test_case import TestCase
from ...common_groups import GROUP_DEFAULT_PK
@pytest.mark.django_db(transaction=False)
def test_agenda_item_db_queries():
@ -106,24 +101,14 @@ class ContentObjects(TestCase):
assert topic.agenda_item is not None
assert topic.list_of_speakers is not None
response = self.client.get(reverse("item-detail", args=[topic.agenda_item.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
response = self.client.get(
reverse("listofspeakers-detail", args=[topic.list_of_speakers.pk])
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_delete_topic(self):
topic = Topic.objects.create(title="test_title_lwOCK32jZGFb37DpmoP(")
item_id = topic.agenda_item_id
list_of_speakers_id = topic.list_of_speakers_id
topic.delete()
response = self.client.get(reverse("item-detail", args=[item_id]))
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
response = self.client.get(
reverse("listofspeakers-detail", args=[list_of_speakers_id])
)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertFalse(Item.objects.filter(pk=item_id).exists())
self.assertFalse(ListOfSpeakers.objects.filter(pk=list_of_speakers_id).exists())
def test_prevent_removing_topic_from_agenda(self):
topic = Topic.objects.create(title="test_title_lwOCK32jZGFb37DpmoP(")
@ -185,105 +170,6 @@ class ContentObjects(TestCase):
self.assertEqual(motion.agenda_item_id, motion.agenda_item.id)
class RetrieveItem(TestCase):
"""
Tests retrieving items.
"""
def setUp(self):
self.client = APIClient()
config["general_system_enable_anonymous"] = True
self.item = Topic.objects.create(
title="test_title_Idais2pheepeiz5uph1c"
).agenda_item
def test_normal_by_anonymous_without_perm_to_see_internal_items(self):
group = get_user_model().groups.field.related_model.objects.get(
pk=GROUP_DEFAULT_PK
)
permission_string = "agenda.can_see_internal_items"
app_label, codename = permission_string.split(".")
permission = group.permissions.get(
content_type__app_label=app_label, codename=codename
)
group.permissions.remove(permission)
self.item.type = Item.AGENDA_ITEM
self.item.save()
response = self.client.get(reverse("item-detail", args=[self.item.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_hidden_by_anonymous_without_manage_perms(self):
response = self.client.get(reverse("item-detail", args=[self.item.pk]))
self.assertEqual(response.status_code, 404)
def test_hidden_by_anonymous_with_manage_perms(self):
group = Group.objects.get(pk=GROUP_DEFAULT_PK)
permission_string = "agenda.can_manage"
app_label, codename = permission_string.split(".")
permission = Permission.objects.get(
content_type__app_label=app_label, codename=codename
)
group.permissions.add(permission)
inform_changed_data(group)
response = self.client.get(reverse("item-detail", args=[self.item.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_internal_by_anonymous_without_perm_to_see_internal_items(self):
group = Group.objects.get(pk=GROUP_DEFAULT_PK)
permission_string = "agenda.can_see_internal_items"
app_label, codename = permission_string.split(".")
permission = group.permissions.get(
content_type__app_label=app_label, codename=codename
)
group.permissions.remove(permission)
inform_changed_data(group)
self.item.type = Item.INTERNAL_ITEM
self.item.save()
response = self.client.get(reverse("item-detail", args=[self.item.pk]))
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_normal_by_anonymous_cant_see_agenda_comments(self):
self.item.type = Item.AGENDA_ITEM
self.item.comment = "comment_gbiejd67gkbmsogh8374jf$kd"
self.item.save()
response = self.client.get(reverse("item-detail", args=[self.item.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(response.data.get("comment") is None)
class RetrieveListOfSpeakers(TestCase):
"""
Tests retrieving list of speakers.
"""
def setUp(self):
self.client = APIClient()
config["general_system_enable_anonymous"] = True
self.list_of_speakers = Topic.objects.create(
title="test_title_qsjem(ZUNfp7egnzp37n"
).list_of_speakers
def test_simple(self):
response = self.client.get(
reverse("listofspeakers-detail", args=[self.list_of_speakers.pk])
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_without_permission(self):
group = Group.objects.get(pk=GROUP_DEFAULT_PK)
permission_string = "agenda.can_see_list_of_speakers"
app_label, codename = permission_string.split(".")
permission = Permission.objects.get(
content_type__app_label=app_label, codename=codename
)
group.permissions.remove(permission)
inform_changed_data(group)
response = self.client.get(
reverse("listofspeakers-detail", args=[self.list_of_speakers.pk])
)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
class ManageSpeaker(TestCase):
"""
Tests managing speakers.

View File

@ -1,13 +1,11 @@
import random
from decimal import Decimal
from typing import Any
import pytest
from django.conf import settings
from django.contrib.auth import get_user_model
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIClient
from openslides.assignments.models import (
Assignment,
@ -19,7 +17,7 @@ from openslides.core.config import config
from openslides.poll.models import BasePoll
from openslides.utils.auth import get_group_model
from openslides.utils.autoupdate import inform_changed_data
from tests.common_groups import GROUP_ADMIN_PK, GROUP_DELEGATE_PK
from tests.common_groups import GROUP_ADMIN_PK
from tests.count_queries import count_queries
from tests.test_case import TestCase
@ -2649,424 +2647,6 @@ class VoteAssignmentPollPseudoanonymousN(VoteAssignmentPollBaseTestClass):
self.assertFalse(AssignmentVote.objects.exists())
# test autoupdates
class VoteAssignmentPollAutoupdatesBaseClass(TestCase):
poll_type = "" # set by subclass, defines which poll type we use
"""
3 important users:
self.admin: manager, has can_see, can_manage, can_manage_polls (in admin group)
self.user: votes, has can_see perms and in in delegate group
self.other_user: Just has can_see perms and is NOT in the delegate group.
"""
def advancedSetUp(self):
self.delegate_group = get_group_model().objects.get(pk=GROUP_DELEGATE_PK)
self.other_user, _ = self.create_user()
inform_changed_data(self.other_user)
self.user, user_password = self.create_user()
self.user.groups.add(self.delegate_group)
self.user.is_present = True
self.user.save()
self.user_client = APIClient()
self.user_client.login(username=self.user.username, password=user_password)
self.assignment = Assignment.objects.create(
title="test_assignment_" + self._get_random_string(), open_posts=1
)
self.assignment.add_candidate(self.admin)
self.description = "test_description_paiquei5ahpie1wu8ohW"
self.poll = AssignmentPoll.objects.create(
assignment=self.assignment,
title="test_title_" + self._get_random_string(),
pollmethod=AssignmentPoll.POLLMETHOD_YNA,
type=self.poll_type,
state=AssignmentPoll.STATE_STARTED,
onehundred_percent_base=AssignmentPoll.PERCENT_BASE_CAST,
majority_method=AssignmentPoll.MAJORITY_TWO_THIRDS,
description=self.description,
)
self.poll.create_options()
self.poll.groups.add(self.delegate_group)
class VoteAssignmentPollNamedAutoupdates(VoteAssignmentPollAutoupdatesBaseClass):
poll_type = AssignmentPoll.TYPE_NAMED
def test_vote(self):
response = self.user_client.post(
reverse("assignmentpoll-vote", args=[self.poll.pk]), {"data": {"1": "A"}}
)
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
poll = AssignmentPoll.objects.get()
vote = AssignmentVote.objects.get()
# Expect the admin to see the full data in the autoupdate
autoupdate = self.get_last_autoupdate(user=self.admin)
self.assertEqual(
autoupdate[0],
{
"assignments/assignment-poll:1": {
"allow_multiple_votes_per_candidate": False,
"assignment_id": 1,
"global_yes": True,
"global_no": True,
"global_abstain": True,
"amount_global_yes": None,
"amount_global_no": None,
"amount_global_abstain": None,
"groups_id": [GROUP_DELEGATE_PK],
"id": 1,
"options_id": [1],
"pollmethod": AssignmentPoll.POLLMETHOD_YNA,
"state": AssignmentPoll.STATE_STARTED,
"title": self.poll.title,
"description": self.description,
"type": AssignmentPoll.TYPE_NAMED,
"onehundred_percent_base": AssignmentPoll.PERCENT_BASE_CAST,
"majority_method": AssignmentPoll.MAJORITY_TWO_THIRDS,
"min_votes_amount": 1,
"max_votes_amount": 1,
"votescast": "1.000000",
"votesinvalid": "0.000000",
"votesvalid": "1.000000",
"user_has_voted": False,
"user_has_voted_for_delegations": [],
},
"assignments/assignment-option:1": {
"abstain": "1.000000",
"id": 1,
"no": "0.000000",
"poll_id": 1,
"pollstate": AssignmentPoll.STATE_STARTED,
"yes": "0.000000",
"user_id": 1,
"weight": 1,
},
"assignments/assignment-vote:1": {
"id": 1,
"option_id": 1,
"pollstate": AssignmentPoll.STATE_STARTED,
"user_id": self.user.id,
"delegated_user_id": self.user.id,
"value": "A",
"weight": "1.000000",
},
},
)
self.assertEqual(autoupdate[1], [])
# Expect user to receive his vote
autoupdate = self.get_last_autoupdate(user=self.user)
self.assertEqual(
autoupdate[0]["assignments/assignment-vote:1"],
{
"id": 1,
"option_id": 1,
"pollstate": AssignmentPoll.STATE_STARTED,
"user_id": self.user.id,
"delegated_user_id": self.user.id,
"value": "A",
"weight": "1.000000",
},
)
self.assertEqual(autoupdate[1], [])
# Expect non-admins to get a restricted poll update
for user in (self.user, self.other_user):
self.assertAutoupdate(poll, user=user)
autoupdate = self.get_last_autoupdate(user=user)
self.assertEqual(
autoupdate[0]["assignments/assignment-poll:1"],
{
"allow_multiple_votes_per_candidate": False,
"assignment_id": 1,
"global_yes": True,
"global_no": True,
"global_abstain": True,
"pollmethod": AssignmentPoll.POLLMETHOD_YNA,
"state": AssignmentPoll.STATE_STARTED,
"type": AssignmentPoll.TYPE_NAMED,
"onehundred_percent_base": AssignmentPoll.PERCENT_BASE_CAST,
"majority_method": AssignmentPoll.MAJORITY_TWO_THIRDS,
"title": self.poll.title,
"description": self.description,
"groups_id": [GROUP_DELEGATE_PK],
"options_id": [1],
"id": 1,
"min_votes_amount": 1,
"max_votes_amount": 1,
"user_has_voted": user == self.user,
"user_has_voted_for_delegations": [],
},
)
# Other users should not get a vote autoupdate
self.assertNoAutoupdate(vote, user=self.other_user)
self.assertNoDeletedAutoupdate(vote, user=self.other_user)
def test_publish(self):
option = self.poll.options.get()
vote = AssignmentVote.objects.create(user=self.user, option=option)
vote.value = "A"
vote.weight = Decimal("1")
vote.save(no_delete_on_restriction=True, skip_autoupdate=True)
self.poll.voted.add(self.user.id)
self.poll.state = AssignmentPoll.STATE_FINISHED
self.poll.save(skip_autoupdate=True)
response = self.client.post(
reverse("assignmentpoll-publish", args=[self.poll.pk])
)
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
poll = AssignmentPoll.objects.get()
vote = AssignmentVote.objects.get()
# Everyone should get the whole data
for user in (
self.admin,
self.user,
self.other_user,
):
self.assertAutoupdate(poll, user=user)
autoupdate = self.get_last_autoupdate(user=user)
self.assertEqual(
autoupdate[0]["assignments/assignment-poll:1"],
{
"allow_multiple_votes_per_candidate": False,
"amount_global_yes": None,
"amount_global_no": None,
"amount_global_abstain": None,
"assignment_id": 1,
"description": "test_description_paiquei5ahpie1wu8ohW",
"global_yes": True,
"global_no": True,
"global_abstain": True,
"groups_id": [GROUP_DELEGATE_PK],
"id": 1,
"majority_method": "two_thirds",
"onehundred_percent_base": "cast",
"options_id": [1],
"pollmethod": "YNA",
"state": 4,
"title": self.poll.title,
"type": "named",
"min_votes_amount": 1,
"max_votes_amount": 1,
"votescast": "1.000000",
"votesinvalid": "0.000000",
"votesvalid": "1.000000",
"user_has_voted": user == self.user,
"user_has_voted_for_delegations": [],
"voted_id": [self.user.id],
},
)
self.assertEqual(
autoupdate[0]["assignments/assignment-vote:1"],
{
"pollstate": AssignmentPoll.STATE_PUBLISHED,
"id": 1,
"weight": "1.000000",
"value": "A",
"user_id": 3,
"delegated_user_id": None,
"option_id": 1,
},
)
self.assertEqual(
autoupdate[0]["assignments/assignment-option:1"],
{
"abstain": "1.000000",
"id": 1,
"no": "0.000000",
"poll_id": 1,
"pollstate": AssignmentPoll.STATE_PUBLISHED,
"yes": "0.000000",
"user_id": 1,
"weight": 1,
},
)
self.assertIn("users/user:3", autoupdate[0])
class VoteAssignmentPollPseudoanonymousAutoupdates(
VoteAssignmentPollAutoupdatesBaseClass
):
poll_type = AssignmentPoll.TYPE_PSEUDOANONYMOUS
def test_votex(self):
response = self.user_client.post(
reverse("assignmentpoll-vote", args=[self.poll.pk]), {"data": {"1": "A"}}
)
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
poll = AssignmentPoll.objects.get()
vote = AssignmentVote.objects.get()
# Expect the admin to see the full data in the autoupdate
autoupdate = self.get_last_autoupdate(user=self.admin)
# TODO: mypy complains without the Any type; check why and fix it
should_be: Any = {
"assignments/assignment-poll:1": {
"allow_multiple_votes_per_candidate": False,
"assignment_id": 1,
"global_yes": True,
"global_no": True,
"global_abstain": True,
"amount_global_yes": None,
"amount_global_no": None,
"amount_global_abstain": None,
"groups_id": [GROUP_DELEGATE_PK],
"id": 1,
"options_id": [1],
"pollmethod": AssignmentPoll.POLLMETHOD_YNA,
"state": AssignmentPoll.STATE_STARTED,
"title": self.poll.title,
"description": self.description,
"type": AssignmentPoll.TYPE_PSEUDOANONYMOUS,
"user_has_voted": False,
"user_has_voted_for_delegations": [],
"onehundred_percent_base": AssignmentPoll.PERCENT_BASE_CAST,
"majority_method": AssignmentPoll.MAJORITY_TWO_THIRDS,
"min_votes_amount": 1,
"max_votes_amount": 1,
"votescast": "1.000000",
"votesinvalid": "0.000000",
"votesvalid": "1.000000",
},
"assignments/assignment-option:1": {
"abstain": "1.000000",
"id": 1,
"no": "0.000000",
"poll_id": 1,
"pollstate": AssignmentPoll.STATE_STARTED,
"yes": "0.000000",
"user_id": 1,
"weight": 1,
},
"assignments/assignment-vote:1": {
"id": 1,
"option_id": 1,
"pollstate": AssignmentPoll.STATE_STARTED,
"user_id": None,
"delegated_user_id": None,
"value": "A",
"weight": "1.000000",
},
}
self.assertEqual(autoupdate[0], should_be)
self.assertEqual(autoupdate[1], [])
# Expect non-admins to get a restricted poll update and no autoupdate
# for a changed vote nor a deleted one
for user in (self.user, self.other_user):
self.assertAutoupdate(poll, user=user)
autoupdate = self.get_last_autoupdate(user=user)
self.assertEqual(
autoupdate[0]["assignments/assignment-poll:1"],
{
"allow_multiple_votes_per_candidate": False,
"assignment_id": 1,
"global_yes": True,
"global_no": True,
"global_abstain": True,
"pollmethod": AssignmentPoll.POLLMETHOD_YNA,
"state": AssignmentPoll.STATE_STARTED,
"type": AssignmentPoll.TYPE_PSEUDOANONYMOUS,
"onehundred_percent_base": AssignmentPoll.PERCENT_BASE_CAST,
"majority_method": AssignmentPoll.MAJORITY_TWO_THIRDS,
"title": self.poll.title,
"description": self.description,
"groups_id": [GROUP_DELEGATE_PK],
"options_id": [1],
"id": 1,
"min_votes_amount": 1,
"max_votes_amount": 1,
"user_has_voted": user == self.user,
"user_has_voted_for_delegations": [],
},
)
self.assertNoAutoupdate(vote, user=user)
self.assertNoDeletedAutoupdate(vote, user=user)
def test_publish(self):
option = self.poll.options.get()
vote = AssignmentVote.objects.create(option=option)
vote.value = "A"
vote.weight = Decimal("1")
vote.save(no_delete_on_restriction=True, skip_autoupdate=True)
self.poll.voted.add(self.user.id)
self.poll.state = AssignmentPoll.STATE_FINISHED
self.poll.save(skip_autoupdate=True)
response = self.client.post(
reverse("assignmentpoll-publish", args=[self.poll.pk])
)
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
poll = AssignmentPoll.objects.get()
vote = AssignmentVote.objects.get()
# Everyone should get the whole data
for user in (
self.admin,
self.user,
self.other_user,
):
self.assertAutoupdate(poll, user=user)
autoupdate = self.get_last_autoupdate(user=user)
self.assertEqual(
autoupdate[0],
{
"assignments/assignment-poll:1": {
"allow_multiple_votes_per_candidate": False,
"amount_global_yes": None,
"amount_global_no": None,
"amount_global_abstain": None,
"assignment_id": 1,
"description": "test_description_paiquei5ahpie1wu8ohW",
"global_yes": True,
"global_no": True,
"global_abstain": True,
"groups_id": [GROUP_DELEGATE_PK],
"id": 1,
"majority_method": "two_thirds",
"onehundred_percent_base": "cast",
"options_id": [1],
"pollmethod": "YNA",
"state": 4,
"title": self.poll.title,
"type": AssignmentPoll.TYPE_PSEUDOANONYMOUS,
"min_votes_amount": 1,
"max_votes_amount": 1,
"votescast": "1.000000",
"votesinvalid": "0.000000",
"votesvalid": "1.000000",
"user_has_voted": user == self.user,
"user_has_voted_for_delegations": [],
"voted_id": [self.user.id],
},
"assignments/assignment-vote:1": {
"pollstate": AssignmentPoll.STATE_PUBLISHED,
"id": 1,
"weight": "1.000000",
"value": "A",
"user_id": None,
"delegated_user_id": None,
"option_id": 1,
},
"assignments/assignment-option:1": {
"abstain": "1.000000",
"id": 1,
"no": "0.000000",
"poll_id": 1,
"pollstate": AssignmentPoll.STATE_PUBLISHED,
"yes": "0.000000",
"user_id": 1,
"weight": 1,
},
},
)
class PseudoanonymizeAssignmentPoll(TestCase):
def advancedSetUp(self):
self.assignment = Assignment.objects.create(

View File

@ -12,20 +12,6 @@ from openslides.utils.rest_api import ValidationError
from tests.test_case import TestCase
@pytest.mark.django_db(transaction=False)
def test_slide_on_default_projector(client):
client.login(username="admin", password="admin")
client.put(
reverse("projector-detail", args=["1"]),
{"elements": [{"name": "topics/topic", "id": 1}]},
content_type="application/json",
)
response = client.get(reverse("projector-detail", args=["1"]))
assert response.status_code == 200
@pytest.mark.django_db(transaction=False)
def test_invalid_element_non_existing_slide(client):
client.login(username="admin", password="admin")

View File

@ -213,9 +213,6 @@ class TestCreation(TestCase):
self.assertFalse(Mediafile.objects.exists())
# TODO: List and retrieve
class TestUpdate(TestCase):
"""
Tree:

View File

@ -20,9 +20,8 @@ from openslides.motions.models import (
Workflow,
)
from openslides.poll.models import BasePoll
from openslides.utils.auth import get_group_model
from openslides.utils.autoupdate import inform_changed_data
from tests.common_groups import GROUP_ADMIN_PK, GROUP_DEFAULT_PK, GROUP_DELEGATE_PK
from tests.common_groups import GROUP_ADMIN_PK, GROUP_DELEGATE_PK
from tests.count_queries import count_queries
from tests.test_case import TestCase
@ -120,78 +119,6 @@ class CreateMotion(TestCase):
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
motion = Motion.objects.get()
changed_autoupdate, deleted_autoupdate = self.get_last_autoupdate()
del changed_autoupdate["motions/motion:1"]["created"]
del changed_autoupdate["motions/motion:1"]["last_modified"]
self.assertEqual(
changed_autoupdate,
{
"agenda/list-of-speakers:1": {
"id": 1,
"title_information": {
"title": "test_title_OoCoo3MeiT9li5Iengu9",
"identifier": "1",
},
"speakers": [],
"closed": False,
"content_object": {"collection": "motions/motion", "id": 1},
},
"motions/motion:1": {
"id": 1,
"identifier": "1",
"title": "test_title_OoCoo3MeiT9li5Iengu9",
"text": "test_text_thuoz0iecheiheereiCi",
"amendment_paragraphs": None,
"amendments_id": [],
"modified_final_version": "",
"reason": "",
"parent_id": None,
"category_id": None,
"category_weight": 10000,
"comments": [],
"motion_block_id": None,
"origin": "",
"submitters": [
{"id": 1, "user_id": 1, "motion_id": 1, "weight": 1}
],
"supporters_id": [],
"state_id": 1,
"state_extension": None,
"state_restriction": [],
"statute_paragraph_id": None,
"workflow_id": 1,
"recommendation_id": None,
"recommendation_extension": None,
"tags_id": [],
"attachments_id": [],
"agenda_item_id": 1,
"list_of_speakers_id": 1,
"sort_parent_id": None,
"weight": 10000,
"change_recommendations_id": [],
},
"agenda/item:1": {
"id": 1,
"item_number": "",
"title_information": {
"title": "test_title_OoCoo3MeiT9li5Iengu9",
"identifier": "1",
},
"comment": None,
"closed": False,
"type": 3,
"is_internal": False,
"is_hidden": True,
"duration": None,
"content_object": {"collection": "motions/motion", "id": 1},
"weight": 10000,
"parent_id": None,
"level": 0,
"tags_id": [],
},
},
)
self.assertEqual(deleted_autoupdate, [])
self.assertEqual(motion.title, "test_title_OoCoo3MeiT9li5Iengu9")
self.assertEqual(motion.identifier, "1")
self.assertTrue(motion.submitters.exists())
@ -412,92 +339,6 @@ class CreateMotion(TestCase):
return Motion.objects.get(pk=int(response.data["id"]))
class RetrieveMotion(TestCase):
"""
Tests retrieving a motion
"""
def setUp(self):
self.client = APIClient()
self.client.login(username="admin", password="admin")
self.motion = Motion(
title="test_title_uj5eeSiedohSh3ohyaaj",
text="test_text_ithohchaeThohmae5aug",
)
self.motion.save()
for index in range(10):
get_user_model().objects.create_user(
username=f"user_{index}", password="password"
)
def test_guest_state_with_restriction(self):
config["general_system_enable_anonymous"] = True
guest_client = APIClient()
state = self.motion.state
state.restriction = ["motions.can_manage"]
state.save()
# The cache has to be cleared, see:
# https://github.com/OpenSlides/OpenSlides/issues/3396
inform_changed_data(self.motion)
response = guest_client.get(reverse("motion-detail", args=[self.motion.pk]))
self.assertEqual(response.status_code, 404)
def test_admin_state_with_restriction(self):
state = self.motion.state
state.restriction = ["motions.can_manage"]
state.save()
response = self.client.get(reverse("motion-detail", args=[self.motion.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_submitter_state_with_restriction(self):
state = self.motion.state
state.restriction = ["is_submitter"]
state.save()
user = get_user_model().objects.create_user(
username="username_ohS2opheikaSa5theijo",
password="password_kau4eequaisheeBateef",
)
Submitter.objects.add(user, self.motion)
submitter_client = APIClient()
submitter_client.force_login(user)
response = submitter_client.get(reverse("motion-detail", args=[self.motion.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_user_without_can_see_user_permission_to_see_motion_and_submitter_data(
self,
):
admin = get_user_model().objects.get(username="admin")
Submitter.objects.add(admin, self.motion)
group = get_group_model().objects.get(
pk=GROUP_DEFAULT_PK
) # Group with pk 1 is for anonymous and default users.
permission_string = "users.can_see_name"
app_label, codename = permission_string.split(".")
permission = group.permissions.get(
content_type__app_label=app_label, codename=codename
)
group.permissions.remove(permission)
config["general_system_enable_anonymous"] = True
guest_client = APIClient()
inform_changed_data(group)
inform_changed_data(self.motion)
response_1 = guest_client.get(reverse("motion-detail", args=[self.motion.pk]))
self.assertEqual(response_1.status_code, status.HTTP_200_OK)
submitter_id = response_1.data["submitters"][0]["user_id"]
response_2 = guest_client.get(reverse("user-detail", args=[submitter_id]))
self.assertEqual(response_2.status_code, status.HTTP_200_OK)
extra_user = get_user_model().objects.create_user(
username="username_wequePhieFoom0hai3wa",
password="password_ooth7taechai5Oocieya",
)
response_3 = guest_client.get(reverse("user-detail", args=[extra_user.pk]))
self.assertEqual(response_3.status_code, 404)
class UpdateMotion(TestCase):
"""
Tests updating motions.
@ -519,9 +360,6 @@ class UpdateMotion(TestCase):
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
motion = Motion.objects.get()
self.assertAutoupdate(motion)
self.assertAutoupdate(motion.agenda_item)
self.assertAutoupdate(motion.list_of_speakers)
self.assertEqual(motion.title, "test_title_aeng7ahChie3waiR8xoh")
self.assertEqual(motion.identifier, "test_identifier_jieseghohj7OoSah1Ko9")

View File

@ -134,43 +134,6 @@ class CreateMotionPoll(TestCase):
poll = MotionPoll.objects.get()
self.assertEqual(poll.pollmethod, "YNA")
def test_autoupdate(self):
response = self.client.post(
reverse("motionpoll-list"),
{
"title": "test_title_9Ce8OsdB8YWTVm5YOzqH",
"pollmethod": "YNA",
"type": "named",
"motion_id": self.motion.id,
"onehundred_percent_base": "YN",
"majority_method": "simple",
},
)
self.assertHttpStatusVerbose(response, status.HTTP_201_CREATED)
autoupdate = self.get_last_autoupdate(user=self.admin)
self.assertEqual(
autoupdate[0]["motions/motion-poll:1"],
{
"motion_id": 1,
"pollmethod": MotionPoll.POLLMETHOD_YNA,
"state": MotionPoll.STATE_CREATED,
"type": MotionPoll.TYPE_NAMED,
"title": "test_title_9Ce8OsdB8YWTVm5YOzqH",
"onehundred_percent_base": MotionPoll.PERCENT_BASE_YN,
"majority_method": MotionPoll.MAJORITY_SIMPLE,
"groups_id": [],
"votesvalid": "0.000000",
"votesinvalid": "0.000000",
"votescast": "0.000000",
"options_id": [1],
"id": 1,
"user_has_voted": False,
"user_has_voted_for_delegations": [],
},
)
self.assertEqual(autoupdate[1], [])
def test_missing_keys(self):
complete_request_data = {
"title": "test_title_OoCh9aitaeyaeth8nom1",
@ -631,7 +594,6 @@ class VoteMotionPollAnalog(TestCase):
self.assertEqual(option.yes, Decimal("1"))
self.assertEqual(option.no, Decimal("2.35"))
self.assertEqual(option.abstain, Decimal("-1"))
self.assertAutoupdate(poll)
def test_vote_no_permissions(self):
self.start_poll()
@ -924,13 +886,6 @@ class VoteMotionPollNamed(TestCase):
self.assertEqual(vote.user, self.user)
self.assertEqual(vote.delegated_user, self.admin)
autoupdate = self.get_last_autoupdate(user=self.admin)
self.assertIn("motions/motion-poll:1", autoupdate[0])
self.assertEqual(
autoupdate[0]["motions/motion-poll:1"]["user_has_voted_for_delegations"],
[self.user.pk],
)
def test_vote_delegation_and_self_vote(self):
self.test_vote_delegation()
response = self.client.post(
@ -1015,262 +970,6 @@ class VoteMotionPollNamed(TestCase):
self.assertFalse(MotionPoll.objects.get().get_votes().exists())
class VoteMotionPollNamedAutoupdates(TestCase):
"""3 important users:
self.admin: manager, has can_see, can_manage, can_manage_polls (in admin group)
self.user1: votes, has can_see perms and in in delegate group
self.other_user: Just has can_see perms and is NOT in the delegate group.
"""
def advancedSetUp(self):
self.motion = Motion(
title="test_title_OoK9IeChe2Jeib9Deeji",
text="test_text_eichui1oobiSeit9aifo",
)
self.motion.save()
self.delegate_group = get_group_model().objects.get(pk=GROUP_DELEGATE_PK)
self.other_user, _ = self.create_user()
inform_changed_data(self.other_user)
self.user, user_password = self.create_user()
self.user.groups.add(self.delegate_group)
self.user.is_present = True
self.user.save()
self.user_client = APIClient()
self.user_client.login(username=self.user.username, password=user_password)
self.poll = MotionPoll.objects.create(
motion=self.motion,
title="test_title_tho8PhiePh8upaex6phi",
pollmethod="YNA",
type=BasePoll.TYPE_NAMED,
state=MotionPoll.STATE_STARTED,
onehundred_percent_base="YN",
majority_method="simple",
)
self.poll.create_options()
self.poll.groups.add(self.delegate_group)
def test_vote(self):
response = self.user_client.post(
reverse("motionpoll-vote", args=[self.poll.pk]), {"data": "A"}
)
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
poll = MotionPoll.objects.get()
vote = MotionVote.objects.get()
# Expect the admin to see the full data in the autoupdate
autoupdate = self.get_last_autoupdate(user=self.admin)
self.assertEqual(
autoupdate[0],
{
"motions/motion-poll:1": {
"motion_id": 1,
"pollmethod": "YNA",
"state": 2,
"type": "named",
"title": "test_title_tho8PhiePh8upaex6phi",
"onehundred_percent_base": "YN",
"majority_method": "simple",
"groups_id": [GROUP_DELEGATE_PK],
"votesvalid": "1.000000",
"votesinvalid": "0.000000",
"votescast": "1.000000",
"options_id": [1],
"id": 1,
"user_has_voted": False,
"user_has_voted_for_delegations": [],
},
"motions/motion-vote:1": {
"pollstate": 2,
"id": 1,
"weight": "1.000000",
"value": "A",
"user_id": self.user.id,
"delegated_user_id": self.user.id,
"option_id": 1,
},
"motions/motion-option:1": {
"abstain": "1.000000",
"id": 1,
"no": "0.000000",
"poll_id": 1,
"pollstate": 2,
"yes": "0.000000",
},
},
)
self.assertEqual(autoupdate[1], [])
# Expect user1 to receive his vote
autoupdate = self.get_last_autoupdate(user=self.user)
self.assertEqual(
autoupdate[0]["motions/motion-vote:1"],
{
"pollstate": 2,
"option_id": 1,
"id": 1,
"weight": "1.000000",
"value": "A",
"user_id": self.user.id,
"delegated_user_id": self.user.id,
},
)
self.assertEqual(
autoupdate[0]["motions/motion-option:1"],
{"id": 1, "poll_id": 1, "pollstate": 2},
)
self.assertEqual(autoupdate[1], [])
# Expect non-admins to get a restricted poll update
for user in (self.user, self.other_user):
self.assertAutoupdate(poll, user=user)
autoupdate = self.get_last_autoupdate(user=user)
self.assertEqual(
autoupdate[0]["motions/motion-poll:1"],
{
"motion_id": 1,
"pollmethod": "YNA",
"state": 2,
"type": "named",
"title": "test_title_tho8PhiePh8upaex6phi",
"onehundred_percent_base": "YN",
"majority_method": "simple",
"groups_id": [GROUP_DELEGATE_PK],
"options_id": [1],
"id": 1,
"user_has_voted": user == self.user,
"user_has_voted_for_delegations": [],
},
)
self.assertEqual(
autoupdate[0]["motions/motion-option:1"],
{
"id": 1,
"poll_id": 1,
"pollstate": 2,
}, # noqa black and flake are no friends :(
)
# Other users should not get a vote autoupdate
self.assertNoAutoupdate(vote, user=self.other_user)
self.assertNoDeletedAutoupdate(vote, user=self.other_user)
class VoteMotionPollPseudoanonymousAutoupdates(TestCase):
"""3 important users:
self.admin: manager, has can_see, can_manage, can_manage_polls (in admin group)
self.user: votes, has can_see perms and in in delegate group
self.other_user: Just has can_see perms and is NOT in the delegate group.
"""
def advancedSetUp(self):
self.motion = Motion(
title="test_title_OoK9IeChe2Jeib9Deeji",
text="test_text_eichui1oobiSeit9aifo",
)
self.motion.save()
self.delegate_group = get_group_model().objects.get(pk=GROUP_DELEGATE_PK)
self.other_user, _ = self.create_user()
inform_changed_data(self.other_user)
self.user, user_password = self.create_user()
self.user.groups.add(self.delegate_group)
self.user.is_present = True
self.user.save()
self.user_client = APIClient()
self.user_client.login(username=self.user.username, password=user_password)
self.poll = MotionPoll.objects.create(
motion=self.motion,
title="test_title_cahP1umooteehah2jeey",
pollmethod="YNA",
type=BasePoll.TYPE_PSEUDOANONYMOUS,
state=MotionPoll.STATE_STARTED,
onehundred_percent_base="YN",
majority_method="simple",
)
self.poll.create_options()
self.poll.groups.add(self.delegate_group)
def test_vote(self):
response = self.user_client.post(
reverse("motionpoll-vote", args=[self.poll.pk]), {"data": "A"}
)
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
poll = MotionPoll.objects.get()
vote = MotionVote.objects.get()
# Expect the admin to see the full data in the autoupdate
autoupdate = self.get_last_autoupdate(user=self.admin)
self.assertEqual(
autoupdate[0],
{
"motions/motion-poll:1": {
"motion_id": 1,
"pollmethod": "YNA",
"state": 2,
"type": "pseudoanonymous",
"title": "test_title_cahP1umooteehah2jeey",
"onehundred_percent_base": "YN",
"majority_method": "simple",
"groups_id": [GROUP_DELEGATE_PK],
"votesvalid": "1.000000",
"votesinvalid": "0.000000",
"votescast": "1.000000",
"options_id": [1],
"id": 1,
"user_has_voted": False,
"user_has_voted_for_delegations": [],
},
"motions/motion-vote:1": {
"pollstate": 2,
"option_id": 1,
"id": 1,
"weight": "1.000000",
"value": "A",
"user_id": None,
"delegated_user_id": None,
},
"motions/motion-option:1": {
"abstain": "1.000000",
"id": 1,
"no": "0.000000",
"poll_id": 1,
"pollstate": 2,
"yes": "0.000000",
},
},
)
self.assertEqual(autoupdate[1], [])
# Expect non-admins to get a restricted poll update and no autoupdate
# for a changed vote nor a deleted one
for user in (self.user, self.other_user):
self.assertAutoupdate(poll, user=user)
autoupdate = self.get_last_autoupdate(user=user)
self.assertEqual(
autoupdate[0]["motions/motion-poll:1"],
{
"motion_id": 1,
"pollmethod": "YNA",
"state": 2,
"type": "pseudoanonymous",
"title": "test_title_cahP1umooteehah2jeey",
"onehundred_percent_base": "YN",
"majority_method": "simple",
"groups_id": [GROUP_DELEGATE_PK],
"options_id": [1],
"id": 1,
"user_has_voted": user == self.user,
"user_has_voted_for_delegations": [],
},
)
self.assertNoAutoupdate(vote, user=user)
self.assertNoDeletedAutoupdate(vote, user=user)
class VoteMotionPollPseudoanonymous(TestCase):
def setUp(self):
self.client = APIClient()
@ -1473,51 +1172,6 @@ class PublishMotionPoll(TestCase):
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
self.assertEqual(MotionPoll.objects.get().state, MotionPoll.STATE_PUBLISHED)
# Test autoupdates: Every user should get the full data
for user in (self.admin, self.user):
autoupdate = self.get_last_autoupdate(user=user)
self.assertEqual(
autoupdate[0],
{
"motions/motion-poll:1": {
"motion_id": 1,
"pollmethod": "YNA",
"state": 4,
"type": "pseudoanonymous",
"title": "test_title_Nufae0iew7Iorox2thoo",
"onehundred_percent_base": "YN",
"majority_method": "simple",
"groups_id": [],
"votesvalid": "0.000000",
"votesinvalid": "0.000000",
"votescast": "0.000000",
"options_id": [1],
"id": 1,
"user_has_voted": False,
"user_has_voted_for_delegations": [],
"voted_id": [],
},
"motions/motion-vote:1": {
"pollstate": 4,
"option_id": 1,
"id": 1,
"weight": "2.000000",
"value": "N",
"user_id": None,
"delegated_user_id": None,
},
"motions/motion-option:1": {
"abstain": "0.000000",
"id": 1,
"no": "2.000000",
"poll_id": 1,
"pollstate": 4,
"yes": "0.000000",
},
},
)
self.assertEqual(autoupdate[1], [])
def test_publish_wrong_state(self):
response = self.client.post(reverse("motionpoll-publish", args=[self.poll.pk]))
self.assertHttpStatusVerbose(response, status.HTTP_400_BAD_REQUEST)
@ -1638,16 +1292,6 @@ class ResetMotionPoll(TestCase):
self.assertEqual(option.abstain, Decimal("0"))
self.assertFalse(option.votes.exists())
def test_deleted_autoupdate(self):
response = self.client.post(reverse("motionpoll-reset", args=[self.poll.pk]))
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
poll = MotionPoll.objects.get()
option = poll.options.get()
self.assertAutoupdate(option, self.admin)
for user in (self.admin, self.user1, self.user2):
self.assertDeletedAutoupdate(self.vote1, user=user)
self.assertDeletedAutoupdate(self.vote2, user=user)
class TestMotionPollWithVoteDelegationAutoupdate(TestCase):
def advancedSetUp(self):
@ -1685,7 +1329,3 @@ class TestMotionPollWithVoteDelegationAutoupdate(TestCase):
def test_start_poll(self):
response = self.client.post(reverse("motionpoll-start", args=[self.poll.pk]))
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
# other_user has to receive an autoupdate because he was delegated
autoupdate = self.get_last_autoupdate(user=self.other_user)
assert "motions/motion-poll:1" in autoupdate[0]

View File

@ -1,22 +0,0 @@
from django.test.client import Client
from openslides.core.config import config
from openslides.motions.models import Motion
from tests.test_case import TestCase
class AnonymousRequests(TestCase):
"""
Tests requests from the anonymous user.
"""
def setUp(self):
self.client = Client()
config["general_system_enable_anonymous"] = True
def test_motion_detail(self):
Motion.objects.create(title="test_motion")
response = self.client.get("/motions/1/")
self.assertEqual(response.status_code, 200)

View File

@ -9,7 +9,6 @@ from openslides.motions.models import (
Category,
Motion,
MotionBlock,
MotionChangeRecommendation,
MotionComment,
MotionCommentSection,
State,
@ -130,16 +129,6 @@ class TestStatuteParagraphs(TestCase):
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_retrieve_simple(self):
self.create_statute_paragraph()
response = self.client.get(
reverse("statuteparagraph-detail", args=[self.cp.pk])
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
sorted(response.data.keys()), sorted(("id", "title", "text", "weight"))
)
def test_update_simple(self):
self.create_statute_paragraph()
response = self.client.patch(
@ -236,37 +225,6 @@ class ManageComments(TestCase):
self.section_read_write.read_groups.add(self.group_in)
self.section_read_write.write_groups.add(self.group_in)
def test_retrieve_comment(self):
comment = MotionComment(
motion=self.motion,
section=self.section_read_write,
comment="test_comment_gwic37Csc&3lf3eo2",
)
comment.save()
response = self.client.get(reverse("motion-detail", args=[self.motion.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue("comments" in response.data)
comments = response.data["comments"]
self.assertTrue(isinstance(comments, list))
self.assertEqual(len(comments), 1)
self.assertEqual(comments[0]["comment"], "test_comment_gwic37Csc&3lf3eo2")
def test_retrieve_comment_no_read_permission(self):
comment = MotionComment(
motion=self.motion,
section=self.section_no_groups,
comment="test_comment_fgkj3C7veo3ijWE(j2DJ",
)
comment.save()
response = self.client.get(reverse("motion-detail", args=[self.motion.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue("comments" in response.data)
comments = response.data["comments"]
self.assertTrue(isinstance(comments, list))
self.assertEqual(len(comments), 0)
def test_wrong_data_type(self):
response = self.client.post(
reverse("motion-manage-comments", args=[self.motion.pk]), None
@ -428,58 +386,6 @@ class TestMotionCommentSection(TestCase):
pk=GROUP_DELEGATE_PK
) # The admin should not be in this group
def test_retrieve(self):
"""
Checks, if the sections can be seen by a manager.
"""
section = MotionCommentSection(name="test_name_f3jOF3m8fp.<qiqmf32=")
section.save()
response = self.client.get(reverse("motioncommentsection-list"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(isinstance(response.data, list))
self.assertEqual(len(response.data), 1)
self.assertEqual(response.data[0]["name"], "test_name_f3jOF3m8fp.<qiqmf32=")
def test_retrieve_non_manager_with_read_permission(self):
"""
Checks, if the sections can be seen by a non manager, but he is in
one of the read_groups.
"""
self.admin.groups.remove(
self.group_in
) # group_in has motions.can_manage permission
self.admin.groups.add(self.group_out) # group_out does not.
inform_changed_data(self.admin)
section = MotionCommentSection(name="test_name_f3mMD28LMcm29Coelwcm")
section.save()
section.read_groups.add(self.group_out, self.group_in)
inform_changed_data(section)
response = self.client.get(reverse("motioncommentsection-list"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 1)
self.assertEqual(response.data[0]["name"], "test_name_f3mMD28LMcm29Coelwcm")
def test_retrieve_non_manager_no_read_permission(self):
"""
Checks, if sections are removed, if the user is a non manager and is in
any of the read_groups.
"""
self.admin.groups.remove(self.group_in)
inform_changed_data(self.admin)
section = MotionCommentSection(name="test_name_f3jOF3m8fp.<qiqmf32=")
section.save()
section.read_groups.add(self.group_out)
response = self.client.get(reverse("motioncommentsection-list"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(isinstance(response.data, list))
self.assertEqual(len(response.data), 0)
def test_create(self):
"""
Create a section just with a name.
@ -759,55 +665,6 @@ class TestMotionCommentSectionSorting(TestCase):
self.assertEqual(section.weight, 10000)
class RetrieveMotionChangeRecommendation(TestCase):
"""
Tests retrieving motion change recommendations.
"""
def setUp(self):
self.client = APIClient()
self.client.login(username="admin", password="admin")
motion = Motion(
title="test_title_3kd)K23,c9239mdj2wcG",
text="test_text_f8FLP,gvprC;wovVEwlQ",
)
motion.save()
self.public_cr = MotionChangeRecommendation(
motion=motion, internal=False, line_from=1, line_to=1
)
self.public_cr.save()
self.internal_cr = MotionChangeRecommendation(
motion=motion, internal=True, line_from=2, line_to=2
)
self.internal_cr.save()
def test_simple(self):
"""
Test retrieving all change recommendations.
"""
response = self.client.get(reverse("motionchangerecommendation-list"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 2)
def test_non_admin(self):
"""
Test retrieving of all change recommendations that are public, if the user
has no manage perms.
"""
self.admin = get_user_model().objects.get(username="admin")
self.admin.groups.add(GROUP_DELEGATE_PK)
self.admin.groups.remove(GROUP_ADMIN_PK)
inform_changed_data(self.admin)
response = self.client.get(reverse("motionchangerecommendation-list"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 1)
self.assertEqual(response.data[0]["id"], self.public_cr.id)
class CreateMotionChangeRecommendation(TestCase):
"""
Tests motion change recommendation creation.
@ -1108,38 +965,6 @@ class TestMotionBlock(TestCase):
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertFalse(MotionBlock.objects.exists())
def test_retrieve_simple(self):
motion_block = MotionBlock(title="test_title")
motion_block.save()
response = self.client.get(
reverse("motionblock-detail", args=[motion_block.pk])
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
sorted(response.data.keys()),
sorted(
(
"agenda_item_id",
"id",
"internal",
"list_of_speakers_id",
"title",
"motions_id",
)
),
)
def test_retrieve_internal_non_admin(self):
self.make_admin_delegate()
motion_block = MotionBlock(title="test_title", internal=True)
motion_block.save()
response = self.client.get(
reverse("motionblock-detail", args=[motion_block.pk])
)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
class FollowRecommendationsForMotionBlock(TestCase):
"""

View File

@ -46,42 +46,6 @@ def test_group_db_queries():
assert count_queries(Group.get_elements)() == 2
class UserGetTest(TestCase):
"""
Tests to receive a users via REST API.
"""
def test_get_with_user_who_is_in_group_with_pk_1(self):
"""
It is invalid, that a user is in the group with the pk 1. But if the
database is invalid, the user should nevertheless be received.
"""
admin = User.objects.get(username="admin")
group1 = Group.objects.get(pk=1)
admin.groups.add(group1)
self.client.login(username="admin", password="admin")
response = self.client.get("/rest/users/user/1/")
self.assertEqual(response.status_code, 200)
def test_get_with_user_without_permissions(self):
group = Group.objects.get(pk=1)
permission_string = "users.can_see_name"
app_label, codename = permission_string.split(".")
permission = group.permissions.get(
content_type__app_label=app_label, codename=codename
)
group.permissions.remove(permission)
inform_changed_data(group)
config["general_system_enable_anonymous"] = True
guest_client = APIClient()
response = guest_client.get("/rest/users/user/1/")
self.assertEqual(response.status_code, 404)
class UserCreate(TestCase):
"""
Tests creation of users via REST API.
@ -379,26 +343,6 @@ class UserUpdate(TestCase):
admin = User.objects.get(pk=self.admin.pk)
self.assertIsNone(admin.vote_delegated_to_id)
def test_update_vote_delegation_autoupdate(self):
self.setup_vote_delegation()
response = self.client.patch(
reverse("user-detail", args=[self.user.pk]),
{"vote_delegated_to_id": self.admin.pk},
)
self.assertHttpStatusVerbose(response, status.HTTP_200_OK)
autoupdate = self.get_last_autoupdate(user=self.admin)
user_au = autoupdate[0].get(f"users/user:{self.user.pk}")
self.assertIsNotNone(user_au)
self.assertEqual(user_au["vote_delegated_to_id"], self.admin.pk)
user2_au = autoupdate[0].get(f"users/user:{self.user2.pk}")
self.assertIsNotNone(user2_au)
self.assertEqual(user2_au["vote_delegated_from_users_id"], [])
admin_au = autoupdate[0].get(f"users/user:{self.admin.pk}")
self.assertIsNotNone(admin_au)
self.assertEqual(admin_au["vote_delegated_from_users_id"], [self.user.pk])
self.assertEqual(autoupdate[1], [])
def test_update_vote_delegated_from(self):
self.setup_vote_delegation()
response = self.client.patch(
@ -864,60 +808,6 @@ class UserSendIntivationEmail(TestCase):
self.assertEqual(mail.outbox[0].to[0], self.email)
class GroupMetadata(TestCase):
def test_options_request_as_anonymous_user_activated(self):
config["general_system_enable_anonymous"] = True
response = self.client.options("/rest/users/group/")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["name"], "Group List")
perm_list = response.data["actions"]["POST"]["permissions"]["choices"]
self.assertEqual(type(perm_list), list)
for item in perm_list:
self.assertEqual(type(item), dict)
self.assertTrue(item.get("display_name") is not None)
self.assertTrue(item.get("value") is not None)
class GroupReceive(TestCase):
def setUp(self):
pass
def test_get_groups_as_anonymous_deactivated(self):
"""
Test to get the groups with an anonymous user, when they are deactivated.
"""
response = self.client.get("/rest/users/group/")
self.assertEqual(response.status_code, 403)
def test_get_groups_as_anonymous_user_activated(self):
"""
Test to get the groups with an anonymous user, when they are activated.
"""
config["general_system_enable_anonymous"] = True
response = self.client.get("/rest/users/group/")
self.assertEqual(response.status_code, 200)
def test_logged_in_user_with_no_permission(self):
"""
Test to get the groups with an logged in user with no permissions.
"""
user = User(username="test")
user.set_password("test")
user.save()
default_group = Group.objects.get(pk=GROUP_DEFAULT_PK)
default_group.permissions.all().delete()
self.client.login(username="test", password="test")
response = self.client.get("/rest/users/group/")
self.assertEqual(response.status_code, 200)
class GroupCreate(TestCase):
"""
Tests creation of groups via REST API.
@ -1158,17 +1048,6 @@ class PersonalNoteTest(TestCase):
def setUp(self):
self.admin = User.objects.get(username="admin")
def test_anonymous_without_personal_notes(self):
personal_note = PersonalNote.objects.create(
user=self.admin, notes='["admin_personal_note_OoGh8choro0oosh0roob"]'
)
config["general_system_enable_anonymous"] = True
guest_client = APIClient()
response = guest_client.get(
reverse("personalnote-detail", args=[personal_note.pk])
)
self.assertEqual(response.status_code, 404)
def test_create(self):
admin_client = APIClient()
admin_client.login(username="admin", password="admin")

View File

@ -1,15 +1,12 @@
import random
import string
from asgiref.sync import async_to_sync
from django.contrib.auth import get_user_model
from django.test import TestCase as _TestCase
from rest_framework.test import APIClient
from openslides.core.config import config
from openslides.utils.autoupdate import inform_changed_data
from openslides.utils.cache import element_cache
from openslides.utils.utils import get_element_id
from tests.common_groups import GROUP_ADMIN_PK, GROUP_DELEGATE_PK
from tests.count_queries import AssertNumQueriesContext
@ -34,45 +31,6 @@ class TestCase(_TestCase):
Adds testing for autoupdates after requests.
"""
def get_last_autoupdate(self, user=None):
"""
Get the last autoupdate as (changed_data, deleted_element_ids) for the given user.
changed_elements is a dict with element_ids as keys and the actual element as value
user_id=None if for full data, 0 for the anonymous and regular ids for users.
"""
user_id = None if user is None else user.id
current_change_id = async_to_sync(element_cache.get_current_change_id)()
_, _changed_elements, deleted_element_ids = async_to_sync(
element_cache.get_data_since
)(user_id=user_id, change_id=current_change_id)
changed_elements = {}
for collection, elements in _changed_elements.items():
for element in elements:
changed_elements[get_element_id(collection, element["id"])] = element
return (changed_elements, deleted_element_ids)
def assertAutoupdate(self, model, user=None):
self.assertTrue(
model.get_element_id() in self.get_last_autoupdate(user=user)[0]
)
def assertDeletedAutoupdate(self, model, user=None):
self.assertTrue(
model.get_element_id() in self.get_last_autoupdate(user=user)[1]
)
def assertNoAutoupdate(self, model, user=None):
self.assertFalse(
model.get_element_id() in self.get_last_autoupdate(user=user)[0]
)
def assertNoDeletedAutoupdate(self, model, user=None):
self.assertFalse(
model.get_element_id() in self.get_last_autoupdate(user=user)[1]
)
def assertNumQueries(self, num, func=None, *args, verbose=False, **kwargs):
context = AssertNumQueriesContext(self, num, verbose)
if func is None:

View File

@ -3,7 +3,7 @@ from typing import Any, Dict, List
import pytest
from openslides.utils.cache import ChangeIdTooLowError, ElementCache
from openslides.utils.cache import ElementCache
from .cache_provider import TTestCacheProvider, example_data, get_cachable_provider
@ -139,84 +139,6 @@ async def test_get_all_data_from_redis(element_cache):
assert sort_dict(result) == sort_dict(example_data())
@pytest.mark.asyncio
async def test_get_data_since_change_id_0(element_cache):
element_cache.cache_provider.full_data = {
"app/collection1:1": '{"id": 1, "value": "value1"}',
"app/collection1:2": '{"id": 2, "value": "value2"}',
"app/collection2:1": '{"id": 1, "key": "value1"}',
"app/collection2:2": '{"id": 2, "key": "value2"}',
"app/personalized-collection:1": '{"id": 1, "key": "value1", "user_id": 1}',
"app/personalized-collection:2": '{"id": 2, "key": "value2", "user_id": 2}',
}
(
max_change_id,
changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(None, 0)
assert sort_dict(changed_elements) == sort_dict(example_data())
assert max_change_id == 0
@pytest.mark.asyncio
async def test_get_data_since_change_id_lower_than_in_redis(element_cache):
element_cache.cache_provider.full_data = {
"app/collection1:1": '{"id": 1, "value": "value1"}',
"app/collection1:2": '{"id": 2, "value": "value2"}',
"app/collection2:1": '{"id": 1, "key": "value1"}',
"app/collection2:2": '{"id": 2, "key": "value2"}',
}
element_cache.cache_provider.default_change_id = 2
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
with pytest.raises(ChangeIdTooLowError):
await element_cache.get_data_since(None, 1)
@pytest.mark.asyncio
async def test_get_data_since_change_id_data_in_redis(element_cache):
element_cache.cache_provider.full_data = {
"app/collection1:1": '{"id": 1, "value": "value1"}',
"app/collection1:2": '{"id": 2, "value": "value2"}',
"app/collection2:1": '{"id": 1, "key": "value1"}',
"app/collection2:2": '{"id": 2, "key": "value2"}',
}
element_cache.cache_provider.change_id_data = {
1: {"app/collection1:1", "app/collection1:3"}
}
result = await element_cache.get_data_since(None, 1)
assert result == (
1,
{"app/collection1": [{"id": 1, "value": "value1"}]},
["app/collection1:3"],
)
@pytest.mark.asyncio
async def test_get_data_since_change_id_data_in_db(element_cache):
element_cache.cache_provider.change_id_data = {
1: {"app/collection1:1", "app/collection1:3"}
}
result = await element_cache.get_data_since(None, 1)
assert result == (
1,
{"app/collection1": [{"id": 1, "value": "value1"}]},
["app/collection1:3"],
)
@pytest.mark.asyncio
async def test_get_gata_since_change_id_data_in_db_empty_change_id(element_cache):
result = await element_cache.get_data_since(None, 1)
assert result == (0, {}, [])
@pytest.mark.asyncio
async def test_get_element_data_empty_redis(element_cache):
result = await element_cache.get_element_data("app/collection1", 1)
@ -245,97 +167,6 @@ async def test_get_element_data_full_redis(element_cache):
assert result == {"id": 1, "value": "value1"}
@pytest.mark.asyncio
async def test_get_all_restricted_data(element_cache):
result = await element_cache.get_all_data_list(1)
# The output from redis has to be the same then the db_data
assert sort_dict(result) == sort_dict(
{
"app/collection1": [
{"id": 1, "value": "restricted_value1"},
{"id": 2, "value": "restricted_value2"},
],
"app/collection2": [
{"id": 1, "key": "restricted_value1"},
{"id": 2, "key": "restricted_value2"},
],
"app/personalized-collection": [{"id": 1, "key": "value1", "user_id": 1}],
}
)
@pytest.mark.asyncio
async def test_get_restricted_data_change_id_0(element_cache):
(
max_change_id,
changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(2, 0)
assert max_change_id == 0
assert sort_dict(changed_elements) == sort_dict(
{
"app/collection1": [
{"id": 1, "value": "restricted_value1"},
{"id": 2, "value": "restricted_value2"},
],
"app/collection2": [
{"id": 1, "key": "restricted_value1"},
{"id": 2, "key": "restricted_value2"},
],
"app/personalized-collection": [{"id": 2, "key": "value2", "user_id": 2}],
}
)
assert deleted_element_ids == []
@pytest.mark.asyncio
async def test_get_restricted_data_2(element_cache):
element_cache.cache_provider.change_id_data = {
1: {"app/collection1:1", "app/collection1:3"}
}
result = await element_cache.get_data_since(0, 1)
assert result == (
1,
{"app/collection1": [{"id": 1, "value": "restricted_value1"}]},
["app/collection1:3"],
)
@pytest.mark.asyncio
async def test_get_restricted_data_from_personalized_cacheable(element_cache):
element_cache.cache_provider.change_id_data = {1: {"app/personalized-collection:2"}}
result = await element_cache.get_data_since(0, 1)
assert result == (1, {}, [])
@pytest.mark.asyncio
async def test_get_restricted_data_change_id_lower_than_in_redis(element_cache):
element_cache.cache_provider.default_change_id = 2
with pytest.raises(ChangeIdTooLowError):
await element_cache.get_data_since(0, 1)
@pytest.mark.asyncio
async def test_get_restricted_data_with_change_id(element_cache):
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
result = await element_cache.get_data_since(0, 2)
assert result == (
2,
{"app/collection1": [{"id": 1, "value": "restricted_value1"}]},
[],
)
@pytest.mark.asyncio
async def test_lowest_change_id_after_updating_lowest_element(element_cache):
await element_cache.change_elements(