auth: add users + audit admin router

This commit is contained in:
Ivo Oskamp 2026-05-28 16:02:56 +02:00
parent 12609ba2a4
commit b6a8a76f7b
2 changed files with 226 additions and 0 deletions

View File

@ -0,0 +1,152 @@
"""Admin endpoints: user CRUD, password reset, audit log."""
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import select
from sqlalchemy.orm import Session
from . import sessions as S
from .audit import record_event
from .dependencies import get_db, require_admin
from .models import AuthAudit, User
from .schemas import (
AuditItem,
CreateUserRequest,
ResetPasswordRequest,
UpdateUserRequest,
UserItem,
)
from .security import PasswordPolicyError, hash_password, validate_password
router = APIRouter()
def _ip(request: Request) -> str | None:
return request.client.host if request.client else None
def _to_item(u: User) -> UserItem:
return UserItem(
id=u.id, username=u.username, role=u.role, is_active=u.is_active, created_at=u.created_at # type: ignore[arg-type]
)
@router.get("/api/users", response_model=list[UserItem])
def list_users(
db: Annotated[Session, Depends(get_db)],
_: Annotated[User, Depends(require_admin)],
) -> list[UserItem]:
rows = db.execute(select(User).order_by(User.created_at.asc())).scalars().all()
return [_to_item(u) for u in rows]
@router.post("/api/users", response_model=UserItem)
def create_user(
payload: CreateUserRequest,
request: Request,
db: Annotated[Session, Depends(get_db)],
actor: Annotated[User, Depends(require_admin)],
) -> UserItem:
try:
validate_password(payload.password)
except PasswordPolicyError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
if db.execute(select(User).where(User.username == payload.username)).scalar_one_or_none():
raise HTTPException(status_code=409, detail="Username already exists")
u = User(
username=payload.username,
password_hash=hash_password(payload.password),
role=payload.role,
is_active=True,
)
db.add(u); db.flush()
record_event(
db, event="user_create", user_id=actor.id, ip=_ip(request),
detail={"target": u.id, "username": u.username, "role": u.role},
)
db.commit(); db.refresh(u)
return _to_item(u)
@router.patch("/api/users/{user_id}", response_model=UserItem)
def update_user(
user_id: int,
payload: UpdateUserRequest,
request: Request,
db: Annotated[Session, Depends(get_db)],
actor: Annotated[User, Depends(require_admin)],
) -> UserItem:
u = db.get(User, user_id)
if u is None:
raise HTTPException(status_code=404, detail="User not found")
changed: dict = {}
if payload.role is not None and payload.role != u.role:
u.role = payload.role; changed["role"] = payload.role
if payload.is_active is not None and payload.is_active != u.is_active:
u.is_active = payload.is_active; changed["is_active"] = payload.is_active
if not payload.is_active:
S.revoke_all_for_user(db, u.id)
if changed:
record_event(db, event="user_update", user_id=actor.id, ip=_ip(request), detail={"target": u.id, **changed})
db.commit(); db.refresh(u)
return _to_item(u)
@router.delete("/api/users/{user_id}")
def delete_user(
user_id: int,
request: Request,
db: Annotated[Session, Depends(get_db)],
actor: Annotated[User, Depends(require_admin)],
) -> dict[str, bool]:
if user_id == actor.id:
raise HTTPException(status_code=400, detail="Cannot delete your own account")
u = db.get(User, user_id)
if u is None:
raise HTTPException(status_code=404, detail="User not found")
db.delete(u)
record_event(db, event="user_delete", user_id=actor.id, ip=_ip(request), detail={"target": user_id})
db.commit()
return {"ok": True}
@router.post("/api/users/{user_id}/reset-password")
def reset_password(
user_id: int,
payload: ResetPasswordRequest,
request: Request,
db: Annotated[Session, Depends(get_db)],
actor: Annotated[User, Depends(require_admin)],
) -> dict[str, bool]:
u = db.get(User, user_id)
if u is None:
raise HTTPException(status_code=404, detail="User not found")
try:
validate_password(payload.password)
except PasswordPolicyError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
u.password_hash = hash_password(payload.password)
S.revoke_all_for_user(db, u.id)
record_event(db, event="password_reset", user_id=actor.id, ip=_ip(request), detail={"target": u.id})
db.commit()
return {"ok": True}
@router.get("/api/audit", response_model=list[AuditItem])
def list_audit(
db: Annotated[Session, Depends(get_db)],
_: Annotated[User, Depends(require_admin)],
limit: int = 200,
event: str | None = None,
) -> list[AuditItem]:
limit = max(1, min(limit, 1000))
q = select(AuthAudit).order_by(AuthAudit.ts.desc()).limit(limit)
if event:
q = q.where(AuthAudit.event == event)
rows = db.execute(q).scalars().all()
return [
AuditItem(id=r.id, ts=r.ts, user_id=r.user_id, event=r.event, ip=r.ip, detail=r.detail)
for r in rows
]

View File

@ -0,0 +1,74 @@
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy.orm import sessionmaker
from clearview_app.auth.dependencies import get_db
from clearview_app.auth.router import router as auth_router
from clearview_app.auth.users_router import router as users_router
def make_app(db_engine):
Session = sessionmaker(bind=db_engine, autoflush=False, autocommit=False, future=True)
def override_get_db():
s = Session()
try:
yield s
finally:
s.close()
app = FastAPI()
app.include_router(auth_router)
app.include_router(users_router)
app.dependency_overrides[get_db] = override_get_db
return app
def _bootstrap_admin(app):
c = TestClient(app)
c.post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"})
return c
def test_non_admin_blocked(db_engine):
app = make_app(db_engine)
admin = _bootstrap_admin(app)
admin.post("/api/users", json={"username": "joe", "password": "JoePassword42", "role": "user"})
user = TestClient(app)
user.post("/api/auth/login", json={"username": "joe", "password": "JoePassword42", "remember": False})
assert user.get("/api/users").status_code == 403
def test_create_list_update_delete(db_engine):
app = make_app(db_engine)
c = _bootstrap_admin(app)
r = c.post("/api/users", json={"username": "alice", "password": "AlicePassword42", "role": "user"})
assert r.status_code == 200
uid = r.json()["id"]
assert {u["username"] for u in c.get("/api/users").json()} == {"root", "alice"}
c.patch(f"/api/users/{uid}", json={"role": "admin"})
assert next(u for u in c.get("/api/users").json() if u["id"] == uid)["role"] == "admin"
c.post(f"/api/users/{uid}/reset-password", json={"password": "NewAlicePass42"})
fresh = TestClient(app)
assert fresh.post("/api/auth/login", json={"username": "alice", "password": "NewAlicePass42", "remember": False}).status_code == 200
c.delete(f"/api/users/{uid}")
assert {u["username"] for u in c.get("/api/users").json()} == {"root"}
def test_cannot_delete_self(db_engine):
app = make_app(db_engine)
c = _bootstrap_admin(app)
me_id = c.get("/api/users").json()[0]["id"]
assert c.delete(f"/api/users/{me_id}").status_code == 400
def test_audit_returns_rows(db_engine):
app = make_app(db_engine)
c = _bootstrap_admin(app)
rows = c.get("/api/audit").json()
assert any(r["event"] == "setup" for r in rows)