2bcab5d098
- moved all server related things into the folder `server`, so this configuration is parallel to the client. - All main "services" are now folders in the root directory - Added Dockerfiles to each service (currently server and client) - Added a docker compose configuration to start everything together. Currently there are heavy dependencies into https://github.com/OpenSlides/openslides-docker-compose - Resturctured the .gitignore. If someone needs something excluded, please add it to the right section. - Added initial build setup with Docker and docker-compose. - removed setup.py. We won't deliver OpenSlides via pip anymore.
225 lines
7.1 KiB
Python
225 lines
7.1 KiB
Python
from typing import List, Union
|
|
|
|
from asgiref.sync import async_to_sync
|
|
from django.apps import apps
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.db.models import Model
|
|
|
|
from .cache import element_cache
|
|
|
|
|
|
GROUP_DEFAULT_PK = 1 # This is the hard coded pk for the default group.
|
|
GROUP_ADMIN_PK = 2 # This is the hard coded pk for the admin group.
|
|
|
|
# Hard coded collection string for users and groups
|
|
group_collection_string = "users/group"
|
|
user_collection_string = "users/user"
|
|
|
|
|
|
class UserDoesNotExist(Exception):
|
|
"""
|
|
This is raised, if some permissions checks are done on not existing users.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
def get_group_model() -> Model:
|
|
"""
|
|
Return the Group model that is active in this project.
|
|
"""
|
|
try:
|
|
return apps.get_model(settings.AUTH_GROUP_MODEL, require_ready=False)
|
|
except ValueError:
|
|
raise ImproperlyConfigured(
|
|
"AUTH_GROUP_MODEL must be of the form 'app_label.model_name'"
|
|
)
|
|
except LookupError:
|
|
raise ImproperlyConfigured(
|
|
f"AUTH_GROUP_MODEL refers to model '{settings.AUTH_GROUP_MODEL}' that has not been installed"
|
|
)
|
|
|
|
|
|
async def async_is_superadmin(user_id: int) -> bool:
|
|
"""
|
|
Checks, if the user is a superadmin (in the admin group).
|
|
|
|
This is done by querying a non existing permission, becuase has_perm
|
|
should always return true, if the user is in the admin group.
|
|
"""
|
|
return await async_has_perm(user_id, "superadmin")
|
|
|
|
|
|
def has_perm(user_id: int, perm: str) -> bool:
|
|
"""
|
|
Checks that user has a specific permission.
|
|
|
|
user_id 0 means anonymous user.
|
|
"""
|
|
# Convert user to right type
|
|
# TODO: Remove this and make use, that user has always the right type
|
|
user_id = user_to_user_id(user_id)
|
|
return async_to_sync(async_has_perm)(user_id, perm)
|
|
|
|
|
|
async def async_has_perm(user_id: int, perm: str) -> bool:
|
|
"""
|
|
Checks that user has a specific permission.
|
|
|
|
user_id 0 means anonymous user.
|
|
"""
|
|
if not user_id and not await async_anonymous_is_enabled():
|
|
has_perm = False
|
|
elif not user_id:
|
|
# Use the permissions from the default group.
|
|
default_group = await element_cache.get_element_data(
|
|
group_collection_string, GROUP_DEFAULT_PK
|
|
)
|
|
if default_group is None:
|
|
raise RuntimeError("Default Group does not exist.")
|
|
has_perm = perm in default_group["permissions"]
|
|
else:
|
|
user_data = await element_cache.get_element_data(
|
|
user_collection_string, user_id
|
|
)
|
|
if user_data is None:
|
|
raise UserDoesNotExist()
|
|
if GROUP_ADMIN_PK in user_data["groups_id"]:
|
|
# User in admin group (pk 2) grants all permissions.
|
|
has_perm = True
|
|
else:
|
|
# Get all groups of the user and then see, if one group has the required
|
|
# permission. If the user has no groups, then use the default group.
|
|
group_ids = user_data["groups_id"] or [GROUP_DEFAULT_PK]
|
|
for group_id in group_ids:
|
|
group = await element_cache.get_element_data(
|
|
group_collection_string, group_id
|
|
)
|
|
if group is None:
|
|
raise RuntimeError(
|
|
f"User {user_id} is in non existing group {group_id}."
|
|
)
|
|
|
|
if perm in group["permissions"]:
|
|
has_perm = True
|
|
break
|
|
else:
|
|
has_perm = False
|
|
return has_perm
|
|
|
|
|
|
# async code doesn't work well with QuerySets, so we have to give a list of ints for groups
|
|
def in_some_groups(user_id: int, groups: List[int], exact: bool = False) -> bool:
|
|
"""
|
|
Checks that user is in at least one given group. Groups can be given as a list
|
|
of ids or a QuerySet.
|
|
|
|
If exact is false (default) and the user is in the admin group (pk = 2),
|
|
the result is always true, even if no groups are given.
|
|
|
|
If exact is true, the user must be in one of the groups, ignoring the possible
|
|
superadmin-status of the user.
|
|
|
|
user_id 0 means anonymous user.
|
|
"""
|
|
# Convert user to right type
|
|
# TODO: Remove this and make use, that user has always the right type
|
|
user_id = user_to_user_id(user_id)
|
|
return async_to_sync(async_in_some_groups)(user_id, groups, exact)
|
|
|
|
|
|
async def async_in_some_groups(
|
|
user_id: int, groups: List[int], exact: bool = False
|
|
) -> bool:
|
|
"""
|
|
Checks that user is in at least one given group. Groups can be given as a list
|
|
of ids or a QuerySet. If the user is in the admin group (pk = 2) the result
|
|
is always true, even if no groups are given.
|
|
|
|
user_id 0 means anonymous user.
|
|
"""
|
|
if not user_id and not await async_anonymous_is_enabled():
|
|
in_some_groups = False
|
|
elif not user_id:
|
|
# Use the permissions from the default group.
|
|
in_some_groups = GROUP_DEFAULT_PK in groups
|
|
else:
|
|
user_data = await element_cache.get_element_data(
|
|
user_collection_string, user_id
|
|
)
|
|
if user_data is None:
|
|
raise UserDoesNotExist()
|
|
if not exact and GROUP_ADMIN_PK in user_data["groups_id"]:
|
|
# User in admin group (pk 2) grants all permissions.
|
|
in_some_groups = True
|
|
else:
|
|
# Get all groups of the user and then see, if one group has the required
|
|
# permission. If the user has no groups, then use the default group.
|
|
group_ids = user_data["groups_id"] or [GROUP_DEFAULT_PK]
|
|
for group_id in group_ids:
|
|
if group_id in groups:
|
|
in_some_groups = True
|
|
break
|
|
else:
|
|
in_some_groups = False
|
|
return in_some_groups
|
|
|
|
|
|
def anonymous_is_enabled() -> bool:
|
|
"""
|
|
Returns True if the anonymous user is enabled in the settings.
|
|
"""
|
|
from ..core.config import config
|
|
|
|
return config["general_system_enable_anonymous"]
|
|
|
|
|
|
async def async_anonymous_is_enabled() -> bool:
|
|
"""
|
|
Like anonymous_is_enabled but async.
|
|
"""
|
|
from ..core.config import config
|
|
|
|
element = await element_cache.get_element_data(
|
|
config.get_collection_string(),
|
|
(await config.async_get_key_to_id())["general_system_enable_anonymous"],
|
|
)
|
|
return False if element is None else element["value"]
|
|
|
|
|
|
AnyUser = Union[Model, int, AnonymousUser, None]
|
|
|
|
|
|
def user_to_user_id(user: AnyUser) -> int:
|
|
"""
|
|
Takes an object, that represents a user returns its user_id.
|
|
|
|
user_id 0 means anonymous user.
|
|
|
|
User can be
|
|
* an user object,
|
|
* an user id or
|
|
* an anonymous user.
|
|
|
|
Raises an TypeError, if the given user object can not be converted.
|
|
"""
|
|
User = get_user_model()
|
|
|
|
if user is None:
|
|
user_id = 0
|
|
elif isinstance(user, int):
|
|
# Nothing to do
|
|
user_id = user
|
|
elif isinstance(user, AnonymousUser):
|
|
user_id = 0
|
|
elif isinstance(user, User):
|
|
user_id = user.pk
|
|
else:
|
|
raise TypeError(
|
|
f"Unsupported type for user. User {user} has type {type(user)}."
|
|
)
|
|
return user_id
|