# SPDX-FileCopyrightText: WTF Kooperative eG # # SPDX-License-Identifier: AGPL-3.0-or-later from flask import make_response, request from sqlalchemy import not_ from ki.models import Address, Contact, ContactType, Language, User, Profile, ProfileLanguage, ProfileSearchtopic, \ ProfileSkill, Skill from app import db def update_address(profile, address_data): address = profile.address if (address is None): address = Address(profile=profile) db.session.add(address) address.name = address_data.get("name", "") address.street = address_data.get("street", "") address.house_number = address_data.get("house_number", "") address.additional = address_data.get("additional", "") address.postcode = address_data.get("postcode", "") address.city = address_data.get("city", "") address.country = address_data.get("country", "") def update_languages(profile, languages_data): profile_language_ids = [] for language_data in languages_data: if "id" not in language_data["language"]: continue language = Language.query.get(language_data["language"]["id"]) profile_language = ProfileLanguage.query.filter(ProfileLanguage.profile == profile, ProfileLanguage.language == language).first() if profile_language is None: profile_language = ProfileLanguage(profile=profile, language=language) db.session.add(profile_language) profile_language.level = language_data["level"] profile_language_ids.append(language.id) ProfileLanguage.query.filter(ProfileLanguage.profile == profile, not_(ProfileLanguage.language_id.in_(profile_language_ids))).delete() def update_skills(profile, skills_data): profile_skill_ids = [] for skill_data in skills_data: skill_name = skill_data["skill"]["name"] skill = Skill.query.filter(Skill.name == skill_name).first() if (skill is None): skill = Skill(name=skill_name) db.session.add(skill) profile_skill = ProfileSkill.query.filter(ProfileSkill.profile == profile, ProfileSkill.skill == skill).first() if (profile_skill is None): profile_skill = ProfileSkill(profile=profile, skill=skill) db.session.add(profile_skill) profile_skill.level = skill_data["level"] profile_skill_ids.append(skill.id) ProfileSkill.query.filter(ProfileSkill.profile == profile, not_(ProfileSkill.skill_id.in_(profile_skill_ids))).delete() def update_searchtopics(profile, searchtopics_data): profile_searchtopics_ids = [] for searchtopic_data in searchtopics_data: skill_name = searchtopic_data["skill"]["name"] skill = Skill.query.filter(Skill.name == skill_name).first() if (skill is None): skill = Skill(name=skill_name) db.session.add(skill) profile_searchtopic = ProfileSearchtopic.query.filter(ProfileSearchtopic.profile == profile, ProfileSearchtopic.skill == skill).first() if (profile_searchtopic is None): profile_searchtopic = ProfileSearchtopic(profile=profile, skill=skill) db.session.add(profile_searchtopic) profile_searchtopics_ids.append(skill.id) ProfileSearchtopic.query.filter(ProfileSearchtopic.profile == profile, not_(ProfileSearchtopic.skill_id.in_(profile_searchtopics_ids))).delete() def update_contacts(profile, contacts_data): contact_ids_to_be_deleted = list(map(lambda c: c.id, profile.contacts)) for contact_data in contacts_data: contacttype_name = contact_data["contacttype"]["name"] contacttype = ContactType.query.filter(ContactType.name == contacttype_name).first() if (contacttype is None): contacttype = ContactType(name=contacttype_name) db.session.add(contacttype) if "id" in contact_data: contact_id = int(contact_data["id"]) contact_ids_to_be_deleted.remove(contact_id) contact = Contact.query.get(contact_id) else: contact = Contact(profile=profile, contacttype=contacttype) db.session.add(contact) contact.contacttype_id = contacttype.id contact.content = contact_data["content"] Contact.query.filter(Contact.id.in_(contact_ids_to_be_deleted)).delete() def update_profile(user_id: int): user = User.query.get(user_id) if user is None: return make_response({}, 404) profile = user.profile if (profile is None): profile = Profile(user=user, nickname=user.auth_id) db.session.add(profile) profile.nickname = request.json.get("nickname", "") profile.pronouns = request.json.get("pronouns", "") profile.volunteerwork = request.json.get("volunteerwork", "") profile.availability_status = request.json.get("availability_status", False) profile.availability_text = request.json.get("availability_text", "") availability_hours_per_week_raw = request.json.get("availability_hours_per_week", 0) try: availability_hours_per_week = int(availability_hours_per_week_raw) except: availability_hours_per_week = None profile.availability_hours_per_week = availability_hours_per_week profile.freetext = request.json.get("freetext", "") profile.visible = request.json.get("visible", False) update_address(profile, request.json.get("address", {})) update_contacts(profile, request.json.get("contacts", [])) update_skills(profile, request.json.get("skills", [])) update_searchtopics(profile, request.json.get("searchtopics", [])) update_languages(profile, request.json.get("languages", [])) db.session.commit() return make_response({"profile": profile.to_dict()})