Add Microsoft Entra SSO authentication flow

This commit is contained in:
Ivo Oskamp 2026-02-23 14:20:22 +01:00
parent 47bb4ee4f0
commit 5274286c04
6 changed files with 397 additions and 5 deletions

View File

@ -1,5 +1,11 @@
import base64
import binascii
import hashlib
import os
import random import random
import secrets
from functools import wraps from functools import wraps
from urllib.parse import urlencode
from flask import ( from flask import (
Blueprint, Blueprint,
@ -11,9 +17,10 @@ from flask import (
session, session,
) )
from flask_login import login_user, logout_user, login_required, current_user from flask_login import login_user, logout_user, login_required, current_user
import requests
from ..database import db from ..database import db
from ..models import User from ..models import SystemSettings, User
auth_bp = Blueprint("auth", __name__, url_prefix="/auth") auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
@ -31,6 +38,106 @@ def generate_captcha():
return question, answer return question, answer
def _entra_effective_config() -> dict:
"""Return effective Entra SSO config from DB settings with env fallback."""
settings = SystemSettings.query.first()
enabled = bool(getattr(settings, "entra_sso_enabled", False)) if settings else False
tenant_id = (getattr(settings, "entra_tenant_id", None) or "").strip() if settings else ""
client_id = (getattr(settings, "entra_client_id", None) or "").strip() if settings else ""
client_secret = (getattr(settings, "entra_client_secret", None) or "").strip() if settings else ""
redirect_uri = (getattr(settings, "entra_redirect_uri", None) or "").strip() if settings else ""
allowed_domain = (getattr(settings, "entra_allowed_domain", None) or "").strip().lower() if settings else ""
auto_provision = bool(getattr(settings, "entra_auto_provision_users", False)) if settings else False
if not tenant_id:
tenant_id = (os.environ.get("ENTRA_TENANT_ID", "") or "").strip()
if not client_id:
client_id = (os.environ.get("ENTRA_CLIENT_ID", "") or "").strip()
if not client_secret:
client_secret = (os.environ.get("ENTRA_CLIENT_SECRET", "") or "").strip()
if not redirect_uri:
redirect_uri = (os.environ.get("ENTRA_REDIRECT_URI", "") or "").strip()
if not allowed_domain:
allowed_domain = (os.environ.get("ENTRA_ALLOWED_DOMAIN", "") or "").strip().lower()
if not enabled:
env_enabled = (os.environ.get("ENTRA_SSO_ENABLED", "") or "").strip().lower()
enabled = env_enabled in ("1", "true", "yes", "on")
if not auto_provision:
env_auto = (os.environ.get("ENTRA_AUTO_PROVISION_USERS", "") or "").strip().lower()
auto_provision = env_auto in ("1", "true", "yes", "on")
return {
"enabled": enabled,
"tenant_id": tenant_id,
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"allowed_domain": allowed_domain,
"auto_provision": auto_provision,
}
def _b64url_decode(data: str) -> bytes:
pad = "=" * (-len(data) % 4)
return base64.urlsafe_b64decode((data + pad).encode("ascii"))
def _decode_id_token_payload(id_token: str) -> dict:
"""Decode JWT payload without signature verification (token comes from Entra token endpoint)."""
if not id_token or "." not in id_token:
return {}
parts = id_token.split(".")
if len(parts) < 2:
return {}
try:
payload_raw = _b64url_decode(parts[1])
import json
payload = json.loads(payload_raw.decode("utf-8"))
if isinstance(payload, dict):
return payload
except (binascii.Error, ValueError, UnicodeDecodeError):
return {}
return {}
def _resolve_sso_user(claims: dict, auto_provision: bool) -> User | None:
"""Resolve or optionally create a local user from Entra claims."""
username = (
(claims.get("preferred_username") or "")
or (claims.get("upn") or "")
or (claims.get("email") or "")
).strip()
email = ((claims.get("email") or claims.get("preferred_username") or "") or "").strip() or None
if not username:
return None
user = User.query.filter_by(username=username).first()
if not user and email:
user = User.query.filter_by(email=email).first()
if user:
return user
if not auto_provision:
return None
new_username = username
if User.query.filter_by(username=new_username).first():
base = new_username
idx = 1
while User.query.filter_by(username=f"{base}.{idx}").first():
idx += 1
new_username = f"{base}.{idx}"
# Random local password as fallback; SSO users authenticate via Entra.
random_password = secrets.token_urlsafe(32)
new_user = User(username=new_username, email=email, role="viewer")
new_user.set_password(random_password)
db.session.add(new_user)
db.session.commit()
return new_user
def captcha_required(func): def captcha_required(func):
@wraps(func) @wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
@ -42,10 +149,18 @@ def captcha_required(func):
# regenerate captcha for re-render # regenerate captcha for re-render
question, answer = generate_captcha() question, answer = generate_captcha()
session["captcha_answer"] = answer session["captcha_answer"] = answer
cfg = _entra_effective_config()
entra_ready = bool(
cfg.get("enabled")
and cfg.get("tenant_id")
and cfg.get("client_id")
and cfg.get("client_secret")
)
return render_template( return render_template(
"auth/login.html", "auth/login.html",
captcha_question=question, captcha_question=question,
username=request.form.get("username", ""), username=request.form.get("username", ""),
entra_sso_enabled=entra_ready,
) )
return func(*args, **kwargs) return func(*args, **kwargs)
@ -61,7 +176,18 @@ def login():
question, answer = generate_captcha() question, answer = generate_captcha()
session["captcha_answer"] = answer session["captcha_answer"] = answer
return render_template("auth/login.html", captcha_question=question) cfg = _entra_effective_config()
entra_ready = bool(
cfg.get("enabled")
and cfg.get("tenant_id")
and cfg.get("client_id")
and cfg.get("client_secret")
)
return render_template(
"auth/login.html",
captcha_question=question,
entra_sso_enabled=entra_ready,
)
# POST # POST
username = (request.form.get("username") or "").strip() username = (request.form.get("username") or "").strip()
@ -72,8 +198,18 @@ def login():
flash("Invalid username or password.", "danger") flash("Invalid username or password.", "danger")
question, answer = generate_captcha() question, answer = generate_captcha()
session["captcha_answer"] = answer session["captcha_answer"] = answer
cfg = _entra_effective_config()
entra_ready = bool(
cfg.get("enabled")
and cfg.get("tenant_id")
and cfg.get("client_id")
and cfg.get("client_secret")
)
return render_template( return render_template(
"auth/login.html", captcha_question=question, username=username "auth/login.html",
captcha_question=question,
username=username,
entra_sso_enabled=entra_ready,
) )
login_user(user) login_user(user)
@ -81,18 +217,154 @@ def login():
session["active_role"] = user.roles[0] session["active_role"] = user.roles[0]
except Exception: except Exception:
session["active_role"] = (getattr(user, "role", "viewer") or "viewer").split(",")[0].strip() or "viewer" session["active_role"] = (getattr(user, "role", "viewer") or "viewer").split(",")[0].strip() or "viewer"
session["auth_provider"] = "local"
flash("You are now logged in.", "success") flash("You are now logged in.", "success")
return redirect(url_for("main.dashboard")) return redirect(url_for("main.dashboard"))
@auth_bp.route("/entra/login")
def entra_login():
"""Start Microsoft Entra ID authorization code flow."""
cfg = _entra_effective_config()
if not cfg.get("enabled"):
flash("Microsoft Entra SSO is not enabled.", "warning")
return redirect(url_for("auth.login"))
if not cfg.get("tenant_id") or not cfg.get("client_id") or not cfg.get("client_secret"):
flash("Microsoft Entra SSO is not fully configured.", "danger")
return redirect(url_for("auth.login"))
redirect_uri = cfg.get("redirect_uri") or url_for("auth.entra_callback", _external=True)
state = secrets.token_urlsafe(24)
nonce = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
session["entra_state"] = state
session["entra_nonce"] = nonce
params = {
"client_id": cfg["client_id"],
"response_type": "code",
"redirect_uri": redirect_uri,
"response_mode": "query",
"scope": "openid profile email",
"state": state,
"nonce": nonce,
"prompt": "select_account",
}
auth_url = (
f"https://login.microsoftonline.com/{cfg['tenant_id']}/oauth2/v2.0/authorize?"
f"{urlencode(params)}"
)
return redirect(auth_url)
@auth_bp.route("/entra/callback")
def entra_callback():
"""Handle Microsoft Entra ID callback and log in mapped local user."""
cfg = _entra_effective_config()
if not cfg.get("enabled"):
flash("Microsoft Entra SSO is not enabled.", "warning")
return redirect(url_for("auth.login"))
error = (request.args.get("error") or "").strip()
if error:
desc = (request.args.get("error_description") or "").strip()
flash(f"Microsoft Entra login failed: {error} {desc}".strip(), "danger")
return redirect(url_for("auth.login"))
state = (request.args.get("state") or "").strip()
expected_state = (session.get("entra_state") or "").strip()
if not state or not expected_state or state != expected_state:
flash("Invalid SSO state. Please try again.", "danger")
return redirect(url_for("auth.login"))
code = (request.args.get("code") or "").strip()
if not code:
flash("No authorization code returned by Microsoft Entra.", "danger")
return redirect(url_for("auth.login"))
redirect_uri = cfg.get("redirect_uri") or url_for("auth.entra_callback", _external=True)
token_url = f"https://login.microsoftonline.com/{cfg['tenant_id']}/oauth2/v2.0/token"
token_payload = {
"client_id": cfg["client_id"],
"client_secret": cfg["client_secret"],
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"scope": "openid profile email",
}
try:
token_resp = requests.post(token_url, data=token_payload, timeout=30)
token_resp.raise_for_status()
token_data = token_resp.json()
except Exception as exc:
flash(f"Failed to fetch token from Microsoft Entra: {exc}", "danger")
return redirect(url_for("auth.login"))
id_token = token_data.get("id_token")
claims = _decode_id_token_payload(id_token or "")
if not claims:
flash("Could not read Microsoft Entra ID token.", "danger")
return redirect(url_for("auth.login"))
expected_nonce = (session.get("entra_nonce") or "").strip()
token_nonce = (claims.get("nonce") or "").strip()
if expected_nonce and token_nonce and token_nonce != expected_nonce:
flash("Invalid SSO nonce. Please try again.", "danger")
return redirect(url_for("auth.login"))
allowed_domain = (cfg.get("allowed_domain") or "").strip().lower()
if allowed_domain:
token_tid = (claims.get("tid") or "").strip().lower()
token_domain = ""
upn = (claims.get("preferred_username") or claims.get("email") or "").strip().lower()
if "@" in upn:
token_domain = upn.split("@", 1)[1]
if allowed_domain not in {token_tid, token_domain}:
flash("Your Microsoft account is not allowed for this instance.", "danger")
return redirect(url_for("auth.login"))
user = _resolve_sso_user(claims, auto_provision=bool(cfg.get("auto_provision")))
if not user:
flash(
"No local Backupchecks user is mapped to this Microsoft account. "
"Ask an admin to create or map your account.",
"danger",
)
return redirect(url_for("auth.login"))
login_user(user)
try:
session["active_role"] = user.roles[0]
except Exception:
session["active_role"] = (getattr(user, "role", "viewer") or "viewer").split(",")[0].strip() or "viewer"
session["auth_provider"] = "entra"
session.pop("entra_state", None)
session.pop("entra_nonce", None)
flash("You are now logged in with Microsoft Entra.", "success")
return redirect(url_for("main.dashboard"))
@auth_bp.route("/logout") @auth_bp.route("/logout")
@login_required @login_required
def logout(): def logout():
cfg = _entra_effective_config()
auth_provider = (session.get("auth_provider") or "").strip()
logout_user() logout_user()
try: try:
session.pop("active_role", None) session.pop("active_role", None)
session.pop("auth_provider", None)
session.pop("entra_state", None)
session.pop("entra_nonce", None)
except Exception: except Exception:
pass pass
if auth_provider == "entra" and cfg.get("enabled") and cfg.get("tenant_id"):
post_logout = url_for("auth.login", _external=True)
logout_url = (
f"https://login.microsoftonline.com/{cfg['tenant_id']}/oauth2/v2.0/logout?"
f"{urlencode({'post_logout_redirect_uri': post_logout})}"
)
return redirect(logout_url)
flash("You have been logged out.", "info") flash("You have been logged out.", "info")
return redirect(url_for("auth.login")) return redirect(url_for("auth.login"))

View File

@ -787,6 +787,7 @@ def settings():
if request.method == "POST": if request.method == "POST":
autotask_form_touched = any(str(k).startswith("autotask_") for k in (request.form or {}).keys()) autotask_form_touched = any(str(k).startswith("autotask_") for k in (request.form or {}).keys())
cove_form_touched = any(str(k).startswith("cove_") for k in (request.form or {}).keys()) cove_form_touched = any(str(k).startswith("cove_") for k in (request.form or {}).keys())
entra_form_touched = any(str(k).startswith("entra_") for k in (request.form or {}).keys())
import_form_touched = any(str(k).startswith("auto_import_") or str(k).startswith("manual_import_") or str(k).startswith("ingest_eml_") for k in (request.form or {}).keys()) import_form_touched = any(str(k).startswith("auto_import_") or str(k).startswith("manual_import_") or str(k).startswith("ingest_eml_") for k in (request.form or {}).keys())
general_form_touched = "ui_timezone" in request.form general_form_touched = "ui_timezone" in request.form
mail_form_touched = any(k in request.form for k in ["graph_tenant_id", "graph_client_id", "graph_mailbox", "incoming_folder", "processed_folder"]) mail_form_touched = any(k in request.form for k in ["graph_tenant_id", "graph_client_id", "graph_mailbox", "incoming_folder", "processed_folder"])
@ -934,6 +935,24 @@ def settings():
except (ValueError, TypeError): except (ValueError, TypeError):
pass pass
# Microsoft Entra SSO
if entra_form_touched:
settings.entra_sso_enabled = bool(request.form.get("entra_sso_enabled"))
settings.entra_auto_provision_users = bool(request.form.get("entra_auto_provision_users"))
if "entra_tenant_id" in request.form:
settings.entra_tenant_id = (request.form.get("entra_tenant_id") or "").strip() or None
if "entra_client_id" in request.form:
settings.entra_client_id = (request.form.get("entra_client_id") or "").strip() or None
if "entra_redirect_uri" in request.form:
settings.entra_redirect_uri = (request.form.get("entra_redirect_uri") or "").strip() or None
if "entra_allowed_domain" in request.form:
settings.entra_allowed_domain = (request.form.get("entra_allowed_domain") or "").strip() or None
if "entra_client_secret" in request.form:
pw = (request.form.get("entra_client_secret") or "").strip()
if pw:
settings.entra_client_secret = pw
# Daily Jobs # Daily Jobs
if "daily_jobs_start_date" in request.form: if "daily_jobs_start_date" in request.form:
daily_jobs_start_date_str = (request.form.get("daily_jobs_start_date") or "").strip() daily_jobs_start_date_str = (request.form.get("daily_jobs_start_date") or "").strip()
@ -1146,6 +1165,7 @@ def settings():
has_client_secret = bool(settings.graph_client_secret) has_client_secret = bool(settings.graph_client_secret)
has_autotask_password = bool(getattr(settings, "autotask_api_password", None)) has_autotask_password = bool(getattr(settings, "autotask_api_password", None))
has_cove_password = bool(getattr(settings, "cove_api_password", None)) has_cove_password = bool(getattr(settings, "cove_api_password", None))
has_entra_secret = bool(getattr(settings, "entra_client_secret", None))
# Common UI timezones (IANA names) # Common UI timezones (IANA names)
tz_options = [ tz_options = [
@ -1272,6 +1292,7 @@ def settings():
has_client_secret=has_client_secret, has_client_secret=has_client_secret,
has_autotask_password=has_autotask_password, has_autotask_password=has_autotask_password,
has_cove_password=has_cove_password, has_cove_password=has_cove_password,
has_entra_secret=has_entra_secret,
tz_options=tz_options, tz_options=tz_options,
users=users, users=users,
admin_users_count=admin_users_count, admin_users_count=admin_users_count,

View File

@ -1175,6 +1175,35 @@ def migrate_cove_integration() -> None:
print(f"[migrations] Failed to migrate Cove integration columns: {exc}") print(f"[migrations] Failed to migrate Cove integration columns: {exc}")
def migrate_entra_sso_settings() -> None:
"""Add Microsoft Entra SSO columns to system_settings if missing."""
try:
engine = db.get_engine()
except Exception as exc:
print(f"[migrations] Could not get engine for Entra SSO migration: {exc}")
return
columns = [
("entra_sso_enabled", "BOOLEAN NOT NULL DEFAULT FALSE"),
("entra_tenant_id", "VARCHAR(128) NULL"),
("entra_client_id", "VARCHAR(128) NULL"),
("entra_client_secret", "VARCHAR(255) NULL"),
("entra_redirect_uri", "VARCHAR(512) NULL"),
("entra_allowed_domain", "VARCHAR(255) NULL"),
("entra_auto_provision_users", "BOOLEAN NOT NULL DEFAULT FALSE"),
]
try:
with engine.begin() as conn:
for column, ddl in columns:
if _column_exists_on_conn(conn, "system_settings", column):
continue
conn.execute(text(f'ALTER TABLE "system_settings" ADD COLUMN {column} {ddl}'))
print("[migrations] migrate_entra_sso_settings completed.")
except Exception as exc:
print(f"[migrations] Failed to migrate Entra SSO columns: {exc}")
def run_migrations() -> None: def run_migrations() -> None:
print("[migrations] Starting migrations...") print("[migrations] Starting migrations...")
migrate_add_username_to_users() migrate_add_username_to_users()
@ -1219,6 +1248,7 @@ def run_migrations() -> None:
migrate_rename_admin_logs_to_audit_logs() migrate_rename_admin_logs_to_audit_logs()
migrate_cove_integration() migrate_cove_integration()
migrate_cove_accounts_table() migrate_cove_accounts_table()
migrate_entra_sso_settings()
print("[migrations] All migrations completed.") print("[migrations] All migrations completed.")

View File

@ -127,6 +127,15 @@ class SystemSettings(db.Model):
cove_partner_id = db.Column(db.Integer, nullable=True) # stored after successful login cove_partner_id = db.Column(db.Integer, nullable=True) # stored after successful login
cove_last_import_at = db.Column(db.DateTime, nullable=True) cove_last_import_at = db.Column(db.DateTime, nullable=True)
# Microsoft Entra SSO settings
entra_sso_enabled = db.Column(db.Boolean, nullable=False, default=False)
entra_tenant_id = db.Column(db.String(128), nullable=True)
entra_client_id = db.Column(db.String(128), nullable=True)
entra_client_secret = db.Column(db.String(255), nullable=True)
entra_redirect_uri = db.Column(db.String(512), nullable=True)
entra_allowed_domain = db.Column(db.String(255), nullable=True)
entra_auto_provision_users = db.Column(db.Boolean, nullable=False, default=False)
# Autotask integration settings # Autotask integration settings
autotask_enabled = db.Column(db.Boolean, nullable=False, default=False) autotask_enabled = db.Column(db.Boolean, nullable=False, default=False)
autotask_environment = db.Column(db.String(32), nullable=True) # sandbox | production autotask_environment = db.Column(db.String(32), nullable=True) # sandbox | production

View File

@ -11,7 +11,7 @@
class="form-control" class="form-control"
id="username" id="username"
name="username" name="username"
value="{{ email or '' }}" value="{{ username or '' }}"
required required
/> />
</div> </div>
@ -39,6 +39,12 @@
<a class="btn btn-link" href="{{ url_for('auth.password_reset_request') }}">Forgot password?</a> <a class="btn btn-link" href="{{ url_for('auth.password_reset_request') }}">Forgot password?</a>
</div> </div>
</form> </form>
{% if entra_sso_enabled %}
<div class="my-3"><hr /></div>
<a class="btn btn-outline-secondary w-100" href="{{ url_for('auth.entra_login') }}">
Sign in with Microsoft
</a>
{% endif %}
</div> </div>
</div> </div>
{% endblock %} {% endblock %}

View File

@ -615,6 +615,60 @@
}); });
})(); })();
</script> </script>
<form method="post" class="mb-4" id="entra-settings-form">
<div class="card mb-3">
<div class="card-header">Microsoft Entra SSO</div>
<div class="card-body">
<div class="form-check form-switch mb-3">
<input class="form-check-input" type="checkbox" id="entra_sso_enabled" name="entra_sso_enabled" {% if settings.entra_sso_enabled %}checked{% endif %} />
<label class="form-check-label" for="entra_sso_enabled">Enable Microsoft sign-in</label>
</div>
<div class="row g-3">
<div class="col-md-6">
<label for="entra_tenant_id" class="form-label">Tenant ID</label>
<input type="text" class="form-control" id="entra_tenant_id" name="entra_tenant_id"
value="{{ settings.entra_tenant_id or '' }}" placeholder="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" />
</div>
<div class="col-md-6">
<label for="entra_client_id" class="form-label">Client ID</label>
<input type="text" class="form-control" id="entra_client_id" name="entra_client_id"
value="{{ settings.entra_client_id or '' }}" placeholder="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" />
</div>
<div class="col-md-12">
<label for="entra_client_secret" class="form-label">Client Secret {% if not has_entra_secret %}<span class="text-danger">*</span>{% endif %}</label>
<input type="password" class="form-control" id="entra_client_secret" name="entra_client_secret"
placeholder="{% if has_entra_secret %}******** (stored){% else %}enter secret{% endif %}" />
<div class="form-text">Leave empty to keep the existing secret.</div>
</div>
<div class="col-md-12">
<label for="entra_redirect_uri" class="form-label">Redirect URI (optional override)</label>
<input type="url" class="form-control" id="entra_redirect_uri" name="entra_redirect_uri"
value="{{ settings.entra_redirect_uri or '' }}"
placeholder="https://your-domain.example/auth/entra/callback" />
<div class="form-text">If empty, Backupchecks uses its own external callback URL.</div>
</div>
<div class="col-md-6">
<label for="entra_allowed_domain" class="form-label">Allowed domain/tenant (optional)</label>
<input type="text" class="form-control" id="entra_allowed_domain" name="entra_allowed_domain"
value="{{ settings.entra_allowed_domain or '' }}" placeholder="contoso.com or tenant-id" />
<div class="form-text">Restrict sign-ins to one tenant id or one email domain.</div>
</div>
<div class="col-md-6">
<div class="form-check form-switch mt-4">
<input class="form-check-input" type="checkbox" id="entra_auto_provision_users" name="entra_auto_provision_users" {% if settings.entra_auto_provision_users %}checked{% endif %} />
<label class="form-check-label" for="entra_auto_provision_users">Auto-provision unknown users as Viewer</label>
</div>
</div>
</div>
<div class="d-flex justify-content-end mt-3">
<button type="submit" class="btn btn-primary">Save Entra Settings</button>
</div>
</div>
</div>
</form>
{% endif %} {% endif %}
{% if section == 'maintenance' %} {% if section == 'maintenance' %}