Fixed incomplete autoupdates

A conceptional issue in `get_data_since` leads to incomplete
autoupdates. The behaviour was long time in the code, but only with a
lot of autoupdates (high concurrency) and the autoupdate delay I noticed
the bug during testing. I'm sure, that this issue might have caused
incomplete autoupdates (which the user may experience as "lost
autoupdates") in previous productive instances. Instead of quering a
range (from_change_id to to_change_id) one now can only get data from a
change id up to the max change id in the element cache. The max change
id gets now returned by `get_data_since`.

I also added a get_all_data with the capability of returning the
max_change_id at this point of time.

As a usability-"fix" (more like a fix the result of a bug, not the bug
itself) a refresh button for a poll was added, that issues an autoupdate
for the poll and all options.
This commit is contained in:
FinnStutzenstein 2020-06-03 14:11:25 +02:00
parent c4f482b70c
commit c186a575f6
No known key found for this signature in database
GPG Key ID: 9042F605C6324654
13 changed files with 246 additions and 106 deletions

View File

@ -119,11 +119,19 @@
<span>{{ 'Ballot papers' | translate }}</span> <span>{{ 'Ballot papers' | translate }}</span>
</button> </button>
<mat-divider></mat-divider> <mat-divider></mat-divider>
<!-- Refresh Button -->
<button mat-menu-item (click)="refreshPoll()">
<mat-icon>refresh</mat-icon>
<span>{{ 'Refresh' | translate }}</span>
</button>
<!-- Reset Button --> <!-- Reset Button -->
<button mat-menu-item (click)="resetState()"> <button mat-menu-item (click)="resetState()">
<mat-icon color="warn">replay</mat-icon> <mat-icon color="warn">replay</mat-icon>
<span>{{ 'Reset state' | translate }}</span> <span>{{ 'Reset state' | translate }}</span>
</button> </button>
<!-- Delete button -->
<button mat-menu-item class="red-warning-text" (click)="onDeletePoll()"> <button mat-menu-item class="red-warning-text" (click)="onDeletePoll()">
<mat-icon>delete</mat-icon> <mat-icon>delete</mat-icon>
<span>{{ 'Delete' | translate }}</span> <span>{{ 'Delete' | translate }}</span>

View File

@ -128,6 +128,12 @@
</button> </button>
<div *osPerms="'motions.can_manage_polls'"> <div *osPerms="'motions.can_manage_polls'">
<mat-divider></mat-divider> <mat-divider></mat-divider>
<!-- Refresh Button -->
<button mat-menu-item (click)="refreshPoll()">
<mat-icon>refresh</mat-icon>
<span>{{ 'Refresh' | translate }}</span>
</button>
<!-- Reset Button --> <!-- Reset Button -->
<button mat-menu-item (click)="resetState()"> <button mat-menu-item (click)="resetState()">
<mat-icon color="warn">replay</mat-icon> <mat-icon color="warn">replay</mat-icon>

View File

@ -103,4 +103,8 @@ export abstract class BasePollComponent<V extends ViewBasePoll, S extends PollSe
protected initPoll(model: V): void { protected initPoll(model: V): void {
this._poll = model; this._poll = model;
} }
public refreshPoll(): void {
this.repo.refresh(this._poll);
}
} }

View File

@ -1,4 +1,4 @@
import { Component, Input } from '@angular/core'; import { Component, Input, OnDestroy } from '@angular/core';
import { MatSnackBar } from '@angular/material/snack-bar'; import { MatSnackBar } from '@angular/material/snack-bar';
import { Title } from '@angular/platform-browser'; import { Title } from '@angular/platform-browser';
@ -16,7 +16,7 @@ import { ViewUser } from 'app/site/users/models/view-user';
templateUrl: './poll-progress.component.html', templateUrl: './poll-progress.component.html',
styleUrls: ['./poll-progress.component.scss'] styleUrls: ['./poll-progress.component.scss']
}) })
export class PollProgressComponent extends BaseViewComponent { export class PollProgressComponent extends BaseViewComponent implements OnDestroy {
private pollId: number = null; private pollId: number = null;
private pollSubscription: Subscription = null; private pollSubscription: Subscription = null;
@ -25,34 +25,11 @@ export class PollProgressComponent extends BaseViewComponent {
if (value.id !== this.pollId) { if (value.id !== this.pollId) {
this.pollId = value.id; this.pollId = value.id;
if (this.pollSubscription !== null) { this.unsubscribePoll();
this.pollSubscription.unsubscribe();
this.pollSubscription = null;
}
this.pollSubscription = this.pollRepo.getViewModelObservable(this.pollId).subscribe(poll => { this.pollSubscription = this.pollRepo.getViewModelObservable(this.pollId).subscribe(poll => {
if (poll) { if (poll) {
this._poll = poll; this._poll = poll;
this.updateVotescast();
// We may cannot use this.poll.votescast during the voting, since it can
// be reported with false values from the server
// -> calculate the votes on our own.
const ids = new Set();
for (const option of this.poll.options) {
for (const vote of option.votes) {
if (vote.user_id) {
ids.add(vote.user_id);
}
}
}
this.votescast = ids.size;
// But sometimes there are not enough votes (poll.votescast is higher).
// If this happens, take the value from the poll
if (this.poll.votescast > this.votescast) {
this.votescast = this.poll.votescast;
}
this.calculateMaxUsers(); this.calculateMaxUsers();
} }
}); });
@ -63,7 +40,7 @@ export class PollProgressComponent extends BaseViewComponent {
} }
private _poll: ViewBasePoll; private _poll: ViewBasePoll;
public votescast: number; public votescast = 0;
public max: number; public max: number;
public valueInPercent: number; public valueInPercent: number;
@ -82,6 +59,34 @@ export class PollProgressComponent extends BaseViewComponent {
}); });
} }
private updateVotescast(): void {
if (this.poll.votescast === 0) {
this.votescast = 0;
return;
}
// We may cannot use this.poll.votescast during the voting, since it can
// be reported with false values from the server
// -> calculate the votes on our own.
const ids = new Set();
for (const option of this.poll.options) {
for (const vote of option.votes) {
if (vote.user_id) {
ids.add(vote.user_id);
}
}
}
if (ids.size > this.votescast) {
this.votescast = ids.size;
}
// But sometimes there are not enough votes (poll.votescast is higher).
// If this happens, take the value from the poll
if (this.poll.votescast > this.votescast) {
this.votescast = this.poll.votescast;
}
}
private calculateMaxUsers(allUsers?: ViewUser[]): void { private calculateMaxUsers(allUsers?: ViewUser[]): void {
if (!this.poll) { if (!this.poll) {
return; return;
@ -95,4 +100,15 @@ export class PollProgressComponent extends BaseViewComponent {
this.max = allUsers.length; this.max = allUsers.length;
this.valueInPercent = this.poll ? (this.votescast / this.max) * 100 : 0; this.valueInPercent = this.poll ? (this.votescast / this.max) * 100 : 0;
} }
public ngOnDestroy(): void {
this.unsubscribePoll();
}
private unsubscribePoll(): void {
if (this.pollSubscription !== null) {
this.pollSubscription.unsubscribe();
this.pollSubscription = null;
}
}
} }

View File

@ -72,16 +72,19 @@ export abstract class BasePollRepositoryService<
} }
} }
public resetPoll(poll: BasePoll): Promise<void> {
return this.http.post(`${this.restPath(poll)}/reset/`);
}
private restPath(poll: BasePoll): string { private restPath(poll: BasePoll): string {
return `/rest/${poll.collectionString}/${poll.id}`; return `/rest/${poll.collectionString}/${poll.id}`;
} }
public resetPoll(poll: BasePoll): Promise<void> {
return this.http.post(`${this.restPath(poll)}/reset/`);
}
public pseudoanonymize(poll: BasePoll): Promise<void> { public pseudoanonymize(poll: BasePoll): Promise<void> {
const path = this.restPath(poll); return this.http.post(`${this.restPath(poll)}/pseudoanonymize/`);
return this.http.post(`${path}/pseudoanonymize/`); }
public refresh(poll: BasePoll): Promise<void> {
return this.http.post(`${this.restPath(poll)}/refresh/`);
} }
} }

View File

@ -222,6 +222,15 @@ class BasePollViewSet(ModelViewSet):
return Response() return Response()
@detail_route(methods=["POST"])
@transaction.atomic
def refresh(self, request, pk):
poll = self.get_object()
inform_changed_data(poll, final_data=True)
inform_changed_data(poll.get_options(), final_data=True)
inform_changed_data(poll.get_votes(), final_data=True)
return Response()
def assert_can_vote(self, poll, request): def assert_can_vote(self, poll, request):
""" """
Raises a permission denied, if the user is not allowed to vote (or has already voted). Raises a permission denied, if the user is not allowed to vote (or has already voted).
@ -263,6 +272,12 @@ class BasePollViewSet(ModelViewSet):
) )
return value return value
def has_manage_permissions(self):
"""
Returns true, if the request user has manage perms.
"""
raise NotImplementedError()
def convert_option_data(self, poll, data): def convert_option_data(self, poll, data):
""" """
May be overwritten by subclass. Adjusts the option data based on the now existing poll May be overwritten by subclass. Adjusts the option data based on the now existing poll

View File

@ -286,9 +286,7 @@ class AutoupdateBundleMiddleware:
user_id = request.user.pk or 0 user_id = request.user.pk or 0
# Inject the autoupdate in the response. # Inject the autoupdate in the response.
# The complete response body will be overwritten! # The complete response body will be overwritten!
autoupdate = async_to_sync(get_autoupdate_data)( _, autoupdate = async_to_sync(get_autoupdate_data)(change_id, user_id)
change_id, change_id, user_id
)
content = {"autoupdate": autoupdate, "data": response.data} content = {"autoupdate": autoupdate, "data": response.data}
# Note: autoupdate may be none on skipped ones (which should not happen # Note: autoupdate may be none on skipped ones (which should not happen
# since the user has made the request....) # since the user has made the request....)
@ -299,17 +297,25 @@ class AutoupdateBundleMiddleware:
async def get_autoupdate_data( async def get_autoupdate_data(
from_change_id: int, to_change_id: int, user_id: int from_change_id: int, user_id: int
) -> Optional[AutoupdateFormat]: ) -> Tuple[int, Optional[AutoupdateFormat]]:
"""
Returns the max_change_id and the autoupdate from from_change_id to max_change_id
"""
try: try:
changed_elements, deleted_element_ids = await element_cache.get_data_since( (
user_id, from_change_id, to_change_id max_change_id,
) changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(user_id, from_change_id)
except ChangeIdTooLowError: except ChangeIdTooLowError:
# The change_id is lower the the lowerst change_id in redis. Return all data # The change_id is lower the the lowerst change_id in redis. Return all data
changed_elements = await element_cache.get_all_data_list(user_id) (
all_data = True max_change_id,
changed_elements,
) = await element_cache.get_all_data_list_with_max_change_id(user_id)
deleted_elements: Dict[str, List[int]] = {} deleted_elements: Dict[str, List[int]] = {}
all_data = True
else: else:
all_data = False all_data = False
deleted_elements = defaultdict(list) deleted_elements = defaultdict(list)
@ -320,15 +326,18 @@ async def get_autoupdate_data(
# Check, if the autoupdate has any data. # Check, if the autoupdate has any data.
if not changed_elements and not deleted_element_ids: if not changed_elements and not deleted_element_ids:
# Skip empty updates # Skip empty updates
return None return max_change_id, None
else: else:
# Normal autoupdate with data # Normal autoupdate with data
return AutoupdateFormat( return (
changed=changed_elements, max_change_id,
deleted=deleted_elements, AutoupdateFormat(
from_change_id=from_change_id, changed=changed_elements,
to_change_id=to_change_id, deleted=deleted_elements,
all_data=all_data, from_change_id=from_change_id,
to_change_id=max_change_id,
all_data=all_data,
),
) )

View File

@ -239,8 +239,23 @@ class ElementCache:
} }
If the user id is given the data will be restricted for this user. If the user id is given the data will be restricted for this user.
""" """
all_data = await self.cache_provider.get_all_data()
return await self.format_all_data(all_data, user_id)
async def get_all_data_list_with_max_change_id(
self, user_id: Optional[int] = None
) -> Tuple[int, Dict[str, List[Dict[str, Any]]]]:
(
max_change_id,
all_data,
) = await self.cache_provider.get_all_data_with_max_change_id()
return max_change_id, await self.format_all_data(all_data, user_id)
async def format_all_data(
self, all_data_bytes: Dict[bytes, bytes], user_id: Optional[int]
) -> Dict[str, List[Dict[str, Any]]]:
all_data: Dict[str, List[Dict[str, Any]]] = defaultdict(list) all_data: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for element_id, data in (await self.cache_provider.get_all_data()).items(): for element_id, data in all_data_bytes.items():
collection, _ = split_element_id(element_id) collection, _ = split_element_id(element_id)
element = json.loads(data.decode()) element = json.loads(data.decode())
element.pop( element.pop(
@ -299,16 +314,15 @@ class ElementCache:
return restricted_elements[0] if restricted_elements else None return restricted_elements[0] if restricted_elements else None
async def get_data_since( async def get_data_since(
self, user_id: Optional[int] = None, change_id: int = 0, max_change_id: int = -1 self, user_id: Optional[int] = None, change_id: int = 0
) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]: ) -> Tuple[int, Dict[str, List[Dict[str, Any]]], List[str]]:
""" """
Returns all data since change_id until max_change_id (included). Returns all data since change_id until the max change id.cIf the user id is given the
max_change_id -1 means the highest change_id. If the user id is given the
data will be restricted for this user. data will be restricted for this user.
Returns two values inside a tuple. The first value is a dict where the Returns three values inside a tuple. The first value is the max change id. The second
key is the collection and the value is a list of data. The second value is a dict where the key is the collection and the value is a list of data.
is a list of element_ids with deleted elements. The third is a list of element_ids with deleted elements.
Only returns elements with the change_id or newer. When change_id is 0, Only returns elements with the change_id or newer. When change_id is 0,
all elements are returned. all elements are returned.
@ -319,7 +333,11 @@ class ElementCache:
that the cache does not know about. that the cache does not know about.
""" """
if change_id == 0: if change_id == 0:
return (await self.get_all_data_list(user_id), []) (
max_change_id,
changed_elements,
) = await self.get_all_data_list_with_max_change_id(user_id)
return (max_change_id, changed_elements, [])
# This raises a Runtime Exception, if there is no change_id # This raises a Runtime Exception, if there is no change_id
lowest_change_id = await self.get_lowest_change_id() lowest_change_id = await self.get_lowest_change_id()
@ -332,11 +350,10 @@ class ElementCache:
) )
( (
max_change_id,
raw_changed_elements, raw_changed_elements,
deleted_elements, deleted_elements,
) = await self.cache_provider.get_data_since( ) = await self.cache_provider.get_data_since(change_id)
change_id, max_change_id=max_change_id
)
changed_elements = { changed_elements = {
collection: [json.loads(value.decode()) for value in value_list] collection: [json.loads(value.decode()) for value in value_list]
for collection, value_list in raw_changed_elements.items() for collection, value_list in raw_changed_elements.items()
@ -381,7 +398,7 @@ class ElementCache:
else: else:
changed_elements[collection] = restricted_elements changed_elements[collection] = restricted_elements
return (changed_elements, deleted_elements) return (max_change_id, changed_elements, deleted_elements)
async def get_current_change_id(self) -> int: async def get_current_change_id(self) -> int:
""" """

View File

@ -57,6 +57,9 @@ class ElementCacheProvider(Protocol):
async def get_all_data(self) -> Dict[bytes, bytes]: async def get_all_data(self) -> Dict[bytes, bytes]:
... ...
async def get_all_data_with_max_change_id(self) -> Tuple[int, Dict[bytes, bytes]]:
...
async def get_collection_data(self, collection: str) -> Dict[int, bytes]: async def get_collection_data(self, collection: str) -> Dict[int, bytes]:
... ...
@ -69,8 +72,8 @@ class ElementCacheProvider(Protocol):
... ...
async def get_data_since( async def get_data_since(
self, change_id: int, max_change_id: int = -1 self, change_id: int
) -> Tuple[Dict[str, List[bytes]], List[str]]: ) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
... ...
async def get_current_change_id(self) -> int: async def get_current_change_id(self) -> int:
@ -137,6 +140,24 @@ class RedisCacheProvider:
False, False,
), ),
"get_all_data": ("return redis.call('hgetall', KEYS[1])", True), "get_all_data": ("return redis.call('hgetall', KEYS[1])", True),
"get_all_data_with_max_change_id": (
"""
local tmp = redis.call('zrevrangebyscore', KEYS[2], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1)
local max_change_id
if next(tmp) == nil then
-- The key does not exist
return redis.error_reply("cache_reset")
else
max_change_id = tmp[2]
end
local all_data = redis.call('hgetall', KEYS[1])
table.insert(all_data, 'max_change_id')
table.insert(all_data, max_change_id)
return all_data
""",
True,
),
"get_collection_data": ( "get_collection_data": (
""" """
local cursor = 0 local cursor = 0
@ -233,11 +254,24 @@ class RedisCacheProvider:
), ),
"get_data_since": ( "get_data_since": (
""" """
-- Get change ids of changed elements -- get max change id
local element_ids = redis.call('zrangebyscore', KEYS[2], ARGV[1], ARGV[2]) local tmp = redis.call('zrevrangebyscore', KEYS[2], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1)
local max_change_id
if next(tmp) == nil then
-- The key does not exist
return redis.error_reply("cache_reset")
else
max_change_id = tmp[2]
end
-- Save elements in array. Rotate element_id and element_json -- Get change ids of changed elements
local element_ids = redis.call('zrangebyscore', KEYS[2], ARGV[1], max_change_id)
-- Save elements in array. First is the max_change_id with the key "max_change_id"
-- Than rotate element_id and element_json. This is ocnverted into a dict in python code.
local elements = {} local elements = {}
table.insert(elements, 'max_change_id')
table.insert(elements, max_change_id)
for _, element_id in pairs(element_ids) do for _, element_id in pairs(element_ids) do
table.insert(elements, element_id) table.insert(elements, element_id)
table.insert(elements, redis.call('hget', KEYS[1], element_id)) table.insert(elements, redis.call('hget', KEYS[1], element_id))
@ -320,9 +354,25 @@ class RedisCacheProvider:
Returns all data from the full_data_cache in a mapping from element_id to the element. Returns all data from the full_data_cache in a mapping from element_id to the element.
""" """
return await aioredis.util.wait_make_dict( return await aioredis.util.wait_make_dict(
self.eval("get_all_data", [self.full_data_cache_key], read_only=True) self.eval("get_all_data", keys=[self.full_data_cache_key], read_only=True)
) )
@ensure_cache_wrapper()
async def get_all_data_with_max_change_id(self) -> Tuple[int, Dict[bytes, bytes]]:
"""
Returns all data from the full_data_cache in a mapping from element_id to the element and
the max change id.
"""
all_data = await aioredis.util.wait_make_dict(
self.eval(
"get_all_data_with_max_change_id",
keys=[self.full_data_cache_key, self.change_id_cache_key],
read_only=True,
)
)
max_change_id = int(all_data.pop(b"max_change_id"))
return max_change_id, all_data
@ensure_cache_wrapper() @ensure_cache_wrapper()
async def get_collection_data(self, collection: str) -> Dict[int, bytes]: async def get_collection_data(self, collection: str) -> Dict[int, bytes]:
""" """
@ -376,8 +426,8 @@ class RedisCacheProvider:
@ensure_cache_wrapper() @ensure_cache_wrapper()
async def get_data_since( async def get_data_since(
self, change_id: int, max_change_id: int = -1 self, change_id: int
) -> Tuple[Dict[str, List[bytes]], List[str]]: ) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
""" """
Returns all elements since a change_id (included) and until the max_change_id (included). Returns all elements since a change_id (included) and until the max_change_id (included).
@ -388,8 +438,6 @@ class RedisCacheProvider:
changed_elements: Dict[str, List[bytes]] = defaultdict(list) changed_elements: Dict[str, List[bytes]] = defaultdict(list)
deleted_elements: List[str] = [] deleted_elements: List[str] = []
# Convert max_change_id to a string. If its negative, use the string '+inf'
redis_max_change_id = "+inf" if max_change_id < 0 else str(max_change_id)
# lua script that returns gets all element_ids from change_id_cache_key # lua script that returns gets all element_ids from change_id_cache_key
# and then uses each element_id on full_data or restricted_data. # and then uses each element_id on full_data or restricted_data.
# It returns a list where the odd values are the change_id and the # It returns a list where the odd values are the change_id and the
@ -399,13 +447,14 @@ class RedisCacheProvider:
self.eval( self.eval(
"get_data_since", "get_data_since",
keys=[self.full_data_cache_key, self.change_id_cache_key], keys=[self.full_data_cache_key, self.change_id_cache_key],
args=[change_id, redis_max_change_id], args=[change_id],
read_only=True, read_only=True,
) )
) )
max_change_id = int(elements[b"max_change_id"].decode()) # type: ignore
for element_id, element_json in elements.items(): for element_id, element_json in elements.items():
if element_id.startswith(b"_config"): if element_id.startswith(b"_config") or element_id == b"max_change_id":
# Ignore config values from the change_id cache key # Ignore config values from the change_id cache key
continue continue
if element_json is None: if element_json is None:
@ -414,7 +463,7 @@ class RedisCacheProvider:
else: else:
collection, id = split_element_id(element_id) collection, id = split_element_id(element_id)
changed_elements[collection].append(element_json) changed_elements[collection].append(element_json)
return changed_elements, deleted_elements return max_change_id, changed_elements, deleted_elements
@ensure_cache_wrapper() @ensure_cache_wrapper()
async def get_current_change_id(self) -> int: async def get_current_change_id(self) -> int:
@ -562,6 +611,11 @@ class MemoryCacheProvider:
async def get_all_data(self) -> Dict[bytes, bytes]: async def get_all_data(self) -> Dict[bytes, bytes]:
return str_dict_to_bytes(self.full_data) return str_dict_to_bytes(self.full_data)
async def get_all_data_with_max_change_id(self) -> Tuple[int, Dict[bytes, bytes]]:
all_data = await self.get_all_data()
max_change_id = await self.get_current_change_id()
return max_change_id, all_data
async def get_collection_data(self, collection: str) -> Dict[int, bytes]: async def get_collection_data(self, collection: str) -> Dict[int, bytes]:
out = {} out = {}
query = f"{collection}:" query = f"{collection}:"
@ -602,16 +656,14 @@ class MemoryCacheProvider:
return change_id return change_id
async def get_data_since( async def get_data_since(
self, change_id: int, max_change_id: int = -1 self, change_id: int
) -> Tuple[Dict[str, List[bytes]], List[str]]: ) -> Tuple[int, Dict[str, List[bytes]], List[str]]:
changed_elements: Dict[str, List[bytes]] = defaultdict(list) changed_elements: Dict[str, List[bytes]] = defaultdict(list)
deleted_elements: List[str] = [] deleted_elements: List[str] = []
all_element_ids: Set[str] = set() all_element_ids: Set[str] = set()
for data_change_id, element_ids in self.change_id_data.items(): for data_change_id, element_ids in self.change_id_data.items():
if data_change_id >= change_id and ( if data_change_id >= change_id:
max_change_id == -1 or data_change_id <= max_change_id
):
all_element_ids.update(element_ids) all_element_ids.update(element_ids)
for element_id in all_element_ids: for element_id in all_element_ids:
@ -621,7 +673,8 @@ class MemoryCacheProvider:
else: else:
collection, id = split_element_id(element_id) collection, id = split_element_id(element_id)
changed_elements[collection].append(element_json.encode()) changed_elements[collection].append(element_json.encode())
return changed_elements, deleted_elements max_change_id = await self.get_current_change_id()
return (max_change_id, changed_elements, deleted_elements)
async def get_current_change_id(self) -> int: async def get_current_change_id(self) -> int:
if self.change_id_data: if self.change_id_data:

View File

@ -19,7 +19,6 @@ class ConsumerAutoupdateStrategy:
# client_change_id is int: the change_id, the client knows about, so the next # client_change_id is int: the change_id, the client knows about, so the next
# update must be from client_change_id+1 .. <next clange_id> # update must be from client_change_id+1 .. <next clange_id>
self.client_change_id: Optional[int] = None self.client_change_id: Optional[int] = None
self.max_seen_change_id = 0
self.next_send_time = None self.next_send_time = None
self.timer_task_handle: Optional[Task[None]] = None self.timer_task_handle: Optional[Task[None]] = None
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
@ -35,18 +34,17 @@ class ConsumerAutoupdateStrategy:
async with self.lock: async with self.lock:
await self.stop_timer() await self.stop_timer()
self.max_seen_change_id = await element_cache.get_current_change_id() max_change_id = await element_cache.get_current_change_id()
print(self.max_seen_change_id)
self.client_change_id = change_id self.client_change_id = change_id
if self.client_change_id == self.max_seen_change_id: if self.client_change_id == max_change_id:
# The client is up-to-date, so nothing will be done # The client is up-to-date, so nothing will be done
return None return None
if self.client_change_id > self.max_seen_change_id: if self.client_change_id > max_change_id:
message = ( message = (
f"Requested change_id {self.client_change_id} is higher than the " f"Requested change_id {self.client_change_id} is higher than the "
+ f"highest change_id {self.max_seen_change_id}." + f"highest change_id {max_change_id}."
) )
raise ChangeIdTooHighException(message, in_response=in_response) raise ChangeIdTooHighException(message, in_response=in_response)
@ -58,8 +56,6 @@ class ConsumerAutoupdateStrategy:
# The -1 is to send this autoupdate as the first one to he client. # The -1 is to send this autoupdate as the first one to he client.
# Remember: the client_change_id is the change_id the client knows about # Remember: the client_change_id is the change_id the client knows about
self.client_change_id = change_id - 1 self.client_change_id = change_id - 1
if change_id > self.max_seen_change_id:
self.max_seen_change_id = change_id
if AUTOUPDATE_DELAY is None: # feature deactivated, send directly if AUTOUPDATE_DELAY is None: # feature deactivated, send directly
await self.send_autoupdate() await self.send_autoupdate()
@ -92,17 +88,15 @@ class ConsumerAutoupdateStrategy:
self.timer_task_handle = None self.timer_task_handle = None
async def send_autoupdate(self, in_response: Optional[str] = None) -> None: async def send_autoupdate(self, in_response: Optional[str] = None) -> None:
# it is important to save this variable, because it can change during runtime.
max_change_id = self.max_seen_change_id
# here, 1 is added to the change_id, because the client_change_id is the id the client # here, 1 is added to the change_id, because the client_change_id is the id the client
# *knows* about -> the client needs client_change_id+1 since get_autoupdate_data is # *knows* about -> the client needs client_change_id+1 since get_autoupdate_data is
# inclusive [change_id .. max_change_id]. # inclusive [change_id .. max_change_id].
autoupdate = await get_autoupdate_data( max_change_id, autoupdate = await get_autoupdate_data(
cast(int, self.client_change_id) + 1, max_change_id, self.consumer.user_id cast(int, self.client_change_id) + 1, self.consumer.user_id
) )
if autoupdate is not None: if autoupdate is not None:
# It will be send, so we can set the client_change_id
self.client_change_id = max_change_id self.client_change_id = max_change_id
# It will be send, so we can set the client_change_id
await self.consumer.send_json( await self.consumer.send_json(
type="autoupdate", content=autoupdate, in_response=in_response, type="autoupdate", content=autoupdate, in_response=in_response,
) )

View File

@ -139,9 +139,9 @@ class ErrorLoggingMixin:
prefix = f"{path} {user_id}" prefix = f"{path} {user_id}"
if isinstance(exc, APIException): if isinstance(exc, APIException):
detail = self._detail_to_string(exc.detail) detail = self._detail_to_string(exc.detail)
error_logger.warn(f"{prefix} {str(detail)}") error_logger.warning(f"{prefix} {str(detail)}")
else: else:
error_logger.warn(f"{prefix} unknown exception: {exc}") error_logger.warning(f"{prefix} unknown exception: {exc}")
return super().handle_exception(exc) # type: ignore return super().handle_exception(exc) # type: ignore
def _detail_to_string(self, detail: Any) -> Any: def _detail_to_string(self, detail: Any) -> Any:

View File

@ -42,7 +42,7 @@ class TestCase(_TestCase):
""" """
user_id = None if user is None else user.id user_id = None if user is None else user.id
current_change_id = async_to_sync(element_cache.get_current_change_id)() current_change_id = async_to_sync(element_cache.get_current_change_id)()
_changed_elements, deleted_element_ids = async_to_sync( _, _changed_elements, deleted_element_ids = async_to_sync(
element_cache.get_data_since element_cache.get_data_since
)(user_id=user_id, change_id=current_change_id) )(user_id=user_id, change_id=current_change_id)

View File

@ -150,9 +150,14 @@ async def test_get_data_since_change_id_0(element_cache):
"app/personalized-collection:2": '{"id": 2, "key": "value2", "user_id": 2}', "app/personalized-collection:2": '{"id": 2, "key": "value2", "user_id": 2}',
} }
result = await element_cache.get_data_since(None, 0) (
max_change_id,
changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(None, 0)
assert sort_dict(result[0]) == sort_dict(example_data()) assert sort_dict(changed_elements) == sort_dict(example_data())
assert max_change_id == 0
@pytest.mark.asyncio @pytest.mark.asyncio
@ -184,6 +189,7 @@ async def test_get_data_since_change_id_data_in_redis(element_cache):
result = await element_cache.get_data_since(None, 1) result = await element_cache.get_data_since(None, 1)
assert result == ( assert result == (
1,
{"app/collection1": [{"id": 1, "value": "value1"}]}, {"app/collection1": [{"id": 1, "value": "value1"}]},
["app/collection1:3"], ["app/collection1:3"],
) )
@ -198,6 +204,7 @@ async def test_get_data_since_change_id_data_in_db(element_cache):
result = await element_cache.get_data_since(None, 1) result = await element_cache.get_data_since(None, 1)
assert result == ( assert result == (
1,
{"app/collection1": [{"id": 1, "value": "value1"}]}, {"app/collection1": [{"id": 1, "value": "value1"}]},
["app/collection1:3"], ["app/collection1:3"],
) )
@ -207,7 +214,7 @@ async def test_get_data_since_change_id_data_in_db(element_cache):
async def test_get_gata_since_change_id_data_in_db_empty_change_id(element_cache): async def test_get_gata_since_change_id_data_in_db_empty_change_id(element_cache):
result = await element_cache.get_data_since(None, 1) result = await element_cache.get_data_since(None, 1)
assert result == ({}, []) assert result == (0, {}, [])
@pytest.mark.asyncio @pytest.mark.asyncio
@ -261,9 +268,14 @@ async def test_get_all_restricted_data(element_cache):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_restricted_data_change_id_0(element_cache): async def test_get_restricted_data_change_id_0(element_cache):
result = await element_cache.get_data_since(2, 0) (
max_change_id,
changed_elements,
deleted_element_ids,
) = await element_cache.get_data_since(2, 0)
assert sort_dict(result[0]) == sort_dict( assert max_change_id == 0
assert sort_dict(changed_elements) == sort_dict(
{ {
"app/collection1": [ "app/collection1": [
{"id": 1, "value": "restricted_value1"}, {"id": 1, "value": "restricted_value1"},
@ -276,6 +288,7 @@ async def test_get_restricted_data_change_id_0(element_cache):
"app/personalized-collection": [{"id": 2, "key": "value2", "user_id": 2}], "app/personalized-collection": [{"id": 2, "key": "value2", "user_id": 2}],
} }
) )
assert deleted_element_ids == []
@pytest.mark.asyncio @pytest.mark.asyncio
@ -287,6 +300,7 @@ async def test_get_restricted_data_2(element_cache):
result = await element_cache.get_data_since(0, 1) result = await element_cache.get_data_since(0, 1)
assert result == ( assert result == (
1,
{"app/collection1": [{"id": 1, "value": "restricted_value1"}]}, {"app/collection1": [{"id": 1, "value": "restricted_value1"}]},
["app/collection1:3"], ["app/collection1:3"],
) )
@ -298,7 +312,7 @@ async def test_get_restricted_data_from_personalized_cacheable(element_cache):
result = await element_cache.get_data_since(0, 1) result = await element_cache.get_data_since(0, 1)
assert result == ({}, []) assert result == (1, {}, [])
@pytest.mark.asyncio @pytest.mark.asyncio
@ -310,12 +324,13 @@ async def test_get_restricted_data_change_id_lower_than_in_redis(element_cache):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_restricted_data_change_with_id(element_cache): async def test_get_restricted_data_with_change_id(element_cache):
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}} element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
result = await element_cache.get_data_since(0, 2) result = await element_cache.get_data_since(0, 2)
assert result == ( assert result == (
2,
{"app/collection1": [{"id": 1, "value": "restricted_value1"}]}, {"app/collection1": [{"id": 1, "value": "restricted_value1"}]},
[], [],
) )