From d9c88c02b3ab58d65c112eef024676d6ef3d94aa Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Wed, 12 Jun 2019 08:17:25 +0200 Subject: [PATCH 1/2] Compression: Recieving compressed messages --- client/package.json | 1 + .../core/core-services/websocket.service.ts | 21 ++++- openslides/utils/stats.py | 78 ++++++++++++++++++- openslides/utils/websocket.py | 46 ++++++++++- requirements/production.txt | 1 + 5 files changed, 140 insertions(+), 7 deletions(-) diff --git a/client/package.json b/client/package.json index 66edbc7b8..ea7dd0196 100644 --- a/client/package.json +++ b/client/package.json @@ -47,6 +47,7 @@ "exceljs": "1.9.1", "file-saver": "^2.0.1", "hammerjs": "^2.0.8", + "lz4js": "^0.2.0", "material-icon-font": "git+https://github.com/petergng/materialIconFont.git", "ng-pick-datetime": "^7.0.0", "ng2-pdf-viewer": "^5.2.3", diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index 3b0f229c8..56e61b6dd 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -5,6 +5,7 @@ import { Router } from '@angular/router'; import { TranslateService } from '@ngx-translate/core'; import { Observable, Subject } from 'rxjs'; import { take } from 'rxjs/operators'; +import { decompress } from 'lz4js'; import { formatQueryParams, QueryParams } from '../query-params'; import { OpenSlidesStatusService } from './openslides-status.service'; @@ -136,7 +137,7 @@ export class WebsocketService { return this._connectionOpen; } - private sendQueueWhileNotConnected: string[] = []; + private sendQueueWhileNotConnected: (string | ArrayBuffer)[] = []; /** * The websocket. @@ -219,6 +220,7 @@ export class WebsocketService { socketPath += formatQueryParams(queryParams); this.websocket = new WebSocket(socketPath); + this.websocket.binaryType = 'arraybuffer'; // connection established. If this connect attept was a retry, // The error notice will be removed and the reconnectSubject is published. @@ -270,7 +272,21 @@ export class WebsocketService { * * @param data The message */ - private handleMessage(data: string): void { + private handleMessage(data: string | ArrayBuffer): void { + if (data instanceof ArrayBuffer) { + const compressedSize = data.byteLength; + const decompressedBuffer: Uint8Array = decompress(new Uint8Array(data)); + console.log( + `Recieved ${compressedSize / 1024} KB (${decompressedBuffer.byteLength / + 1024} KB uncompressed), ratio ${decompressedBuffer.byteLength / compressedSize}` + ); + const textDecoder = new TextDecoder(); + data = textDecoder.decode(decompressedBuffer); + } else { + const recvSize = new TextEncoder().encode(data).byteLength; + console.log(`Received ${recvSize} KB uncompressed`); + } + const message: IncommingWebsocketMessage = JSON.parse(data); const type = message.type; const inResponse = message.in_response; @@ -447,6 +463,7 @@ export class WebsocketService { // Either send directly or add to queue, if not connected. const jsonMessage = JSON.stringify(message); + if (this.isConnected) { this.websocket.send(jsonMessage); } else { diff --git a/openslides/utils/stats.py b/openslides/utils/stats.py index 8c14c969d..1f0e7eab8 100644 --- a/openslides/utils/stats.py +++ b/openslides/utils/stats.py @@ -1,7 +1,7 @@ import asyncio import logging import time -from typing import List +from typing import List, Optional class WebsocketLatencyLogger: @@ -60,3 +60,79 @@ class WebsocketLatencyLogger: """ Resets the stats. """ self.latencies: List[int] = [] self.time = time.time() + + +class WebsocketThroughputLogger: + """ + Usage (give values in bytes): + - WebsocketThroughputLogger.send(, ) + - WebsocketThroughputLogger.recieve(, ) + The compressed value is optional. If the data is not compressed, just + give the uncompressed value. + Note: Only the send values are logged in KB (received values in bytes). + """ + + lock = asyncio.Lock() + """ To access the stats variables. """ + + instance = None + """ The only throughputlogger instance. """ + + logger = logging.getLogger("openslides.websocket.throughput") + """ The logger to log to. """ + + def __init__(self) -> None: + self.reset() + + @classmethod + async def send(cls, uncompressed: int, compressed: Optional[int] = None) -> None: + # pass the latency value to the single instance + async with cls.lock: + if cls.instance is None: + cls.instance = cls() + if compressed is None: + compressed = uncompressed + cls.instance.send_uncompressed += int(uncompressed / 1024) + cls.instance.send_compressed += int(compressed / 1024) + await cls.instance.check_and_flush() + + @classmethod + async def receive(cls, uncompressed: int, compressed: Optional[int] = None) -> None: + # pass the latency value to the single instance + async with cls.lock: + if cls.instance is None: + cls.instance = cls() + if compressed is None: + compressed = uncompressed + cls.instance.receive_uncompressed += uncompressed + cls.instance.receive_compressed += compressed + await cls.instance.check_and_flush() + + async def check_and_flush(self) -> None: + # If we waited longer then 60 seconds, flush the data. + current_time = time.time() + if current_time > (self.time + 20): + + send_ratio = receive_ratio = 1.0 + if self.send_compressed > 0: + send_ratio = self.send_uncompressed / self.send_compressed + if self.receive_compressed > 0: + receive_ratio = self.receive_uncompressed / self.receive_compressed + + self.logger.debug( + f"tx_uncompressed={self.send_uncompressed} KB, " + f"tx_compressed={self.send_compressed} KB, " + f"tx_ratio={send_ratio:.2f}, " + f"rx_uncompressed={self.receive_uncompressed} B, " + f"rx_compressed={self.receive_compressed} B, " + f"rx_ratio={receive_ratio:.2f}" + ) + self.reset() + + def reset(self) -> None: + """ Resets the stats. """ + self.send_compressed = 0 + self.send_uncompressed = 0 + self.receive_compressed = 0 + self.receive_uncompressed = 0 + self.time = time.time() diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py index af090fb68..b39638f4b 100644 --- a/openslides/utils/websocket.py +++ b/openslides/utils/websocket.py @@ -1,12 +1,16 @@ +import json from collections import defaultdict from typing import Any, Dict, List, Optional import jsonschema -from channels.generic.websocket import AsyncJsonWebsocketConsumer +import lz4.frame +from channels.generic.websocket import AsyncWebsocketConsumer +from django.conf import settings from websockets.exceptions import ConnectionClosed from .autoupdate import AutoupdateFormat from .cache import element_cache +from .stats import WebsocketThroughputLogger from .utils import split_element_id @@ -25,12 +29,46 @@ WEBSOCKET_WRONG_FORMAT = 10 # If the recieved data has not the expected format. -class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): +class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer): + async def receive( + self, + text_data: Optional[str] = None, + bytes_data: Optional[bytes] = None, + **kwargs: Dict[str, Any], + ) -> None: + if bytes_data: + uncompressed_data = lz4.frame.decompress(bytes_data) + text_data = uncompressed_data.decode("utf-8") + + recv_len = len(bytes_data) + uncompressed_len = len(uncompressed_data) + await WebsocketThroughputLogger.receive(uncompressed_len, recv_len) + elif text_data: + uncompressed_len = len(text_data.encode("utf-8")) + await WebsocketThroughputLogger.receive(uncompressed_len) + + if text_data: + await self.receive_json(json.loads(text_data), **kwargs) + + async def send_json(self, content: Any, close: bool = False) -> None: + text_data = json.dumps(content) + + b_text_data = text_data.encode("utf-8") + uncompressed_len = len(b_text_data) + await WebsocketThroughputLogger.send(uncompressed_len) + + await self.send(text_data=text_data, close=close) + + async def receive_json(self, content: str, **kwargs: Dict[str, Any]) -> None: + pass + + +class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer): """ Mixin for JSONWebsocketConsumers, that speaks the a special protocol. """ - async def send_json( + async def send_json( # type: ignore self, type: str, content: Any, @@ -77,7 +115,7 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): silence_errors=silence_errors, ) - async def receive_json(self, content: Any) -> None: + async def receive_json(self, content: Any) -> None: # type: ignore """ Receives the json data, parses it and calls receive_content. """ diff --git a/requirements/production.txt b/requirements/production.txt index 084d70fe2..d0948c463 100644 --- a/requirements/production.txt +++ b/requirements/production.txt @@ -9,6 +9,7 @@ Django>=2.1,<2.3 djangorestframework>=3.4,<3.10 jsonfield2>=3.0,<3.1 jsonschema>=3.0,<3.1 +lz4>=2.1.6 mypy_extensions>=0.4,<0.5 PyPDF2>=1.26,<1.27 roman>=2.0,<3.2 From 6d027f0f75c6da22df4992326c87e2dce623c5be Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Wed, 12 Jun 2019 08:18:37 +0200 Subject: [PATCH 2/2] Compression: Sending compressed messages --- .../core/core-services/websocket.service.ts | 17 ++++++++----- openslides/utils/websocket.py | 15 +++++++++-- tests/integration/utils/test_consumers.py | 5 ++-- tests/integration/websocket.py | 25 +++++++++++++++++++ 4 files changed, 52 insertions(+), 10 deletions(-) create mode 100644 tests/integration/websocket.py diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index 56e61b6dd..79adce478 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -5,7 +5,7 @@ import { Router } from '@angular/router'; import { TranslateService } from '@ngx-translate/core'; import { Observable, Subject } from 'rxjs'; import { take } from 'rxjs/operators'; -import { decompress } from 'lz4js'; +import { compress, decompress } from 'lz4js'; import { formatQueryParams, QueryParams } from '../query-params'; import { OpenSlidesStatusService } from './openslides-status.service'; @@ -282,12 +282,10 @@ export class WebsocketService { ); const textDecoder = new TextDecoder(); data = textDecoder.decode(decompressedBuffer); - } else { - const recvSize = new TextEncoder().encode(data).byteLength; - console.log(`Received ${recvSize} KB uncompressed`); } const message: IncommingWebsocketMessage = JSON.parse(data); + console.log('Received', message); const type = message.type; const inResponse = message.in_response; const callbacks = this.responseCallbacks[inResponse]; @@ -464,10 +462,17 @@ export class WebsocketService { // Either send directly or add to queue, if not connected. const jsonMessage = JSON.stringify(message); + const textEncoder = new TextEncoder(); + const bytesMessage = textEncoder.encode(jsonMessage); + const compressedMessage: ArrayBuffer = compress(bytesMessage); + const ratio = bytesMessage.byteLength / compressedMessage.byteLength; + + const toSend = ratio > 1 ? compressedMessage : jsonMessage; + if (this.isConnected) { - this.websocket.send(jsonMessage); + this.websocket.send(toSend); } else { - this.sendQueueWhileNotConnected.push(jsonMessage); + this.sendQueueWhileNotConnected.push(toSend); } return message.id; diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py index b39638f4b..b8d73cace 100644 --- a/openslides/utils/websocket.py +++ b/openslides/utils/websocket.py @@ -52,12 +52,23 @@ class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer): async def send_json(self, content: Any, close: bool = False) -> None: text_data = json.dumps(content) + bytes_data = None # type: ignore b_text_data = text_data.encode("utf-8") uncompressed_len = len(b_text_data) - await WebsocketThroughputLogger.send(uncompressed_len) - await self.send(text_data=text_data, close=close) + if getattr(settings, "COMPRESSION", True): + compressed_data = lz4.frame.compress(b_text_data) + ratio = len(b_text_data) / len(compressed_data) + if ratio > 1: + bytes_data = compressed_data + text_data = None # type: ignore + await WebsocketThroughputLogger.send(uncompressed_len, len(bytes_data)) + + if not bytes_data: + await WebsocketThroughputLogger.send(uncompressed_len) + + await self.send(text_data=text_data, bytes_data=bytes_data, close=close) async def receive_json(self, content: str, **kwargs: Dict[str, Any]) -> None: pass diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index 05f05e12e..f7d665124 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -1,10 +1,10 @@ import asyncio from importlib import import_module +from typing import Optional from unittest.mock import patch import pytest from asgiref.sync import sync_to_async -from channels.testing import WebsocketCommunicator from django.conf import settings from django.contrib.auth import BACKEND_SESSION_KEY, HASH_SESSION_KEY, SESSION_KEY @@ -20,6 +20,7 @@ from openslides.utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH from ...unit.utils.cache_provider import Collection1, Collection2, get_cachable_provider from ..helpers import TConfig, TProjector, TUser +from ..websocket import WebsocketCommunicator @pytest.fixture(autouse=True) @@ -45,7 +46,7 @@ async def prepare_element_cache(settings): @pytest.fixture async def get_communicator(): - communicator: WebsocketCommunicator = None + communicator: Optional[WebsocketCommunicator] = None def get_communicator(query_string=""): nonlocal communicator # use the outer communicator variable diff --git a/tests/integration/websocket.py b/tests/integration/websocket.py new file mode 100644 index 000000000..7ff15a133 --- /dev/null +++ b/tests/integration/websocket.py @@ -0,0 +1,25 @@ +import json + +import lz4.frame +from channels.testing import WebsocketCommunicator as ChannelsWebsocketCommunicator + + +class WebsocketCommunicator(ChannelsWebsocketCommunicator): + """ + Implements decompression when receiving JSON data. + """ + + async def receive_json_from(self, timeout=1): + """ + Receives a JSON text frame or a compressed JSON bytes object, decompresses and decodes it + """ + payload = await self.receive_from(timeout) + if isinstance(payload, bytes): + # try to decompress the message + uncompressed_data = lz4.frame.decompress(payload) + text_data = uncompressed_data.decode("utf-8") + else: + text_data = payload + + assert isinstance(text_data, str), "JSON data is not a text frame" + return json.loads(text_data)