384 lines
15 KiB
Python
384 lines
15 KiB
Python
from collections import defaultdict
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import jsonschema
|
|
from asgiref.sync import sync_to_async
|
|
from channels.db import database_sync_to_async
|
|
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
|
|
|
from ..core.config import config
|
|
from ..core.models import Projector
|
|
from .auth import async_anonymous_is_enabled, has_perm
|
|
from .cache import element_cache, split_element_id
|
|
from .collection import (
|
|
AutoupdateFormat,
|
|
Collection,
|
|
CollectionElement,
|
|
format_for_autoupdate_old,
|
|
from_channel_message,
|
|
)
|
|
from .constants import get_constants
|
|
|
|
|
|
class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
|
|
"""
|
|
Mixin for JSONWebsocketConsumers, that speaks the a special protocol.
|
|
"""
|
|
schema = {
|
|
"$schema": "http://json-schema.org/draft-04/schema#",
|
|
"title": "OpenSlidesWebsocketProtocol",
|
|
"description": "The base packages that OpenSlides sends between the server and the client.",
|
|
"type": "object",
|
|
"properties": {
|
|
"type": {
|
|
"description": "Defines what kind of packages is packed.",
|
|
"type": "string",
|
|
"pattern": "notify|constants", # The server can sent other types
|
|
},
|
|
"content": {
|
|
"description": "The content of the package.",
|
|
},
|
|
"id": {
|
|
"description": "An identifier of the package.",
|
|
"type": "string",
|
|
},
|
|
"in_response": {
|
|
"description": "The id of another package that the other part sent before.",
|
|
"type": "string",
|
|
}
|
|
},
|
|
"required": ["type", "content", "id"],
|
|
}
|
|
|
|
async def send_json(self, type: str, content: Any, id: Optional[str] = None, in_response: Optional[str] = None) -> None:
|
|
"""
|
|
Sends the data with the type.
|
|
"""
|
|
out = {'type': type, 'content': content}
|
|
if id:
|
|
out['id'] = id
|
|
if in_response:
|
|
out['in_response'] = in_response
|
|
await super().send_json(out)
|
|
|
|
async def receive_json(self, content: Any) -> None:
|
|
"""
|
|
Receives the json data, parses it and calls receive_content.
|
|
"""
|
|
try:
|
|
jsonschema.validate(content, self.schema)
|
|
except jsonschema.ValidationError as err:
|
|
try:
|
|
in_response = content['id']
|
|
except (TypeError, KeyError):
|
|
# content is not a dict (TypeError) or has not the key id (KeyError)
|
|
in_response = None
|
|
|
|
await self.send_json(
|
|
type='error',
|
|
content=str(err),
|
|
in_response=in_response)
|
|
return
|
|
|
|
await self.receive_content(type=content['type'], content=content['content'], id=content['id'])
|
|
|
|
async def receive_content(self, type: str, content: object, id: str) -> None:
|
|
raise NotImplementedError("ProtocollAsyncJsonWebsocketConsumer needs the method receive_content()")
|
|
|
|
|
|
class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
|
"""
|
|
Websocket Consumer for the site.
|
|
"""
|
|
groups = ['site']
|
|
|
|
async def connect(self) -> None:
|
|
"""
|
|
A user connects to the site.
|
|
|
|
If it is an anonymous user and anonymous is disabled, the connection is closed.
|
|
|
|
Sends the startup data to the user.
|
|
"""
|
|
# TODO: add a way to ask for the data since a change_id and send only data that is newer
|
|
if not await async_anonymous_is_enabled() and self.scope['user'].id is None:
|
|
await self.close()
|
|
else:
|
|
await self.accept()
|
|
data = await startup_data(self.scope['user'])
|
|
await self.send_json(type='autoupdate', content=data)
|
|
|
|
async def receive_content(self, type: str, content: Any, id: str) -> None:
|
|
"""
|
|
If we recieve something from the client we currently just interpret this
|
|
as a notify message.
|
|
|
|
The server adds the sender's user id (0 for anonymous) and reply
|
|
channel name so that a receiver client may reply to the sender or to all
|
|
sender's instances.
|
|
"""
|
|
if type == 'notify':
|
|
if notify_message_is_valid(content):
|
|
await self.channel_layer.group_send(
|
|
"projector",
|
|
{
|
|
"type": "send_notify",
|
|
"incomming": content,
|
|
"senderReplyChannelName": self.channel_name,
|
|
"senderUserId": self.scope['user'].id or 0,
|
|
},
|
|
)
|
|
await self.channel_layer.group_send(
|
|
"site",
|
|
{
|
|
"type": "send_notify",
|
|
"incomming": content,
|
|
"senderReplyChannelName": self.channel_name,
|
|
"senderUserId": self.scope['user'].id or 0,
|
|
},
|
|
)
|
|
else:
|
|
await self.send_json(type='error', content='Invalid notify message', in_response=id)
|
|
|
|
elif type == 'constants':
|
|
# Return all constants to the client.
|
|
await self.send_json(type='constants', content=get_constants(), in_response=id)
|
|
|
|
async def send_notify(self, event: Dict[str, Any]) -> None:
|
|
"""
|
|
Send a notify message to the user.
|
|
"""
|
|
user_id = self.scope['user'].id or 0
|
|
|
|
out = []
|
|
for item in event['incomming']:
|
|
users = item.get('users')
|
|
reply_channels = item.get('replyChannels')
|
|
projectors = item.get('projectors')
|
|
if ((isinstance(users, list) and user_id in users)
|
|
or (isinstance(reply_channels, list) and self.channel_name in reply_channels)
|
|
or (users is None and reply_channels is None and projectors is None)):
|
|
item['senderReplyChannelName'] = event.get('senderReplyChannelName')
|
|
item['senderUserId'] = event.get('senderUserId')
|
|
item['senderProjectorId'] = event.get('senderProjectorId')
|
|
out.append(item)
|
|
|
|
if out:
|
|
await self.send_json(type='notify', content=out)
|
|
|
|
async def send_data(self, event: Dict[str, Any]) -> None:
|
|
"""
|
|
Send changed or deleted elements to the user.
|
|
"""
|
|
change_id = event['change_id']
|
|
changed_elements, deleted_elements_ids = await element_cache.get_restricted_data(self.scope['user'], change_id, max_change_id=change_id)
|
|
|
|
deleted_elements: Dict[str, List[int]] = defaultdict(list)
|
|
for element_id in deleted_elements_ids:
|
|
collection_string, id = split_element_id(element_id)
|
|
deleted_elements[collection_string].append(id)
|
|
await self.send_json(type='autoupdate', content=AutoupdateFormat(changed=changed_elements, deleted=deleted_elements, change_id=change_id))
|
|
|
|
|
|
class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
|
"""
|
|
Websocket Consumer for the projector.
|
|
"""
|
|
|
|
groups = ['projector']
|
|
|
|
async def connect(self) -> None:
|
|
"""
|
|
Adds the websocket connection to a group specific to the projector with the given id.
|
|
Also sends all data that are shown on the projector.
|
|
"""
|
|
user = self.scope['user']
|
|
projector_id = self.scope["url_route"]["kwargs"]["projector_id"]
|
|
await self.accept()
|
|
|
|
if not await database_sync_to_async(has_perm)(user, 'core.can_see_projector'):
|
|
await self.send_json(type='error', content='No permissions to see this projector.')
|
|
# TODO: Shouldend we just close the websocket connection with an error message?
|
|
# self.close(code=4403)
|
|
else:
|
|
out = await sync_to_async(projector_startup_data)(projector_id)
|
|
await self.send_json(type='autoupdate', content=out)
|
|
|
|
async def receive_content(self, type: str, content: Any, id: str) -> None:
|
|
"""
|
|
If we recieve something from the client we currently just interpret this
|
|
as a notify message.
|
|
|
|
The server adds the sender's user id (0 for anonymous) and reply
|
|
channel name so that a receiver client may reply to the sender or to all
|
|
sender's instances.
|
|
"""
|
|
projector_id = self.scope["url_route"]["kwargs"]["projector_id"]
|
|
await self.channel_layer.group_send(
|
|
"projector",
|
|
{
|
|
"type": "send_notify",
|
|
"incomming": content,
|
|
"senderReplyChannelName": self.channel_name,
|
|
"senderProjectorId": projector_id,
|
|
},
|
|
)
|
|
await self.channel_layer.group_send(
|
|
"site",
|
|
{
|
|
"type": "send_notify",
|
|
"incomming": content,
|
|
"senderReplyChannelName": self.channel_name,
|
|
"senderProjectorId": projector_id,
|
|
},
|
|
)
|
|
|
|
async def send_notify(self, event: Dict[str, Any]) -> None:
|
|
"""
|
|
Send a notify message to the projector.
|
|
"""
|
|
projector_id = self.scope["url_route"]["kwargs"]["projector_id"]
|
|
|
|
out = []
|
|
for item in event['incomming']:
|
|
users = item.get('users')
|
|
reply_channels = item.get('replyChannels')
|
|
projectors = item.get('projectors')
|
|
if ((isinstance(projectors, list) and projector_id in projectors)
|
|
or (isinstance(reply_channels, list) and self.channel_name in reply_channels)
|
|
or (users is None and reply_channels is None and projectors is None)):
|
|
item['senderReplyChannelName'] = event.get('senderReplyChannelName')
|
|
item['senderUserId'] = event.get('senderUserId')
|
|
item['senderProjectorId'] = event.get('senderProjectorId')
|
|
out.append(item)
|
|
|
|
if out:
|
|
await self.send_json(type='notify', content=out)
|
|
|
|
async def send_data(self, event: Dict[str, Any]) -> None:
|
|
"""
|
|
Informs all projector clients about changed data.
|
|
"""
|
|
projector_id = self.scope["url_route"]["kwargs"]["projector_id"]
|
|
collection_elements = from_channel_message(event['message'])
|
|
|
|
output = await projector_sync_send_data(projector_id, collection_elements)
|
|
if output:
|
|
await self.send_json(type='autoupdate', content=output)
|
|
|
|
|
|
async def startup_data(user: Optional[CollectionElement], change_id: int = 0) -> AutoupdateFormat:
|
|
"""
|
|
Returns all data for startup.
|
|
"""
|
|
# TODO: use the change_id argument
|
|
# TODO: This two calls have to be atomic
|
|
changed_elements, deleted_element_ids = await element_cache.get_restricted_data(user)
|
|
current_change_id = await element_cache.get_current_change_id()
|
|
|
|
deleted_elements: Dict[str, List[int]] = defaultdict(list)
|
|
for element_id in deleted_element_ids:
|
|
collection_string, id = split_element_id(element_id)
|
|
deleted_elements[collection_string].append(id)
|
|
|
|
return AutoupdateFormat(changed=changed_elements, deleted=deleted_elements, change_id=current_change_id)
|
|
|
|
|
|
def projector_startup_data(projector_id: int) -> Any:
|
|
"""
|
|
Generate the startup data for a projector.
|
|
"""
|
|
try:
|
|
projector = Projector.objects.get(pk=projector_id)
|
|
except Projector.DoesNotExist:
|
|
return {'text': 'The projector {} does not exist.'.format(projector_id)}
|
|
else:
|
|
# Now check whether broadcast is active at the moment. If yes,
|
|
# change the local projector variable.
|
|
if config['projector_broadcast'] > 0:
|
|
projector = Projector.objects.get(pk=config['projector_broadcast'])
|
|
|
|
# Collect all elements that are on the projector.
|
|
output = []
|
|
for requirement in projector.get_all_requirements():
|
|
required_collection_element = CollectionElement.from_instance(requirement)
|
|
output.append(required_collection_element.as_autoupdate_for_projector())
|
|
|
|
# Collect all config elements.
|
|
config_collection = Collection(config.get_collection_string())
|
|
projector_data = (config_collection.get_access_permissions()
|
|
.get_projector_data(config_collection.get_full_data()))
|
|
for data in projector_data:
|
|
output.append(format_for_autoupdate_old(
|
|
config_collection.collection_string,
|
|
data['id'],
|
|
'changed',
|
|
data))
|
|
|
|
# Collect the projector instance.
|
|
collection_element = CollectionElement.from_instance(projector)
|
|
output.append(collection_element.as_autoupdate_for_projector())
|
|
|
|
# Send all the data that were only collected before.
|
|
return output
|
|
|
|
|
|
@sync_to_async
|
|
def projector_sync_send_data(projector_id: int, collection_elements: List[CollectionElement]) -> List[Any]:
|
|
"""
|
|
sync function that generates the elements for an projector.
|
|
"""
|
|
# Load the projector object. If broadcast is on, use the broadcast projector
|
|
# instead.
|
|
if config['projector_broadcast'] > 0:
|
|
projector_id = config['projector_broadcast']
|
|
|
|
projector = Projector.objects.get(pk=projector_id)
|
|
|
|
# TODO: This runs once for every open projector tab. Either use
|
|
# caching or something else, so this is only called once
|
|
output = []
|
|
for collection_element in collection_elements:
|
|
if collection_element.is_deleted():
|
|
output.append(collection_element.as_autoupdate_for_projector())
|
|
else:
|
|
for element in projector.get_collection_elements_required_for_this(collection_element):
|
|
output.append(element.as_autoupdate_for_projector())
|
|
return output
|
|
|
|
|
|
def notify_message_is_valid(message: object) -> bool:
|
|
"""
|
|
Returns True, when the message is a valid notify_message.
|
|
"""
|
|
schema = {
|
|
"$schema": "http://json-schema.org/draft-04/schema#",
|
|
"title": "Notify elements.",
|
|
"description": "Elements that one client can send to one or many other clients.",
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"projectors": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"reply_channels": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
},
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
}
|
|
}
|
|
},
|
|
"minItems": 1,
|
|
}
|
|
try:
|
|
jsonschema.validate(message, schema)
|
|
except jsonschema.ValidationError:
|
|
return False
|
|
else:
|
|
return True
|