ki-backend/ki/auth.py

89 lines
2.3 KiB
Python
Raw Permalink Normal View History

2021-07-05 19:37:05 +02:00
# SPDX-FileCopyrightText: WTF Kooperative eG <https://wtf-eg.de/>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
2021-06-12 13:24:26 +02:00
import uuid
import yaml
2021-09-15 19:16:45 +02:00
from ldap3 import Server, Connection
from ldap3.utils.conv import escape_filter_chars
2021-07-11 12:16:41 +02:00
2021-06-12 13:24:26 +02:00
from app import app, db
from ki.models import User, Token
2021-07-11 12:16:41 +02:00
def create_user_token(username):
user = User.query.filter(User.auth_id.__eq__(username)).first()
if user is None:
user = User(auth_id=username)
db.session.add(user)
token = Token(token=str(uuid.uuid4()), user=user)
db.session.add(token)
db.session.commit()
return token
def file_auth(username, password):
app.logger.debug("performing file authentication")
2021-06-12 13:24:26 +02:00
auth_file_path = app.config["KI_DATA_DIR"] + "/auth.yml"
with open(auth_file_path, "r") as auth_file_stream:
users = yaml.safe_load(auth_file_stream)
if username not in users:
return None
auth_user = users[username]
if auth_user["password"] != password:
return None
2021-07-11 12:16:41 +02:00
return create_user_token(username)
2021-06-12 13:24:26 +02:00
2021-07-11 12:16:41 +02:00
def ldap_auth(username, password):
app.logger.debug("performing LDAP authentication")
2021-06-12 13:24:26 +02:00
2021-09-15 19:16:45 +02:00
escaped_username = escape_filter_chars(username)
server = Server(app.config['KI_LDAP_URL'])
2021-07-11 12:16:41 +02:00
2021-09-15 19:16:45 +02:00
try:
connection = Connection(server,
app.config['KI_LDAP_AUTH_USER'],
app.config['KI_LDAP_AUTH_PASSWORD'],
auto_bind=True)
except:
app.logger.error('ldap connection failed')
return None
2021-07-11 12:16:41 +02:00
2021-09-15 19:16:45 +02:00
if not connection.search(app.config['KI_LDAP_BASE_DN'], f"(&(objectClass=inetOrgPerson)(uid={escaped_username}))"):
2021-09-15 19:44:11 +02:00
app.logger.info("ldap search failed")
return None
if not connection.entries:
app.logger.info(f"no ldap search result for {username}")
2021-09-15 19:16:45 +02:00
return None
user_dn = connection.entries[0].entry_dn
if connection.rebind(user=user_dn, password=password):
2021-07-11 12:16:41 +02:00
connection.unbind()
return create_user_token(username)
connection.unbind()
2021-09-15 19:16:45 +02:00
app.logger.info(f"ldap login of {username} failed")
2021-07-11 12:16:41 +02:00
return None
def auth(username, password):
if app.config['KI_AUTH'] == 'file':
return file_auth(username, password)
if app.config['KI_AUTH'] == 'ldap':
return ldap_auth(username, password)
raise RuntimeError('unknown auth method')