2021-07-05 19:37:05 +02:00
|
|
|
# SPDX-FileCopyrightText: WTF Kooperative eG <https://wtf-eg.de/>
|
|
|
|
#
|
|
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
|
2021-06-12 13:24:26 +02:00
|
|
|
import uuid
|
|
|
|
import yaml
|
|
|
|
|
2021-07-11 12:16:41 +02:00
|
|
|
from ldap3 import Server, Connection, ALL
|
|
|
|
|
2021-06-12 13:24:26 +02:00
|
|
|
from app import app, db
|
|
|
|
from ki.models import User, Token
|
|
|
|
|
|
|
|
|
2021-07-11 12:16:41 +02:00
|
|
|
def create_user_token(username):
|
|
|
|
user = User.query.filter(User.auth_id.__eq__(username)).first()
|
|
|
|
|
|
|
|
if user is None:
|
|
|
|
user = User(auth_id=username)
|
|
|
|
db.session.add(user)
|
|
|
|
|
|
|
|
token = Token(token=str(uuid.uuid4()), user=user)
|
|
|
|
db.session.add(token)
|
|
|
|
db.session.commit()
|
|
|
|
return token
|
|
|
|
|
|
|
|
|
|
|
|
def file_auth(username, password):
|
|
|
|
app.logger.debug("performing file authentication")
|
|
|
|
|
2021-06-12 13:24:26 +02:00
|
|
|
auth_file_path = app.config["KI_DATA_DIR"] + "/auth.yml"
|
|
|
|
|
|
|
|
with open(auth_file_path, "r") as auth_file_stream:
|
|
|
|
users = yaml.safe_load(auth_file_stream)
|
|
|
|
|
|
|
|
if username not in users:
|
|
|
|
return None
|
|
|
|
|
|
|
|
auth_user = users[username]
|
|
|
|
|
|
|
|
if auth_user["password"] != password:
|
|
|
|
return None
|
|
|
|
|
2021-07-11 12:16:41 +02:00
|
|
|
return create_user_token(username)
|
2021-06-12 13:24:26 +02:00
|
|
|
|
|
|
|
|
2021-07-11 12:16:41 +02:00
|
|
|
def ldap_auth(username, password):
|
|
|
|
app.logger.debug("performing LDAP authentication")
|
2021-06-12 13:24:26 +02:00
|
|
|
|
2021-07-11 12:16:41 +02:00
|
|
|
server = Server(app.config['KI_LDAP_URL'], get_info=ALL)
|
|
|
|
root_dn = app.config['KI_LDAP_ROOT_DN']
|
|
|
|
ldap_user = f"cn={username},{root_dn}"
|
|
|
|
|
|
|
|
app.logger.debug(f"server: {server}")
|
|
|
|
connection = Connection(server, user=ldap_user, password=password)
|
|
|
|
|
|
|
|
if connection.bind():
|
|
|
|
connection.unbind()
|
|
|
|
return create_user_token(username)
|
|
|
|
|
|
|
|
connection.unbind()
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def auth(username, password):
|
|
|
|
if app.config['KI_AUTH'] == 'file':
|
|
|
|
return file_auth(username, password)
|
|
|
|
|
|
|
|
if app.config['KI_AUTH'] == 'ldap':
|
|
|
|
return ldap_auth(username, password)
|
|
|
|
|
|
|
|
raise RuntimeError('unknown auth method')
|