Compression: Recieving compressed messages

This commit is contained in:
FinnStutzenstein 2019-06-12 08:17:25 +02:00
parent 8f2160d44e
commit d9c88c02b3
5 changed files with 140 additions and 7 deletions

View File

@ -47,6 +47,7 @@
"exceljs": "1.9.1",
"file-saver": "^2.0.1",
"hammerjs": "^2.0.8",
"lz4js": "^0.2.0",
"material-icon-font": "git+https://github.com/petergng/materialIconFont.git",
"ng-pick-datetime": "^7.0.0",
"ng2-pdf-viewer": "^5.2.3",

View File

@ -5,6 +5,7 @@ import { Router } from '@angular/router';
import { TranslateService } from '@ngx-translate/core';
import { Observable, Subject } from 'rxjs';
import { take } from 'rxjs/operators';
import { decompress } from 'lz4js';
import { formatQueryParams, QueryParams } from '../query-params';
import { OpenSlidesStatusService } from './openslides-status.service';
@ -136,7 +137,7 @@ export class WebsocketService {
return this._connectionOpen;
}
private sendQueueWhileNotConnected: string[] = [];
private sendQueueWhileNotConnected: (string | ArrayBuffer)[] = [];
/**
* The websocket.
@ -219,6 +220,7 @@ export class WebsocketService {
socketPath += formatQueryParams(queryParams);
this.websocket = new WebSocket(socketPath);
this.websocket.binaryType = 'arraybuffer';
// connection established. If this connect attept was a retry,
// The error notice will be removed and the reconnectSubject is published.
@ -270,7 +272,21 @@ export class WebsocketService {
*
* @param data The message
*/
private handleMessage(data: string): void {
private handleMessage(data: string | ArrayBuffer): void {
if (data instanceof ArrayBuffer) {
const compressedSize = data.byteLength;
const decompressedBuffer: Uint8Array = decompress(new Uint8Array(data));
console.log(
`Recieved ${compressedSize / 1024} KB (${decompressedBuffer.byteLength /
1024} KB uncompressed), ratio ${decompressedBuffer.byteLength / compressedSize}`
);
const textDecoder = new TextDecoder();
data = textDecoder.decode(decompressedBuffer);
} else {
const recvSize = new TextEncoder().encode(data).byteLength;
console.log(`Received ${recvSize} KB uncompressed`);
}
const message: IncommingWebsocketMessage = JSON.parse(data);
const type = message.type;
const inResponse = message.in_response;
@ -447,6 +463,7 @@ export class WebsocketService {
// Either send directly or add to queue, if not connected.
const jsonMessage = JSON.stringify(message);
if (this.isConnected) {
this.websocket.send(jsonMessage);
} else {

View File

@ -1,7 +1,7 @@
import asyncio
import logging
import time
from typing import List
from typing import List, Optional
class WebsocketLatencyLogger:
@ -60,3 +60,79 @@ class WebsocketLatencyLogger:
""" Resets the stats. """
self.latencies: List[int] = []
self.time = time.time()
class WebsocketThroughputLogger:
"""
Usage (give values in bytes):
- WebsocketThroughputLogger.send(<uncompressed>, <compressed>)
- WebsocketThroughputLogger.recieve(<uncompressed>, <compressed>)
The compressed value is optional. If the data is not compressed, just
give the uncompressed value.
Note: Only the send values are logged in KB (received values in bytes).
"""
lock = asyncio.Lock()
""" To access the stats variables. """
instance = None
""" The only throughputlogger instance. """
logger = logging.getLogger("openslides.websocket.throughput")
""" The logger to log to. """
def __init__(self) -> None:
self.reset()
@classmethod
async def send(cls, uncompressed: int, compressed: Optional[int] = None) -> None:
# pass the latency value to the single instance
async with cls.lock:
if cls.instance is None:
cls.instance = cls()
if compressed is None:
compressed = uncompressed
cls.instance.send_uncompressed += int(uncompressed / 1024)
cls.instance.send_compressed += int(compressed / 1024)
await cls.instance.check_and_flush()
@classmethod
async def receive(cls, uncompressed: int, compressed: Optional[int] = None) -> None:
# pass the latency value to the single instance
async with cls.lock:
if cls.instance is None:
cls.instance = cls()
if compressed is None:
compressed = uncompressed
cls.instance.receive_uncompressed += uncompressed
cls.instance.receive_compressed += compressed
await cls.instance.check_and_flush()
async def check_and_flush(self) -> None:
# If we waited longer then 60 seconds, flush the data.
current_time = time.time()
if current_time > (self.time + 20):
send_ratio = receive_ratio = 1.0
if self.send_compressed > 0:
send_ratio = self.send_uncompressed / self.send_compressed
if self.receive_compressed > 0:
receive_ratio = self.receive_uncompressed / self.receive_compressed
self.logger.debug(
f"tx_uncompressed={self.send_uncompressed} KB, "
f"tx_compressed={self.send_compressed} KB, "
f"tx_ratio={send_ratio:.2f}, "
f"rx_uncompressed={self.receive_uncompressed} B, "
f"rx_compressed={self.receive_compressed} B, "
f"rx_ratio={receive_ratio:.2f}"
)
self.reset()
def reset(self) -> None:
""" Resets the stats. """
self.send_compressed = 0
self.send_uncompressed = 0
self.receive_compressed = 0
self.receive_uncompressed = 0
self.time = time.time()

View File

@ -1,12 +1,16 @@
import json
from collections import defaultdict
from typing import Any, Dict, List, Optional
import jsonschema
from channels.generic.websocket import AsyncJsonWebsocketConsumer
import lz4.frame
from channels.generic.websocket import AsyncWebsocketConsumer
from django.conf import settings
from websockets.exceptions import ConnectionClosed
from .autoupdate import AutoupdateFormat
from .cache import element_cache
from .stats import WebsocketThroughputLogger
from .utils import split_element_id
@ -25,12 +29,46 @@ WEBSOCKET_WRONG_FORMAT = 10
# If the recieved data has not the expected format.
class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer):
async def receive(
self,
text_data: Optional[str] = None,
bytes_data: Optional[bytes] = None,
**kwargs: Dict[str, Any],
) -> None:
if bytes_data:
uncompressed_data = lz4.frame.decompress(bytes_data)
text_data = uncompressed_data.decode("utf-8")
recv_len = len(bytes_data)
uncompressed_len = len(uncompressed_data)
await WebsocketThroughputLogger.receive(uncompressed_len, recv_len)
elif text_data:
uncompressed_len = len(text_data.encode("utf-8"))
await WebsocketThroughputLogger.receive(uncompressed_len)
if text_data:
await self.receive_json(json.loads(text_data), **kwargs)
async def send_json(self, content: Any, close: bool = False) -> None:
text_data = json.dumps(content)
b_text_data = text_data.encode("utf-8")
uncompressed_len = len(b_text_data)
await WebsocketThroughputLogger.send(uncompressed_len)
await self.send(text_data=text_data, close=close)
async def receive_json(self, content: str, **kwargs: Dict[str, Any]) -> None:
pass
class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer):
"""
Mixin for JSONWebsocketConsumers, that speaks the a special protocol.
"""
async def send_json(
async def send_json( # type: ignore
self,
type: str,
content: Any,
@ -77,7 +115,7 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
silence_errors=silence_errors,
)
async def receive_json(self, content: Any) -> None:
async def receive_json(self, content: Any) -> None: # type: ignore
"""
Receives the json data, parses it and calls receive_content.
"""

View File

@ -9,6 +9,7 @@ Django>=2.1,<2.3
djangorestframework>=3.4,<3.10
jsonfield2>=3.0,<3.1
jsonschema>=3.0,<3.1
lz4>=2.1.6
mypy_extensions>=0.4,<0.5
PyPDF2>=1.26,<1.27
roman>=2.0,<3.2