# SPDX-FileCopyrightText: WTF Kooperative eG <https://wtf-eg.de/> # # SPDX-License-Identifier: AGPL-3.0-or-later import os from flask import g, make_response, request, send_file from functools import wraps from ki.auth import auth from ki.handlers import find_profiles as find_profiles_handler from ki.handlers import update_profile as update_profile_handler from ki.models import ContactType, Language, Skill, Token, User from app import app, db content_type_svg = "image/svg+xml" content_type_png = "image/png" def token_auth(func): @wraps(func) def _token_auth(*args, **kwargs): auth_header = request.headers.get("Authorization") if (auth_header is None): return make_response({}, 401) if not auth_header.startswith("Bearer"): return make_response({}, 401) token = Token.query.filter(Token.token == auth_header[7:]).first() if token is None: return make_response({}, 403) g.user = token.user return func(*args, **kwargs) return _token_auth def models_to_list(models): models_list = [] for model in models: models_list.append(model.to_dict()) return models_list def handle_completion_request(model, key): query = model.query if "search" in request.args: query = query.filter(model.name.startswith(request.args.get("search"))) results = query.order_by(model.name) \ .limit(10) \ .all() api_results = models_to_list(results) response_data = {} response_data[key] = api_results return response_data def handle_icon_request(model, id, path): object = db.session.get(model, id) if object is None: return make_response({}, 404) icon_base_path = path + str(id) icon_svg_path = icon_base_path + ".svg" if os.path.exists(icon_svg_path): return send_file(icon_svg_path, mimetype=content_type_svg) icon_png_path = icon_base_path + ".png" if os.path.exists(icon_png_path): return send_file(icon_png_path, mimetype=content_type_png) unknown_svg_path = path + "unknown.svg" if os.path.exists(unknown_svg_path): return send_file(unknown_svg_path, mimetype=content_type_svg) unknown_png_path = path + "unknown.png" if os.path.exists(unknown_png_path): return send_file(unknown_png_path, mimetype=content_type_png) return make_response({"error": "icon not found"}, 404) @app.route("/") def hello_world(): return "KI" @app.route("/users/login", methods=["POST"]) def login(): username = request.json.get("username", "") password = request.json.get("password", "") token = auth(username, password) if token is None: return make_response({}, 403) return make_response({"token": token.token, "user_id": token.user_id}) @app.route("/users/<user_id>/profile") @token_auth def get_user_profile(user_id): user = User.query.filter(User.id == int(user_id)).first() if user is None: return make_response({}, 404) profile = user.profile if profile is None: return make_response({}, 404) if not profile.visible and profile.user.id != g.user.id: return make_response({}, 403) return make_response({ "profile": profile.to_dict(), }) @app.route("/users/<user_id>/profile", methods=["POST"]) @token_auth def update_profile(user_id): if g.user.id != int(user_id): return make_response({}, 403) return update_profile_handler(int(user_id)) @app.route("/contacttypes") @token_auth def get_contacttypes(): return handle_completion_request(ContactType, "contacttypes") @app.route("/users/profiles") @token_auth def find_profiles(): return find_profiles_handler() @app.route("/skills") @token_auth def get_skills(): return handle_completion_request(Skill, "skills") @app.route("/skills/<skill_id>/icon") def get_skill_icon(skill_id): skill_icons_path = app.config["KI_DATA_DIR"] + "/imgs/skill_icons/" return handle_icon_request(Skill, skill_id, skill_icons_path) @app.route("/languages") @token_auth def get_languages(): return handle_completion_request(Language, "languages") @app.route("/languages/<language_id>/icon") def get_language_icon(language_id): language_flags_path = app.config["KI_DATA_DIR"] + "/imgs/flags/" return handle_icon_request(Language, language_id, language_flags_path)