150 lines
5.3 KiB
Python
150 lines
5.3 KiB
Python
# SPDX-FileCopyrightText: WTF Kooperative eG <https://wtf-eg.de/>
|
|
#
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
from flask import make_response, request
|
|
from sqlalchemy import not_
|
|
|
|
from ki.models import Address, Contact, ContactType, Language, User, Profile, ProfileLanguage, ProfileSearchtopic, \
|
|
ProfileSkill, Skill
|
|
from app import db
|
|
|
|
|
|
def update_address(profile, address_data):
|
|
address = profile.address
|
|
|
|
if (address is None):
|
|
address = Address(profile=profile)
|
|
db.session.add(address)
|
|
|
|
address.name = address_data.get("name", "")
|
|
address.street = address_data.get("street", "")
|
|
address.house_number = address_data.get("house_number", "")
|
|
address.additional = address_data.get("additional", "")
|
|
address.postcode = address_data.get("postcode", "")
|
|
address.city = address_data.get("city", "")
|
|
address.country = address_data.get("country", "")
|
|
|
|
|
|
def update_languages(profile, languages_data):
|
|
profile_language_ids = []
|
|
|
|
for language_data in languages_data:
|
|
language_id = language_data["language"]["id"]
|
|
language = Language.query.get(language_id)
|
|
|
|
profile_language = ProfileLanguage.query.filter(ProfileLanguage.profile == profile,
|
|
ProfileLanguage.language_id == language_id).first()
|
|
|
|
if profile_language is None:
|
|
profile_language = ProfileLanguage(profile=profile, language=language)
|
|
db.session.add(profile_language)
|
|
|
|
profile_language.level = language_data["level"]
|
|
|
|
profile_language_ids.append(language_id)
|
|
|
|
ProfileLanguage.query.filter(ProfileLanguage.profile == profile,
|
|
not_(ProfileLanguage.language_id.in_(profile_language_ids))).delete()
|
|
|
|
|
|
def update_skills(profile, skills_data):
|
|
profile_skill_ids = []
|
|
|
|
for skill_data in skills_data:
|
|
skill_name = skill_data["skill"]["name"]
|
|
skill = Skill.query.filter(Skill.name == skill_name).first()
|
|
|
|
if (skill is None):
|
|
skill = Skill(name=skill_name)
|
|
db.session.add(skill)
|
|
|
|
profile_skill = ProfileSkill.query.filter(ProfileSkill.profile == profile, ProfileSkill.skill == skill).first()
|
|
|
|
if (profile_skill is None):
|
|
profile_skill = ProfileSkill(profile=profile, skill=skill)
|
|
db.session.add(profile_skill)
|
|
|
|
profile_skill.level = skill_data["level"]
|
|
profile_skill_ids.append(skill.id)
|
|
|
|
ProfileSkill.query.filter(ProfileSkill.profile == profile,
|
|
not_(ProfileSkill.skill_id.in_(profile_skill_ids))).delete()
|
|
|
|
|
|
def update_searchtopics(profile, searchtopics_data):
|
|
profile_searchtopics_ids = []
|
|
|
|
for searchtopic_data in searchtopics_data:
|
|
skill_name = searchtopic_data["skill"]["name"]
|
|
skill = Skill.query.filter(Skill.name == skill_name).first()
|
|
|
|
if (skill is None):
|
|
skill = Skill(name=skill_name)
|
|
db.session.add(skill)
|
|
|
|
profile_searchtopic = ProfileSearchtopic.query.filter(ProfileSearchtopic.profile == profile,
|
|
ProfileSearchtopic.skill == skill).first()
|
|
|
|
if (profile_searchtopic is None):
|
|
profile_searchtopic = ProfileSearchtopic(profile=profile, skill=skill)
|
|
db.session.add(profile_searchtopic)
|
|
|
|
profile_searchtopics_ids.append(skill.id)
|
|
|
|
ProfileSearchtopic.query.filter(ProfileSearchtopic.profile == profile,
|
|
not_(ProfileSearchtopic.skill_id.in_(profile_searchtopics_ids))).delete()
|
|
|
|
|
|
def update_contacts(profile, contacts_data):
|
|
contact_ids_to_be_deleted = list(map(lambda c: c.id, profile.contacts))
|
|
|
|
for contact_data in contacts_data:
|
|
contacttype_name = contact_data["contacttype"]["name"]
|
|
contacttype = ContactType.query.filter(ContactType.name == contacttype_name).first()
|
|
|
|
if (contacttype is None):
|
|
contacttype = ContactType(name=contacttype_name)
|
|
db.session.add(contacttype)
|
|
|
|
if "id" in contact_data:
|
|
contact_id = int(contact_data["id"])
|
|
contact_ids_to_be_deleted.remove(contact_id)
|
|
contact = Contact.query.get(contact_id)
|
|
else:
|
|
contact = Contact(profile=profile, contacttype=contacttype)
|
|
db.session.add(contact)
|
|
|
|
contact.contacttype_id = contacttype.id
|
|
contact.content = contact_data["content"]
|
|
|
|
Contact.query.filter(Contact.id.in_(contact_ids_to_be_deleted)).delete()
|
|
|
|
|
|
def update_profile(user_id: int):
|
|
user = User.query.get(user_id)
|
|
|
|
if user is None:
|
|
return make_response({}, 404)
|
|
|
|
profile = user.profile
|
|
|
|
if (profile is None):
|
|
profile = Profile(user=user, nickname=user.auth_id)
|
|
db.session.add(profile)
|
|
|
|
profile.pronouns = request.json.get("pronouns", "")
|
|
profile.volunteerwork = request.json.get("volunteerwork", "")
|
|
profile.freetext = request.json.get("freetext", "")
|
|
profile.visible = request.json.get("visible", False)
|
|
|
|
update_address(profile, request.json.get("address", {}))
|
|
update_contacts(profile, request.json.get("contacts", {}))
|
|
update_skills(profile, request.json.get("skills", {}))
|
|
update_searchtopics(profile, request.json.get("searchtopics"))
|
|
update_languages(profile, request.json.get("languages", {}))
|
|
|
|
db.session.commit()
|
|
|
|
return make_response({"profile": profile.to_dict()})
|