Small improvements and first attempt to make to poll progress responsive
to massive autoupdates. The "optimization" didn't help, so this has to be continued in another PR.
This commit is contained in:
parent
d8b21c5fb5
commit
0eee839736
@ -60,6 +60,7 @@ export function isAutoupdateFormat(obj: any): obj is AutoupdateFormat {
|
||||
})
|
||||
export class AutoupdateService {
|
||||
private mutex = new Mutex();
|
||||
|
||||
/**
|
||||
* Constructor to create the AutoupdateService. Calls the constructor of the parent class.
|
||||
* @param websocketService
|
||||
@ -104,7 +105,7 @@ export class AutoupdateService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores all data from the autoupdate. This means, that the DS is resettet and filled with just the
|
||||
* Stores all data from the autoupdate. This means, that the DS is resetted and filled with just the
|
||||
* given data from the autoupdate.
|
||||
* @param autoupdate The autoupdate
|
||||
*/
|
||||
@ -133,9 +134,10 @@ export class AutoupdateService {
|
||||
|
||||
// Normal autoupdate
|
||||
if (autoupdate.from_change_id <= maxChangeId + 1 && autoupdate.to_change_id > maxChangeId) {
|
||||
this.injectAutupdateIntoDS(autoupdate, true);
|
||||
await this.injectAutupdateIntoDS(autoupdate, true);
|
||||
} else {
|
||||
// autoupdate fully in the future. we are missing something!
|
||||
console.log('Autoupdate in the future', maxChangeId, autoupdate.from_change_id, autoupdate.to_change_id);
|
||||
this.requestChanges();
|
||||
}
|
||||
}
|
||||
@ -143,7 +145,7 @@ export class AutoupdateService {
|
||||
public async injectAutoupdateIgnoreChangeId(autoupdate: AutoupdateFormat): Promise<void> {
|
||||
const unlock = await this.mutex.lock();
|
||||
console.debug('inject autoupdate', autoupdate);
|
||||
this.injectAutupdateIntoDS(autoupdate, false);
|
||||
await this.injectAutupdateIntoDS(autoupdate, false);
|
||||
unlock();
|
||||
}
|
||||
|
||||
|
@ -258,6 +258,7 @@ export class DataStoreUpdateManagerService {
|
||||
|
||||
private serveNextSlot(): void {
|
||||
if (this.updateSlotRequests.length > 0) {
|
||||
console.log('Concurrent update slots');
|
||||
const request = this.updateSlotRequests.pop();
|
||||
request.resolve();
|
||||
}
|
||||
@ -665,4 +666,11 @@ export class DataStoreService {
|
||||
await this.storageService.set(DataStoreService.cachePrefix + 'DS', this.jsonStore);
|
||||
await this.storageService.set(DataStoreService.cachePrefix + 'maxChangeId', changeId);
|
||||
}
|
||||
|
||||
public print(): void {
|
||||
console.log('Max change id', this.maxChangeId);
|
||||
console.log('json storage');
|
||||
console.log(JSON.stringify(this.jsonStore));
|
||||
console.log(this.modelStore);
|
||||
}
|
||||
}
|
||||
|
@ -456,8 +456,7 @@ export class OperatorService implements OnAfterAppsLoaded {
|
||||
* Set the operators presence to isPresent
|
||||
*/
|
||||
public async setPresence(isPresent: boolean): Promise<void> {
|
||||
const r = await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent);
|
||||
console.log('operator', r);
|
||||
await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -32,6 +32,26 @@
|
||||
<span>{{ 'Check for updates' | translate }}</span>
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<div>
|
||||
<button type="button" mat-button (click)="showDevTools=!showDevTools">
|
||||
<span>{{ 'Show devtools' | translate }}</span>
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<mat-card class="os-card" *ngIf="showDevTools">
|
||||
<div>
|
||||
<button type="button" mat-button (click)="printDS()">
|
||||
<span>Print DS</span>
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<div>
|
||||
<button type="button" mat-button (click)="getThisComponent()">
|
||||
<span>Get this component</span>
|
||||
</button>
|
||||
</div>
|
||||
</mat-card>
|
||||
</mat-card>
|
||||
|
||||
<mat-card class="os-card" *osPerms="'users.can_manage'">
|
||||
|
@ -4,6 +4,7 @@ import { Title } from '@angular/platform-browser';
|
||||
|
||||
import { TranslateService } from '@ngx-translate/core';
|
||||
|
||||
import { DataStoreService } from 'app/core/core-services/data-store.service';
|
||||
import { OpenSlidesService } from 'app/core/core-services/openslides.service';
|
||||
import { OperatorService, Permission } from 'app/core/core-services/operator.service';
|
||||
import { ConfigRepositoryService } from 'app/core/repositories/config/config-repository.service';
|
||||
@ -25,6 +26,8 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit {
|
||||
*/
|
||||
public legalNotice = '';
|
||||
|
||||
public showDevTools = false;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
@ -35,7 +38,8 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit {
|
||||
private openSlidesService: OpenSlidesService,
|
||||
private update: UpdateService,
|
||||
private configRepo: ConfigRepositoryService,
|
||||
private operator: OperatorService
|
||||
private operator: OperatorService,
|
||||
private DS: DataStoreService
|
||||
) {
|
||||
super(title, translate, matSnackbar);
|
||||
}
|
||||
@ -67,4 +71,12 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit {
|
||||
public canManage(): boolean {
|
||||
return this.operator.hasPerms(Permission.coreCanManageConfig);
|
||||
}
|
||||
|
||||
public printDS(): void {
|
||||
this.DS.print();
|
||||
}
|
||||
|
||||
public getThisComponent(): void {
|
||||
console.log(this);
|
||||
}
|
||||
}
|
||||
|
@ -554,7 +554,7 @@ export class MotionDetailComponent extends BaseViewComponent implements OnInit,
|
||||
this.motionFilterService.initFilters(this.motionObserver);
|
||||
this.motionSortService.initSorting(this.motionFilterService.outputObservable);
|
||||
this.sortedMotionsObservable = this.motionSortService.outputObservable;
|
||||
} else if (this.motion.parent_id) {
|
||||
} else if (this.motion && this.motion.parent_id) {
|
||||
// only use the amendments for this motion
|
||||
this.amendmentFilterService.initFilters(this.repo.amendmentsTo(this.motion.parent_id));
|
||||
this.amendmentSortService.initSorting(this.amendmentFilterService.outputObservable);
|
||||
|
@ -1,6 +1,6 @@
|
||||
<div *ngIf="poll" class="poll-progress-wrapper">
|
||||
<div class="vote-number">
|
||||
<span>{{ poll.votescast }} / {{ max }}</span>
|
||||
<span>{{ votescast }} / {{ max }}</span>
|
||||
</div>
|
||||
|
||||
<span>{{ 'Received votes' | translate }}</span>
|
||||
|
@ -1,58 +1,98 @@
|
||||
import { Component, Input, OnInit } from '@angular/core';
|
||||
import { Component, Input } from '@angular/core';
|
||||
import { MatSnackBar } from '@angular/material/snack-bar';
|
||||
import { Title } from '@angular/platform-browser';
|
||||
|
||||
import { TranslateService } from '@ngx-translate/core';
|
||||
import { map } from 'rxjs/operators';
|
||||
import { Subscription } from 'rxjs';
|
||||
|
||||
import { MotionPollRepositoryService } from 'app/core/repositories/motions/motion-poll-repository.service';
|
||||
import { UserRepositoryService } from 'app/core/repositories/users/user-repository.service';
|
||||
import { BaseViewComponent } from 'app/site/base/base-view';
|
||||
import { ViewBasePoll } from 'app/site/polls/models/view-base-poll';
|
||||
import { ViewUser } from 'app/site/users/models/view-user';
|
||||
|
||||
@Component({
|
||||
selector: 'os-poll-progress',
|
||||
templateUrl: './poll-progress.component.html',
|
||||
styleUrls: ['./poll-progress.component.scss']
|
||||
})
|
||||
export class PollProgressComponent extends BaseViewComponent implements OnInit {
|
||||
@Input()
|
||||
public poll: ViewBasePoll;
|
||||
export class PollProgressComponent extends BaseViewComponent {
|
||||
private pollId: number = null;
|
||||
private pollSubscription: Subscription = null;
|
||||
|
||||
@Input()
|
||||
public set poll(value: ViewBasePoll) {
|
||||
if (value.id !== this.pollId) {
|
||||
this.pollId = value.id;
|
||||
|
||||
if (this.pollSubscription !== null) {
|
||||
this.pollSubscription.unsubscribe();
|
||||
this.pollSubscription = null;
|
||||
}
|
||||
|
||||
this.pollSubscription = this.pollRepo.getViewModelObservable(this.pollId).subscribe(poll => {
|
||||
if (poll) {
|
||||
this._poll = poll;
|
||||
|
||||
// We may cannot use this.poll.votescast during the voting, since it can
|
||||
// be reported with false values from the server
|
||||
// -> calculate the votes on our own.
|
||||
const ids = new Set();
|
||||
for (const option of this.poll.options) {
|
||||
for (const vote of option.votes) {
|
||||
if (vote.user_id) {
|
||||
ids.add(vote.user_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.votescast = ids.size;
|
||||
|
||||
// But sometimes there are not enough votes (poll.votescast is higher).
|
||||
// If this happens, take the value from the poll
|
||||
if (this.poll.votescast > this.votescast) {
|
||||
this.votescast = this.poll.votescast;
|
||||
}
|
||||
|
||||
this.calculateMaxUsers();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
public get poll(): ViewBasePoll {
|
||||
return this._poll;
|
||||
}
|
||||
private _poll: ViewBasePoll;
|
||||
|
||||
public votescast: number;
|
||||
public max: number;
|
||||
public valueInPercent: number;
|
||||
|
||||
public constructor(
|
||||
title: Title,
|
||||
protected translate: TranslateService,
|
||||
snackbar: MatSnackBar,
|
||||
private userRepo: UserRepositoryService
|
||||
private userRepo: UserRepositoryService,
|
||||
private pollRepo: MotionPollRepositoryService
|
||||
) {
|
||||
super(title, translate, snackbar);
|
||||
this.userRepo.getViewModelListObservable().subscribe(users => {
|
||||
if (users) {
|
||||
this.calculateMaxUsers(users);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public get valueInPercent(): number {
|
||||
if (this.poll) {
|
||||
return (this.poll.votesvalid / this.max) * 100;
|
||||
} else {
|
||||
return 0;
|
||||
private calculateMaxUsers(allUsers?: ViewUser[]): void {
|
||||
if (!this.poll) {
|
||||
return;
|
||||
}
|
||||
if (!allUsers) {
|
||||
allUsers = this.userRepo.getViewModelList();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* OnInit.
|
||||
* Sets the observable for groups.
|
||||
*/
|
||||
public ngOnInit(): void {
|
||||
if (this.poll) {
|
||||
this.userRepo
|
||||
.getViewModelListObservable()
|
||||
.pipe(
|
||||
map(users =>
|
||||
users.filter(user => user.is_present && this.poll.groups_id.intersect(user.groups_id).length)
|
||||
)
|
||||
)
|
||||
.subscribe(users => {
|
||||
this.max = users.length;
|
||||
});
|
||||
}
|
||||
allUsers = allUsers.filter(user => user.is_present && this.poll.groups_id.intersect(user.groups_id).length);
|
||||
|
||||
this.max = allUsers.length;
|
||||
this.valueInPercent = this.poll ? (this.votescast / this.max) * 100 : 0;
|
||||
}
|
||||
}
|
||||
|
@ -156,9 +156,11 @@ class BasePollViewSet(ModelViewSet):
|
||||
|
||||
poll.state = BasePoll.STATE_PUBLISHED
|
||||
poll.save()
|
||||
inform_changed_data(vote.user for vote in poll.get_votes().all() if vote.user)
|
||||
inform_changed_data(poll.get_votes())
|
||||
inform_changed_data(poll.get_options())
|
||||
inform_changed_data(
|
||||
(vote.user for vote in poll.get_votes().all() if vote.user), final_data=True
|
||||
)
|
||||
inform_changed_data(poll.get_votes(), final_data=True)
|
||||
inform_changed_data(poll.get_options(), final_data=True)
|
||||
return Response()
|
||||
|
||||
@detail_route(methods=["POST"])
|
||||
|
@ -111,7 +111,8 @@ class AutoupdateBundle:
|
||||
save_history(self.element_iterator)
|
||||
|
||||
# Update cache and send autoupdate using async code.
|
||||
return async_to_sync(self.dispatch_autoupdate)()
|
||||
change_id = async_to_sync(self.dispatch_autoupdate)()
|
||||
return change_id
|
||||
|
||||
@property
|
||||
def element_iterator(self) -> Iterable[AutoupdateElement]:
|
||||
@ -172,6 +173,7 @@ def inform_changed_data(
|
||||
user_id: Optional[int] = None,
|
||||
disable_history: bool = False,
|
||||
no_delete_on_restriction: bool = False,
|
||||
final_data: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Informs the autoupdate system and the caching system about the creation or
|
||||
@ -187,8 +189,10 @@ def inform_changed_data(
|
||||
instances = (instances,)
|
||||
|
||||
root_instances = set(instance.get_root_rest_element() for instance in instances)
|
||||
elements = [
|
||||
AutoupdateElement(
|
||||
|
||||
elements = []
|
||||
for root_instance in root_instances:
|
||||
element = AutoupdateElement(
|
||||
id=root_instance.get_rest_pk(),
|
||||
collection_string=root_instance.get_collection_string(),
|
||||
disable_history=disable_history,
|
||||
@ -196,8 +200,9 @@ def inform_changed_data(
|
||||
user_id=user_id,
|
||||
no_delete_on_restriction=no_delete_on_restriction,
|
||||
)
|
||||
for root_instance in root_instances
|
||||
]
|
||||
if final_data:
|
||||
element["full_data"] = root_instance.get_full_data()
|
||||
elements.append(element)
|
||||
inform_elements(elements)
|
||||
|
||||
|
||||
|
@ -261,7 +261,7 @@ class RedisCacheProvider:
|
||||
|
||||
async def add_to_full_data(self, data: Dict[str, str]) -> None:
|
||||
async with get_connection() as redis:
|
||||
redis.hmset_dict(self.full_data_cache_key, data)
|
||||
await redis.hmset_dict(self.full_data_cache_key, data)
|
||||
|
||||
async def data_exists(self) -> bool:
|
||||
"""
|
||||
|
@ -16,7 +16,8 @@ class ConsumerAutoupdateStrategy:
|
||||
def __init__(self, consumer: ProtocollAsyncJsonWebsocketConsumer) -> None:
|
||||
self.consumer = consumer
|
||||
# client_change_id = None: unknown -> set on first autoupdate or request_change_id
|
||||
# client_change_id is int: the one
|
||||
# client_change_id is int: the change_id, the client knows about, so the next
|
||||
# update must be from client_change_id+1 .. <next clange_id>
|
||||
self.client_change_id: Optional[int] = None
|
||||
self.max_seen_change_id = 0
|
||||
self.next_send_time = None
|
||||
@ -26,11 +27,16 @@ class ConsumerAutoupdateStrategy:
|
||||
async def request_change_id(
|
||||
self, change_id: int, in_response: Optional[str] = None
|
||||
) -> None:
|
||||
"""
|
||||
The change id is not inclusive, so the client is on change_id and wants
|
||||
data from change_id+1 .. now
|
||||
"""
|
||||
# This resets the server side tracking of the client's change id.
|
||||
async with self.lock:
|
||||
await self.stop_timer()
|
||||
|
||||
self.max_seen_change_id = await element_cache.get_current_change_id()
|
||||
print(self.max_seen_change_id)
|
||||
self.client_change_id = change_id
|
||||
|
||||
if self.client_change_id == self.max_seen_change_id:
|
||||
@ -49,7 +55,9 @@ class ConsumerAutoupdateStrategy:
|
||||
async def new_change_id(self, change_id: int) -> None:
|
||||
async with self.lock:
|
||||
if self.client_change_id is None:
|
||||
self.client_change_id = change_id
|
||||
# The -1 is to send this autoupdate as the first one to he client.
|
||||
# Remember: the client_change_id is the change_id the client knows about
|
||||
self.client_change_id = change_id - 1
|
||||
if change_id > self.max_seen_change_id:
|
||||
self.max_seen_change_id = change_id
|
||||
|
||||
@ -84,12 +92,13 @@ class ConsumerAutoupdateStrategy:
|
||||
self.timer_task_handle = None
|
||||
|
||||
async def send_autoupdate(self, in_response: Optional[str] = None) -> None:
|
||||
max_change_id = (
|
||||
self.max_seen_change_id
|
||||
) # important to save this variable, because
|
||||
# it can change during runtime.
|
||||
# it is important to save this variable, because it can change during runtime.
|
||||
max_change_id = self.max_seen_change_id
|
||||
# here, 1 is added to the change_id, because the client_change_id is the id the client
|
||||
# *knows* about -> the client needs client_change_id+1 since get_autoupdate_data is
|
||||
# inclusive [change_id .. max_change_id].
|
||||
autoupdate = await get_autoupdate_data(
|
||||
cast(int, self.client_change_id), max_change_id, self.consumer.user_id
|
||||
cast(int, self.client_change_id) + 1, max_change_id, self.consumer.user_id
|
||||
)
|
||||
if autoupdate is not None:
|
||||
# It will be send, so we can set the client_change_id
|
||||
|
@ -175,7 +175,7 @@ class RESTModelMixin:
|
||||
current_time = time.time()
|
||||
if current_time > last_time + 5:
|
||||
last_time = current_time
|
||||
logger.info(f"\t{i+1}/{instances_length}...")
|
||||
logger.info(f" {i+1}/{instances_length}...")
|
||||
return full_data
|
||||
|
||||
@classmethod
|
||||
|
@ -50,7 +50,7 @@ async def prepare_element_cache(settings):
|
||||
]
|
||||
)
|
||||
element_cache._cachables = None
|
||||
await element_cache.async_ensure_cache(default_change_id=2)
|
||||
await element_cache.async_ensure_cache(default_change_id=10)
|
||||
yield
|
||||
# Reset the cachable_provider
|
||||
element_cache.cachable_provider = orig_cachable_provider
|
||||
@ -159,7 +159,7 @@ async def test_connection_with_too_big_change_id(get_communicator, set_config):
|
||||
@pytest.mark.asyncio
|
||||
async def test_changed_data_autoupdate(get_communicator, set_config):
|
||||
await set_config("general_system_enable_anonymous", True)
|
||||
communicator = get_communicator("autoupdate=on")
|
||||
communicator = get_communicator()
|
||||
await communicator.connect()
|
||||
|
||||
# Change a config value
|
||||
@ -201,7 +201,7 @@ async def create_user_session_cookie(user_id: int) -> Tuple[bytes, bytes]:
|
||||
@pytest.mark.asyncio
|
||||
async def test_with_user(get_communicator):
|
||||
cookie_header = await create_user_session_cookie(1)
|
||||
communicator = get_communicator("autoupdate=on", headers=[cookie_header])
|
||||
communicator = get_communicator(headers=[cookie_header])
|
||||
|
||||
connected, __ = await communicator.connect()
|
||||
|
||||
@ -211,7 +211,7 @@ async def test_with_user(get_communicator):
|
||||
@pytest.mark.asyncio
|
||||
async def test_skipping_autoupdate(set_config, get_communicator):
|
||||
cookie_header = await create_user_session_cookie(1)
|
||||
communicator = get_communicator("autoupdate=on", headers=[cookie_header])
|
||||
communicator = get_communicator(headers=[cookie_header])
|
||||
|
||||
await communicator.connect()
|
||||
|
||||
@ -254,7 +254,7 @@ async def test_skipping_autoupdate(set_config, get_communicator):
|
||||
@pytest.mark.asyncio
|
||||
async def test_receive_deleted_data(get_communicator, set_config):
|
||||
await set_config("general_system_enable_anonymous", True)
|
||||
communicator = get_communicator("autoupdate=on")
|
||||
communicator = get_communicator()
|
||||
await communicator.connect()
|
||||
|
||||
# Delete test element
|
||||
@ -384,6 +384,7 @@ async def test_send_get_elements_too_big_change_id(communicator, set_config):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_get_elements_too_small_change_id(communicator, set_config):
|
||||
# Note: this test depends on the default_change_id set in prepare_element_cache
|
||||
await set_config("general_system_enable_anonymous", True)
|
||||
await communicator.connect()
|
||||
|
||||
@ -517,7 +518,7 @@ async def test_listen_to_projector(communicator, set_config):
|
||||
content = response.get("content")
|
||||
assert type == "projector"
|
||||
assert content == {
|
||||
"change_id": 3,
|
||||
"change_id": 11,
|
||||
"data": {
|
||||
"1": [
|
||||
{
|
||||
@ -555,7 +556,7 @@ async def test_update_projector(communicator, set_config):
|
||||
content = response.get("content")
|
||||
assert type == "projector"
|
||||
assert content == {
|
||||
"change_id": 4,
|
||||
"change_id": 12,
|
||||
"data": {
|
||||
"1": [
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user