Added autoupdate bundle middleware.

This commit is contained in:
Norman Jäckel 2018-01-20 13:57:25 +01:00
parent 3ac072d1a2
commit d381ca36dd
11 changed files with 105 additions and 183 deletions

View File

@ -82,7 +82,7 @@ Core:
- Fixing error when clearing empty chat [#3199].
- Added notify system [#3212].
- Enhanced performance esp. for server restart and first connection of all
clients by refactoring autoupdate, Collection and AccessPermission [#3223].
clients by refactoring autoupdate, Collection and AccessPermission [#3223, #3539].
- Fixes autoupdate bug for a user without user.can_see_name permission [#3233].
- Improved reconnect handling if the server was flushed [#3297].
- Highlight list entries in a light blue, if a related object is projected

View File

@ -33,6 +33,7 @@ MIDDLEWARE = [
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'openslides.utils.autoupdate.AutoupdateBundleMiddleware',
]
ROOT_URLCONF = 'openslides.urls'

View File

@ -1,7 +1,8 @@
import json
import threading
import time
import warnings
from collections import defaultdict
from collections import OrderedDict, defaultdict
from typing import Any, Dict, Generator, Iterable, List, Tuple, Union
from channels import Channel, Group
@ -316,12 +317,23 @@ def send_data_site(message: ChannelMessageFormat) -> None:
send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)})
def to_ordered_dict(d):
"""
Little helper to hash information dict in inform_*_data.
"""
if isinstance(d, dict):
result = OrderedDict([(key, to_ordered_dict(d[key])) for key in sorted(d.keys())])
else:
result = d
return result
def inform_changed_data(instances: Union[Iterable[Model], Model], information: Dict[str, Any]=None) -> None:
"""
Informs the autoupdate system and the caching system about the creation or
update of an element.
update of an element. This is done via the AutoupdateBundleMiddleware.
The argument instances can be one instance or an interable over instances.
The argument instances can be one instance or an iterable over instances.
"""
root_instances = set()
if not isinstance(instances, Iterable):
@ -334,41 +346,37 @@ def inform_changed_data(instances: Union[Iterable[Model], Model], information: D
# Instance has no method get_root_rest_element. Just ignore it.
pass
# Generates an collection element list for the root_instances.
collection_elements = [] # type: List[CollectionElement]
for root_instance in root_instances:
collection_elements.append(
CollectionElement.from_instance(
# Put all collection elements into the autoupdate_bundle.
bundle = autoupdate_bundle.get(threading.get_ident())
if bundle is not None:
# Run autoupdate only if the bundle exists because we are in a request-response-cycle.
for root_instance in root_instances:
collection_element = CollectionElement.from_instance(
root_instance,
information=information))
# If currently there is an open database transaction, then the
# send_autoupdate function is only called, when the transaction is
# commited. If there is currently no transaction, then the function
# is called immediately.
transaction.on_commit(lambda: send_autoupdate(collection_elements))
information=information)
key = root_instance.get_collection_string() + str(root_instance.get_rest_pk()) + str(to_ordered_dict(information))
bundle[key] = collection_element
def inform_deleted_data(elements: Iterable[Tuple[str, int]], information: Dict[str, Any]=None) -> None:
"""
Informs the autoupdate system and the caching system about the deletion of
elements.
elements. This is done via the AutoupdateBundleMiddleware.
The argument information is added to each collection element.
"""
# Go through each pair of collection_string and id and generate a collection
# element from it.
collection_elements = [] # type: List[CollectionElement]
for element in elements:
collection_elements.append(CollectionElement.from_values(
collection_string=element[0],
id=element[1],
deleted=True,
information=information))
# If currently there is an open database transaction, then the
# send_autoupdate function is only called, when the transaction is
# commited. If there is currently no transaction, then the function
# is called immediately.
transaction.on_commit(lambda: send_autoupdate(collection_elements))
# Put all stuff to be deleted into the autoupdate_bundle.
bundle = autoupdate_bundle.get(threading.get_ident())
if bundle is not None:
# Run autoupdate only if the bundle exists because we are in a request-response-cycle.
for element in elements:
collection_element = CollectionElement.from_values(
collection_string=element[0],
id=element[1],
deleted=True,
information=information)
key = element[0] + str(element[1]) + str(to_ordered_dict(information))
bundle[key] = collection_element
def inform_data_collection_element_list(collection_elements: List[CollectionElement],
@ -377,14 +385,45 @@ def inform_data_collection_element_list(collection_elements: List[CollectionElem
Informs the autoupdate system about some collection elements. This is
used just to send some data to all users.
"""
# If currently there is an open database transaction, then the
# send_autoupdate function is only called, when the transaction is
# commited. If there is currently no transaction, then the function
# is called immediately.
transaction.on_commit(lambda: send_autoupdate(collection_elements))
# Put all stuff into the autoupdate_bundle.
bundle = autoupdate_bundle.get(threading.get_ident())
if bundle is not None:
# Run autoupdate only if the bundle exists because we are in a request-response-cycle.
for collection_element in collection_elements:
key = collection_element.collection_string + str(collection_element.id) + str(to_ordered_dict(information))
bundle[key] = collection_element
def send_autoupdate(collection_elements: List[CollectionElement]) -> None:
"""
Global container for autoupdate bundles
"""
autoupdate_bundle = {} # type: Dict[int, Dict[str, CollectionElement]]
class AutoupdateBundleMiddleware:
"""
Middleware to handle autoupdate bundling.
"""
def __init__(self, get_response):
self.get_response = get_response
# One-time configuration and initialization.
def __call__(self, request):
thread_id = threading.get_ident()
autoupdate_bundle[thread_id] = {}
response = self.get_response(request)
bundle = autoupdate_bundle.pop(thread_id) # type: Dict[str, CollectionElement]
# If currently there is an open database transaction, then the
# send_autoupdate function is only called, when the transaction is
# commited. If there is currently no transaction, then the function
# is called immediately.
transaction.on_commit(lambda: send_autoupdate(bundle.values()))
return response
def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> None:
"""
Helper function, that sends collection_elements through a channel to the
autoupdate system.

View File

@ -3,6 +3,7 @@ from typing import (
Any,
Dict,
Generator,
Iterable,
List,
Optional,
Type,
@ -317,7 +318,7 @@ def format_for_autoupdate(collection_string: str, id: int, action: str, data: Di
return output
def to_channel_message(elements: List[CollectionElement]) -> ChannelMessageFormat:
def to_channel_message(elements: Iterable[CollectionElement]) -> ChannelMessageFormat:
"""
Converts a list of collection elements to a dict, that can be send to the
channels system.

View File

@ -12,7 +12,7 @@ from openslides.core.models import Countdown
from openslides.motions.models import Motion
from openslides.topics.models import Topic
from openslides.users.models import User
from openslides.utils.autoupdate import inform_changed_data
from openslides.utils.collection import CollectionElement
from openslides.utils.test import TestCase
@ -207,7 +207,7 @@ class ManageSpeaker(TestCase):
group_delegates = type(group_staff).objects.get(name='Delegates')
admin.groups.add(group_delegates)
admin.groups.remove(group_staff)
inform_changed_data(admin)
CollectionElement.from_instance(admin)
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]),
@ -244,7 +244,7 @@ class ManageSpeaker(TestCase):
group_delegates = type(group_staff).objects.get(name='Delegates')
admin.groups.add(group_delegates)
admin.groups.remove(group_staff)
inform_changed_data(admin)
CollectionElement.from_instance(admin)
speaker = Speaker.objects.add(self.user, self.item)
response = self.client.delete(

View File

@ -7,7 +7,6 @@ from rest_framework.test import APIClient
from openslides.assignments.models import Assignment
from openslides.core.config import config
from openslides.users.models import User
from openslides.utils.autoupdate import inform_changed_data
from openslides.utils.test import TestCase
@ -111,7 +110,7 @@ class CanidatureSelf(TestCase):
group_delegates = type(group_staff).objects.get(name='Delegates')
admin.groups.add(group_delegates)
admin.groups.remove(group_staff)
inform_changed_data(admin)
get_redis_connection('default').flushall()
response = self.client.post(reverse('assignment-candidature-self', args=[self.assignment.pk]))
@ -158,7 +157,7 @@ class CanidatureSelf(TestCase):
group_delegates = type(group_staff).objects.get(name='Delegates')
admin.groups.add(group_delegates)
admin.groups.remove(group_staff)
inform_changed_data(admin)
get_redis_connection('default').flushall()
response = self.client.delete(reverse('assignment-candidature-self', args=[self.assignment.pk]))
@ -239,7 +238,7 @@ class CandidatureOther(TestCase):
group_delegates = type(group_staff).objects.get(name='Delegates')
admin.groups.add(group_delegates)
admin.groups.remove(group_staff)
inform_changed_data(admin)
get_redis_connection('default').flushall()
response = self.client.post(
reverse('assignment-candidature-other', args=[self.assignment.pk]),
@ -295,7 +294,7 @@ class CandidatureOther(TestCase):
group_delegates = type(group_staff).objects.get(name='Delegates')
admin.groups.add(group_delegates)
admin.groups.remove(group_staff)
inform_changed_data(admin)
get_redis_connection('default').flushall()
response = self.client.delete(
reverse('assignment-candidature-other', args=[self.assignment.pk]),

View File

@ -17,7 +17,7 @@ from openslides.motions.models import (
Workflow,
)
from openslides.users.models import Group
from openslides.utils.autoupdate import inform_changed_data
from openslides.utils.collection import CollectionElement
from openslides.utils.test import TestCase
@ -381,7 +381,7 @@ class CreateMotion(TestCase):
self.admin = get_user_model().objects.get(username='admin')
self.admin.groups.add(2)
self.admin.groups.remove(3)
inform_changed_data(self.admin)
get_redis_connection('default').flushall()
response = self.client.post(
reverse('motion-list'),
@ -448,7 +448,7 @@ class RetrieveMotion(TestCase):
state.save()
# The cache has to be cleared, see:
# https://github.com/OpenSlides/OpenSlides/issues/3396
get_redis_connection("default").flushall()
get_redis_connection('default').flushall()
response = guest_client.get(reverse('motion-detail', args=[self.motion.pk]))
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@ -474,15 +474,14 @@ class RetrieveMotion(TestCase):
def test_user_without_can_see_user_permission_to_see_motion_and_submitter_data(self):
self.motion.submitters.add(get_user_model().objects.get(username='admin'))
inform_changed_data(self.motion)
group = Group.objects.get(pk=1) # Group with pk 1 is for anonymous and default users.
permission_string = 'users.can_see_name'
app_label, codename = permission_string.split('.')
permission = group.permissions.get(content_type__app_label=app_label, codename=codename)
group.permissions.remove(permission)
inform_changed_data(group)
config['general_system_enable_anonymous'] = True
guest_client = APIClient()
get_redis_connection('default').flushall()
response_1 = guest_client.get(reverse('motion-detail', args=[self.motion.pk]))
self.assertEqual(response_1.status_code, status.HTTP_200_OK)
@ -492,6 +491,7 @@ class RetrieveMotion(TestCase):
extra_user = get_user_model().objects.create_user(
username='username_wequePhieFoom0hai3wa',
password='password_ooth7taechai5Oocieya')
get_redis_connection('default').flushall()
response_3 = guest_client.get(reverse('user-detail', args=[extra_user.pk]))
self.assertEqual(response_3.status_code, status.HTTP_403_FORBIDDEN)
@ -561,10 +561,10 @@ class UpdateMotion(TestCase):
self.assertFalse(motion.supporters.exists())
def test_removal_of_supporters(self):
# No cache used here.
admin = get_user_model().objects.get(username='admin')
group_staff = admin.groups.get(name='Staff')
admin.groups.remove(group_staff)
inform_changed_data(admin)
self.motion.submitters.add(admin)
supporter = get_user_model().objects.create_user(
username='test_username_ahshi4oZin0OoSh9chee',
@ -572,6 +572,7 @@ class UpdateMotion(TestCase):
self.motion.supporters.add(supporter)
config['motions_remove_supporters'] = True
self.assertEqual(self.motion.supporters.count(), 1)
get_redis_connection('default').flushall()
response = self.client.patch(
reverse('motion-detail', args=[self.motion.pk]),
@ -629,7 +630,7 @@ class DeleteMotion(TestCase):
group_delegates = Group.objects.get(name='Delegates')
self.admin.groups.remove(group_staff)
self.admin.groups.add(group_delegates)
inform_changed_data(self.admin)
CollectionElement.from_instance(self.admin)
def put_motion_in_complex_workflow(self):
workflow = Workflow.objects.get(name='Complex Workflow')
@ -793,6 +794,7 @@ class SupportMotion(TestCase):
def test_support(self):
config['motions_min_supporters'] = 1
get_redis_connection('default').flushall()
response = self.client.post(reverse('motion-support', args=[self.motion.pk]))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, {'detail': 'You have supported this motion successfully.'})

View File

@ -1,131 +0,0 @@
from unittest.mock import patch
from channels import DEFAULT_CHANNEL_LAYER
from channels.asgi import channel_layers
from channels.tests import ChannelTestCase
from django.contrib.auth.models import Group
from openslides.assignments.models import Assignment
from openslides.topics.models import Topic
from openslides.utils.autoupdate import (
inform_changed_data,
inform_deleted_data,
)
@patch('openslides.utils.autoupdate.transaction.on_commit', lambda func: func())
class TestsInformChangedData(ChannelTestCase):
# on_commit does not work with Djangos TestCase, see:
# https://docs.djangoproject.com/en/1.10/topics/db/transactions/#use-in-tests
# In this case it is also not possible to use a TransactionTestCase because
# we have to use the ChannelTestCase.
# The patch in the class decorator changes on_commit, so the given callable
# is called immediately. This is the same behavior as if there would be
# no transaction at all.
def test_change_one_element(self):
topic = Topic.objects.create(title='test_topic')
channel_layers[DEFAULT_CHANNEL_LAYER].flush()
inform_changed_data(topic)
channel_message = self.get_next_message('autoupdate.send_data_projector', require=True)
self.assertEqual(len(channel_message['elements']), 1)
self.assertEqual(
channel_message['elements'][0]['collection_string'],
'topics/topic')
channel_message = self.get_next_message('autoupdate.send_data_site', require=True)
self.assertEqual(len(channel_message['elements']), 1)
self.assertEqual(
channel_message['elements'][0]['collection_string'],
'topics/topic')
def test_change_many_elements(self):
topics = (
Topic.objects.create(title='test_topic1'),
Topic.objects.create(title='test_topic2'),
Topic.objects.create(title='test_topic3'))
channel_layers[DEFAULT_CHANNEL_LAYER].flush()
inform_changed_data(topics)
channel_message = self.get_next_message('autoupdate.send_data_projector', require=True)
self.assertEqual(len(channel_message['elements']), 3)
channel_message = self.get_next_message('autoupdate.send_data_site', require=True)
self.assertEqual(len(channel_message['elements']), 3)
def test_change_with_non_root_rest_elements(self):
"""
Tests that if an root_rest_element is called together with one of its
child elements, then there is only the root_rest_element in the channel
message.
"""
assignment = Assignment.objects.create(title='test_assignment', open_posts=1)
poll = assignment.create_poll()
channel_layers[DEFAULT_CHANNEL_LAYER].flush()
inform_changed_data((assignment, poll))
channel_message = self.get_next_message('autoupdate.send_data_projector', require=True)
self.assertEqual(len(channel_message['elements']), 1)
channel_message = self.get_next_message('autoupdate.send_data_site', require=True)
self.assertEqual(len(channel_message['elements']), 1)
def test_change_only_non_root_rest_element(self):
"""
Tests that if only a non root_rest_element is called, then only the
root_rest_element is in the channel.
"""
assignment = Assignment.objects.create(title='test_assignment', open_posts=1)
poll = assignment.create_poll()
channel_layers[DEFAULT_CHANNEL_LAYER].flush()
inform_changed_data(poll)
channel_message = self.get_next_message('autoupdate.send_data_projector', require=True)
self.assertEqual(len(channel_message['elements']), 1)
channel_message = self.get_next_message('autoupdate.send_data_site', require=True)
self.assertEqual(len(channel_message['elements']), 1)
def test_change_no_autoupdate_model(self):
"""
Tests that if inform_changed_data() is called with a model that does
not support autoupdate, nothing happens. We use the django Group for
this (not the OpenSlides Group)
"""
group = Group.objects.create(name='test_group')
channel_layers[DEFAULT_CHANNEL_LAYER].flush()
inform_changed_data(group)
with self.assertRaises(AssertionError):
# self.get_next_message() with require=True raises a AssertionError
# if there is no message in the channel
self.get_next_message('autoupdate.send_data_projector', require=True)
with self.assertRaises(AssertionError):
# self.get_next_message() with require=True raises a AssertionError
# if there is no message in the channel
self.get_next_message('autoupdate.send_data_site', require=True)
def test_delete_one_element(self):
channel_layers[DEFAULT_CHANNEL_LAYER].flush()
inform_deleted_data([('topics/topic', 1)])
channel_message = self.get_next_message('autoupdate.send_data_projector', require=True)
self.assertEqual(len(channel_message['elements']), 1)
self.assertTrue(channel_message['elements'][0]['deleted'])
channel_message = self.get_next_message('autoupdate.send_data_site', require=True)
self.assertEqual(len(channel_message['elements']), 1)
self.assertTrue(channel_message['elements'][0]['deleted'])
def test_delete_many_elements(self):
channel_layers[DEFAULT_CHANNEL_LAYER].flush()
inform_deleted_data([('topics/topic', 1), ('topics/topic', 2), ('testmodule/model', 1)])
channel_message = self.get_next_message('autoupdate.send_data_projector', require=True)
self.assertEqual(len(channel_message['elements']), 3)
channel_message = self.get_next_message('autoupdate.send_data_site', require=True)
self.assertEqual(len(channel_message['elements']), 3)

View File

@ -62,6 +62,7 @@ class TestCollectionCache(TestCase):
"""
Tests that no db query is used when the list is received twice.
"""
get_redis_connection("default").flushall()
Topic.objects.create(title='test topic1')
Topic.objects.create(title='test topic2')
Topic.objects.create(title='test topic3')
@ -83,6 +84,7 @@ class TestCollectionCache(TestCase):
topic_collection = collection.Collection('topics/topic')
list(topic_collection.get_full_data())
collection.CollectionElement.from_instance(topic3, deleted=True)
topic3.delete()
with self.assertNumQueries(0):

View File

@ -1,3 +1,5 @@
from django_redis import get_redis_connection
from openslides.core.config import ConfigVariable, config
from openslides.core.exceptions import ConfigError, ConfigNotFound
from openslides.utils.test import TestCase
@ -55,6 +57,7 @@ class HandleConfigTest(TestCase):
def test_change_config_value(self):
self.assertEqual(config['string_var'], 'default_string_rien4ooCZieng6ah')
config['string_var'] = 'other_special_unique_string dauTex9eAiy7jeen'
get_redis_connection('default').flushall()
self.assertEqual(config['string_var'], 'other_special_unique_string dauTex9eAiy7jeen')
def test_missing_cache_(self):

View File

@ -1,3 +1,5 @@
from django_redis import get_redis_connection
from openslides.core.config import config
from openslides.motions.exceptions import WorkflowError
from openslides.motions.models import Motion, State, Workflow
@ -130,6 +132,7 @@ class ModelTest(TestCase):
def test_is_amendment(self):
config['motions_amendments_enabled'] = True
get_redis_connection('default').flushall()
amendment = Motion.objects.create(title='amendment', parent=self.motion)
self.assertTrue(amendment.is_amendment())
@ -150,6 +153,7 @@ class ModelTest(TestCase):
If the config is set to manually, the method does nothing.
"""
config['motions_identifier'] = 'manually'
get_redis_connection("default").flushall()
motion = Motion()
motion.set_identifier()
@ -165,6 +169,7 @@ class ModelTest(TestCase):
config['motions_amendments_enabled'] = True
self.motion.identifier = 'Parent identifier'
self.motion.save()
get_redis_connection("default").flushall()
motion = Motion(parent=self.motion)
motion.set_identifier()
@ -179,6 +184,7 @@ class ModelTest(TestCase):
config['motions_amendments_enabled'] = True
self.motion.identifier = 'Parent identifier'
self.motion.save()
get_redis_connection("default").flushall()
Motion.objects.create(title='Amendment1', parent=self.motion)
motion = Motion(parent=self.motion)