Merge pull request #4753 from FinnStutzenstein/compression

Compression
This commit is contained in:
Emanuel Schütze 2019-06-12 14:36:31 +02:00 committed by GitHub
commit a570cf16b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 186 additions and 11 deletions

View File

@ -47,6 +47,7 @@
"exceljs": "1.9.1",
"file-saver": "^2.0.1",
"hammerjs": "^2.0.8",
"lz4js": "^0.2.0",
"material-icon-font": "git+https://github.com/petergng/materialIconFont.git",
"ng-pick-datetime": "^7.0.0",
"ng2-pdf-viewer": "^5.2.3",

View File

@ -5,6 +5,7 @@ import { Router } from '@angular/router';
import { TranslateService } from '@ngx-translate/core';
import { Observable, Subject } from 'rxjs';
import { take } from 'rxjs/operators';
import { compress, decompress } from 'lz4js';
import { formatQueryParams, QueryParams } from '../query-params';
import { OpenSlidesStatusService } from './openslides-status.service';
@ -136,7 +137,7 @@ export class WebsocketService {
return this._connectionOpen;
}
private sendQueueWhileNotConnected: string[] = [];
private sendQueueWhileNotConnected: (string | ArrayBuffer)[] = [];
/**
* The websocket.
@ -219,6 +220,7 @@ export class WebsocketService {
socketPath += formatQueryParams(queryParams);
this.websocket = new WebSocket(socketPath);
this.websocket.binaryType = 'arraybuffer';
// connection established. If this connect attept was a retry,
// The error notice will be removed and the reconnectSubject is published.
@ -270,8 +272,20 @@ export class WebsocketService {
*
* @param data The message
*/
private handleMessage(data: string): void {
private handleMessage(data: string | ArrayBuffer): void {
if (data instanceof ArrayBuffer) {
const compressedSize = data.byteLength;
const decompressedBuffer: Uint8Array = decompress(new Uint8Array(data));
console.log(
`Recieved ${compressedSize / 1024} KB (${decompressedBuffer.byteLength /
1024} KB uncompressed), ratio ${decompressedBuffer.byteLength / compressedSize}`
);
const textDecoder = new TextDecoder();
data = textDecoder.decode(decompressedBuffer);
}
const message: IncommingWebsocketMessage = JSON.parse(data);
console.log('Received', message);
const type = message.type;
const inResponse = message.in_response;
const callbacks = this.responseCallbacks[inResponse];
@ -447,10 +461,18 @@ export class WebsocketService {
// Either send directly or add to queue, if not connected.
const jsonMessage = JSON.stringify(message);
const textEncoder = new TextEncoder();
const bytesMessage = textEncoder.encode(jsonMessage);
const compressedMessage: ArrayBuffer = compress(bytesMessage);
const ratio = bytesMessage.byteLength / compressedMessage.byteLength;
const toSend = ratio > 1 ? compressedMessage : jsonMessage;
if (this.isConnected) {
this.websocket.send(jsonMessage);
this.websocket.send(toSend);
} else {
this.sendQueueWhileNotConnected.push(jsonMessage);
this.sendQueueWhileNotConnected.push(toSend);
}
return message.id;

View File

@ -1,7 +1,7 @@
import asyncio
import logging
import time
from typing import List
from typing import List, Optional
class WebsocketLatencyLogger:
@ -60,3 +60,79 @@ class WebsocketLatencyLogger:
""" Resets the stats. """
self.latencies: List[int] = []
self.time = time.time()
class WebsocketThroughputLogger:
"""
Usage (give values in bytes):
- WebsocketThroughputLogger.send(<uncompressed>, <compressed>)
- WebsocketThroughputLogger.recieve(<uncompressed>, <compressed>)
The compressed value is optional. If the data is not compressed, just
give the uncompressed value.
Note: Only the send values are logged in KB (received values in bytes).
"""
lock = asyncio.Lock()
""" To access the stats variables. """
instance = None
""" The only throughputlogger instance. """
logger = logging.getLogger("openslides.websocket.throughput")
""" The logger to log to. """
def __init__(self) -> None:
self.reset()
@classmethod
async def send(cls, uncompressed: int, compressed: Optional[int] = None) -> None:
# pass the latency value to the single instance
async with cls.lock:
if cls.instance is None:
cls.instance = cls()
if compressed is None:
compressed = uncompressed
cls.instance.send_uncompressed += int(uncompressed / 1024)
cls.instance.send_compressed += int(compressed / 1024)
await cls.instance.check_and_flush()
@classmethod
async def receive(cls, uncompressed: int, compressed: Optional[int] = None) -> None:
# pass the latency value to the single instance
async with cls.lock:
if cls.instance is None:
cls.instance = cls()
if compressed is None:
compressed = uncompressed
cls.instance.receive_uncompressed += uncompressed
cls.instance.receive_compressed += compressed
await cls.instance.check_and_flush()
async def check_and_flush(self) -> None:
# If we waited longer then 60 seconds, flush the data.
current_time = time.time()
if current_time > (self.time + 20):
send_ratio = receive_ratio = 1.0
if self.send_compressed > 0:
send_ratio = self.send_uncompressed / self.send_compressed
if self.receive_compressed > 0:
receive_ratio = self.receive_uncompressed / self.receive_compressed
self.logger.debug(
f"tx_uncompressed={self.send_uncompressed} KB, "
f"tx_compressed={self.send_compressed} KB, "
f"tx_ratio={send_ratio:.2f}, "
f"rx_uncompressed={self.receive_uncompressed} B, "
f"rx_compressed={self.receive_compressed} B, "
f"rx_ratio={receive_ratio:.2f}"
)
self.reset()
def reset(self) -> None:
""" Resets the stats. """
self.send_compressed = 0
self.send_uncompressed = 0
self.receive_compressed = 0
self.receive_uncompressed = 0
self.time = time.time()

View File

@ -1,12 +1,16 @@
import json
from collections import defaultdict
from typing import Any, Dict, List, Optional
import jsonschema
from channels.generic.websocket import AsyncJsonWebsocketConsumer
import lz4.frame
from channels.generic.websocket import AsyncWebsocketConsumer
from django.conf import settings
from websockets.exceptions import ConnectionClosed
from .autoupdate import AutoupdateFormat
from .cache import element_cache
from .stats import WebsocketThroughputLogger
from .utils import split_element_id
@ -25,12 +29,57 @@ WEBSOCKET_WRONG_FORMAT = 10
# If the recieved data has not the expected format.
class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer):
async def receive(
self,
text_data: Optional[str] = None,
bytes_data: Optional[bytes] = None,
**kwargs: Dict[str, Any],
) -> None:
if bytes_data:
uncompressed_data = lz4.frame.decompress(bytes_data)
text_data = uncompressed_data.decode("utf-8")
recv_len = len(bytes_data)
uncompressed_len = len(uncompressed_data)
await WebsocketThroughputLogger.receive(uncompressed_len, recv_len)
elif text_data:
uncompressed_len = len(text_data.encode("utf-8"))
await WebsocketThroughputLogger.receive(uncompressed_len)
if text_data:
await self.receive_json(json.loads(text_data), **kwargs)
async def send_json(self, content: Any, close: bool = False) -> None:
text_data = json.dumps(content)
bytes_data = None # type: ignore
b_text_data = text_data.encode("utf-8")
uncompressed_len = len(b_text_data)
if getattr(settings, "COMPRESSION", True):
compressed_data = lz4.frame.compress(b_text_data)
ratio = len(b_text_data) / len(compressed_data)
if ratio > 1:
bytes_data = compressed_data
text_data = None # type: ignore
await WebsocketThroughputLogger.send(uncompressed_len, len(bytes_data))
if not bytes_data:
await WebsocketThroughputLogger.send(uncompressed_len)
await self.send(text_data=text_data, bytes_data=bytes_data, close=close)
async def receive_json(self, content: str, **kwargs: Dict[str, Any]) -> None:
pass
class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer):
"""
Mixin for JSONWebsocketConsumers, that speaks the a special protocol.
"""
async def send_json(
async def send_json( # type: ignore
self,
type: str,
content: Any,
@ -77,7 +126,7 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
silence_errors=silence_errors,
)
async def receive_json(self, content: Any) -> None:
async def receive_json(self, content: Any) -> None: # type: ignore
"""
Receives the json data, parses it and calls receive_content.
"""

View File

@ -9,6 +9,7 @@ Django>=2.1,<2.3
djangorestframework>=3.4,<3.10
jsonfield2>=3.0,<3.1
jsonschema>=3.0,<3.1
lz4>=2.1.6
mypy_extensions>=0.4,<0.5
PyPDF2>=1.26,<1.27
roman>=2.0,<3.2

View File

@ -1,10 +1,10 @@
import asyncio
from importlib import import_module
from typing import Optional
from unittest.mock import patch
import pytest
from asgiref.sync import sync_to_async
from channels.testing import WebsocketCommunicator
from django.conf import settings
from django.contrib.auth import BACKEND_SESSION_KEY, HASH_SESSION_KEY, SESSION_KEY
@ -20,6 +20,7 @@ from openslides.utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH
from ...unit.utils.cache_provider import Collection1, Collection2, get_cachable_provider
from ..helpers import TConfig, TProjector, TUser
from ..websocket import WebsocketCommunicator
@pytest.fixture(autouse=True)
@ -45,7 +46,7 @@ async def prepare_element_cache(settings):
@pytest.fixture
async def get_communicator():
communicator: WebsocketCommunicator = None
communicator: Optional[WebsocketCommunicator] = None
def get_communicator(query_string=""):
nonlocal communicator # use the outer communicator variable

View File

@ -0,0 +1,25 @@
import json
import lz4.frame
from channels.testing import WebsocketCommunicator as ChannelsWebsocketCommunicator
class WebsocketCommunicator(ChannelsWebsocketCommunicator):
"""
Implements decompression when receiving JSON data.
"""
async def receive_json_from(self, timeout=1):
"""
Receives a JSON text frame or a compressed JSON bytes object, decompresses and decodes it
"""
payload = await self.receive_from(timeout)
if isinstance(payload, bytes):
# try to decompress the message
uncompressed_data = lz4.frame.decompress(payload)
text_data = uncompressed_data.decode("utf-8")
else:
text_data = payload
assert isinstance(text_data, str), "JSON data is not a text frame"
return json.loads(text_data)