OpenSlides/openslides/utils/consumers.py

253 lines
9.0 KiB
Python

import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, cast
from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer
from mypy_extensions import TypedDict
from ..utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH
from . import logging
from .auth import UserDoesNotExist, async_anonymous_is_enabled
from .cache import ChangeIdTooLowError, element_cache, split_element_id
from .utils import get_worker_id
from .websocket import ProtocollAsyncJsonWebsocketConsumer
logger = logging.getLogger("openslides.websocket")
AutoupdateFormat = TypedDict(
"AutoupdateFormat",
{
"changed": Dict[str, List[Dict[str, Any]]],
"deleted": Dict[str, List[int]],
"from_change_id": int,
"to_change_id": int,
"all_data": bool,
},
)
class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
"""
Websocket Consumer for the site.
"""
groups = ["site"]
ID_COUNTER = 0
"""
ID counter for assigning each instance of this class an unique id.
"""
skipped_autoupdate_from_change_id: Optional[int] = None
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.projector_hash: Dict[int, int] = {}
SiteConsumer.ID_COUNTER += 1
self._id = get_worker_id() + "-" + str(SiteConsumer.ID_COUNTER)
super().__init__(*args, **kwargs)
async def connect(self) -> None:
"""
A user connects to the site.
If it is an anonymous user and anonymous is disabled, the connection is closed.
Sends the startup data to the user.
"""
self.connect_time = time.time()
# self.scope['user'] is the full_data dict of the user. For an
# anonymous user is it the dict {'id': 0}
change_id = None
if not await async_anonymous_is_enabled() and not self.scope["user"]["id"]:
await self.accept() # workaround for #4009
await self.close()
logger.debug(f"connect: denied ({self._id})")
return
query_string = cast(
Dict[bytes, List[bytes]], parse_qs(self.scope["query_string"])
)
if b"change_id" in query_string:
try:
change_id = int(query_string[b"change_id"][0])
except ValueError:
await self.accept() # workaround for #4009
await self.close() # TODO: Find a way to send an error code
logger.debug(f"connect: wrong change id ({self._id})")
return
if b"autoupdate" in query_string and query_string[b"autoupdate"][
0
].lower() not in [b"0", b"off", b"false"]:
# a positive value in autoupdate. Start autoupdate
await self.channel_layer.group_add("autoupdate", self.channel_name)
await self.accept()
if change_id is not None:
logger.debug(f"connect: change id {change_id} ({self._id})")
await self.send_autoupdate(change_id)
else:
logger.debug(f"connect: no change id ({self._id})")
async def disconnect(self, close_code: int) -> None:
"""
A user disconnects. Remove it from autoupdate.
"""
await self.channel_layer.group_discard("autoupdate", self.channel_name)
active_seconds = int(time.time() - self.connect_time)
logger.debug(
f"disconnect code={close_code} active_secs={active_seconds} ({self._id})"
)
async def send_notify(self, event: Dict[str, Any]) -> None:
"""
Send a notify message to the user.
"""
user_id = self.scope["user"]["id"]
item = event["incomming"]
users = item.get("users")
reply_channels = item.get("replyChannels")
if (
(isinstance(users, bool) and users)
or (isinstance(users, list) and user_id in users)
or (
isinstance(reply_channels, list) and self.channel_name in reply_channels
)
or (users is None and reply_channels is None)
):
item["senderChannelName"] = event["senderChannelName"]
item["senderUserId"] = event["senderUserId"]
await self.send_json(type="notify", content=item)
async def send_autoupdate(
self,
change_id: int,
max_change_id: Optional[int] = None,
in_response: Optional[str] = None,
) -> None:
"""
Sends an autoupdate to the client from change_id to max_change_id.
If max_change_id is None, the current change id will be used.
"""
user_id = self.scope["user"]["id"]
if max_change_id is None:
max_change_id = await element_cache.get_current_change_id()
if change_id == max_change_id + 1:
# The client is up-to-date, so nothing will be done
return
if change_id > max_change_id:
message = f"Requested change_id {change_id} is higher this highest change_id {max_change_id}."
await self.send_error(
code=WEBSOCKET_CHANGE_ID_TOO_HIGH,
message=message,
in_response=in_response,
)
return
try:
changed_elements, deleted_element_ids = await element_cache.get_data_since(
user_id, change_id, max_change_id
)
except ChangeIdTooLowError:
# The change_id is lower the the lowerst change_id in redis. Return all data
changed_elements = await element_cache.get_all_data_list(user_id)
all_data = True
deleted_elements: Dict[str, List[int]] = {}
except UserDoesNotExist:
# Maybe the user was deleted, but a websocket connection is still open to the user.
# So we can close this connection and return.
await self.close()
return
else:
all_data = False
deleted_elements = defaultdict(list)
for element_id in deleted_element_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
# Check, if the autoupdate has any data.
if not changed_elements and not deleted_element_ids:
# Set the current from_change_id, if it is the first skipped autoupdate
if not self.skipped_autoupdate_from_change_id:
self.skipped_autoupdate_from_change_id = change_id
else:
# Normal autoupdate with data
from_change_id = change_id
# If there is at least one skipped autoupdate, take the saved from_change_id
if self.skipped_autoupdate_from_change_id:
from_change_id = self.skipped_autoupdate_from_change_id
self.skipped_autoupdate_from_change_id = None
await self.send_json(
type="autoupdate",
content=AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=from_change_id,
to_change_id=max_change_id,
all_data=all_data,
),
in_response=in_response,
)
async def send_data(self, event: Dict[str, Any]) -> None:
"""
Send changed or deleted elements to the user.
"""
change_id = event["change_id"]
await self.send_autoupdate(change_id, max_change_id=change_id)
async def projector_changed(self, event: Dict[str, Any]) -> None:
"""
The projector has changed.
"""
all_projector_data = event["data"]
change_id = event["change_id"]
projector_data: Dict[int, Dict[str, Any]] = {}
for projector_id in self.listen_projector_ids:
data = all_projector_data.get(projector_id, [])
new_hash = hash(str(data))
if new_hash != self.projector_hash.get(projector_id):
projector_data[projector_id] = data
self.projector_hash[projector_id] = new_hash
if projector_data:
await self.send_projector_data(projector_data, change_id=change_id)
async def send_projector_data(
self,
data: Dict[int, Dict[str, Any]],
change_id: Optional[int] = None,
in_response: Optional[str] = None,
) -> None:
"""
Sends projector data to the consumer.
"""
if change_id is None:
change_id = await element_cache.get_current_change_id()
content = {"change_id": change_id, "data": data}
await self.send_json(type="projector", content=content, in_response=in_response)
class CloseConsumer(AsyncWebsocketConsumer):
""" Auto-closes the connection """
groups: List[str] = []
def __init__(self, args: Dict[str, Any], **kwargs: Any) -> None:
logger.info(f'Closing connection to unknown websocket url {args["path"]}')
async def connect(self) -> None:
await self.accept()
await self.close()