auth: add /api/auth router (login, logout, me, setup)

This commit is contained in:
Ivo Oskamp 2026-05-28 16:00:51 +02:00
parent 7c9431c615
commit d1ed9cb4b5
3 changed files with 274 additions and 0 deletions

View File

@ -0,0 +1,139 @@
"""Routes for login, logout, identity, and initial setup."""
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from ..config import COOKIE_NAME, COOKIE_SAMESITE, COOKIE_SECURE
from . import sessions as S
from .audit import record_event
from .dependencies import get_db, require_user
from .models import User, UserSession
from .schemas import LoginRequest, MeResponse, SetupRequest, SetupRequiredResponse
from .security import (
PasswordPolicyError,
hash_password,
validate_password,
verify_password,
)
router = APIRouter()
def _ip(request: Request) -> str | None:
return request.client.host if request.client else None
def _set_cookie(response: Response, sid: str, *, remember: bool) -> None:
max_age = 30 * 24 * 3600 if remember else 8 * 3600
response.set_cookie(
key=COOKIE_NAME,
value=sid,
max_age=max_age,
httponly=True,
secure=COOKIE_SECURE,
samesite=COOKIE_SAMESITE,
path="/",
)
def _clear_cookie(response: Response) -> None:
response.delete_cookie(key=COOKIE_NAME, path="/")
def _users_count(db: Session) -> int:
return db.execute(select(func.count(User.id))).scalar_one()
@router.get("/api/auth/setup-required", response_model=SetupRequiredResponse)
def setup_required(db: Annotated[Session, Depends(get_db)]) -> SetupRequiredResponse:
return SetupRequiredResponse(setup_required=_users_count(db) == 0)
@router.post("/api/auth/setup", response_model=MeResponse)
def setup(
payload: SetupRequest,
request: Request,
response: Response,
db: Annotated[Session, Depends(get_db)],
) -> MeResponse:
if _users_count(db) > 0:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Setup already completed")
try:
validate_password(payload.password)
except PasswordPolicyError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
user = User(
username=payload.username,
password_hash=hash_password(payload.password),
role="admin",
is_active=True,
)
db.add(user); db.flush()
sid, _ = S.create_session(
db, user_id=user.id, remember=False, ip=_ip(request), user_agent=request.headers.get("user-agent")
)
record_event(db, event="setup", user_id=user.id, ip=_ip(request), detail={"username": user.username})
db.commit()
_set_cookie(response, sid, remember=False)
return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type]
@router.post("/api/auth/login", response_model=MeResponse)
def login(
payload: LoginRequest,
request: Request,
response: Response,
db: Annotated[Session, Depends(get_db)],
) -> MeResponse:
user = db.execute(select(User).where(User.username == payload.username)).scalar_one_or_none()
if user is None or not user.is_active or not verify_password(payload.password, user.password_hash):
record_event(
db,
event="login_fail",
user_id=user.id if user else None,
ip=_ip(request),
detail={"username": payload.username},
)
db.commit()
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
sid, _ = S.create_session(
db, user_id=user.id, remember=payload.remember, ip=_ip(request), user_agent=request.headers.get("user-agent")
)
record_event(db, event="login_ok", user_id=user.id, ip=_ip(request), detail=None)
S.purge_expired(db)
db.commit()
_set_cookie(response, sid, remember=payload.remember)
return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type]
@router.post("/api/auth/logout")
def logout(
request: Request,
response: Response,
db: Annotated[Session, Depends(get_db)],
) -> dict[str, bool]:
sid = request.cookies.get(COOKIE_NAME)
user_id: int | None = None
if sid:
existing = db.get(UserSession, sid)
if existing is not None:
user_id = existing.user_id
S.revoke(db, sid)
record_event(db, event="logout", user_id=user_id, ip=_ip(request), detail=None)
db.commit()
_clear_cookie(response)
return {"ok": True}
@router.get("/api/auth/me", response_model=MeResponse)
def me(user: Annotated[User, Depends(require_user)]) -> MeResponse:
return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type]

View File

@ -0,0 +1,59 @@
"""Pydantic schemas for the auth and users routers."""
from __future__ import annotations
from datetime import datetime
from typing import Literal
from pydantic import BaseModel, Field
class LoginRequest(BaseModel):
username: str = Field(min_length=1, max_length=128)
password: str = Field(min_length=1, max_length=1024)
remember: bool = False
class SetupRequest(BaseModel):
username: str = Field(min_length=1, max_length=128)
password: str = Field(min_length=1, max_length=1024)
class MeResponse(BaseModel):
username: str
role: Literal["admin", "user"]
class SetupRequiredResponse(BaseModel):
setup_required: bool
class UserItem(BaseModel):
id: int
username: str
role: Literal["admin", "user"]
is_active: bool
created_at: datetime
class CreateUserRequest(BaseModel):
username: str = Field(min_length=1, max_length=128)
password: str = Field(min_length=1, max_length=1024)
role: Literal["admin", "user"] = "user"
class UpdateUserRequest(BaseModel):
role: Literal["admin", "user"] | None = None
is_active: bool | None = None
class ResetPasswordRequest(BaseModel):
password: str = Field(min_length=1, max_length=1024)
class AuditItem(BaseModel):
id: int
ts: datetime
user_id: int | None
event: str
ip: str | None
detail: dict | None

View File

@ -0,0 +1,76 @@
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy.orm import sessionmaker
from clearview_app.auth.dependencies import get_db
from clearview_app.auth.router import router as auth_router
def make_app(db_engine):
Session = sessionmaker(bind=db_engine, autoflush=False, autocommit=False, future=True)
def override_get_db():
s = Session()
try:
yield s
finally:
s.close()
app = FastAPI()
app.include_router(auth_router)
app.dependency_overrides[get_db] = override_get_db
return app
def test_setup_required_when_empty(db_engine):
c = TestClient(make_app(db_engine))
assert c.get("/api/auth/setup-required").json() == {"setup_required": True}
def test_setup_creates_first_admin_and_logs_in(db_engine):
c = TestClient(make_app(db_engine))
r = c.post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"})
assert r.status_code == 200
assert r.cookies.get("clearview_session")
me = c.get("/api/auth/me")
assert me.status_code == 200
assert me.json() == {"username": "root", "role": "admin"}
def test_setup_rejects_when_users_exist(db_engine):
app = make_app(db_engine)
TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"})
r = TestClient(app).post("/api/auth/setup", json={"username": "x", "password": "CorrectHorse42"})
assert r.status_code == 409
def test_login_wrong_password_returns_401(db_engine):
app = make_app(db_engine)
TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"})
r = TestClient(app).post("/api/auth/login", json={"username": "root", "password": "WrongPass00X", "remember": False})
assert r.status_code == 401
def test_login_success_sets_cookie(db_engine):
app = make_app(db_engine)
TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"})
c2 = TestClient(app)
r = c2.post("/api/auth/login", json={"username": "root", "password": "CorrectHorse42", "remember": True})
assert r.status_code == 200
assert c2.cookies.get("clearview_session")
assert c2.get("/api/auth/me").json()["username"] == "root"
def test_logout_invalidates_session(db_engine):
app = make_app(db_engine)
c = TestClient(app)
c.post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"})
assert c.get("/api/auth/me").status_code == 200
c.post("/api/auth/logout")
assert c.get("/api/auth/me").status_code == 401
def test_password_policy_enforced_on_setup(db_engine):
c = TestClient(make_app(db_engine))
r = c.post("/api/auth/setup", json={"username": "root", "password": "short"})
assert r.status_code == 400