Merge branch 'pollprogressCd'

This commit is contained in:
FinnStutzenstein 2020-06-04 15:16:20 +02:00
commit 429473dcf9
No known key found for this signature in database
GPG Key ID: 9042F605C6324654
13 changed files with 246 additions and 106 deletions

View File

@ -119,11 +119,19 @@
<span>{{ 'Ballot papers' | translate }}</span>
</button>
<mat-divider></mat-divider>
<!-- Refresh Button -->
<button mat-menu-item (click)="refreshPoll()">
<mat-icon>refresh</mat-icon>
<span>{{ 'Refresh' | translate }}</span>
</button>
<!-- Reset Button -->
<button mat-menu-item (click)="resetState()">
<mat-icon color="warn">replay</mat-icon>
<span>{{ 'Reset state' | translate }}</span>
</button>
<!-- Delete button -->
<button mat-menu-item class="red-warning-text" (click)="onDeletePoll()">
<mat-icon>delete</mat-icon>
<span>{{ 'Delete' | translate }}</span>

View File

@ -128,6 +128,12 @@
</button>
<div *osPerms="'motions.can_manage_polls'">
<mat-divider></mat-divider>
<!-- Refresh Button -->
<button mat-menu-item (click)="refreshPoll()">
<mat-icon>refresh</mat-icon>
<span>{{ 'Refresh' | translate }}</span>
</button>
<!-- Reset Button -->
<button mat-menu-item (click)="resetState()">
<mat-icon color="warn">replay</mat-icon>

View File

@ -103,4 +103,8 @@ export abstract class BasePollComponent<V extends ViewBasePoll, S extends PollSe
protected initPoll(model: V): void {
this._poll = model;
}
public refreshPoll(): void {
this.repo.refresh(this._poll);
}
}

View File

@ -1,4 +1,4 @@
import { Component, Input } from '@angular/core';
import { Component, Input, OnDestroy } from '@angular/core';
import { MatSnackBar } from '@angular/material/snack-bar';
import { Title } from '@angular/platform-browser';
@ -16,7 +16,7 @@ import { ViewUser } from 'app/site/users/models/view-user';
templateUrl: './poll-progress.component.html',
styleUrls: ['./poll-progress.component.scss']
})
export class PollProgressComponent extends BaseViewComponent {
export class PollProgressComponent extends BaseViewComponent implements OnDestroy {
private pollId: number = null;
private pollSubscription: Subscription = null;
@ -25,34 +25,11 @@ export class PollProgressComponent extends BaseViewComponent {
if (value.id !== this.pollId) {
this.pollId = value.id;
if (this.pollSubscription !== null) {
this.pollSubscription.unsubscribe();
this.pollSubscription = null;
}
this.unsubscribePoll();
this.pollSubscription = this.pollRepo.getViewModelObservable(this.pollId).subscribe(poll => {
if (poll) {
this._poll = poll;
// We may cannot use this.poll.votescast during the voting, since it can
// be reported with false values from the server
// -> calculate the votes on our own.
const ids = new Set();
for (const option of this.poll.options) {
for (const vote of option.votes) {
if (vote.user_id) {
ids.add(vote.user_id);
}
}
}
this.votescast = ids.size;
// But sometimes there are not enough votes (poll.votescast is higher).
// If this happens, take the value from the poll
if (this.poll.votescast > this.votescast) {
this.votescast = this.poll.votescast;
}
this.updateVotescast();
this.calculateMaxUsers();
}
});
@ -63,7 +40,7 @@ export class PollProgressComponent extends BaseViewComponent {
}
private _poll: ViewBasePoll;
public votescast: number;
public votescast = 0;
public max: number;
public valueInPercent: number;
@ -82,6 +59,34 @@ export class PollProgressComponent extends BaseViewComponent {
});
}
private updateVotescast(): void {
if (this.poll.votescast === 0) {
this.votescast = 0;
return;
}
// We may cannot use this.poll.votescast during the voting, since it can
// be reported with false values from the server
// -> calculate the votes on our own.
const ids = new Set();
for (const option of this.poll.options) {
for (const vote of option.votes) {
if (vote.user_id) {
ids.add(vote.user_id);
}
}
}
if (ids.size > this.votescast) {
this.votescast = ids.size;
}
// But sometimes there are not enough votes (poll.votescast is higher).
// If this happens, take the value from the poll
if (this.poll.votescast > this.votescast) {
this.votescast = this.poll.votescast;
}
}
private calculateMaxUsers(allUsers?: ViewUser[]): void {
if (!this.poll) {
return;
@ -95,4 +100,15 @@ export class PollProgressComponent extends BaseViewComponent {
this.max = allUsers.length;
this.valueInPercent = this.poll ? (this.votescast / this.max) * 100 : 0;
}
public ngOnDestroy(): void {
this.unsubscribePoll();
}
private unsubscribePoll(): void {
if (this.pollSubscription !== null) {
this.pollSubscription.unsubscribe();
this.pollSubscription = null;
}
}
}

View File

@ -72,16 +72,19 @@ export abstract class BasePollRepositoryService<
}
}
public resetPoll(poll: BasePoll): Promise<void> {
return this.http.post(`${this.restPath(poll)}/reset/`);
}
private restPath(poll: BasePoll): string {
return `/rest/${poll.collectionString}/${poll.id}`;
}
public resetPoll(poll: BasePoll): Promise<void> {
return this.http.post(`${this.restPath(poll)}/reset/`);
}
public pseudoanonymize(poll: BasePoll): Promise<void> {
const path = this.restPath(poll);
return this.http.post(`${path}/pseudoanonymize/`);
return this.http.post(`${this.restPath(poll)}/pseudoanonymize/`);
}
public refresh(poll: BasePoll): Promise<void> {
return this.http.post(`${this.restPath(poll)}/refresh/`);
}
}

View File

@ -222,6 +222,15 @@ class BasePollViewSet(ModelViewSet):
return Response()
@detail_route(methods=["POST"])
@transaction.atomic
def refresh(self, request, pk):
poll = self.get_object()
inform_changed_data(poll, final_data=True)
inform_changed_data(poll.get_options(), final_data=True)
inform_changed_data(poll.get_votes(), final_data=True)
return Response()
def assert_can_vote(self, poll, request):
"""
Raises a permission denied, if the user is not allowed to vote (or has already voted).
@ -263,6 +272,12 @@ class BasePollViewSet(ModelViewSet):
)
return value
def has_manage_permissions(self):
"""
Returns true, if the request user has manage perms.
"""
raise NotImplementedError()
def convert_option_data(self, poll, data):
"""
May be overwritten by subclass. Adjusts the option data based on the now existing poll

View File

@ -286,9 +286,7 @@ class AutoupdateBundleMiddleware:
user_id = request.user.pk or 0
# Inject the autoupdate in the response.
# The complete response body will be overwritten!
autoupdate = async_to_sync(get_autoupdate_data)(
change_id, change_id, user_id
)
_, autoupdate = async_to_sync(get_autoupdate_data)(change_id, user_id)
content = {"autoupdate": autoupdate, "data": response.data}
# Note: autoupdate may be none on skipped ones (which should not happen
# since the user has made the request....)
@ -299,17 +297,25 @@ class AutoupdateBundleMiddleware:
async def get_autoupdate_data(
from_change_id: int, to_change_id: int, user_id: int
) -> Optional[AutoupdateFormat]:
from_change_id: int, user_id: int
) -> Tuple[int, Optional[AutoupdateFormat]]:
"""
Returns the max_change_id and the autoupdate from from_change_id to max_change_id
"""
try:
changed_elements, deleted_element_ids = await element_cache.get_data_since(
user_id, from_change_id, to_change_id
)
(
max_change_id,
changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(user_id, from_change_id)
except ChangeIdTooLowError:
# The change_id is lower the the lowerst change_id in redis. Return all data
changed_elements = await element_cache.get_all_data_list(user_id)
all_data = True
(
max_change_id,
changed_elements,
) = await element_cache.get_all_data_list_with_max_change_id(user_id)
deleted_elements: Dict[str, List[int]] = {}
all_data = True
else:
all_data = False
deleted_elements = defaultdict(list)
@ -320,15 +326,18 @@ async def get_autoupdate_data(
# Check, if the autoupdate has any data.
if not changed_elements and not deleted_element_ids:
# Skip empty updates
return None
return max_change_id, None
else:
# Normal autoupdate with data
return AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=from_change_id,
to_change_id=to_change_id,
all_data=all_data,
return (
max_change_id,
AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=from_change_id,
to_change_id=max_change_id,
all_data=all_data,
),
)

View File

@ -239,8 +239,23 @@ class ElementCache:
}
If the user id is given the data will be restricted for this user.
"""
all_data = await self.cache_provider.get_all_data()
return await self.format_all_data(all_data, user_id)
async def get_all_data_list_with_max_change_id(
self, user_id: Optional[int] = None
) -> Tuple[int, Dict[str, List[Dict[str, Any]]]]:
(
max_change_id,
all_data,
) = await self.cache_provider.get_all_data_with_max_change_id()
return max_change_id, await self.format_all_data(all_data, user_id)
async def format_all_data(
self, all_data_bytes: Dict[bytes, bytes], user_id: Optional[int]
) -> Dict[str, List[Dict[str, Any]]]:
all_data: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for element_id, data in (await self.cache_provider.get_all_data()).items():
for element_id, data in all_data_bytes.items():
collection, _ = split_element_id(element_id)
element = json.loads(data.decode())
element.pop(
@ -299,16 +314,15 @@ class ElementCache:
return restricted_elements[0] if restricted_elements else None
async def get_data_since(
self, user_id: Optional[int] = None, change_id: int = 0, max_change_id: int = -1
) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
self, user_id: Optional[int] = None, change_id: int = 0
) -> Tuple[int, Dict[str, List[Dict[str, Any]]], List[str]]:
"""
Returns all data since change_id until max_change_id (included).
max_change_id -1 means the highest change_id. If the user id is given the
Returns all data since change_id until the max change id.cIf the user id is given the
data will be restricted for this user.
Returns two values inside a tuple. The first value is a dict where the
key is the collection and the value is a list of data. The second
is a list of element_ids with deleted elements.
Returns three values inside a tuple. The first value is the max change id. The second
value is a dict where the key is the collection and the value is a list of data.
The third is a list of element_ids with deleted elements.
Only returns elements with the change_id or newer. When change_id is 0,
all elements are returned.
@ -319,7 +333,11 @@ class ElementCache:
that the cache does not know about.
"""
if change_id == 0:
return (await self.get_all_data_list(user_id), [])
(
max_change_id,
changed_elements,
) = await self.get_all_data_list_with_max_change_id(user_id)
return (max_change_id, changed_elements, [])
# This raises a Runtime Exception, if there is no change_id
lowest_change_id = await self.get_lowest_change_id()
@ -332,11 +350,10 @@ class ElementCache:
)
(
max_change_id,
raw_changed_elements,
deleted_elements,
) = await self.cache_provider.get_data_since(
change_id, max_change_id=max_change_id
)
) = await self.cache_provider.get_data_since(change_id)
changed_elements = {
collection: [json.loads(value.decode()) for value in value_list]
for collection, value_list in raw_changed_elements.items()
@ -381,7 +398,7 @@ class ElementCache:
else:
changed_elements[collection] = restricted_elements
return (changed_elements, deleted_elements)
return (max_change_id, changed_elements, deleted_elements)
async def get_current_change_id(self) -> int:
"""

View File

@ -57,6 +57,9 @@ class ElementCacheProvider(Protocol):
async def get_all_data(self) -> Dict[bytes, bytes]:
...
async def get_all_data_with_max_change_id(self) -> Tuple[int, Dict[bytes, bytes]]:
...
async def get_collection_data(self, collection: str) -> Dict[int, bytes]:
...
@ -69,8 +72,8 @@ class ElementCacheProvider(Protocol):
...
async def get_data_since(
self, change_id: int, max_change_id: int = -1
) -> Tuple[Dict[str, List[bytes]], List[str]]:
self, change_id: int
) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
...
async def get_current_change_id(self) -> int:
@ -137,6 +140,24 @@ class RedisCacheProvider:
False,
),
"get_all_data": ("return redis.call('hgetall', KEYS[1])", True),
"get_all_data_with_max_change_id": (
"""
local tmp = redis.call('zrevrangebyscore', KEYS[2], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1)
local max_change_id
if next(tmp) == nil then
-- The key does not exist
return redis.error_reply("cache_reset")
else
max_change_id = tmp[2]
end
local all_data = redis.call('hgetall', KEYS[1])
table.insert(all_data, 'max_change_id')
table.insert(all_data, max_change_id)
return all_data
""",
True,
),
"get_collection_data": (
"""
local cursor = 0
@ -233,11 +254,24 @@ class RedisCacheProvider:
),
"get_data_since": (
"""
-- Get change ids of changed elements
local element_ids = redis.call('zrangebyscore', KEYS[2], ARGV[1], ARGV[2])
-- get max change id
local tmp = redis.call('zrevrangebyscore', KEYS[2], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1)
local max_change_id
if next(tmp) == nil then
-- The key does not exist
return redis.error_reply("cache_reset")
else
max_change_id = tmp[2]
end
-- Save elements in array. Rotate element_id and element_json
-- Get change ids of changed elements
local element_ids = redis.call('zrangebyscore', KEYS[2], ARGV[1], max_change_id)
-- Save elements in array. First is the max_change_id with the key "max_change_id"
-- Than rotate element_id and element_json. This is ocnverted into a dict in python code.
local elements = {}
table.insert(elements, 'max_change_id')
table.insert(elements, max_change_id)
for _, element_id in pairs(element_ids) do
table.insert(elements, element_id)
table.insert(elements, redis.call('hget', KEYS[1], element_id))
@ -320,9 +354,25 @@ class RedisCacheProvider:
Returns all data from the full_data_cache in a mapping from element_id to the element.
"""
return await aioredis.util.wait_make_dict(
self.eval("get_all_data", [self.full_data_cache_key], read_only=True)
self.eval("get_all_data", keys=[self.full_data_cache_key], read_only=True)
)
@ensure_cache_wrapper()
async def get_all_data_with_max_change_id(self) -> Tuple[int, Dict[bytes, bytes]]:
"""
Returns all data from the full_data_cache in a mapping from element_id to the element and
the max change id.
"""
all_data = await aioredis.util.wait_make_dict(
self.eval(
"get_all_data_with_max_change_id",
keys=[self.full_data_cache_key, self.change_id_cache_key],
read_only=True,
)
)
max_change_id = int(all_data.pop(b"max_change_id"))
return max_change_id, all_data
@ensure_cache_wrapper()
async def get_collection_data(self, collection: str) -> Dict[int, bytes]:
"""
@ -376,8 +426,8 @@ class RedisCacheProvider:
@ensure_cache_wrapper()
async def get_data_since(
self, change_id: int, max_change_id: int = -1
) -> Tuple[Dict[str, List[bytes]], List[str]]:
self, change_id: int
) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
"""
Returns all elements since a change_id (included) and until the max_change_id (included).
@ -388,8 +438,6 @@ class RedisCacheProvider:
changed_elements: Dict[str, List[bytes]] = defaultdict(list)
deleted_elements: List[str] = []
# Convert max_change_id to a string. If its negative, use the string '+inf'
redis_max_change_id = "+inf" if max_change_id < 0 else str(max_change_id)
# lua script that returns gets all element_ids from change_id_cache_key
# and then uses each element_id on full_data or restricted_data.
# It returns a list where the odd values are the change_id and the
@ -399,13 +447,14 @@ class RedisCacheProvider:
self.eval(
"get_data_since",
keys=[self.full_data_cache_key, self.change_id_cache_key],
args=[change_id, redis_max_change_id],
args=[change_id],
read_only=True,
)
)
max_change_id = int(elements[b"max_change_id"].decode()) # type: ignore
for element_id, element_json in elements.items():
if element_id.startswith(b"_config"):
if element_id.startswith(b"_config") or element_id == b"max_change_id":
# Ignore config values from the change_id cache key
continue
if element_json is None:
@ -414,7 +463,7 @@ class RedisCacheProvider:
else:
collection, id = split_element_id(element_id)
changed_elements[collection].append(element_json)
return changed_elements, deleted_elements
return max_change_id, changed_elements, deleted_elements
@ensure_cache_wrapper()
async def get_current_change_id(self) -> int:
@ -562,6 +611,11 @@ class MemoryCacheProvider:
async def get_all_data(self) -> Dict[bytes, bytes]:
return str_dict_to_bytes(self.full_data)
async def get_all_data_with_max_change_id(self) -> Tuple[int, Dict[bytes, bytes]]:
all_data = await self.get_all_data()
max_change_id = await self.get_current_change_id()
return max_change_id, all_data
async def get_collection_data(self, collection: str) -> Dict[int, bytes]:
out = {}
query = f"{collection}:"
@ -602,16 +656,14 @@ class MemoryCacheProvider:
return change_id
async def get_data_since(
self, change_id: int, max_change_id: int = -1
) -> Tuple[Dict[str, List[bytes]], List[str]]:
self, change_id: int
) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
changed_elements: Dict[str, List[bytes]] = defaultdict(list)
deleted_elements: List[str] = []
all_element_ids: Set[str] = set()
for data_change_id, element_ids in self.change_id_data.items():
if data_change_id >= change_id and (
max_change_id == -1 or data_change_id <= max_change_id
):
if data_change_id >= change_id:
all_element_ids.update(element_ids)
for element_id in all_element_ids:
@ -621,7 +673,8 @@ class MemoryCacheProvider:
else:
collection, id = split_element_id(element_id)
changed_elements[collection].append(element_json.encode())
return changed_elements, deleted_elements
max_change_id = await self.get_current_change_id()
return (max_change_id, changed_elements, deleted_elements)
async def get_current_change_id(self) -> int:
if self.change_id_data:

View File

@ -19,7 +19,6 @@ class ConsumerAutoupdateStrategy:
# client_change_id is int: the change_id, the client knows about, so the next
# update must be from client_change_id+1 .. <next clange_id>
self.client_change_id: Optional[int] = None
self.max_seen_change_id = 0
self.next_send_time = None
self.timer_task_handle: Optional[Task[None]] = None
self.lock = asyncio.Lock()
@ -35,18 +34,17 @@ class ConsumerAutoupdateStrategy:
async with self.lock:
await self.stop_timer()
self.max_seen_change_id = await element_cache.get_current_change_id()
print(self.max_seen_change_id)
max_change_id = await element_cache.get_current_change_id()
self.client_change_id = change_id
if self.client_change_id == self.max_seen_change_id:
if self.client_change_id == max_change_id:
# The client is up-to-date, so nothing will be done
return None
if self.client_change_id > self.max_seen_change_id:
if self.client_change_id > max_change_id:
message = (
f"Requested change_id {self.client_change_id} is higher than the "
+ f"highest change_id {self.max_seen_change_id}."
+ f"highest change_id {max_change_id}."
)
raise ChangeIdTooHighException(message, in_response=in_response)
@ -58,8 +56,6 @@ class ConsumerAutoupdateStrategy:
# The -1 is to send this autoupdate as the first one to he client.
# Remember: the client_change_id is the change_id the client knows about
self.client_change_id = change_id - 1
if change_id > self.max_seen_change_id:
self.max_seen_change_id = change_id
if AUTOUPDATE_DELAY is None: # feature deactivated, send directly
await self.send_autoupdate()
@ -92,17 +88,15 @@ class ConsumerAutoupdateStrategy:
self.timer_task_handle = None
async def send_autoupdate(self, in_response: Optional[str] = None) -> None:
# it is important to save this variable, because it can change during runtime.
max_change_id = self.max_seen_change_id
# here, 1 is added to the change_id, because the client_change_id is the id the client
# *knows* about -> the client needs client_change_id+1 since get_autoupdate_data is
# inclusive [change_id .. max_change_id].
autoupdate = await get_autoupdate_data(
cast(int, self.client_change_id) + 1, max_change_id, self.consumer.user_id
max_change_id, autoupdate = await get_autoupdate_data(
cast(int, self.client_change_id) + 1, self.consumer.user_id
)
if autoupdate is not None:
# It will be send, so we can set the client_change_id
self.client_change_id = max_change_id
# It will be send, so we can set the client_change_id
await self.consumer.send_json(
type="autoupdate", content=autoupdate, in_response=in_response,
)

View File

@ -139,9 +139,9 @@ class ErrorLoggingMixin:
prefix = f"{path} {user_id}"
if isinstance(exc, APIException):
detail = self._detail_to_string(exc.detail)
error_logger.warn(f"{prefix} {str(detail)}")
error_logger.warning(f"{prefix} {str(detail)}")
else:
error_logger.warn(f"{prefix} unknown exception: {exc}")
error_logger.warning(f"{prefix} unknown exception: {exc}")
return super().handle_exception(exc) # type: ignore
def _detail_to_string(self, detail: Any) -> Any:

View File

@ -42,7 +42,7 @@ class TestCase(_TestCase):
"""
user_id = None if user is None else user.id
current_change_id = async_to_sync(element_cache.get_current_change_id)()
_changed_elements, deleted_element_ids = async_to_sync(
_, _changed_elements, deleted_element_ids = async_to_sync(
element_cache.get_data_since
)(user_id=user_id, change_id=current_change_id)

View File

@ -150,9 +150,14 @@ async def test_get_data_since_change_id_0(element_cache):
"app/personalized-collection:2": '{"id": 2, "key": "value2", "user_id": 2}',
}
result = await element_cache.get_data_since(None, 0)
(
max_change_id,
changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(None, 0)
assert sort_dict(result[0]) == sort_dict(example_data())
assert sort_dict(changed_elements) == sort_dict(example_data())
assert max_change_id == 0
@pytest.mark.asyncio
@ -184,6 +189,7 @@ async def test_get_data_since_change_id_data_in_redis(element_cache):
result = await element_cache.get_data_since(None, 1)
assert result == (
1,
{"app/collection1": [{"id": 1, "value": "value1"}]},
["app/collection1:3"],
)
@ -198,6 +204,7 @@ async def test_get_data_since_change_id_data_in_db(element_cache):
result = await element_cache.get_data_since(None, 1)
assert result == (
1,
{"app/collection1": [{"id": 1, "value": "value1"}]},
["app/collection1:3"],
)
@ -207,7 +214,7 @@ async def test_get_data_since_change_id_data_in_db(element_cache):
async def test_get_gata_since_change_id_data_in_db_empty_change_id(element_cache):
result = await element_cache.get_data_since(None, 1)
assert result == ({}, [])
assert result == (0, {}, [])
@pytest.mark.asyncio
@ -261,9 +268,14 @@ async def test_get_all_restricted_data(element_cache):
@pytest.mark.asyncio
async def test_get_restricted_data_change_id_0(element_cache):
result = await element_cache.get_data_since(2, 0)
(
max_change_id,
changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(2, 0)
assert sort_dict(result[0]) == sort_dict(
assert max_change_id == 0
assert sort_dict(changed_elements) == sort_dict(
{
"app/collection1": [
{"id": 1, "value": "restricted_value1"},
@ -276,6 +288,7 @@ async def test_get_restricted_data_change_id_0(element_cache):
"app/personalized-collection": [{"id": 2, "key": "value2", "user_id": 2}],
}
)
assert deleted_element_ids == []
@pytest.mark.asyncio
@ -287,6 +300,7 @@ async def test_get_restricted_data_2(element_cache):
result = await element_cache.get_data_since(0, 1)
assert result == (
1,
{"app/collection1": [{"id": 1, "value": "restricted_value1"}]},
["app/collection1:3"],
)
@ -298,7 +312,7 @@ async def test_get_restricted_data_from_personalized_cacheable(element_cache):
result = await element_cache.get_data_since(0, 1)
assert result == ({}, [])
assert result == (1, {}, [])
@pytest.mark.asyncio
@ -310,12 +324,13 @@ async def test_get_restricted_data_change_id_lower_than_in_redis(element_cache):
@pytest.mark.asyncio
async def test_get_restricted_data_change_with_id(element_cache):
async def test_get_restricted_data_with_change_id(element_cache):
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
result = await element_cache.get_data_since(0, 2)
assert result == (
2,
{"app/collection1": [{"id": 1, "value": "restricted_value1"}]},
[],
)