# Clearview Authentication Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add session-based authentication with admin/user roles, an initial-setup flow, user management UI, and audit log to the existing Clearview FastAPI + static-frontend app. **Architecture:** New `clearview_app/auth/` package owns models, hashing, sessions, audit, and routers. A single Alembic migration adds `users`, `user_sessions`, `auth_audit`. Existing routers are gated with `Depends(require_user)`; user/audit endpoints with `Depends(require_admin)`. The static frontend gets `login.html` + `setup.html` and a new admin-only Users tab. **Tech Stack:** FastAPI 0.115, SQLAlchemy 2.0 (mapped-style), Alembic 1.14, Postgres (via `psycopg`), `argon2-cffi` (new), vanilla JS frontend, pytest (new). **Spec:** `docs/superpowers/specs/2026-05-28-authentication-design.md` --- ## File Map **Backend — create:** - `containers/clearview/src/clearview_app/auth/__init__.py` - `containers/clearview/src/clearview_app/auth/models.py` — `User`, `UserSession`, `AuthAudit` - `containers/clearview/src/clearview_app/auth/security.py` — Argon2id hashing + password policy - `containers/clearview/src/clearview_app/auth/sessions.py` — create / lookup / refresh / revoke - `containers/clearview/src/clearview_app/auth/audit.py` — `record_event()` helper - `containers/clearview/src/clearview_app/auth/dependencies.py` — `current_session`, `require_user`, `require_admin` - `containers/clearview/src/clearview_app/auth/schemas.py` — pydantic request/response models - `containers/clearview/src/clearview_app/auth/router.py` — `/api/auth/*` - `containers/clearview/src/clearview_app/auth/users_router.py` — `/api/users/*` and `/api/audit` - `containers/clearview/src/clearview_app/migrations/versions/0003_auth_tables.py` **Backend — modify:** - `containers/clearview/requirements.txt` — add `argon2-cffi`, `pytest`, `httpx` - `containers/clearview/src/clearview_app/config.py` — add cookie config - `containers/clearview/src/clearview_app/main.py` — wire auth routers, gate existing routers - `containers/clearview/src/clearview_app/worker.py` — periodic session purge **Frontend — create:** - `containers/clearview/site/login.html` - `containers/clearview/site/setup.html` - `containers/clearview/site/auth.js` — small shared module (fetch wrapper, helpers) **Frontend — modify:** - `containers/clearview/site/index.html` — Users nav link, header user-badge slot - `containers/clearview/site/app.js` — boot calls `/api/auth/me`, wraps fetch, renders Users + Audit views - `containers/clearview/site/styles.css` — auth-page, user-badge, and modal styles **Tests — create:** - `containers/clearview/tests/__init__.py` - `containers/clearview/tests/conftest.py` - `containers/clearview/tests/test_models.py` - `containers/clearview/tests/test_security.py` - `containers/clearview/tests/test_sessions.py` - `containers/clearview/tests/test_dependencies.py` - `containers/clearview/tests/test_auth_router.py` - `containers/clearview/tests/test_users_router.py` - `containers/clearview/tests/test_existing_routes_protected.py` **Docs:** - `docs/changelog-develop.md` — append entries per task (per project convention) --- ## Task 1: Dependencies + auth package skeleton + test scaffold **Files:** - Modify: `containers/clearview/requirements.txt` - Create: `containers/clearview/src/clearview_app/auth/__init__.py` - Create: `containers/clearview/tests/__init__.py` - Create: `containers/clearview/tests/conftest.py` - [ ] **Step 1: Add dependencies** Append to `containers/clearview/requirements.txt`: ``` argon2-cffi==23.1.0 pytest==8.3.3 httpx==0.27.2 ``` - [ ] **Step 2: Create auth package marker** Create `containers/clearview/src/clearview_app/auth/__init__.py` containing: ```python """Authentication, session, and user-management subsystem.""" ``` - [ ] **Step 3: Create test package and conftest** Create `containers/clearview/tests/__init__.py` (empty file). Create `containers/clearview/tests/conftest.py`: ```python """Pytest fixtures for Clearview tests. Uses an in-memory SQLite database. Schema is created from the SQLAlchemy metadata directly (the Alembic migrations target Postgres types like JSONB). """ from __future__ import annotations import os import sys from pathlib import Path import pytest from sqlalchemy import create_engine, event from sqlalchemy.orm import sessionmaker SRC = Path(__file__).resolve().parents[1] / "src" sys.path.insert(0, str(SRC)) os.environ.setdefault("DATABASE_URL", "sqlite+pysqlite:///:memory:") os.environ.setdefault("COOKIE_SECURE", "false") @pytest.fixture() def db_engine(): engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, future=True, ) @event.listens_for(engine, "connect") def _fk_on(dbapi_conn, _): cur = dbapi_conn.cursor() cur.execute("PRAGMA foreign_keys=ON") cur.close() from clearview_app.auth.models import Base as AuthBase AuthBase.metadata.create_all(engine) yield engine engine.dispose() @pytest.fixture() def db_session(db_engine): Session = sessionmaker(bind=db_engine, autoflush=False, autocommit=False, future=True) s = Session() try: yield s finally: s.close() ``` - [ ] **Step 4: Smoke-check pytest discovers the suite** Run: `cd containers/clearview && python -m pytest tests/ -q` Expected: "no tests ran" (no failures; conftest imports cleanly — note this will only succeed after Task 2 creates `auth/models.py`; here it's acceptable for the fixture to fail at collection because no test files exist yet). - [ ] **Step 5: Commit** ```bash git add containers/clearview/requirements.txt \ containers/clearview/src/clearview_app/auth/__init__.py \ containers/clearview/tests/__init__.py \ containers/clearview/tests/conftest.py git commit -m "auth: add dependencies and pytest scaffold" ``` - [ ] **Step 6: Append changelog** Per the project changelog convention, append to the **top** of `docs/changelog-develop.md` (above the most recent `## YYYY-MM-DD — Released as vX.Y.Z` marker): ```markdown ## 2026-05-28 — Authentication: scaffold ### Added - `argon2-cffi`, `pytest`, `httpx` dependencies. - New `clearview_app/auth/` package skeleton. - `tests/` directory with SQLite-backed pytest fixtures. ``` Commit `docs/changelog-develop.md`. --- ## Task 2: Auth models **Files:** - Create: `containers/clearview/src/clearview_app/auth/models.py` - Create: `containers/clearview/tests/test_models.py` - [ ] **Step 1: Write failing tests** Create `containers/clearview/tests/test_models.py`: ```python from datetime import datetime, timedelta, timezone from clearview_app.auth.models import AuthAudit, User, UserSession def test_user_defaults(db_session): u = User(username="alice", password_hash="x", role="admin") db_session.add(u); db_session.commit(); db_session.refresh(u) assert u.id is not None assert u.is_active is True assert isinstance(u.created_at, datetime) def test_session_persists_with_expiry(db_session): u = User(username="bob", password_hash="x", role="user") db_session.add(u); db_session.commit(); db_session.refresh(u) s = UserSession( id="abc123", user_id=u.id, expires_at=datetime.now(timezone.utc) + timedelta(hours=8), ip="1.2.3.4", user_agent="ua", remember=False, ) db_session.add(s); db_session.commit() assert s.created_at is not None def test_audit_row(db_session): a = AuthAudit(event="login_ok", ip="9.9.9.9", detail={"k": "v"}) db_session.add(a); db_session.commit() assert a.id is not None assert a.detail == {"k": "v"} ``` - [ ] **Step 2: Verify it fails** Run: `cd containers/clearview && python -m pytest tests/test_models.py -v` Expected: ImportError (`auth.models` doesn't exist). - [ ] **Step 3: Implement models** Create `containers/clearview/src/clearview_app/auth/models.py`: ```python """SQLAlchemy models for authentication, sessions, and audit log. A dedicated ``Base`` is used so these tables can be created independently of the existing scan/tenant models in tests; in production they coexist in the same database under Alembic. """ from __future__ import annotations from datetime import datetime, timezone from typing import Any from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, JSON, String, Text from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column def _utcnow() -> datetime: return datetime.now(timezone.utc) class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) username: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True) password_hash: Mapped[str] = mapped_column(Text, nullable=False) role: Mapped[str] = mapped_column(String(16), nullable=False) # 'admin' | 'user' is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False) updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False) class UserSession(Base): __tablename__ = "user_sessions" id: Mapped[str] = mapped_column(String(64), primary_key=True) user_id: Mapped[int] = mapped_column( Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True ) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False) expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True) last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False) ip: Mapped[str | None] = mapped_column(String(64), nullable=True) user_agent: Mapped[str | None] = mapped_column(Text, nullable=True) remember: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) class AuthAudit(Base): __tablename__ = "auth_audit" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False, index=True) user_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True ) event: Mapped[str] = mapped_column(String(32), nullable=False, index=True) ip: Mapped[str | None] = mapped_column(String(64), nullable=True) detail: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True) ``` Production uses Postgres `JSONB` — the Alembic migration in Task 4 declares that explicitly. `JSON` here keeps tests portable on SQLite. - [ ] **Step 4: Verify tests pass** Run: `cd containers/clearview && python -m pytest tests/test_models.py -v` Expected: 3 passed. - [ ] **Step 5: Commit** ```bash git add containers/clearview/src/clearview_app/auth/models.py \ containers/clearview/tests/test_models.py git commit -m "auth: add User, UserSession, AuthAudit models" ``` - [ ] **Step 6: Append changelog** ```markdown ## 2026-05-28 — Authentication: data models ### Added - `User`, `UserSession`, `AuthAudit` SQLAlchemy models. - Model-level tests using SQLite in-memory engine. ``` Commit. --- ## Task 3: Password hashing + policy **Files:** - Create: `containers/clearview/src/clearview_app/auth/security.py` - Create: `containers/clearview/tests/test_security.py` - [ ] **Step 1: Write failing tests** Create `containers/clearview/tests/test_security.py`: ```python import pytest from clearview_app.auth.security import ( PasswordPolicyError, hash_password, new_session_id, validate_password, verify_password, ) def test_hash_and_verify_roundtrip(): h = hash_password("CorrectHorse42") assert verify_password("CorrectHorse42", h) is True assert verify_password("wrong", h) is False def test_verify_returns_false_on_garbage_hash(): assert verify_password("anything", "not-a-real-hash") is False @pytest.mark.parametrize("pw", ["short1A", "alllowercase", "ALLUPPERCASE", "12345678901234"]) def test_policy_rejects(pw): with pytest.raises(PasswordPolicyError): validate_password(pw) @pytest.mark.parametrize("pw", ["CorrectHorse42", "abcdefghij12"]) def test_policy_accepts(pw): validate_password(pw) def test_new_session_id_unique_and_hex(): a = new_session_id() b = new_session_id() assert a != b assert len(a) == 32 and all(c in "0123456789abcdef" for c in a) ``` - [ ] **Step 2: Verify it fails** Run: `cd containers/clearview && python -m pytest tests/test_security.py -v` Expected: ImportError. - [ ] **Step 3: Implement `security.py`** Create `containers/clearview/src/clearview_app/auth/security.py`: ```python """Password hashing, password-policy validation, and session-id generation.""" from __future__ import annotations import uuid from argon2 import PasswordHasher from argon2.exceptions import InvalidHashError, VerificationError, VerifyMismatchError class PasswordPolicyError(ValueError): """Raised when a candidate password does not meet the policy.""" _hasher = PasswordHasher() MIN_LENGTH = 12 def validate_password(pw: str) -> None: """Enforce: length >= 12, at least one letter and one digit.""" if len(pw) < MIN_LENGTH: raise PasswordPolicyError(f"Password must be at least {MIN_LENGTH} characters.") if not any(c.isalpha() for c in pw): raise PasswordPolicyError("Password must contain at least one letter.") if not any(c.isdigit() for c in pw): raise PasswordPolicyError("Password must contain at least one digit.") def hash_password(pw: str) -> str: return _hasher.hash(pw) def verify_password(pw: str, encoded: str) -> bool: try: return _hasher.verify(encoded, pw) except (VerifyMismatchError, InvalidHashError, VerificationError): return False except Exception: return False def new_session_id() -> str: """Opaque 128-bit session identifier rendered as 32 hex chars.""" return uuid.uuid4().hex ``` - [ ] **Step 4: Verify tests pass** Run: `cd containers/clearview && python -m pytest tests/test_security.py -v` Expected: all parametrized cases pass. - [ ] **Step 5: Commit** ```bash git add containers/clearview/src/clearview_app/auth/security.py \ containers/clearview/tests/test_security.py git commit -m "auth: add Argon2id hashing and password policy" ``` - [ ] **Step 6: Append changelog** ```markdown ## 2026-05-28 — Authentication: hashing + password policy ### Added - Argon2id password hashing (`hash_password`, `verify_password`). - Server-side password policy (min 12, letter + digit). - Opaque hex session-id generator. ``` Commit. --- ## Task 4: Alembic migration `0003_auth_tables` **Files:** - Create: `containers/clearview/src/clearview_app/migrations/versions/0003_auth_tables.py` - [ ] **Step 1: Write the migration** ```python """Create users, user_sessions, auth_audit tables. Revision ID: 0003_auth_tables Revises: 0002_timestamptz Create Date: 2026-05-28 """ from __future__ import annotations from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql revision = "0003_auth_tables" down_revision = "0002_timestamptz" branch_labels = None depends_on = None def upgrade() -> None: op.create_table( "users", sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), sa.Column("username", sa.String(length=128), nullable=False, unique=True), sa.Column("password_hash", sa.Text(), nullable=False), sa.Column("role", sa.String(length=16), nullable=False), sa.Column("is_active", sa.Boolean(), nullable=False, server_default=sa.true()), sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), ) op.create_index("ix_users_username", "users", ["username"], unique=True) op.create_table( "user_sessions", sa.Column("id", sa.String(length=64), primary_key=True), sa.Column("user_id", sa.Integer(), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False), sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), sa.Column("expires_at", sa.DateTime(timezone=True), nullable=False), sa.Column("last_seen_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), sa.Column("ip", sa.String(length=64), nullable=True), sa.Column("user_agent", sa.Text(), nullable=True), sa.Column("remember", sa.Boolean(), nullable=False, server_default=sa.false()), ) op.create_index("ix_user_sessions_user_id", "user_sessions", ["user_id"]) op.create_index("ix_user_sessions_expires_at", "user_sessions", ["expires_at"]) op.create_table( "auth_audit", sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True), sa.Column("ts", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), sa.Column("user_id", sa.Integer(), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), sa.Column("event", sa.String(length=32), nullable=False), sa.Column("ip", sa.String(length=64), nullable=True), sa.Column("detail", postgresql.JSONB(astext_type=sa.Text()), nullable=True), ) op.create_index("ix_auth_audit_ts", "auth_audit", ["ts"]) op.create_index("ix_auth_audit_event", "auth_audit", ["event"]) def downgrade() -> None: op.drop_index("ix_auth_audit_event", table_name="auth_audit") op.drop_index("ix_auth_audit_ts", table_name="auth_audit") op.drop_table("auth_audit") op.drop_index("ix_user_sessions_expires_at", table_name="user_sessions") op.drop_index("ix_user_sessions_user_id", table_name="user_sessions") op.drop_table("user_sessions") op.drop_index("ix_users_username", table_name="users") op.drop_table("users") ``` - [ ] **Step 2: Sanity-check the script loads** Run: ```bash cd containers/clearview && python -c " import importlib.util, pathlib p = pathlib.Path('src/clearview_app/migrations/versions/0003_auth_tables.py') spec = importlib.util.spec_from_file_location('m', p) m = importlib.util.module_from_spec(spec); spec.loader.exec_module(m) print('ok', m.revision, m.down_revision) " ``` Expected: `ok 0003_auth_tables 0002_timestamptz` - [ ] **Step 3: Commit** ```bash git add containers/clearview/src/clearview_app/migrations/versions/0003_auth_tables.py git commit -m "auth: add 0003_auth_tables migration" ``` - [ ] **Step 4: Append changelog** ```markdown ## 2026-05-28 — Authentication: database migration ### Added - Alembic migration `0003_auth_tables` creating `users`, `user_sessions`, `auth_audit`. ``` Commit. --- ## Task 5: Audit helper **Files:** - Create: `containers/clearview/src/clearview_app/auth/audit.py` - Modify: `containers/clearview/tests/test_models.py` - [ ] **Step 1: Append failing test to `test_models.py`** ```python from clearview_app.auth.audit import record_event from clearview_app.auth.models import AuthAudit def test_record_event_persists(db_session): record_event(db_session, event="login_ok", user_id=None, ip="1.1.1.1", detail={"u": "x"}) db_session.commit() rows = db_session.query(AuthAudit).all() assert len(rows) == 1 assert rows[0].event == "login_ok" assert rows[0].detail == {"u": "x"} ``` - [ ] **Step 2: Verify it fails** Run: `cd containers/clearview && python -m pytest tests/test_models.py::test_record_event_persists -v` Expected: ImportError. - [ ] **Step 3: Implement audit helper** Create `containers/clearview/src/clearview_app/auth/audit.py`: ```python """Single-entry helper for writing rows to the auth audit log.""" from __future__ import annotations from typing import Any from sqlalchemy.orm import Session from .models import AuthAudit def record_event( db: Session, *, event: str, user_id: int | None = None, ip: str | None = None, detail: dict[str, Any] | None = None, ) -> None: """Add an AuthAudit row to the session. Caller commits.""" db.add(AuthAudit(event=event, user_id=user_id, ip=ip, detail=detail)) ``` - [ ] **Step 4: Verify it passes** Run: `cd containers/clearview && python -m pytest tests/test_models.py -v` Expected: all model tests pass. - [ ] **Step 5: Commit** ```bash git add containers/clearview/src/clearview_app/auth/audit.py \ containers/clearview/tests/test_models.py git commit -m "auth: add audit log helper" ``` - [ ] **Step 6: Append changelog** ```markdown ## 2026-05-28 — Authentication: audit helper ### Added - `auth.audit.record_event()` for one-line writes to `auth_audit`. ``` Commit. --- ## Task 6: Session lifecycle (create / lookup / refresh / revoke / purge) **Files:** - Create: `containers/clearview/src/clearview_app/auth/sessions.py` - Create: `containers/clearview/tests/test_sessions.py` - [ ] **Step 1: Write failing tests** ```python from datetime import datetime, timedelta, timezone import pytest from clearview_app.auth import sessions as S from clearview_app.auth.models import User, UserSession @pytest.fixture() def user(db_session): u = User(username="alice", password_hash="x", role="admin") db_session.add(u); db_session.commit(); db_session.refresh(u) return u def test_create_session_sliding(db_session, user): sid, expires = S.create_session(db_session, user_id=user.id, remember=False, ip=None, user_agent=None) db_session.commit() assert len(sid) == 32 row = db_session.get(UserSession, sid) assert row.remember is False delta = row.expires_at - datetime.now(timezone.utc) assert timedelta(hours=7, minutes=55) < delta < timedelta(hours=8, minutes=5) def test_create_session_remember(db_session, user): sid, _ = S.create_session(db_session, user_id=user.id, remember=True, ip=None, user_agent=None) db_session.commit() row = db_session.get(UserSession, sid) delta = row.expires_at - datetime.now(timezone.utc) assert delta > timedelta(days=29) def test_lookup_refresh_sliding_extends(db_session, user): sid, _ = S.create_session(db_session, user_id=user.id, remember=False, ip=None, user_agent=None) db_session.commit() row = db_session.get(UserSession, sid) row.expires_at = datetime.now(timezone.utc) + timedelta(minutes=5) db_session.commit() looked = S.lookup_and_refresh(db_session, sid) db_session.commit() assert looked is not None assert looked.expires_at - datetime.now(timezone.utc) > timedelta(hours=7) def test_lookup_refresh_remember_does_not_slide(db_session, user): sid, _ = S.create_session(db_session, user_id=user.id, remember=True, ip=None, user_agent=None) db_session.commit() before = db_session.get(UserSession, sid).expires_at S.lookup_and_refresh(db_session, sid) db_session.commit() after = db_session.get(UserSession, sid).expires_at assert before == after def test_expired_session_returns_none(db_session, user): sid, _ = S.create_session(db_session, user_id=user.id, remember=False, ip=None, user_agent=None) row = db_session.get(UserSession, sid) row.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1) db_session.commit() assert S.lookup_and_refresh(db_session, sid) is None def test_revoke(db_session, user): sid, _ = S.create_session(db_session, user_id=user.id, remember=False, ip=None, user_agent=None) db_session.commit() S.revoke(db_session, sid); db_session.commit() assert db_session.get(UserSession, sid) is None def test_purge_expired(db_session, user): fresh, _ = S.create_session(db_session, user_id=user.id, remember=False, ip=None, user_agent=None) stale, _ = S.create_session(db_session, user_id=user.id, remember=False, ip=None, user_agent=None) db_session.get(UserSession, stale).expires_at = datetime.now(timezone.utc) - timedelta(hours=1) db_session.commit() removed = S.purge_expired(db_session); db_session.commit() assert removed == 1 assert db_session.get(UserSession, fresh) is not None assert db_session.get(UserSession, stale) is None ``` - [ ] **Step 2: Verify it fails** Run: `cd containers/clearview && python -m pytest tests/test_sessions.py -v` Expected: ImportError. - [ ] **Step 3: Implement `sessions.py`** ```python """Session lifecycle: create, look up + refresh, revoke, purge expired.""" from __future__ import annotations from datetime import datetime, timedelta, timezone from sqlalchemy import delete from sqlalchemy.orm import Session from .models import UserSession from .security import new_session_id SLIDING_TTL = timedelta(hours=8) REMEMBER_TTL = timedelta(days=30) def _utcnow() -> datetime: return datetime.now(timezone.utc) def create_session( db: Session, *, user_id: int, remember: bool, ip: str | None, user_agent: str | None, ) -> tuple[str, datetime]: ttl = REMEMBER_TTL if remember else SLIDING_TTL expires = _utcnow() + ttl sid = new_session_id() db.add( UserSession( id=sid, user_id=user_id, expires_at=expires, ip=ip, user_agent=user_agent, remember=remember, ) ) return sid, expires def lookup_and_refresh(db: Session, sid: str | None) -> UserSession | None: if not sid: return None row = db.get(UserSession, sid) if row is None: return None now = _utcnow() expires = row.expires_at if row.expires_at.tzinfo else row.expires_at.replace(tzinfo=timezone.utc) if expires <= now: return None row.last_seen_at = now if not row.remember: row.expires_at = now + SLIDING_TTL return row def revoke(db: Session, sid: str) -> None: db.execute(delete(UserSession).where(UserSession.id == sid)) def revoke_all_for_user(db: Session, user_id: int) -> int: res = db.execute(delete(UserSession).where(UserSession.user_id == user_id)) return res.rowcount or 0 def purge_expired(db: Session) -> int: res = db.execute(delete(UserSession).where(UserSession.expires_at <= _utcnow())) return res.rowcount or 0 ``` - [ ] **Step 4: Verify tests pass** Run: `cd containers/clearview && python -m pytest tests/test_sessions.py -v` Expected: 7 passed. - [ ] **Step 5: Commit** ```bash git add containers/clearview/src/clearview_app/auth/sessions.py \ containers/clearview/tests/test_sessions.py git commit -m "auth: add session lifecycle (create/lookup/refresh/revoke/purge)" ``` - [ ] **Step 6: Append changelog** ```markdown ## 2026-05-28 — Authentication: session lifecycle ### Added - `auth.sessions` with 8h sliding / 30d remember TTLs, lookup-and-refresh, revoke, purge. ``` Commit. --- ## Task 7: Cookie config **Files:** - Modify: `containers/clearview/src/clearview_app/config.py` - [ ] **Step 1: Read existing config** Run: `cat containers/clearview/src/clearview_app/config.py` - [ ] **Step 2: Append cookie settings** If `os` is already imported, reuse it. Otherwise add `import os` at the top. Append: ```python # Auth cookie settings (override via env) COOKIE_NAME = "clearview_session" COOKIE_SECURE = os.environ.get("COOKIE_SECURE", "true").lower() != "false" COOKIE_SAMESITE = "lax" ``` - [ ] **Step 3: Commit** ```bash git add containers/clearview/src/clearview_app/config.py git commit -m "auth: add cookie config (name, Secure flag, SameSite)" ``` --- ## Task 8: FastAPI dependencies (current_session, require_user, require_admin) **Files:** - Create: `containers/clearview/src/clearview_app/auth/dependencies.py` - Create: `containers/clearview/tests/test_dependencies.py` - [ ] **Step 1: Write failing tests** ```python import pytest from fastapi import Depends, FastAPI from fastapi.testclient import TestClient from sqlalchemy.orm import sessionmaker from clearview_app.auth import sessions as S from clearview_app.auth.dependencies import ( AuthedUser, get_db, require_admin, require_user, ) from clearview_app.auth.models import User @pytest.fixture() def app_and_client(db_engine): Session = sessionmaker(bind=db_engine, autoflush=False, autocommit=False, future=True) def override_get_db(): s = Session() try: yield s finally: s.close() app = FastAPI() @app.get("/who") def who(u: AuthedUser = Depends(require_user)): return {"id": u.id, "role": u.role} @app.get("/admin-only") def admin_only(u: AuthedUser = Depends(require_admin)): return {"ok": True} app.dependency_overrides[get_db] = override_get_db return app, Session def _make_user(Session, role: str, username: str = "x"): s = Session() u = User(username=username, password_hash="h", role=role) s.add(u); s.commit(); s.refresh(u); s.close() return u def _login(Session, user_id: int) -> str: s = Session() sid, _ = S.create_session(s, user_id=user_id, remember=False, ip=None, user_agent=None) s.commit(); s.close() return sid def test_anon_gets_401(app_and_client): app, _ = app_and_client assert TestClient(app).get("/who").status_code == 401 def test_user_can_access_require_user(app_and_client): app, Session = app_and_client u = _make_user(Session, "user") sid = _login(Session, u.id) c = TestClient(app); c.cookies.set("clearview_session", sid) r = c.get("/who") assert r.status_code == 200 and r.json()["role"] == "user" def test_user_blocked_from_admin(app_and_client): app, Session = app_and_client u = _make_user(Session, "user") sid = _login(Session, u.id) c = TestClient(app); c.cookies.set("clearview_session", sid) assert c.get("/admin-only").status_code == 403 def test_admin_allowed(app_and_client): app, Session = app_and_client u = _make_user(Session, "admin") sid = _login(Session, u.id) c = TestClient(app); c.cookies.set("clearview_session", sid) assert c.get("/admin-only").status_code == 200 def test_inactive_user_rejected(app_and_client): app, Session = app_and_client u = _make_user(Session, "admin") s = Session(); s.get(User, u.id).is_active = False; s.commit(); s.close() sid = _login(Session, u.id) c = TestClient(app); c.cookies.set("clearview_session", sid) assert c.get("/who").status_code == 401 ``` - [ ] **Step 2: Verify it fails** Run: `cd containers/clearview && python -m pytest tests/test_dependencies.py -v` Expected: ImportError. - [ ] **Step 3: Implement dependencies** ```python """FastAPI dependencies that gate API endpoints behind a session.""" from __future__ import annotations from typing import Annotated from fastapi import Cookie, Depends, HTTPException, Request, status from sqlalchemy.orm import Session from ..config import COOKIE_NAME # noqa: F401 (kept for symmetry with router) from ..db import SessionLocal from . import sessions as S from .models import User, UserSession AuthedUser = User def get_db(): db: Session = SessionLocal() try: yield db finally: db.close() def _load_session(db: Session, sid: str | None) -> tuple[User, UserSession]: session = S.lookup_and_refresh(db, sid) if session is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") user = db.get(User, session.user_id) if user is None or not user.is_active: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") db.commit() return user, session def require_user( db: Annotated[Session, Depends(get_db)], clearview_session: Annotated[str | None, Cookie()] = None, ) -> User: user, _ = _load_session(db, clearview_session) return user def require_admin( db: Annotated[Session, Depends(get_db)], clearview_session: Annotated[str | None, Cookie()] = None, ) -> User: user, _ = _load_session(db, clearview_session) if user.role != "admin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin required") return user ``` The Python parameter name `clearview_session` is what FastAPI uses to bind the cookie of the same name. Keep them aligned with `COOKIE_NAME`. - [ ] **Step 4: Verify tests pass** Run: `cd containers/clearview && python -m pytest tests/test_dependencies.py -v` Expected: 5 passed. - [ ] **Step 5: Commit** ```bash git add containers/clearview/src/clearview_app/auth/dependencies.py \ containers/clearview/tests/test_dependencies.py git commit -m "auth: add require_user / require_admin FastAPI dependencies" ``` - [ ] **Step 6: Append changelog** ```markdown ## 2026-05-28 — Authentication: FastAPI dependencies ### Added - `require_user` / `require_admin` cookie-based session loading. - 401 for missing/expired/inactive; 403 for non-admin on admin routes. ``` Commit. --- ## Task 9: Auth router (login, logout, me, setup-required, setup) + schemas **Files:** - Create: `containers/clearview/src/clearview_app/auth/schemas.py` - Create: `containers/clearview/src/clearview_app/auth/router.py` - Create: `containers/clearview/tests/test_auth_router.py` - [ ] **Step 1: Write failing tests** ```python from fastapi import FastAPI from fastapi.testclient import TestClient from sqlalchemy.orm import sessionmaker from clearview_app.auth.dependencies import get_db from clearview_app.auth.router import router as auth_router def make_app(db_engine): Session = sessionmaker(bind=db_engine, autoflush=False, autocommit=False, future=True) def override_get_db(): s = Session() try: yield s finally: s.close() app = FastAPI() app.include_router(auth_router) app.dependency_overrides[get_db] = override_get_db return app def test_setup_required_when_empty(db_engine): c = TestClient(make_app(db_engine)) assert c.get("/api/auth/setup-required").json() == {"setup_required": True} def test_setup_creates_first_admin_and_logs_in(db_engine): c = TestClient(make_app(db_engine)) r = c.post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) assert r.status_code == 200 assert r.cookies.get("clearview_session") me = c.get("/api/auth/me") assert me.status_code == 200 assert me.json() == {"username": "root", "role": "admin"} def test_setup_rejects_when_users_exist(db_engine): app = make_app(db_engine) TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) r = TestClient(app).post("/api/auth/setup", json={"username": "x", "password": "CorrectHorse42"}) assert r.status_code == 409 def test_login_wrong_password_returns_401(db_engine): app = make_app(db_engine) TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) r = TestClient(app).post("/api/auth/login", json={"username": "root", "password": "WrongPass00X", "remember": False}) assert r.status_code == 401 def test_login_success_sets_cookie(db_engine): app = make_app(db_engine) TestClient(app).post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) c2 = TestClient(app) r = c2.post("/api/auth/login", json={"username": "root", "password": "CorrectHorse42", "remember": True}) assert r.status_code == 200 assert c2.cookies.get("clearview_session") assert c2.get("/api/auth/me").json()["username"] == "root" def test_logout_invalidates_session(db_engine): app = make_app(db_engine) c = TestClient(app) c.post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) assert c.get("/api/auth/me").status_code == 200 c.post("/api/auth/logout") assert c.get("/api/auth/me").status_code == 401 def test_password_policy_enforced_on_setup(db_engine): c = TestClient(make_app(db_engine)) r = c.post("/api/auth/setup", json={"username": "root", "password": "short"}) assert r.status_code == 400 ``` - [ ] **Step 2: Verify it fails** Run: `cd containers/clearview && python -m pytest tests/test_auth_router.py -v` Expected: ImportError. - [ ] **Step 3: Implement schemas** Create `containers/clearview/src/clearview_app/auth/schemas.py`: ```python """Pydantic schemas for the auth and users routers.""" from __future__ import annotations from datetime import datetime from typing import Literal from pydantic import BaseModel, Field class LoginRequest(BaseModel): username: str = Field(min_length=1, max_length=128) password: str = Field(min_length=1, max_length=1024) remember: bool = False class SetupRequest(BaseModel): username: str = Field(min_length=1, max_length=128) password: str = Field(min_length=1, max_length=1024) class MeResponse(BaseModel): username: str role: Literal["admin", "user"] class SetupRequiredResponse(BaseModel): setup_required: bool class UserItem(BaseModel): id: int username: str role: Literal["admin", "user"] is_active: bool created_at: datetime class CreateUserRequest(BaseModel): username: str = Field(min_length=1, max_length=128) password: str = Field(min_length=1, max_length=1024) role: Literal["admin", "user"] = "user" class UpdateUserRequest(BaseModel): role: Literal["admin", "user"] | None = None is_active: bool | None = None class ResetPasswordRequest(BaseModel): password: str = Field(min_length=1, max_length=1024) class AuditItem(BaseModel): id: int ts: datetime user_id: int | None event: str ip: str | None detail: dict | None ``` - [ ] **Step 4: Implement auth router** Create `containers/clearview/src/clearview_app/auth/router.py`: ```python """Routes for login, logout, identity, and initial setup.""" from __future__ import annotations from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from sqlalchemy import func, select from sqlalchemy.orm import Session from ..config import COOKIE_NAME, COOKIE_SAMESITE, COOKIE_SECURE from . import sessions as S from .audit import record_event from .dependencies import get_db, require_user from .models import User, UserSession from .schemas import LoginRequest, MeResponse, SetupRequest, SetupRequiredResponse from .security import ( PasswordPolicyError, hash_password, validate_password, verify_password, ) router = APIRouter() def _ip(request: Request) -> str | None: return request.client.host if request.client else None def _set_cookie(response: Response, sid: str, *, remember: bool) -> None: max_age = 30 * 24 * 3600 if remember else 8 * 3600 response.set_cookie( key=COOKIE_NAME, value=sid, max_age=max_age, httponly=True, secure=COOKIE_SECURE, samesite=COOKIE_SAMESITE, path="/", ) def _clear_cookie(response: Response) -> None: response.delete_cookie(key=COOKIE_NAME, path="/") def _users_count(db: Session) -> int: return db.execute(select(func.count(User.id))).scalar_one() @router.get("/api/auth/setup-required", response_model=SetupRequiredResponse) def setup_required(db: Annotated[Session, Depends(get_db)]) -> SetupRequiredResponse: return SetupRequiredResponse(setup_required=_users_count(db) == 0) @router.post("/api/auth/setup", response_model=MeResponse) def setup( payload: SetupRequest, request: Request, response: Response, db: Annotated[Session, Depends(get_db)], ) -> MeResponse: if _users_count(db) > 0: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Setup already completed") try: validate_password(payload.password) except PasswordPolicyError as exc: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc user = User( username=payload.username, password_hash=hash_password(payload.password), role="admin", is_active=True, ) db.add(user); db.flush() sid, _ = S.create_session( db, user_id=user.id, remember=False, ip=_ip(request), user_agent=request.headers.get("user-agent") ) record_event(db, event="setup", user_id=user.id, ip=_ip(request), detail={"username": user.username}) db.commit() _set_cookie(response, sid, remember=False) return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type] @router.post("/api/auth/login", response_model=MeResponse) def login( payload: LoginRequest, request: Request, response: Response, db: Annotated[Session, Depends(get_db)], ) -> MeResponse: user = db.execute(select(User).where(User.username == payload.username)).scalar_one_or_none() if user is None or not user.is_active or not verify_password(payload.password, user.password_hash): record_event( db, event="login_fail", user_id=user.id if user else None, ip=_ip(request), detail={"username": payload.username}, ) db.commit() raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") sid, _ = S.create_session( db, user_id=user.id, remember=payload.remember, ip=_ip(request), user_agent=request.headers.get("user-agent") ) record_event(db, event="login_ok", user_id=user.id, ip=_ip(request), detail=None) S.purge_expired(db) db.commit() _set_cookie(response, sid, remember=payload.remember) return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type] @router.post("/api/auth/logout") def logout( request: Request, response: Response, db: Annotated[Session, Depends(get_db)], ) -> dict[str, bool]: sid = request.cookies.get(COOKIE_NAME) user_id: int | None = None if sid: existing = db.get(UserSession, sid) if existing is not None: user_id = existing.user_id S.revoke(db, sid) record_event(db, event="logout", user_id=user_id, ip=_ip(request), detail=None) db.commit() _clear_cookie(response) return {"ok": True} @router.get("/api/auth/me", response_model=MeResponse) def me(user: Annotated[User, Depends(require_user)]) -> MeResponse: return MeResponse(username=user.username, role=user.role) # type: ignore[arg-type] ``` - [ ] **Step 5: Verify tests pass** Run: `cd containers/clearview && python -m pytest tests/test_auth_router.py -v` Expected: 7 passed. - [ ] **Step 6: Commit** ```bash git add containers/clearview/src/clearview_app/auth/schemas.py \ containers/clearview/src/clearview_app/auth/router.py \ containers/clearview/tests/test_auth_router.py git commit -m "auth: add /api/auth router (login, logout, me, setup)" ``` - [ ] **Step 7: Append changelog** ```markdown ## 2026-05-28 — Authentication: auth router ### Added - `/api/auth/setup-required`, `/api/auth/setup`, `/api/auth/login`, `/api/auth/logout`, `/api/auth/me`. - HttpOnly session cookie with SameSite=Lax; Secure flag controlled by `COOKIE_SECURE` env. ``` Commit. --- ## Task 10: Users + audit admin router **Files:** - Create: `containers/clearview/src/clearview_app/auth/users_router.py` - Create: `containers/clearview/tests/test_users_router.py` - [ ] **Step 1: Write failing tests** ```python from fastapi import FastAPI from fastapi.testclient import TestClient from sqlalchemy.orm import sessionmaker from clearview_app.auth.dependencies import get_db from clearview_app.auth.router import router as auth_router from clearview_app.auth.users_router import router as users_router def make_app(db_engine): Session = sessionmaker(bind=db_engine, autoflush=False, autocommit=False, future=True) def override_get_db(): s = Session() try: yield s finally: s.close() app = FastAPI() app.include_router(auth_router) app.include_router(users_router) app.dependency_overrides[get_db] = override_get_db return app def _bootstrap_admin(app): c = TestClient(app) c.post("/api/auth/setup", json={"username": "root", "password": "CorrectHorse42"}) return c def test_non_admin_blocked(db_engine): app = make_app(db_engine) admin = _bootstrap_admin(app) admin.post("/api/users", json={"username": "joe", "password": "JoePassword42", "role": "user"}) user = TestClient(app) user.post("/api/auth/login", json={"username": "joe", "password": "JoePassword42", "remember": False}) assert user.get("/api/users").status_code == 403 def test_create_list_update_delete(db_engine): app = make_app(db_engine) c = _bootstrap_admin(app) r = c.post("/api/users", json={"username": "alice", "password": "AlicePassword42", "role": "user"}) assert r.status_code == 200 uid = r.json()["id"] assert {u["username"] for u in c.get("/api/users").json()} == {"root", "alice"} c.patch(f"/api/users/{uid}", json={"role": "admin"}) assert next(u for u in c.get("/api/users").json() if u["id"] == uid)["role"] == "admin" c.post(f"/api/users/{uid}/reset-password", json={"password": "NewAlicePass42"}) fresh = TestClient(app) assert fresh.post("/api/auth/login", json={"username": "alice", "password": "NewAlicePass42", "remember": False}).status_code == 200 c.delete(f"/api/users/{uid}") assert {u["username"] for u in c.get("/api/users").json()} == {"root"} def test_cannot_delete_self(db_engine): app = make_app(db_engine) c = _bootstrap_admin(app) me_id = c.get("/api/users").json()[0]["id"] assert c.delete(f"/api/users/{me_id}").status_code == 400 def test_audit_returns_rows(db_engine): app = make_app(db_engine) c = _bootstrap_admin(app) rows = c.get("/api/audit").json() assert any(r["event"] == "setup" for r in rows) ``` - [ ] **Step 2: Verify it fails** Run: `cd containers/clearview && python -m pytest tests/test_users_router.py -v` Expected: ImportError. - [ ] **Step 3: Implement users router** ```python """Admin endpoints: user CRUD, password reset, audit log.""" from __future__ import annotations from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy import select from sqlalchemy.orm import Session from . import sessions as S from .audit import record_event from .dependencies import get_db, require_admin from .models import AuthAudit, User from .schemas import ( AuditItem, CreateUserRequest, ResetPasswordRequest, UpdateUserRequest, UserItem, ) from .security import PasswordPolicyError, hash_password, validate_password router = APIRouter() def _ip(request: Request) -> str | None: return request.client.host if request.client else None def _to_item(u: User) -> UserItem: return UserItem( id=u.id, username=u.username, role=u.role, is_active=u.is_active, created_at=u.created_at # type: ignore[arg-type] ) @router.get("/api/users", response_model=list[UserItem]) def list_users( db: Annotated[Session, Depends(get_db)], _: Annotated[User, Depends(require_admin)], ) -> list[UserItem]: rows = db.execute(select(User).order_by(User.created_at.asc())).scalars().all() return [_to_item(u) for u in rows] @router.post("/api/users", response_model=UserItem) def create_user( payload: CreateUserRequest, request: Request, db: Annotated[Session, Depends(get_db)], actor: Annotated[User, Depends(require_admin)], ) -> UserItem: try: validate_password(payload.password) except PasswordPolicyError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc if db.execute(select(User).where(User.username == payload.username)).scalar_one_or_none(): raise HTTPException(status_code=409, detail="Username already exists") u = User( username=payload.username, password_hash=hash_password(payload.password), role=payload.role, is_active=True, ) db.add(u); db.flush() record_event( db, event="user_create", user_id=actor.id, ip=_ip(request), detail={"target": u.id, "username": u.username, "role": u.role}, ) db.commit(); db.refresh(u) return _to_item(u) @router.patch("/api/users/{user_id}", response_model=UserItem) def update_user( user_id: int, payload: UpdateUserRequest, request: Request, db: Annotated[Session, Depends(get_db)], actor: Annotated[User, Depends(require_admin)], ) -> UserItem: u = db.get(User, user_id) if u is None: raise HTTPException(status_code=404, detail="User not found") changed: dict = {} if payload.role is not None and payload.role != u.role: u.role = payload.role; changed["role"] = payload.role if payload.is_active is not None and payload.is_active != u.is_active: u.is_active = payload.is_active; changed["is_active"] = payload.is_active if not payload.is_active: S.revoke_all_for_user(db, u.id) if changed: record_event(db, event="user_update", user_id=actor.id, ip=_ip(request), detail={"target": u.id, **changed}) db.commit(); db.refresh(u) return _to_item(u) @router.delete("/api/users/{user_id}") def delete_user( user_id: int, request: Request, db: Annotated[Session, Depends(get_db)], actor: Annotated[User, Depends(require_admin)], ) -> dict[str, bool]: if user_id == actor.id: raise HTTPException(status_code=400, detail="Cannot delete your own account") u = db.get(User, user_id) if u is None: raise HTTPException(status_code=404, detail="User not found") db.delete(u) record_event(db, event="user_delete", user_id=actor.id, ip=_ip(request), detail={"target": user_id}) db.commit() return {"ok": True} @router.post("/api/users/{user_id}/reset-password") def reset_password( user_id: int, payload: ResetPasswordRequest, request: Request, db: Annotated[Session, Depends(get_db)], actor: Annotated[User, Depends(require_admin)], ) -> dict[str, bool]: u = db.get(User, user_id) if u is None: raise HTTPException(status_code=404, detail="User not found") try: validate_password(payload.password) except PasswordPolicyError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc u.password_hash = hash_password(payload.password) S.revoke_all_for_user(db, u.id) record_event(db, event="password_reset", user_id=actor.id, ip=_ip(request), detail={"target": u.id}) db.commit() return {"ok": True} @router.get("/api/audit", response_model=list[AuditItem]) def list_audit( db: Annotated[Session, Depends(get_db)], _: Annotated[User, Depends(require_admin)], limit: int = 200, event: str | None = None, ) -> list[AuditItem]: limit = max(1, min(limit, 1000)) q = select(AuthAudit).order_by(AuthAudit.ts.desc()).limit(limit) if event: q = q.where(AuthAudit.event == event) rows = db.execute(q).scalars().all() return [ AuditItem(id=r.id, ts=r.ts, user_id=r.user_id, event=r.event, ip=r.ip, detail=r.detail) for r in rows ] ``` - [ ] **Step 4: Verify tests pass** Run: `cd containers/clearview && python -m pytest tests/test_users_router.py -v` Expected: 4 passed. - [ ] **Step 5: Commit** ```bash git add containers/clearview/src/clearview_app/auth/users_router.py \ containers/clearview/tests/test_users_router.py git commit -m "auth: add users + audit admin router" ``` - [ ] **Step 6: Append changelog** ```markdown ## 2026-05-28 — Authentication: users + audit endpoints ### Added - Admin endpoints: list/create/update/delete users, reset password, view audit log. - Self-delete protection; deactivating or resetting password revokes existing sessions. ``` Commit. --- ## Task 11: Wire auth into `main.py` and protect existing routers **Files:** - Modify: `containers/clearview/src/clearview_app/main.py` - Create: `containers/clearview/tests/test_existing_routes_protected.py` - [ ] **Step 1: Write failing test** ```python """Smoke check that existing routers refuse anonymous requests once gated.""" from fastapi import Depends, FastAPI from fastapi.testclient import TestClient from sqlalchemy.orm import sessionmaker from clearview_app.api_tenants import router as tenants_router from clearview_app.auth.dependencies import get_db, require_user def test_tenants_route_requires_auth(db_engine): Session = sessionmaker(bind=db_engine, autoflush=False, autocommit=False, future=True) def override_get_db(): s = Session() try: yield s finally: s.close() app = FastAPI() app.include_router(tenants_router, dependencies=[Depends(require_user)]) app.dependency_overrides[get_db] = override_get_db assert TestClient(app).get("/api/tenants").status_code == 401 ``` - [ ] **Step 2: Verify it fails** Run: `cd containers/clearview && python -m pytest tests/test_existing_routes_protected.py -v` Expected: FAIL or error (depending on DB state; the fail mode is "not 401"). - [ ] **Step 3: Modify `main.py`** Replace this block in `containers/clearview/src/clearview_app/main.py`: ```python app.include_router(tenants_router) app.include_router(jobs_router) app.include_router(onboarding_router) ``` With: ```python from fastapi import Depends from .auth.dependencies import require_user from .auth.router import router as auth_router from .auth.users_router import router as users_router # Public auth endpoints (login / setup / setup-required) — no dependency. app.include_router(auth_router) # Admin endpoints — already enforce require_admin internally. app.include_router(users_router) # Existing routers gated by an authenticated session. _protected = [Depends(require_user)] app.include_router(tenants_router, dependencies=_protected) app.include_router(jobs_router, dependencies=_protected) app.include_router(onboarding_router, dependencies=_protected) ``` - [ ] **Step 4: Verify protection test passes** Run: `cd containers/clearview && python -m pytest tests/test_existing_routes_protected.py -v` Expected: pass. - [ ] **Step 5: Run full backend suite** Run: `cd containers/clearview && python -m pytest tests/ -v` Expected: all tests pass. - [ ] **Step 6: Commit** ```bash git add containers/clearview/src/clearview_app/main.py \ containers/clearview/tests/test_existing_routes_protected.py git commit -m "auth: gate existing routers behind require_user, wire auth + users routers" ``` - [ ] **Step 7: Append changelog** ```markdown ## 2026-05-28 — Authentication: API gating ### Changed - Tenants, Jobs, and Onboarding routers now require an authenticated session. - Auth and Users routers wired into the FastAPI app. ``` Commit. --- ## Task 12: Login page + shared `auth.js` + CSS **Files:** - Create: `containers/clearview/site/login.html` - Create: `containers/clearview/site/auth.js` - Modify: `containers/clearview/site/styles.css` - [ ] **Step 1: Create `auth.js`** ```javascript (function (global) { async function postJson(url, body) { const r = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, credentials: 'same-origin', body: JSON.stringify(body), }); let data = null; try { data = await r.json(); } catch (_) {} return { ok: r.ok, status: r.status, data }; } async function getJson(url) { const r = await fetch(url, { credentials: 'same-origin' }); let data = null; try { data = await r.json(); } catch (_) {} return { ok: r.ok, status: r.status, data }; } global.ClearviewAuth = { postJson, getJson }; })(window); ``` - [ ] **Step 2: Create `login.html`** ```html Clearview — Sign in

Clearview

Sign in to continue

``` - [ ] **Step 3: Append CSS** Append to `containers/clearview/site/styles.css`: ```css .auth-page { display: flex; align-items: center; justify-content: center; min-height: 100vh; background: #0f1115; margin: 0; } .auth-card { width: 360px; max-width: 92vw; padding: 28px; background: #1a1d24; border-radius: 12px; box-shadow: 0 8px 28px rgba(0,0,0,.35); color: #e6e8ee; font-family: system-ui, sans-serif; } .auth-card h1 { margin: 0 0 4px; font-size: 22px; } .auth-sub { margin: 0 0 18px; opacity: .75; } .auth-card form { display: flex; flex-direction: column; gap: 12px; } .auth-card label { display: flex; flex-direction: column; gap: 4px; font-size: 13px; } .auth-card input[type=text], .auth-card input[type=password], .auth-card input:not([type]) { padding: 8px 10px; background: #0e1116; border: 1px solid #2a2f3a; border-radius: 6px; color: inherit; } .auth-card .auth-remember { flex-direction: row; align-items: center; gap: 8px; font-size: 13px; } .auth-card button { padding: 10px; background: #3b82f6; border: 0; border-radius: 6px; color: #fff; font-weight: 600; cursor: pointer; } .auth-card button:hover { background: #2563eb; } .auth-error { background: #3a1f25; color: #fda4af; padding: 8px 10px; border-radius: 6px; font-size: 13px; } .user-badge { display: inline-flex; align-items: center; gap: 8px; padding: 4px 10px; border: 1px solid #2a2f3a; border-radius: 999px; font-size: 12px; } .user-badge button { background: transparent; border: 0; color: #93c5fd; cursor: pointer; padding: 0; } .user-badge button:hover { text-decoration: underline; } ``` (If `styles.css` already uses theme variables, the worker may adapt these colours to match — the literal values above guarantee a usable page.) - [ ] **Step 4: Manual smoke check** Bring up the stack and open `http://:/login.html`. Confirm the page renders, wrong creds show an error, right creds redirect to `/`. If the local environment can't run the stack, mark this as a manual gate and continue. - [ ] **Step 5: Commit** ```bash git add containers/clearview/site/login.html \ containers/clearview/site/auth.js \ containers/clearview/site/styles.css git commit -m "auth: add login page and shared auth.js" ``` - [ ] **Step 6: Append changelog** ```markdown ## 2026-05-28 — Authentication: login page ### Added - `login.html` with username / password / remember-me. - Shared `auth.js` helpers (postJson, getJson). - CSS for auth pages and the header user-badge. ``` Commit. --- ## Task 13: Setup page **Files:** - Create: `containers/clearview/site/setup.html` - [ ] **Step 1: Create `setup.html`** ```html Clearview — First-time setup

Welcome to Clearview

Create the first administrator account.

``` - [ ] **Step 2: Commit** ```bash git add containers/clearview/site/setup.html git commit -m "auth: add first-run setup page" ``` - [ ] **Step 3: Append changelog** ```markdown ## 2026-05-28 — Authentication: setup page ### Added - `setup.html` for first-run admin creation, reachable only while the `users` table is empty. ``` Commit. --- ## Task 14: app.js boot — auth gate + logout + user-badge + Users nav **Files:** - Modify: `containers/clearview/site/app.js` - Modify: `containers/clearview/site/index.html` - [ ] **Step 1: Inspect the current `app.js` entry and `requestJson` wrapper** Run: `sed -n '100,140p' containers/clearview/site/app.js` Identify `requestJson` (around line 116) and the bootstrap entry point. - [ ] **Step 2: Modify `requestJson` so 401 redirects to login** Locate the existing function. Add the 401 short-circuit immediately after the `fetch` call. Preserve all subsequent error/JSON handling that exists today: ```javascript async function requestJson(url, options) { const response = await fetch(url, Object.assign({ credentials: 'same-origin' }, options || {})); if (response.status === 401) { window.location.replace('/login.html'); throw new Error('unauthenticated'); } // ...existing remainder of the function unchanged... } ``` - [ ] **Step 3: Add an auth-gate immediately before the existing app boot** ```javascript (async function authGate() { try { const r = await fetch('/api/auth/me', { credentials: 'same-origin' }); if (r.status === 401) { const setup = await fetch('/api/auth/setup-required').then(x => x.json()).catch(() => ({ setup_required: false })); window.location.replace(setup.setup_required ? '/setup.html' : '/login.html'); return; } const me = await r.json(); window.__clearviewUser = me; renderUserBadge(me); if (me.role !== 'admin') { const usersLink = document.querySelector('[data-route="users"]'); if (usersLink) usersLink.style.display = 'none'; } } catch (e) { window.location.replace('/login.html'); } })(); function renderUserBadge(me) { const slot = document.getElementById('userBadge'); if (!slot) return; slot.innerHTML = ''; const wrap = document.createElement('span'); wrap.className = 'user-badge'; wrap.append(document.createTextNode(`${me.username} (${me.role})`)); const btn = document.createElement('button'); btn.type = 'button'; btn.textContent = 'Sign out'; btn.addEventListener('click', async () => { await fetch('/api/auth/logout', { method: 'POST', credentials: 'same-origin' }); window.location.replace('/login.html'); }); wrap.append(btn); slot.append(wrap); } ``` - [ ] **Step 4: Add user-badge slot and Users nav link to `index.html`** In `