diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index 56e61b6dd..79adce478 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -5,7 +5,7 @@ import { Router } from '@angular/router'; import { TranslateService } from '@ngx-translate/core'; import { Observable, Subject } from 'rxjs'; import { take } from 'rxjs/operators'; -import { decompress } from 'lz4js'; +import { compress, decompress } from 'lz4js'; import { formatQueryParams, QueryParams } from '../query-params'; import { OpenSlidesStatusService } from './openslides-status.service'; @@ -282,12 +282,10 @@ export class WebsocketService { ); const textDecoder = new TextDecoder(); data = textDecoder.decode(decompressedBuffer); - } else { - const recvSize = new TextEncoder().encode(data).byteLength; - console.log(`Received ${recvSize} KB uncompressed`); } const message: IncommingWebsocketMessage = JSON.parse(data); + console.log('Received', message); const type = message.type; const inResponse = message.in_response; const callbacks = this.responseCallbacks[inResponse]; @@ -464,10 +462,17 @@ export class WebsocketService { // Either send directly or add to queue, if not connected. const jsonMessage = JSON.stringify(message); + const textEncoder = new TextEncoder(); + const bytesMessage = textEncoder.encode(jsonMessage); + const compressedMessage: ArrayBuffer = compress(bytesMessage); + const ratio = bytesMessage.byteLength / compressedMessage.byteLength; + + const toSend = ratio > 1 ? compressedMessage : jsonMessage; + if (this.isConnected) { - this.websocket.send(jsonMessage); + this.websocket.send(toSend); } else { - this.sendQueueWhileNotConnected.push(jsonMessage); + this.sendQueueWhileNotConnected.push(toSend); } return message.id; diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py index b39638f4b..b8d73cace 100644 --- a/openslides/utils/websocket.py +++ b/openslides/utils/websocket.py @@ -52,12 +52,23 @@ class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer): async def send_json(self, content: Any, close: bool = False) -> None: text_data = json.dumps(content) + bytes_data = None # type: ignore b_text_data = text_data.encode("utf-8") uncompressed_len = len(b_text_data) - await WebsocketThroughputLogger.send(uncompressed_len) - await self.send(text_data=text_data, close=close) + if getattr(settings, "COMPRESSION", True): + compressed_data = lz4.frame.compress(b_text_data) + ratio = len(b_text_data) / len(compressed_data) + if ratio > 1: + bytes_data = compressed_data + text_data = None # type: ignore + await WebsocketThroughputLogger.send(uncompressed_len, len(bytes_data)) + + if not bytes_data: + await WebsocketThroughputLogger.send(uncompressed_len) + + await self.send(text_data=text_data, bytes_data=bytes_data, close=close) async def receive_json(self, content: str, **kwargs: Dict[str, Any]) -> None: pass diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index 05f05e12e..f7d665124 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -1,10 +1,10 @@ import asyncio from importlib import import_module +from typing import Optional from unittest.mock import patch import pytest from asgiref.sync import sync_to_async -from channels.testing import WebsocketCommunicator from django.conf import settings from django.contrib.auth import BACKEND_SESSION_KEY, HASH_SESSION_KEY, SESSION_KEY @@ -20,6 +20,7 @@ from openslides.utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH from ...unit.utils.cache_provider import Collection1, Collection2, get_cachable_provider from ..helpers import TConfig, TProjector, TUser +from ..websocket import WebsocketCommunicator @pytest.fixture(autouse=True) @@ -45,7 +46,7 @@ async def prepare_element_cache(settings): @pytest.fixture async def get_communicator(): - communicator: WebsocketCommunicator = None + communicator: Optional[WebsocketCommunicator] = None def get_communicator(query_string=""): nonlocal communicator # use the outer communicator variable diff --git a/tests/integration/websocket.py b/tests/integration/websocket.py new file mode 100644 index 000000000..7ff15a133 --- /dev/null +++ b/tests/integration/websocket.py @@ -0,0 +1,25 @@ +import json + +import lz4.frame +from channels.testing import WebsocketCommunicator as ChannelsWebsocketCommunicator + + +class WebsocketCommunicator(ChannelsWebsocketCommunicator): + """ + Implements decompression when receiving JSON data. + """ + + async def receive_json_from(self, timeout=1): + """ + Receives a JSON text frame or a compressed JSON bytes object, decompresses and decodes it + """ + payload = await self.receive_from(timeout) + if isinstance(payload, bytes): + # try to decompress the message + uncompressed_data = lz4.frame.decompress(payload) + text_data = uncompressed_data.decode("utf-8") + else: + text_data = payload + + assert isinstance(text_data, str), "JSON data is not a text frame" + return json.loads(text_data)