import json import logging from django.contrib.auth import ( get_user_model, login as auth_login, logout as auth_logout, ) from django.http import HttpResponse, HttpResponseRedirect, HttpResponseServerError from django.views.generic import View from onelogin.saml2.auth import OneLogin_Saml2_Auth from onelogin.saml2.errors import OneLogin_Saml2_Error from onelogin.saml2.utils import OneLogin_Saml2_Utils from openslides.utils.autoupdate import inform_changed_data from .settings import get_saml_settings logger = logging.getLogger(__name__) class SamlView(View): """ View for the SAML Interface. Some SAML termina: - IDP: Identity provider. The service providing the actual login. - SP: Service provider. That is OpenSlides. """ def __init__(self, *args, **kwargs): return super().__init__(*args, **kwargs) def post(self, request, *args, **kwargs): """POST requests should do the same as GET requests.""" return self.get(request, *args, **kwargs) def get(self, request, *args, **kwargs): """ Switches specific saml types First user-initiated requests: - sso: SingleSignOn -> Redirect to IDP - sso2: Also SingleSingOn with a special redirect url - slo: SingleLogOut: Logs the user out of OpenSlides and the IDP. To only log out from OpenSlides, use the standard logout url. Second, requests from the IDP: - acs: AssertionConsumerService: Response from the IDP to the SP. Contains login data for a valid user - sls: SingleLogoutService: Request to log the user out. TODO: Nicer errors """ url, auth = self.get_saml_auth(request) if "sso" in request.GET: return HttpResponseRedirect(auth.login()) elif "sso2" in request.GET: return_to = url + "/" return HttpResponseRedirect(auth.login(return_to)) elif "slo" in request.GET: name_id = request.session.get("samlNameId") session_index = request.session.get("samlSessionIndex") auth_logout(request) # Logout from OpenSlides if name_id is None and session_index is None: # Not a SAML user return HttpResponseRedirect("/") else: request.session["samlNameId"] = None request.session["samlSessionIndex"] = None # Logout from IDP return HttpResponseRedirect( auth.logout(name_id=name_id, session_index=session_index) ) elif "acs" in request.GET: error_msg = "" try: auth.process_response() errors = auth.get_errors() if errors: error_msg = "".join(errors) except OneLogin_Saml2_Error as e: auth_errors = auth.get_errors() if auth_errors: auth_errors = "".join(auth_errors) error_msg = f"auth: {auth_errors}, " error_msg += f"detail: {str(e)}, code: {e.code}" if error_msg: return HttpResponseServerError(content=error_msg) request.session["samlNameId"] = auth.get_nameid() request.session["samlSessionIndex"] = auth.get_session_index() self.login_user(request, auth.get_attributes()) if "RelayState" in request.POST and url != request.POST["RelayState"]: return HttpResponseRedirect( auth.redirect_to(request.POST["RelayState"]) ) else: return HttpResponseRedirect("/") elif "sls" in request.GET: error_msg = "" try: url = auth.process_slo( delete_session_cb=lambda: request.session.flush() ) errors = auth.get_errors() if errors: error_msg = "".join(errors) except OneLogin_Saml2_Error as e: auth_errors = auth.get_errors() if auth_errors: auth_errors = "".join(auth_errors) error_msg = f"auth: {auth_errors}, " error_msg += f"detail: {str(e)}, code: {e.code}" if error_msg: return HttpResponseServerError(content=error_msg) else: return HttpResponseRedirect(url or "/") else: return HttpResponseRedirect("/") def login_user(self, request, attributes): """ Logs in a user given by the attributes """ verbose_attrs = ", ".join(attributes.keys()) logger.info(f"Login saml user with these attributes: {verbose_attrs}") # Get arguments for querying the one user queryargs = self.get_queryargs(attributes) User = get_user_model() user, created = User.objects.get_or_create(**queryargs) if created: logger.info( f"Created new saml user with id {user.id} and username {user.username}" ) self.add_groups_to_user(user, attributes) inform_changed_data(user) # put the new user into the cache else: logger.info( f"Found saml user with id {user.id} and username {user.username}" ) self.update_user(user, queryargs["defaults"]) auth_login(request, user) def add_groups_to_user(self, user, attributes): groups = get_saml_settings().groups group_ids = set() for i, m in enumerate(groups["matchers"]): raw_attr_value = attributes.get(m["attribute"], "") # Convert the raw_attr_value to a list of string. # It can be a string or list of strings. Other types will be ignored. attr_values = None if isinstance(raw_attr_value, str): attr_values = [raw_attr_value] elif isinstance(raw_attr_value, list) and all( isinstance(x, str) for x in raw_attr_value ): attr_values = raw_attr_value if attr_values is None: logger.warning( f"Want to match matcher {i+1} but the attribute value is not a string or a list of strings but {type(raw_attr_value)}" ) continue if any(m["regex"].search(x) for x in attr_values): group_ids.update(m["group_ids"]) logger.info(f"Matcher {i+1} matched. Adding groups: {m['group_ids']}") else: logger.info(f"Matcher {i+1} did not match") # since a matcher must have groups, not having groups here means no matcher matched if not group_ids: logger.info( f"No matcher matched. Adding default groups: {groups['default_group_ids']}" ) group_ids = groups["default_group_ids"] logger.info(f"Adding these groups to the user: {list(group_ids)}") if group_ids: user.groups.add(*group_ids) def get_queryargs(self, attributes): """ Build the arguments for getting or creating a user attributes with lookup=True are "normal" queryargs. The rest are default values. Ensures the auth_type to be "saml". """ queryargs = {} defaults = {} mapping = get_saml_settings().attribute_mapping for key, (value, lookup) in mapping.items(): attribute = attributes.get(key) if isinstance(attribute, list): attribute = ", ".join(attribute) if lookup: queryargs[value] = attribute else: defaults[value] = attribute # Add the auth_type to the defaults: defaults["auth_type"] = "saml" queryargs["defaults"] = defaults verbose_queryargs = json.dumps(queryargs) logger.debug(f"User queryargs: {verbose_queryargs}") return queryargs def update_user(self, user, attributes): """Updates a user with the new attributes""" if "auth_type" in attributes: del attributes["auth_type"] changed = False for key, value in attributes.items(): user_attr = getattr(user, key) if user_attr != value: setattr(user, key, value) changed = True if changed: user.save() def get_saml_auth(self, request): saml_request = dict(get_saml_settings().request_settings) # Update not existing keys saml_request["https"] = saml_request.get( "https", "on" if request.is_secure() else "off" ) saml_request["http_host"] = saml_request.get( "http_host", request.META["HTTP_HOST"] ) saml_request["script_name"] = saml_request.get( "script_name", request.META["PATH_INFO"] ) saml_request["server_port"] = saml_request.get( "server_port", request.META["SERVER_PORT"] ) # add get and post data saml_request["get_data"] = request.GET.copy() saml_request["post_data"] = request.POST.copy() return ( OneLogin_Saml2_Utils.get_self_url(saml_request), OneLogin_Saml2_Auth(saml_request, get_saml_settings().saml_settings), ) def serve_metadata(request, *args, **kwargs): settings = get_saml_settings().saml_settings metadata = settings.get_sp_metadata() errors = settings.validate_metadata(metadata) if len(errors) > 0: return HttpResponseServerError(content=", ".join(errors)) else: return HttpResponse(content=metadata, content_type="text/xml")