commit
b3de965a66
@ -1,8 +1,9 @@
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from ..utils.auth import async_has_perm
|
||||
from ..utils.constants import get_constants
|
||||
from ..utils.projector import get_projector_data
|
||||
from ..utils.stats import WebsocketLatencyLogger
|
||||
from ..utils.websocket import (
|
||||
BaseWebsocketClientMessage,
|
||||
ProtocollAsyncJsonWebsocketConsumer,
|
||||
@ -206,6 +207,11 @@ class PingPong(BaseWebsocketClientMessage):
|
||||
}
|
||||
|
||||
async def receive_content(
|
||||
self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str
|
||||
self,
|
||||
consumer: "ProtocollAsyncJsonWebsocketConsumer",
|
||||
latency: Optional[int],
|
||||
id: str,
|
||||
) -> None:
|
||||
await consumer.send_json(type="pong", content=content, in_response=id)
|
||||
await consumer.send_json(type="pong", content=latency, in_response=id)
|
||||
if latency is not None:
|
||||
await WebsocketLatencyLogger.add_latency(latency)
|
||||
|
@ -1,3 +1,5 @@
|
||||
import logging
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict, List
|
||||
from urllib.parse import parse_qs
|
||||
@ -5,9 +7,13 @@ from urllib.parse import parse_qs
|
||||
from .auth import async_anonymous_is_enabled
|
||||
from .autoupdate import AutoupdateFormat
|
||||
from .cache import element_cache, split_element_id
|
||||
from .utils import get_worker_id
|
||||
from .websocket import ProtocollAsyncJsonWebsocketConsumer, get_element_data
|
||||
|
||||
|
||||
logger = logging.getLogger("openslides.websocket")
|
||||
|
||||
|
||||
class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||
"""
|
||||
Websocket Consumer for the site.
|
||||
@ -15,8 +21,15 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||
|
||||
groups = ["site"]
|
||||
|
||||
ID_COUNTER = 0
|
||||
"""
|
||||
ID counter for assigning each instance of this class an unique id.
|
||||
"""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
self.projector_hash: Dict[int, int] = {}
|
||||
SiteConsumer.ID_COUNTER += 1
|
||||
self._id = get_worker_id() + "-" + str(SiteConsumer.ID_COUNTER)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def connect(self) -> None:
|
||||
@ -27,12 +40,14 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||
|
||||
Sends the startup data to the user.
|
||||
"""
|
||||
self.connect_time = time.time()
|
||||
# self.scope['user'] is the full_data dict of the user. For an
|
||||
# anonymous user is it the dict {'id': 0}
|
||||
change_id = None
|
||||
if not await async_anonymous_is_enabled() and not self.scope["user"]["id"]:
|
||||
await self.accept() # workaround for #4009
|
||||
await self.close()
|
||||
logger.debug(f"connect: denied ({self._id})")
|
||||
return
|
||||
|
||||
query_string = parse_qs(self.scope["query_string"])
|
||||
@ -42,6 +57,7 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||
except ValueError:
|
||||
await self.accept() # workaround for #4009
|
||||
await self.close() # TODO: Find a way to send an error code
|
||||
logger.debug(f"connect: wrong change id ({self._id})")
|
||||
return
|
||||
|
||||
if b"autoupdate" in query_string and query_string[b"autoupdate"][
|
||||
@ -53,6 +69,7 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||
await self.accept()
|
||||
|
||||
if change_id is not None:
|
||||
logger.debug(f"connect: change id {change_id} ({self._id})")
|
||||
try:
|
||||
data = await get_element_data(self.scope["user"]["id"], change_id)
|
||||
except ValueError:
|
||||
@ -60,12 +77,18 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||
pass
|
||||
else:
|
||||
await self.send_json(type="autoupdate", content=data)
|
||||
else:
|
||||
logger.debug(f"connect: no change id ({self._id})")
|
||||
|
||||
async def disconnect(self, close_code: int) -> None:
|
||||
"""
|
||||
A user disconnects. Remove it from autoupdate.
|
||||
"""
|
||||
await self.channel_layer.group_discard("autoupdate", self.channel_name)
|
||||
active_seconds = int(time.time() - self.connect_time)
|
||||
logger.debug(
|
||||
f"disconnect code={close_code} active_secs={active_seconds} ({self._id})"
|
||||
)
|
||||
|
||||
async def send_notify(self, event: Dict[str, Any]) -> None:
|
||||
"""
|
||||
|
@ -144,6 +144,38 @@ MEDIA_ROOT = os.path.join(OPENSLIDES_USER_DATA_DIR, 'media', '')
|
||||
# AUTH_PASSWORD_VALIDATORS = []
|
||||
|
||||
|
||||
# Logging
|
||||
# see https://docs.djangoproject.com/en/2.2/topics/logging/
|
||||
|
||||
LOGGING = {
|
||||
'version': 1,
|
||||
'disable_existing_loggers': False,
|
||||
'formatters': {
|
||||
'gunicorn': {
|
||||
'format': '{asctime} [{process:d}] [{levelname}] {name} {message}',
|
||||
'style': '{',
|
||||
'datefmt': '[%%Y-%%m-%%d %%H:%%M:%%S %%z]',
|
||||
},
|
||||
},
|
||||
'handlers': {
|
||||
'console': {
|
||||
'class': 'logging.StreamHandler',
|
||||
'formatter': 'gunicorn',
|
||||
},
|
||||
},
|
||||
'loggers': {
|
||||
'django': {
|
||||
'handlers': ['console'],
|
||||
'level': os.getenv('DJANGO_LOG_LEVEL', 'INFO'),
|
||||
},
|
||||
'openslides': {
|
||||
'handlers': ['console'],
|
||||
'level': os.getenv('OPENSLIDES_LOG_LEVEL', 'INFO'),
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# Customization of OpenSlides apps
|
||||
|
||||
MOTION_IDENTIFIER_MIN_DIGITS = 1
|
||||
|
62
openslides/utils/stats.py
Normal file
62
openslides/utils/stats.py
Normal file
@ -0,0 +1,62 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
|
||||
class WebsocketLatencyLogger:
|
||||
"""
|
||||
Loggs latencies reported by clients during the ping-pong messages. Collects
|
||||
them and calculate the mean and standard derivation in 60 second intervalls
|
||||
and print these stats to the logger.
|
||||
|
||||
Usage: WebsocketLatencyLogger.add_latency(<latency>)
|
||||
"""
|
||||
|
||||
lock = asyncio.Lock()
|
||||
""" Locks the access to the shared latency list. """
|
||||
|
||||
instance = None
|
||||
""" The only latencylogger instance. """
|
||||
|
||||
logger = logging.getLogger("openslides.websocket.latency")
|
||||
""" The logger to log to. """
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.reset()
|
||||
|
||||
@classmethod
|
||||
async def add_latency(cls, latency: int) -> None:
|
||||
""" Add the latency to the logger. """
|
||||
# pass the latency value to the single instance
|
||||
if cls.instance is None:
|
||||
cls.instance = cls()
|
||||
await cls.instance._add_latency(latency)
|
||||
|
||||
async def _add_latency(self, latency: int) -> None:
|
||||
async with self.lock:
|
||||
self.latencies.append(latency)
|
||||
|
||||
# If we waited longer then 60 seconds, flush the data.
|
||||
current_time = time.time()
|
||||
if current_time > (self.time + 60):
|
||||
self.flush()
|
||||
return
|
||||
|
||||
# If we have collected too many entries, flush the data.
|
||||
if len(self.latencies) > 1000:
|
||||
self.flush()
|
||||
|
||||
def flush(self) -> None:
|
||||
""" Calc Stats and print to logger. """
|
||||
N = len(self.latencies)
|
||||
mean = sum(self.latencies) / N
|
||||
std = sum((l - mean) ** 2 for l in self.latencies)
|
||||
self.logger.debug(f"N={N}, mean={mean:.2f}, std={std:.2f}")
|
||||
|
||||
self.reset()
|
||||
|
||||
def reset(self) -> None:
|
||||
""" Resets the stats. """
|
||||
self.latencies: List[int] = []
|
||||
self.time = time.time()
|
@ -1,5 +1,7 @@
|
||||
import random
|
||||
import re
|
||||
from typing import Dict, Generator, Tuple, Type, Union
|
||||
import string
|
||||
from typing import Dict, Generator, Optional, Tuple, Type, Union
|
||||
|
||||
import roman
|
||||
from django.apps import apps
|
||||
@ -95,3 +97,20 @@ def get_model_from_collection_string(collection_string: str) -> Type[Model]:
|
||||
f"Invalid message. A valid collection_string is missing. Got {collection_string}"
|
||||
)
|
||||
return model
|
||||
|
||||
|
||||
_worker_id: Optional[str] = None
|
||||
"""
|
||||
The worker id. Accessable via `get_worker_id()`.
|
||||
"""
|
||||
|
||||
|
||||
def get_worker_id() -> str:
|
||||
"""
|
||||
Returns a random string of length 4 that identifies this
|
||||
instance of this worker
|
||||
"""
|
||||
global _worker_id
|
||||
if _worker_id is None:
|
||||
_worker_id = "".join(random.sample(string.ascii_letters, 4))
|
||||
return _worker_id
|
||||
|
Loading…
Reference in New Issue
Block a user