forked from kompetenzinventar/ki-backend
add ldap auth
This commit is contained in:
53
ki/auth.py
53
ki/auth.py
@ -5,11 +5,28 @@
|
||||
import uuid
|
||||
import yaml
|
||||
|
||||
from ldap3 import Server, Connection, ALL
|
||||
|
||||
from app import app, db
|
||||
from ki.models import User, Token
|
||||
|
||||
|
||||
def auth(username, password):
|
||||
def create_user_token(username):
|
||||
user = User.query.filter(User.auth_id.__eq__(username)).first()
|
||||
|
||||
if user is None:
|
||||
user = User(auth_id=username)
|
||||
db.session.add(user)
|
||||
|
||||
token = Token(token=str(uuid.uuid4()), user=user)
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
return token
|
||||
|
||||
|
||||
def file_auth(username, password):
|
||||
app.logger.debug("performing file authentication")
|
||||
|
||||
auth_file_path = app.config["KI_DATA_DIR"] + "/auth.yml"
|
||||
|
||||
with open(auth_file_path, "r") as auth_file_stream:
|
||||
@ -23,14 +40,32 @@ def auth(username, password):
|
||||
if auth_user["password"] != password:
|
||||
return None
|
||||
|
||||
user = User.query.filter(User.auth_id.__eq__(username)).first()
|
||||
return create_user_token(username)
|
||||
|
||||
if user is None:
|
||||
user = User(auth_id=username)
|
||||
db.session.add(user)
|
||||
|
||||
token = Token(token=str(uuid.uuid4()), user=user)
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
def ldap_auth(username, password):
|
||||
app.logger.debug("performing LDAP authentication")
|
||||
|
||||
return token
|
||||
server = Server(app.config['KI_LDAP_URL'], get_info=ALL)
|
||||
root_dn = app.config['KI_LDAP_ROOT_DN']
|
||||
ldap_user = f"cn={username},{root_dn}"
|
||||
|
||||
app.logger.debug(f"server: {server}")
|
||||
connection = Connection(server, user=ldap_user, password=password)
|
||||
|
||||
if connection.bind():
|
||||
connection.unbind()
|
||||
return create_user_token(username)
|
||||
|
||||
connection.unbind()
|
||||
return None
|
||||
|
||||
|
||||
def auth(username, password):
|
||||
if app.config['KI_AUTH'] == 'file':
|
||||
return file_auth(username, password)
|
||||
|
||||
if app.config['KI_AUTH'] == 'ldap':
|
||||
return ldap_auth(username, password)
|
||||
|
||||
raise RuntimeError('unknown auth method')
|
||||
|
@ -17,6 +17,7 @@ class ApiTest(unittest.TestCase):
|
||||
app.debug = True
|
||||
app.config["TESTING"] = True
|
||||
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
|
||||
|
||||
self.client = app.test_client()
|
||||
|
||||
with app.app_context():
|
||||
|
Reference in New Issue
Block a user