Merge pull request #1526 from normanjaeckel/AgendaSpeakerREST

Added view to add and remove users from the list of speakers.
This commit is contained in:
Norman Jäckel 2015-05-29 13:08:10 +02:00
commit 5de7365c9a
3 changed files with 411 additions and 2 deletions

View File

@ -4,6 +4,7 @@ from cgi import escape
from collections import defaultdict from collections import defaultdict
from json import dumps from json import dumps
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.contrib.staticfiles.templatetags.staticfiles import static from django.contrib.staticfiles.templatetags.staticfiles import static
from django.template.loader import render_to_string from django.template.loader import render_to_string
@ -18,8 +19,15 @@ from openslides.projector.api import (
get_active_object, get_active_object,
get_projector_overlays_js, get_projector_overlays_js,
get_overlays) get_overlays)
from openslides.utils.exceptions import OpenSlidesError
from openslides.utils.pdf import stylesheet from openslides.utils.pdf import stylesheet
from openslides.utils.rest_api import ModelViewSet, list_route, Response from openslides.utils.rest_api import (
ModelViewSet,
Response,
ValidationError,
detail_route,
list_route,
)
from openslides.utils.views import ( from openslides.utils.views import (
AjaxMixin, AjaxMixin,
PDFView, PDFView,
@ -27,7 +35,7 @@ from openslides.utils.views import (
SingleObjectMixin, SingleObjectMixin,
TemplateView) TemplateView)
from .models import Item from .models import Item, Speaker
from .serializers import ItemSerializer from .serializers import ItemSerializer
@ -208,6 +216,137 @@ class ItemViewSet(ModelViewSet):
queryset = queryset.exclude(type__exact=Item.ORGANIZATIONAL_ITEM) queryset = queryset.exclude(type__exact=Item.ORGANIZATIONAL_ITEM)
return queryset return queryset
@detail_route(methods=['POST', 'DELETE'])
def manage_speaker(self, request, pk=None):
"""
Special view endpoint to add users to the list of speakers or remove
them. Send POST {'user': <user_id>} to add a new speaker. Omit
data to add yourself. Send DELETE {'speaker': <speaker_id>} to remove
someone from the list of speakers. Omit data to remove yourself.
Checks also whether the requesting user can do this. He needs at
least the permissions 'agenda.can_see' (see
self.check_permission()). In case of adding himself the permission
'agenda.can_be_speaker' is required. In case of adding someone else
the permission 'agenda.can_manage' is required. In case of removing
someone else 'agenda.can_manage' is required. In case of removing
himself no other permission is required.
"""
# Retrieve item.
item = self.get_object()
if request.method == 'POST':
# Retrieve user_id
user_id = request.data.get('user')
# Check permissions and other conditions. Get user instance.
if user_id is None:
# Add oneself
if not self.request.user.has_perm('agenda.can_be_speaker'):
self.permission_denied(request)
if item.speaker_list_closed:
raise ValidationError({'detail': _('The list of speakers is closed.')})
user = self.request.user
else:
# Add someone else.
if not self.request.user.has_perm('agenda.can_manage'):
self.permission_denied(request)
try:
user = get_user_model().objects.get(pk=int(user_id))
except (ValueError, get_user_model().DoesNotExist):
raise ValidationError({'detail': _('User does not exist.')})
# Try to add the user. This ensurse that a user is not twice in the
# list of coming speakers.
try:
Speaker.objects.add(user, item)
except OpenSlidesError as e:
raise ValidationError({'detail': e})
message = _('User %s was successfully added to the list of speakers.') % user
else:
# request.method == 'DELETE'
# Retrieve speaker_id
speaker_id = request.data.get('speaker')
# Check permissions and other conditions. Get speaker instance.
if speaker_id is None:
# Remove oneself
queryset = Speaker.objects.filter(
item=item, user=self.request.user).exclude(weight=None)
try:
# We assume that there aren't multiple entries because this
# is forbidden by the Manager's add method. We assume that
# there is only one speaker instance or none.
speaker = queryset.get()
except Speaker.DoesNotExist:
raise ValidationError({'detail': _('You are not on the list of speakers.')})
else:
# Remove someone else.
if not self.request.user.has_perm('agenda.can_manage'):
self.permission_denied(request)
try:
speaker = Speaker.objects.get(pk=int(speaker_id))
except (ValueError, Speaker.DoesNotExist):
raise ValidationError({'detail': _('Speaker does not exist.')})
# Delete the speaker.
speaker.delete()
message = _('Speaker %s was successfully removed from the list of speakers.') % speaker
# Initiate response.
return Response({'detail': message})
@detail_route(methods=['PUT', 'DELETE'])
def speak(self, request, pk=None):
"""
Special view endpoint to begin and end speach of speakers. Send PUT
{'speaker': <speaker_id>} to begin speach. Omit data to begin speach of
the next speaker. Send DELETE to end speach of current speaker.
Checks also whether the requesting user can do this. He needs at
least the permissions 'agenda.can_see' (see
self.check_permission()). Also the permission 'agenda.can_manage'
is required.
"""
# Check permission.
if not self.request.user.has_perm('agenda.can_manage'):
self.permission_denied(request)
# Retrieve item.
item = self.get_object()
if request.method == 'PUT':
# Retrieve speaker_id
speaker_id = request.data.get('speaker')
if speaker_id is None:
speaker = item.get_next_speaker()
if speaker is None:
raise ValidationError({'detail': _('The list of speakers is empty.')})
else:
try:
speaker = Speaker.objects.get(pk=int(speaker_id))
except (ValueError, Speaker.DoesNotExist):
raise ValidationError({'detail': _('Speaker does not exist.')})
speaker.begin_speach()
message = _('User is now speaking.')
else:
# request.method == 'DELETE'
try:
# We assume that there aren't multiple entries because this
# is forbidden by the Model's begin_speach method. We assume that
# there is only one speaker instance or none.
current_speaker = Speaker.objects.filter(item=item, end_time=None).exclude(begin_time=None).get()
except Speaker.DoesNotExist:
raise ValidationError(
{'detail': _('There is no one speaking at the moment according to %(item)s.') % {'item': item}})
current_speaker.end_speach()
message = _('The speach is finished now.')
# Initiate response.
return Response({'detail': message})
@list_route(methods=['get', 'put']) @list_route(methods=['get', 'put'])
def tree(self, request): def tree(self, request):
""" """

View File

@ -0,0 +1,179 @@
from django.contrib.auth import get_user_model
from django.core.urlresolvers import reverse
from rest_framework.test import APIClient
from openslides.agenda.models import Item, Speaker
from openslides.utils.test import TestCase
class ManageSpeaker(TestCase):
"""
Tests managing speakers.
"""
def setUp(self):
self.client = APIClient()
self.client.login(username='admin', password='admin')
self.item = Item.objects.create(title='test_title_aZaedij4gohn5eeQu8fe')
self.user = get_user_model().objects.create_user(
username='test_user_jooSaex1bo5ooPhuphae',
password='test_password_e6paev4zeeh9n')
def test_add_oneself(self):
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]))
self.assertEqual(response.status_code, 200)
self.assertTrue(Speaker.objects.all().exists())
def test_add_oneself_twice(self):
Speaker.objects.add(get_user_model().objects.get(username='admin'), self.item)
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]))
self.assertEqual(response.status_code, 400)
def test_add_oneself_when_closed(self):
self.item.speaker_list_closed = True
self.item.save()
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]))
self.assertEqual(response.status_code, 400)
def test_remove_oneself(self):
Speaker.objects.add(get_user_model().objects.get(username='admin'), self.item)
response = self.client.delete(
reverse('item-manage-speaker', args=[self.item.pk]))
self.assertEqual(response.status_code, 200)
self.assertFalse(Speaker.objects.all().exists())
def test_remove_self_not_on_list(self):
response = self.client.delete(
reverse('item-manage-speaker', args=[self.item.pk]))
self.assertEqual(response.status_code, 400)
def test_add_someone_else(self):
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]),
{'user': self.user.pk})
self.assertEqual(response.status_code, 200)
self.assertTrue(Speaker.objects.filter(item=self.item, user=self.user).exists())
def test_invalid_data_string_instead_of_integer(self):
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]),
{'user': 'string_instead_of_integer'})
self.assertEqual(response.status_code, 400)
def test_invalid_data_user_does_not_exist(self):
# ID of a user that does not exist.
# Be careful: Here we do not test that the user does not exist.
inexistent_user_pk = self.user.pk + 1000
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]),
{'user': inexistent_user_pk})
self.assertEqual(response.status_code, 400)
def test_add_someone_else_twice(self):
Speaker.objects.add(self.user, self.item)
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]),
{'user': self.user.pk})
self.assertEqual(response.status_code, 400)
def test_add_someone_else_non_admin(self):
admin = get_user_model().objects.get(username='admin')
group_staff = admin.groups.get(name='Staff')
group_delegates = type(group_staff).objects.get(name='Delegates')
admin.groups.add(group_delegates)
admin.groups.remove(group_staff)
response = self.client.post(
reverse('item-manage-speaker', args=[self.item.pk]),
{'user': self.user.pk})
self.assertEqual(response.status_code, 403)
def test_remove_someone_else(self):
speaker = Speaker.objects.add(self.user, self.item)
response = self.client.delete(
reverse('item-manage-speaker', args=[self.item.pk]),
{'speaker': speaker.pk})
self.assertEqual(response.status_code, 200)
self.assertFalse(Speaker.objects.filter(item=self.item, user=self.user).exists())
def test_remove_someone_else_not_on_list(self):
response = self.client.delete(
reverse('item-manage-speaker', args=[self.item.pk]),
{'speaker': '1'})
self.assertEqual(response.status_code, 400)
def test_remove_someone_else_invalid_data(self):
response = self.client.delete(
reverse('item-manage-speaker', args=[self.item.pk]),
{'speaker': 'invalid'})
self.assertEqual(response.status_code, 400)
def test_remove_someone_else_non_admin(self):
admin = get_user_model().objects.get(username='admin')
group_staff = admin.groups.get(name='Staff')
group_delegates = type(group_staff).objects.get(name='Delegates')
admin.groups.add(group_delegates)
admin.groups.remove(group_staff)
speaker = Speaker.objects.add(self.user, self.item)
response = self.client.delete(
reverse('item-manage-speaker', args=[self.item.pk]),
{'speaker': speaker.pk})
self.assertEqual(response.status_code, 403)
class Speak(TestCase):
"""
Tests view to begin or end speach.
"""
def setUp(self):
self.client = APIClient()
self.client.login(username='admin', password='admin')
self.item = Item.objects.create(title='test_title_KooDueco3zaiGhiraiho')
self.user = get_user_model().objects.create_user(
username='test_user_Aigh4vohb3seecha4aa4',
password='test_password_eneupeeVo5deilixoo8j')
def test_begin_speach(self):
Speaker.objects.add(self.user, self.item)
speaker = Speaker.objects.add(get_user_model().objects.get(username='admin'), self.item)
self.assertTrue(Speaker.objects.get(pk=speaker.pk).begin_time is None)
response = self.client.put(
reverse('item-speak', args=[self.item.pk]),
{'speaker': speaker.pk})
self.assertEqual(response.status_code, 200)
self.assertFalse(Speaker.objects.get(pk=speaker.pk).begin_time is None)
def test_begin_speach_next_speaker(self):
speaker = Speaker.objects.add(self.user, self.item)
Speaker.objects.add(get_user_model().objects.get(username='admin'), self.item)
self.assertTrue(Speaker.objects.get(pk=speaker.pk).begin_time is None)
response = self.client.put(reverse('item-speak', args=[self.item.pk]))
self.assertEqual(response.status_code, 200)
self.assertFalse(Speaker.objects.get(pk=speaker.pk).begin_time is None)
def test_begin_speach_invalid_speaker_id(self):
response = self.client.put(
reverse('item-speak', args=[self.item.pk]),
{'speaker': '1'})
self.assertEqual(response.status_code, 400)
def test_begin_speach_invalid_data(self):
response = self.client.put(
reverse('item-speak', args=[self.item.pk]),
{'speaker': 'invalid'})
self.assertEqual(response.status_code, 400)
def test_end_speach(self):
speaker = Speaker.objects.add(get_user_model().objects.get(username='admin'), self.item)
speaker.begin_speach()
self.assertFalse(Speaker.objects.get(pk=speaker.pk).begin_time is None)
self.assertTrue(Speaker.objects.get(pk=speaker.pk).end_time is None)
response = self.client.delete(reverse('item-speak', args=[self.item.pk]))
self.assertEqual(response.status_code, 200)
self.assertFalse(Speaker.objects.get(pk=speaker.pk).end_time is None)
def test_end_speach_no_current_speaker(self):
response = self.client.delete(reverse('item-speak', args=[self.item.pk]))
self.assertEqual(response.status_code, 400)

View File

@ -0,0 +1,91 @@
from unittest import TestCase
from unittest.mock import MagicMock, patch
from openslides.agenda.views import ItemViewSet
class ItemViewSetManageSpeaker(TestCase):
"""
Tests views of ItemViewSet to manage speakers.
"""
def setUp(self):
self.request = MagicMock()
self.view_instance = ItemViewSet()
self.view_instance.request = self.request
self.view_instance.get_object = get_object_mock = MagicMock()
get_object_mock.return_value = self.mock_item = MagicMock()
@patch('openslides.agenda.views.Speaker')
def test_add_oneself_as_speaker(self, mock_speaker):
self.request.method = 'POST'
self.request.user.has_perm.return_value = True
self.request.data = {}
self.mock_item.speaker_list_closed = False
self.view_instance.manage_speaker(self.request)
mock_speaker.objects.add.assert_called_with(self.request.user, self.mock_item)
@patch('openslides.agenda.views.get_user_model')
@patch('openslides.agenda.views.Speaker')
def test_add_someone_else_as_speaker(self, mock_speaker, mock_get_user_model):
self.request.method = 'POST'
self.request.user.has_perm.return_value = True
self.request.data = {'user': '2'} # It is assumed that the request user has pk!=2.
mock_get_user_model.return_value = MockUser = MagicMock()
MockUser.objects.get.return_value = mock_user = MagicMock()
self.view_instance.manage_speaker(self.request)
MockUser.objects.get.assert_called_with(pk=2)
mock_speaker.objects.add.assert_called_with(mock_user, self.mock_item)
@patch('openslides.agenda.views.Speaker')
def test_remove_oneself(self, mock_speaker):
self.request.method = 'DELETE'
self.request.data = {}
self.view_instance.manage_speaker(self.request)
mock_queryset = mock_speaker.objects.filter.return_value.exclude.return_value
mock_queryset.get.return_value.delete.assert_called_with()
@patch('openslides.agenda.views.Speaker')
def test_remove_someone_else(self, mock_speaker):
self.request.method = 'DELETE'
self.request.user.has_perm.return_value = True
self.request.data = {'speaker': '1'}
self.view_instance.manage_speaker(self.request)
mock_speaker.objects.get.assert_called_with(pk=1)
mock_speaker.objects.get.return_value.delete.assert_called_with()
class ItemViewSetSpeak(TestCase):
"""
Tests views of ItemViewSet to begin and end speach.
"""
def setUp(self):
self.request = MagicMock()
self.view_instance = ItemViewSet()
self.view_instance.request = self.request
self.view_instance.get_object = get_object_mock = MagicMock()
get_object_mock.return_value = self.mock_item = MagicMock()
def test_begin_speach(self):
self.request.method = 'PUT'
self.request.user.has_perm.return_value = True
self.request.data = {}
self.mock_item.get_next_speaker.return_value = mock_next_speaker = MagicMock()
self.view_instance.speak(self.request)
mock_next_speaker.begin_speach.assert_called_with()
@patch('openslides.agenda.views.Speaker')
def test_begin_speach_specific_speaker(self, mock_speaker):
self.request.method = 'PUT'
self.request.user.has_perm.return_value = True
self.request.data = {'speaker': '1'}
mock_speaker.objects.get.return_value = mock_next_speaker = MagicMock()
self.view_instance.speak(self.request)
mock_next_speaker.begin_speach.assert_called_with()
@patch('openslides.agenda.views.Speaker')
def test_end_speach(self, mock_speaker):
self.request.method = 'DELETE'
self.request.user.has_perm.return_value = True
mock_speaker.objects.filter.return_value.exclude.return_value.get.return_value = mock_speaker = MagicMock()
self.view_instance.speak(self.request)
mock_speaker.end_speach.assert_called_with()