# SPDX-FileCopyrightText: WTF Kooperative eG # # SPDX-License-Identifier: AGPL-3.0-or-later import uuid import yaml from ldap3 import Server, Connection from ldap3.utils.conv import escape_filter_chars from app import app, db from ki.models import User, Token def create_user_token(username): user = User.query.filter(User.auth_id.__eq__(username)).first() if user is None: user = User(auth_id=username) db.session.add(user) token = Token(token=str(uuid.uuid4()), user=user) db.session.add(token) db.session.commit() return token def file_auth(username, password): app.logger.debug("performing file authentication") auth_file_path = app.config["KI_DATA_DIR"] + "/auth.yml" with open(auth_file_path, "r") as auth_file_stream: users = yaml.safe_load(auth_file_stream) if username not in users: return None auth_user = users[username] if auth_user["password"] != password: return None return create_user_token(username) def ldap_auth(username, password): app.logger.debug("performing LDAP authentication") escaped_username = escape_filter_chars(username) server = Server(app.config['KI_LDAP_URL']) try: connection = Connection(server, app.config['KI_LDAP_AUTH_USER'], app.config['KI_LDAP_AUTH_PASSWORD'], auto_bind=True) except: app.logger.error('ldap connection failed') return None if not connection.search(app.config['KI_LDAP_BASE_DN'], f"(&(objectClass=inetOrgPerson)(uid={escaped_username}))"): app.logger.info("ldap search failed") return None if not connection.entries: app.logger.info(f"no ldap search result for {username}") return None user_dn = connection.entries[0].entry_dn if connection.rebind(user=user_dn, password=password): connection.unbind() return create_user_token(username) connection.unbind() app.logger.info(f"ldap login of {username} failed") return None def auth(username, password): if app.config['KI_AUTH'] == 'file': return file_auth(username, password) if app.config['KI_AUTH'] == 'ldap': return ldap_auth(username, password) raise RuntimeError('unknown auth method')