OpenSlides/openslides/agenda/projector.py

267 lines
8.7 KiB
Python
Raw Normal View History

2018-12-23 11:05:38 +01:00
from collections import defaultdict
2019-02-21 17:04:59 +01:00
from typing import Any, Dict, List, Union
2019-02-15 12:17:08 +01:00
from ..users.projector import get_user_name
from ..utils.projector import (
AllData,
ProjectorElementException,
2019-02-15 12:17:08 +01:00
get_config,
register_projector_slide,
)
2015-06-24 23:36:36 +02:00
2018-12-23 11:05:38 +01:00
# Important: All functions have to be prune. This means, that thay can only
# access the data, that they get as argument and do not have any
# side effects.
2018-12-23 11:05:38 +01:00
async def get_sorted_agenda_items(all_data: AllData) -> List[Dict[str, Any]]:
2019-03-08 11:59:07 +01:00
"""
Returns all sorted agenda items by id first and then weight, resulting in
ordered items, if some have the same weight.
"""
return sorted(
sorted(all_data["agenda/item"].values(), key=lambda item: item["id"]),
key=lambda item: item["weight"],
)
async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str, Any]]:
2018-12-23 11:05:38 +01:00
"""
Build the item tree from all_data.
2018-12-23 11:05:38 +01:00
Only build the tree from elements unterneath parent_id.
2018-12-23 11:05:38 +01:00
Returns a list of two element tuples where the first element is the item title
and the second a List with children as two element tuples.
2015-06-24 23:36:36 +02:00
"""
2019-01-06 16:22:33 +01:00
2018-12-23 11:05:38 +01:00
# Build a dict from an item_id to all its children
children: Dict[int, List[int]] = defaultdict(list)
2019-02-28 09:09:32 +01:00
if "agenda/item" in all_data:
for item in await get_sorted_agenda_items(all_data):
2019-02-28 09:09:32 +01:00
if item["type"] == 1: # only normal items
children[item["parent_id"] or 0].append(item["id"])
2019-02-21 16:15:21 +01:00
tree = []
async def get_children(item_ids: List[int], depth: int) -> None:
2019-02-21 16:15:21 +01:00
for item_id in item_ids:
tree.append(
{
"item_number": all_data["agenda/item"][item_id]["item_number"],
"title_information": all_data["agenda/item"][item_id][
"title_information"
],
"collection": all_data["agenda/item"][item_id]["content_object"][
"collection"
],
"depth": depth,
}
)
await get_children(children[item_id], depth + 1)
2019-02-21 16:15:21 +01:00
await get_children(children[parent_id], 0)
2019-02-21 16:15:21 +01:00
return tree
async def item_list_slide(
2019-02-21 14:40:07 +01:00
all_data: AllData, element: Dict[str, Any], projector_id: int
) -> Dict[str, Any]:
2015-06-24 23:36:36 +02:00
"""
2018-12-23 11:05:38 +01:00
Item list slide.
2019-01-06 16:22:33 +01:00
2018-12-23 11:05:38 +01:00
Returns all root items or all children of an item.
"""
2019-02-21 16:15:21 +01:00
only_main_items = element.get("only_main_items", True)
2015-06-24 23:36:36 +02:00
2019-02-21 16:15:21 +01:00
if only_main_items:
2019-01-12 23:01:42 +01:00
agenda_items = []
for item in await get_sorted_agenda_items(all_data):
2019-02-21 16:15:21 +01:00
if item["parent_id"] is None and item["type"] == 1:
agenda_items.append(
{
"item_number": item["item_number"],
"title_information": item["title_information"],
"collection": item["content_object"]["collection"],
}
)
else:
agenda_items = await get_flat_tree(all_data)
2015-06-24 23:36:36 +02:00
2019-01-12 23:01:42 +01:00
return {"items": agenda_items}
2016-10-05 18:25:50 +02:00
2016-09-12 11:05:34 +02:00
async def list_of_speakers_slide(
2019-02-21 14:40:07 +01:00
all_data: AllData, element: Dict[str, Any], projector_id: int
) -> Dict[str, Any]:
2016-09-12 11:05:34 +02:00
"""
2018-12-23 11:05:38 +01:00
List of speakers slide.
Returns all usernames, that are on the list of speaker of a slide.
2016-10-05 18:25:50 +02:00
"""
2019-02-15 12:17:08 +01:00
item_id = element.get("id")
2018-12-23 11:05:38 +01:00
2019-02-15 12:17:08 +01:00
if item_id is None:
raise ProjectorElementException("id is required for list of speakers slide")
2018-12-23 11:05:38 +01:00
return await get_list_of_speakers_slide_data(all_data, item_id)
2019-02-21 14:40:07 +01:00
async def get_list_of_speakers_slide_data(
all_data: AllData, item_id: int
) -> Dict[str, Any]:
2018-12-23 11:05:38 +01:00
try:
item = all_data["agenda/item"][item_id]
except KeyError:
raise ProjectorElementException(f"Item {item_id} does not exist")
2019-01-06 16:22:33 +01:00
2019-02-15 12:17:08 +01:00
# Partition speaker objects to waiting, current and finished
speakers_waiting = []
speakers_finished = []
current_speaker = None
2018-12-23 11:05:38 +01:00
for speaker in item["speakers"]:
user = await get_user_name(all_data, speaker["user_id"])
2019-02-15 12:17:08 +01:00
formatted_speaker = {
"user": user,
"marked": speaker["marked"],
"weight": speaker["weight"],
"end_time": speaker["end_time"],
}
if speaker["begin_time"] is None and speaker["end_time"] is None:
speakers_waiting.append(formatted_speaker)
elif speaker["begin_time"] is not None and speaker["end_time"] is None:
current_speaker = formatted_speaker
else:
speakers_finished.append(formatted_speaker)
# sort speakers
speakers_waiting = sorted(speakers_waiting, key=lambda s: s["weight"])
speakers_finished = sorted(speakers_finished, key=lambda s: s["end_time"])
number_of_last_speakers = await get_config(all_data, "agenda_show_last_speakers")
2019-02-15 12:17:08 +01:00
if number_of_last_speakers == 0:
speakers_finished = []
else:
speakers_finished = speakers_finished[
-number_of_last_speakers:
] # Take the last speakers
return {
"waiting": speakers_waiting,
"current": current_speaker,
"finished": speakers_finished,
"content_object_collection": item["content_object"]["collection"],
"title_information": item["title_information"],
"item_number": item["item_number"],
}
async def get_current_item_id_for_projector(
2019-02-21 17:04:59 +01:00
all_data: AllData, projector: Dict[str, Any]
) -> Union[int, None]:
"""
2019-02-21 17:04:59 +01:00
Search for elements, that do have an agenda item:
Try to get a model by the collection and id in the element. This
model needs to have a 'agenda_item_id'. This item must exist. The first
matching element is taken.
"""
2019-02-21 17:04:59 +01:00
elements = projector["elements"]
2019-02-21 14:40:07 +01:00
item_id = None
for element in elements:
if "id" not in element:
continue
collection = element["name"]
id = element["id"]
if collection not in all_data or id not in all_data[collection]:
continue
model = all_data[collection][id]
if "agenda_item_id" not in model:
continue
if not model["agenda_item_id"] in all_data["agenda/item"]:
continue
item_id = model["agenda_item_id"]
break
2019-02-21 17:04:59 +01:00
return item_id
async def get_reference_projector(
all_data: AllData, projector_id: int
) -> Dict[str, Any]:
2019-02-21 17:04:59 +01:00
"""
Returns the reference projector to the given projector (by id)
"""
try:
this_projector = all_data["core/projector"][projector_id]
except KeyError:
raise ProjectorElementException(f"Projector {projector_id} does not exist")
reference_projector_id = this_projector["reference_projector_id"] or projector_id
try:
reference_projector = all_data["core/projector"][reference_projector_id]
except KeyError:
raise ProjectorElementException(
f"Projector {reference_projector_id} does not exist"
)
return reference_projector
async def current_list_of_speakers_slide(
2019-02-21 17:04:59 +01:00
all_data: AllData, element: Dict[str, Any], projector_id: int
) -> Dict[str, Any]:
"""
The current list of speakers slide. Creates the data for the given projector.
"""
reference_projector = await get_reference_projector(all_data, projector_id)
item_id = await get_current_item_id_for_projector(all_data, reference_projector)
2019-02-21 14:40:07 +01:00
if item_id is None: # no element found
return {}
return await get_list_of_speakers_slide_data(all_data, item_id)
async def current_speaker_chyron_slide(
2019-02-21 17:04:59 +01:00
all_data: AllData, element: Dict[str, Any], projector_id: int
) -> Dict[str, Any]:
"""
Returns the username for the current speaker.
"""
reference_projector = await get_reference_projector(all_data, projector_id)
item_id = await get_current_item_id_for_projector(all_data, reference_projector)
2019-02-21 17:04:59 +01:00
if item_id is None: # no element found
return {}
# get item
try:
item = all_data["agenda/item"][item_id]
except KeyError:
raise ProjectorElementException(f"Item {item_id} does not exist")
# find current speaker
current_speaker = None
for speaker in item["speakers"]:
if speaker["begin_time"] is not None and speaker["end_time"] is None:
current_speaker = await get_user_name(all_data, speaker["user_id"])
2019-02-21 17:04:59 +01:00
return {"current_speaker": current_speaker}
def register_projector_slides() -> None:
2019-02-21 16:15:21 +01:00
register_projector_slide("agenda/item-list", item_list_slide)
register_projector_slide("agenda/list-of-speakers", list_of_speakers_slide)
register_projector_slide(
"agenda/current-list-of-speakers", current_list_of_speakers_slide
)
register_projector_slide(
"agenda/current-list-of-speakers-overlay", current_list_of_speakers_slide
)
2019-02-21 17:04:59 +01:00
register_projector_slide(
"agenda/current-speaker-chyron", current_speaker_chyron_slide
)