65 lines
2.3 KiB
Python
65 lines
2.3 KiB
Python
from flask import Blueprint, render_template, request, redirect, url_for, flash, session
|
|
import os
|
|
from functools import wraps
|
|
from ldap3 import Server, Connection, ALL
|
|
from ldap3.core.exceptions import LDAPException
|
|
|
|
|
|
class LoginAuth:
|
|
def __init__(self):
|
|
# Create Blueprint
|
|
self.bp = Blueprint("auth", __name__)
|
|
|
|
# LDAP CONFIG
|
|
self.LDAP_SERVER = os.getenv("LDAP_SERVER", "ldap://host.docker.internal:389")
|
|
self.BASE_DN = "ou=users,dc=lcepl,dc=org"
|
|
|
|
# Register Routes
|
|
self.bp.add_url_rule("/login", view_func=self.login, methods=["GET", "POST"])
|
|
self.bp.add_url_rule("/logout", view_func=self.logout)
|
|
|
|
# ================= LOGIN =================
|
|
def login(self):
|
|
if request.method == "POST":
|
|
username = request.form.get("username")
|
|
password = request.form.get("password")
|
|
|
|
if not username or not password:
|
|
flash("Username and password are required!", "danger")
|
|
return render_template("login.html")
|
|
|
|
user_dn = f"uid={username},{self.BASE_DN}"
|
|
server = Server(self.LDAP_SERVER, get_info=ALL)
|
|
|
|
try:
|
|
conn = Connection(server, user=user_dn, password=password, auto_bind=True)
|
|
|
|
if conn.bound:
|
|
session["user"] = username
|
|
flash(f"Login successful! Welcome {username}", "success")
|
|
conn.unbind()
|
|
return redirect(url_for("welcome"))
|
|
else:
|
|
flash("Invalid username or password!", "danger")
|
|
|
|
except LDAPException as e:
|
|
flash(f"LDAP login failed: {str(e)}", "danger")
|
|
|
|
return render_template("login.html")
|
|
|
|
# ================= LOGOUT =================
|
|
def logout(self):
|
|
session.clear()
|
|
flash("Logged out successfully!", "success")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
# ================= LOGIN REQUIRED =================
|
|
def login_required(self, f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
if "user" not in session:
|
|
flash("Please login first!", "danger")
|
|
return redirect(url_for("auth.login"))
|
|
return f(*args, **kwargs)
|
|
return wrapper
|