Compression: Sending compressed messages
This commit is contained in:
parent
d9c88c02b3
commit
6d027f0f75
@ -5,7 +5,7 @@ import { Router } from '@angular/router';
|
|||||||
import { TranslateService } from '@ngx-translate/core';
|
import { TranslateService } from '@ngx-translate/core';
|
||||||
import { Observable, Subject } from 'rxjs';
|
import { Observable, Subject } from 'rxjs';
|
||||||
import { take } from 'rxjs/operators';
|
import { take } from 'rxjs/operators';
|
||||||
import { decompress } from 'lz4js';
|
import { compress, decompress } from 'lz4js';
|
||||||
|
|
||||||
import { formatQueryParams, QueryParams } from '../query-params';
|
import { formatQueryParams, QueryParams } from '../query-params';
|
||||||
import { OpenSlidesStatusService } from './openslides-status.service';
|
import { OpenSlidesStatusService } from './openslides-status.service';
|
||||||
@ -282,12 +282,10 @@ export class WebsocketService {
|
|||||||
);
|
);
|
||||||
const textDecoder = new TextDecoder();
|
const textDecoder = new TextDecoder();
|
||||||
data = textDecoder.decode(decompressedBuffer);
|
data = textDecoder.decode(decompressedBuffer);
|
||||||
} else {
|
|
||||||
const recvSize = new TextEncoder().encode(data).byteLength;
|
|
||||||
console.log(`Received ${recvSize} KB uncompressed`);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const message: IncommingWebsocketMessage = JSON.parse(data);
|
const message: IncommingWebsocketMessage = JSON.parse(data);
|
||||||
|
console.log('Received', message);
|
||||||
const type = message.type;
|
const type = message.type;
|
||||||
const inResponse = message.in_response;
|
const inResponse = message.in_response;
|
||||||
const callbacks = this.responseCallbacks[inResponse];
|
const callbacks = this.responseCallbacks[inResponse];
|
||||||
@ -464,10 +462,17 @@ export class WebsocketService {
|
|||||||
// Either send directly or add to queue, if not connected.
|
// Either send directly or add to queue, if not connected.
|
||||||
const jsonMessage = JSON.stringify(message);
|
const jsonMessage = JSON.stringify(message);
|
||||||
|
|
||||||
|
const textEncoder = new TextEncoder();
|
||||||
|
const bytesMessage = textEncoder.encode(jsonMessage);
|
||||||
|
const compressedMessage: ArrayBuffer = compress(bytesMessage);
|
||||||
|
const ratio = bytesMessage.byteLength / compressedMessage.byteLength;
|
||||||
|
|
||||||
|
const toSend = ratio > 1 ? compressedMessage : jsonMessage;
|
||||||
|
|
||||||
if (this.isConnected) {
|
if (this.isConnected) {
|
||||||
this.websocket.send(jsonMessage);
|
this.websocket.send(toSend);
|
||||||
} else {
|
} else {
|
||||||
this.sendQueueWhileNotConnected.push(jsonMessage);
|
this.sendQueueWhileNotConnected.push(toSend);
|
||||||
}
|
}
|
||||||
|
|
||||||
return message.id;
|
return message.id;
|
||||||
|
@ -52,12 +52,23 @@ class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer):
|
|||||||
|
|
||||||
async def send_json(self, content: Any, close: bool = False) -> None:
|
async def send_json(self, content: Any, close: bool = False) -> None:
|
||||||
text_data = json.dumps(content)
|
text_data = json.dumps(content)
|
||||||
|
bytes_data = None # type: ignore
|
||||||
|
|
||||||
b_text_data = text_data.encode("utf-8")
|
b_text_data = text_data.encode("utf-8")
|
||||||
uncompressed_len = len(b_text_data)
|
uncompressed_len = len(b_text_data)
|
||||||
|
|
||||||
|
if getattr(settings, "COMPRESSION", True):
|
||||||
|
compressed_data = lz4.frame.compress(b_text_data)
|
||||||
|
ratio = len(b_text_data) / len(compressed_data)
|
||||||
|
if ratio > 1:
|
||||||
|
bytes_data = compressed_data
|
||||||
|
text_data = None # type: ignore
|
||||||
|
await WebsocketThroughputLogger.send(uncompressed_len, len(bytes_data))
|
||||||
|
|
||||||
|
if not bytes_data:
|
||||||
await WebsocketThroughputLogger.send(uncompressed_len)
|
await WebsocketThroughputLogger.send(uncompressed_len)
|
||||||
|
|
||||||
await self.send(text_data=text_data, close=close)
|
await self.send(text_data=text_data, bytes_data=bytes_data, close=close)
|
||||||
|
|
||||||
async def receive_json(self, content: str, **kwargs: Dict[str, Any]) -> None:
|
async def receive_json(self, content: str, **kwargs: Dict[str, Any]) -> None:
|
||||||
pass
|
pass
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from importlib import import_module
|
from importlib import import_module
|
||||||
|
from typing import Optional
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from asgiref.sync import sync_to_async
|
from asgiref.sync import sync_to_async
|
||||||
from channels.testing import WebsocketCommunicator
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.auth import BACKEND_SESSION_KEY, HASH_SESSION_KEY, SESSION_KEY
|
from django.contrib.auth import BACKEND_SESSION_KEY, HASH_SESSION_KEY, SESSION_KEY
|
||||||
|
|
||||||
@ -20,6 +20,7 @@ from openslides.utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH
|
|||||||
|
|
||||||
from ...unit.utils.cache_provider import Collection1, Collection2, get_cachable_provider
|
from ...unit.utils.cache_provider import Collection1, Collection2, get_cachable_provider
|
||||||
from ..helpers import TConfig, TProjector, TUser
|
from ..helpers import TConfig, TProjector, TUser
|
||||||
|
from ..websocket import WebsocketCommunicator
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
@ -45,7 +46,7 @@ async def prepare_element_cache(settings):
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def get_communicator():
|
async def get_communicator():
|
||||||
communicator: WebsocketCommunicator = None
|
communicator: Optional[WebsocketCommunicator] = None
|
||||||
|
|
||||||
def get_communicator(query_string=""):
|
def get_communicator(query_string=""):
|
||||||
nonlocal communicator # use the outer communicator variable
|
nonlocal communicator # use the outer communicator variable
|
||||||
|
25
tests/integration/websocket.py
Normal file
25
tests/integration/websocket.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
import lz4.frame
|
||||||
|
from channels.testing import WebsocketCommunicator as ChannelsWebsocketCommunicator
|
||||||
|
|
||||||
|
|
||||||
|
class WebsocketCommunicator(ChannelsWebsocketCommunicator):
|
||||||
|
"""
|
||||||
|
Implements decompression when receiving JSON data.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def receive_json_from(self, timeout=1):
|
||||||
|
"""
|
||||||
|
Receives a JSON text frame or a compressed JSON bytes object, decompresses and decodes it
|
||||||
|
"""
|
||||||
|
payload = await self.receive_from(timeout)
|
||||||
|
if isinstance(payload, bytes):
|
||||||
|
# try to decompress the message
|
||||||
|
uncompressed_data = lz4.frame.decompress(payload)
|
||||||
|
text_data = uncompressed_data.decode("utf-8")
|
||||||
|
else:
|
||||||
|
text_data = payload
|
||||||
|
|
||||||
|
assert isinstance(text_data, str), "JSON data is not a text frame"
|
||||||
|
return json.loads(text_data)
|
Loading…
Reference in New Issue
Block a user