From d1ed9cb4b5e0c20f0e142f3e5edea35e6a499bf4 Mon Sep 17 00:00:00 2001 From: Ivo Oskamp Date: Thu, 28 May 2026 16:00:51 +0200 Subject: [PATCH] auth: add /api/auth router (login, logout, me, setup) --- .../src/clearview_app/auth/router.py | 139 ++++++++++++++++++ .../src/clearview_app/auth/schemas.py | 59 ++++++++ .../clearview/tests/test_auth_router.py | 76 ++++++++++ 3 files changed, 274 insertions(+) create mode 100644 containers/clearview/src/clearview_app/auth/router.py create mode 100644 containers/clearview/src/clearview_app/auth/schemas.py create mode 100644 containers/clearview/tests/test_auth_router.py diff --git a/containers/clearview/src/clearview_app/auth/router.py b/containers/clearview/src/clearview_app/auth/router.py new file mode 100644 index 0000000..b28b858 --- /dev/null +++ b/containers/clearview/src/clearview_app/auth/router.py @@ -0,0 +1,139 @@ +"""Routes for login, logout, identity, and initial setup.""" +from __future__ import annotations + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Request, Response, status +from sqlalchemy import func, select +from sqlalchemy.orm import Session + +from ..config import COOKIE_NAME, COOKIE_SAMESITE, COOKIE_SECURE +from . import sessions as S +from .audit import record_event +from .dependencies import get_db, require_user +from .models import User, UserSession +from .schemas import LoginRequest, MeResponse, SetupRequest, SetupRequiredResponse +from .security import ( + PasswordPolicyError, + hash_password, + validate_password, + verify_password, +) + +router = APIRouter() + + +def _ip(request: Request) -> str | None: + return request.client.host if request.client else None + + +def _set_cookie(response: Response, sid: str, *, remember: bool) -> None: + max_age = 30 * 24 * 3600 if remember else 8 * 3600 + response.set_cookie( + key=COOKIE_NAME, + value=sid, + max_age=max_age, + httponly=True, + secure=COOKIE_SECURE, + samesite=COOKIE_SAMESITE, + path="/", + ) + + +def _clear_cookie(response: Response) -> None: + response.delete_cookie(key=COOKIE_NAME, path="/") + + +def _users_count(db: Session) -> int: + return db.execute(select(func.count(User.id))).scalar_one() + + +@router.get("/api/auth/setup-required", response_model=SetupRequiredResponse) +def setup_required(db: Annotated[Session, Depends(get_db)]) -> SetupRequiredResponse: + return SetupRequiredResponse(setup_required=_users_count(db) == 0) + + +@router.post("/api/auth/setup", response_model=MeResponse) +def setup( + payload: SetupRequest, + request: Request, + response: Response, + db: Annotated[Session, Depends(get_db)], +) -> MeResponse: + if _users_count(db) > 0: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Setup already completed") + try: + validate_password(payload.password) + except PasswordPolicyError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + + user = User( + username=payload.username, + password_hash=hash_password(payload.password), + role="admin", + is_active=True, + ) + db.add(user); db.flush() + + sid, _ = S.create_session( + db, user_id=user.id, remember=False, ip=_ip(request), user_agent=request.headers.get("user-agent") + ) + record_event(db, event="setup", user_id=user.id, ip=_ip(request), detail={"username": user.username}) + db.commit() + + _set_cookie(response, sid, remember=False) + return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type] + + +@router.post("/api/auth/login", response_model=MeResponse) +def login( + payload: LoginRequest, + request: Request, + response: Response, + db: Annotated[Session, Depends(get_db)], +) -> MeResponse: + user = db.execute(select(User).where(User.username == payload.username)).scalar_one_or_none() + if user is None or not user.is_active or not verify_password(payload.password, user.password_hash): + record_event( + db, + event="login_fail", + user_id=user.id if user else None, + ip=_ip(request), + detail={"username": payload.username}, + ) + db.commit() + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") + + sid, _ = S.create_session( + db, user_id=user.id, remember=payload.remember, ip=_ip(request), user_agent=request.headers.get("user-agent") + ) + record_event(db, event="login_ok", user_id=user.id, ip=_ip(request), detail=None) + S.purge_expired(db) + db.commit() + + _set_cookie(response, sid, remember=payload.remember) + return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type] + + +@router.post("/api/auth/logout") +def logout( + request: Request, + response: Response, + db: Annotated[Session, Depends(get_db)], +) -> dict[str, bool]: + sid = request.cookies.get(COOKIE_NAME) + user_id: int | None = None + if sid: + existing = db.get(UserSession, sid) + if existing is not None: + user_id = existing.user_id + S.revoke(db, sid) + record_event(db, event="logout", user_id=user_id, ip=_ip(request), detail=None) + db.commit() + _clear_cookie(response) + return {"ok": True} + + +@router.get("/api/auth/me", response_model=MeResponse) +def me(user: Annotated[User, Depends(require_user)]) -> MeResponse: + return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type] diff --git a/containers/clearview/src/clearview_app/auth/schemas.py b/containers/clearview/src/clearview_app/auth/schemas.py new file mode 100644 index 0000000..cb43ab1 --- /dev/null +++ b/containers/clearview/src/clearview_app/auth/schemas.py @@ -0,0 +1,59 @@ +"""Pydantic schemas for the auth and users routers.""" +from __future__ import annotations + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, Field + + +class LoginRequest(BaseModel): + username: str = Field(min_length=1, max_length=128) + password: str = Field(min_length=1, max_length=1024) + remember: bool = False + + +class SetupRequest(BaseModel): + username: str = Field(min_length=1, max_length=128) + password: str = Field(min_length=1, max_length=1024) + + +class MeResponse(BaseModel): + username: str + role: Literal["admin", "user"] + + +class SetupRequiredResponse(BaseModel): + setup_required: bool + + +class UserItem(BaseModel): + id: int + username: str + role: Literal["admin", "user"] + is_active: bool + created_at: datetime + + +class CreateUserRequest(BaseModel): + username: str = Field(min_length=1, max_length=128) + password: str = Field(min_length=1, max_length=1024) + role: Literal["admin", "user"] = "user" + + +class UpdateUserRequest(BaseModel): + role: Literal["admin", "user"] | None = None + is_active: bool | None = None + + +class ResetPasswordRequest(BaseModel): + password: str = Field(min_length=1, max_length=1024) + + +class AuditItem(BaseModel): + id: int + ts: datetime + user_id: int | None + event: str + ip: str | None + detail: dict | None diff --git a/containers/clearview/tests/test_auth_router.py b/containers/clearview/tests/test_auth_router.py new file mode 100644 index 0000000..bd01fc0 --- /dev/null +++ b/containers/clearview/tests/test_auth_router.py @@ -0,0 +1,76 @@ +from fastapi import FastAPI +from fastapi.testclient import TestClient +from sqlalchemy.orm import sessionmaker + +from clearview_app.auth.dependencies import get_db +from clearview_app.auth.router import router as auth_router + + +def make_app(db_engine): + Session = sessionmaker(bind=db_engine, autoflush=False, autocommit=False, future=True) + + def override_get_db(): + s = Session() + try: + yield s + finally: + s.close() + + app = FastAPI() + app.include_router(auth_router) + app.dependency_overrides[get_db] = override_get_db + return app + + +def test_setup_required_when_empty(db_engine): + c = TestClient(make_app(db_engine)) + assert c.get("/api/auth/setup-required").json() == {"setup_required": True} + + +def test_setup_creates_first_admin_and_logs_in(db_engine): + c = TestClient(make_app(db_engine)) + r = c.post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) + assert r.status_code == 200 + assert r.cookies.get("clearview_session") + me = c.get("/api/auth/me") + assert me.status_code == 200 + assert me.json() == {"username": "root", "role": "admin"} + + +def test_setup_rejects_when_users_exist(db_engine): + app = make_app(db_engine) + TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) + r = TestClient(app).post("/api/auth/setup", json={"username": "x", "password": "CorrectHorse42"}) + assert r.status_code == 409 + + +def test_login_wrong_password_returns_401(db_engine): + app = make_app(db_engine) + TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) + r = TestClient(app).post("/api/auth/login", json={"username": "root", "password": "WrongPass00X", "remember": False}) + assert r.status_code == 401 + + +def test_login_success_sets_cookie(db_engine): + app = make_app(db_engine) + TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) + c2 = TestClient(app) + r = c2.post("/api/auth/login", json={"username": "root", "password": "CorrectHorse42", "remember": True}) + assert r.status_code == 200 + assert c2.cookies.get("clearview_session") + assert c2.get("/api/auth/me").json()["username"] == "root" + + +def test_logout_invalidates_session(db_engine): + app = make_app(db_engine) + c = TestClient(app) + c.post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) + assert c.get("/api/auth/me").status_code == 200 + c.post("/api/auth/logout") + assert c.get("/api/auth/me").status_code == 401 + + +def test_password_policy_enforced_on_setup(db_engine): + c = TestClient(make_app(db_engine)) + r = c.post("/api/auth/setup", json={"username": "root", "password": "short"}) + assert r.status_code == 400