From d9c88c02b3ab58d65c112eef024676d6ef3d94aa Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Wed, 12 Jun 2019 08:17:25 +0200 Subject: [PATCH] Compression: Recieving compressed messages --- client/package.json | 1 + .../core/core-services/websocket.service.ts | 21 ++++- openslides/utils/stats.py | 78 ++++++++++++++++++- openslides/utils/websocket.py | 46 ++++++++++- requirements/production.txt | 1 + 5 files changed, 140 insertions(+), 7 deletions(-) diff --git a/client/package.json b/client/package.json index 66edbc7b8..ea7dd0196 100644 --- a/client/package.json +++ b/client/package.json @@ -47,6 +47,7 @@ "exceljs": "1.9.1", "file-saver": "^2.0.1", "hammerjs": "^2.0.8", + "lz4js": "^0.2.0", "material-icon-font": "git+https://github.com/petergng/materialIconFont.git", "ng-pick-datetime": "^7.0.0", "ng2-pdf-viewer": "^5.2.3", diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index 3b0f229c8..56e61b6dd 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -5,6 +5,7 @@ import { Router } from '@angular/router'; import { TranslateService } from '@ngx-translate/core'; import { Observable, Subject } from 'rxjs'; import { take } from 'rxjs/operators'; +import { decompress } from 'lz4js'; import { formatQueryParams, QueryParams } from '../query-params'; import { OpenSlidesStatusService } from './openslides-status.service'; @@ -136,7 +137,7 @@ export class WebsocketService { return this._connectionOpen; } - private sendQueueWhileNotConnected: string[] = []; + private sendQueueWhileNotConnected: (string | ArrayBuffer)[] = []; /** * The websocket. @@ -219,6 +220,7 @@ export class WebsocketService { socketPath += formatQueryParams(queryParams); this.websocket = new WebSocket(socketPath); + this.websocket.binaryType = 'arraybuffer'; // connection established. If this connect attept was a retry, // The error notice will be removed and the reconnectSubject is published. @@ -270,7 +272,21 @@ export class WebsocketService { * * @param data The message */ - private handleMessage(data: string): void { + private handleMessage(data: string | ArrayBuffer): void { + if (data instanceof ArrayBuffer) { + const compressedSize = data.byteLength; + const decompressedBuffer: Uint8Array = decompress(new Uint8Array(data)); + console.log( + `Recieved ${compressedSize / 1024} KB (${decompressedBuffer.byteLength / + 1024} KB uncompressed), ratio ${decompressedBuffer.byteLength / compressedSize}` + ); + const textDecoder = new TextDecoder(); + data = textDecoder.decode(decompressedBuffer); + } else { + const recvSize = new TextEncoder().encode(data).byteLength; + console.log(`Received ${recvSize} KB uncompressed`); + } + const message: IncommingWebsocketMessage = JSON.parse(data); const type = message.type; const inResponse = message.in_response; @@ -447,6 +463,7 @@ export class WebsocketService { // Either send directly or add to queue, if not connected. const jsonMessage = JSON.stringify(message); + if (this.isConnected) { this.websocket.send(jsonMessage); } else { diff --git a/openslides/utils/stats.py b/openslides/utils/stats.py index 8c14c969d..1f0e7eab8 100644 --- a/openslides/utils/stats.py +++ b/openslides/utils/stats.py @@ -1,7 +1,7 @@ import asyncio import logging import time -from typing import List +from typing import List, Optional class WebsocketLatencyLogger: @@ -60,3 +60,79 @@ class WebsocketLatencyLogger: """ Resets the stats. """ self.latencies: List[int] = [] self.time = time.time() + + +class WebsocketThroughputLogger: + """ + Usage (give values in bytes): + - WebsocketThroughputLogger.send(, ) + - WebsocketThroughputLogger.recieve(, ) + The compressed value is optional. If the data is not compressed, just + give the uncompressed value. + Note: Only the send values are logged in KB (received values in bytes). + """ + + lock = asyncio.Lock() + """ To access the stats variables. """ + + instance = None + """ The only throughputlogger instance. """ + + logger = logging.getLogger("openslides.websocket.throughput") + """ The logger to log to. """ + + def __init__(self) -> None: + self.reset() + + @classmethod + async def send(cls, uncompressed: int, compressed: Optional[int] = None) -> None: + # pass the latency value to the single instance + async with cls.lock: + if cls.instance is None: + cls.instance = cls() + if compressed is None: + compressed = uncompressed + cls.instance.send_uncompressed += int(uncompressed / 1024) + cls.instance.send_compressed += int(compressed / 1024) + await cls.instance.check_and_flush() + + @classmethod + async def receive(cls, uncompressed: int, compressed: Optional[int] = None) -> None: + # pass the latency value to the single instance + async with cls.lock: + if cls.instance is None: + cls.instance = cls() + if compressed is None: + compressed = uncompressed + cls.instance.receive_uncompressed += uncompressed + cls.instance.receive_compressed += compressed + await cls.instance.check_and_flush() + + async def check_and_flush(self) -> None: + # If we waited longer then 60 seconds, flush the data. + current_time = time.time() + if current_time > (self.time + 20): + + send_ratio = receive_ratio = 1.0 + if self.send_compressed > 0: + send_ratio = self.send_uncompressed / self.send_compressed + if self.receive_compressed > 0: + receive_ratio = self.receive_uncompressed / self.receive_compressed + + self.logger.debug( + f"tx_uncompressed={self.send_uncompressed} KB, " + f"tx_compressed={self.send_compressed} KB, " + f"tx_ratio={send_ratio:.2f}, " + f"rx_uncompressed={self.receive_uncompressed} B, " + f"rx_compressed={self.receive_compressed} B, " + f"rx_ratio={receive_ratio:.2f}" + ) + self.reset() + + def reset(self) -> None: + """ Resets the stats. """ + self.send_compressed = 0 + self.send_uncompressed = 0 + self.receive_compressed = 0 + self.receive_uncompressed = 0 + self.time = time.time() diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py index af090fb68..b39638f4b 100644 --- a/openslides/utils/websocket.py +++ b/openslides/utils/websocket.py @@ -1,12 +1,16 @@ +import json from collections import defaultdict from typing import Any, Dict, List, Optional import jsonschema -from channels.generic.websocket import AsyncJsonWebsocketConsumer +import lz4.frame +from channels.generic.websocket import AsyncWebsocketConsumer +from django.conf import settings from websockets.exceptions import ConnectionClosed from .autoupdate import AutoupdateFormat from .cache import element_cache +from .stats import WebsocketThroughputLogger from .utils import split_element_id @@ -25,12 +29,46 @@ WEBSOCKET_WRONG_FORMAT = 10 # If the recieved data has not the expected format. -class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): +class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer): + async def receive( + self, + text_data: Optional[str] = None, + bytes_data: Optional[bytes] = None, + **kwargs: Dict[str, Any], + ) -> None: + if bytes_data: + uncompressed_data = lz4.frame.decompress(bytes_data) + text_data = uncompressed_data.decode("utf-8") + + recv_len = len(bytes_data) + uncompressed_len = len(uncompressed_data) + await WebsocketThroughputLogger.receive(uncompressed_len, recv_len) + elif text_data: + uncompressed_len = len(text_data.encode("utf-8")) + await WebsocketThroughputLogger.receive(uncompressed_len) + + if text_data: + await self.receive_json(json.loads(text_data), **kwargs) + + async def send_json(self, content: Any, close: bool = False) -> None: + text_data = json.dumps(content) + + b_text_data = text_data.encode("utf-8") + uncompressed_len = len(b_text_data) + await WebsocketThroughputLogger.send(uncompressed_len) + + await self.send(text_data=text_data, close=close) + + async def receive_json(self, content: str, **kwargs: Dict[str, Any]) -> None: + pass + + +class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer): """ Mixin for JSONWebsocketConsumers, that speaks the a special protocol. """ - async def send_json( + async def send_json( # type: ignore self, type: str, content: Any, @@ -77,7 +115,7 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): silence_errors=silence_errors, ) - async def receive_json(self, content: Any) -> None: + async def receive_json(self, content: Any) -> None: # type: ignore """ Receives the json data, parses it and calls receive_content. """ diff --git a/requirements/production.txt b/requirements/production.txt index 084d70fe2..d0948c463 100644 --- a/requirements/production.txt +++ b/requirements/production.txt @@ -9,6 +9,7 @@ Django>=2.1,<2.3 djangorestframework>=3.4,<3.10 jsonfield2>=3.0,<3.1 jsonschema>=3.0,<3.1 +lz4>=2.1.6 mypy_extensions>=0.4,<0.5 PyPDF2>=1.26,<1.27 roman>=2.0,<3.2