diff --git a/containers/clearview/src/clearview_app/auth/models.py b/containers/clearview/src/clearview_app/auth/models.py new file mode 100644 index 0000000..7317812 --- /dev/null +++ b/containers/clearview/src/clearview_app/auth/models.py @@ -0,0 +1,61 @@ +"""SQLAlchemy models for authentication, sessions, and audit log. + +A dedicated ``Base`` is used so these tables can be created independently +of the existing scan/tenant models in tests; in production they coexist +in the same database under Alembic. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, JSON, String, Text +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + +class Base(DeclarativeBase): + pass + + +class User(Base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + username: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True) + password_hash: Mapped[str] = mapped_column(Text, nullable=False) + role: Mapped[str] = mapped_column(String(16), nullable=False) + is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False) + + +class UserSession(Base): + __tablename__ = "user_sessions" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + user_id: Mapped[int] = mapped_column( + Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True + ) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False) + expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True) + last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False) + ip: Mapped[str | None] = mapped_column(String(64), nullable=True) + user_agent: Mapped[str | None] = mapped_column(Text, nullable=True) + remember: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + + +class AuthAudit(Base): + __tablename__ = "auth_audit" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False, index=True) + user_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + event: Mapped[str] = mapped_column(String(32), nullable=False, index=True) + ip: Mapped[str | None] = mapped_column(String(64), nullable=True) + detail: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True) diff --git a/containers/clearview/tests/test_models.py b/containers/clearview/tests/test_models.py new file mode 100644 index 0000000..ef7e0f9 --- /dev/null +++ b/containers/clearview/tests/test_models.py @@ -0,0 +1,34 @@ +from datetime import datetime, timedelta, timezone + +from clearview_app.auth.models import AuthAudit, User, UserSession + + +def test_user_defaults(db_session): + u = User(username="alice", password_hash="x", role="admin") + db_session.add(u); db_session.commit(); db_session.refresh(u) + assert u.id is not None + assert u.is_active is True + assert isinstance(u.created_at, datetime) + + +def test_session_persists_with_expiry(db_session): + u = User(username="bob", password_hash="x", role="user") + db_session.add(u); db_session.commit(); db_session.refresh(u) + + s = UserSession( + id="abc123", + user_id=u.id, + expires_at=datetime.now(timezone.utc) + timedelta(hours=8), + ip="1.2.3.4", + user_agent="ua", + remember=False, + ) + db_session.add(s); db_session.commit() + assert s.created_at is not None + + +def test_audit_row(db_session): + a = AuthAudit(event="login_ok", ip="9.9.9.9", detail={"k": "v"}) + db_session.add(a); db_session.commit() + assert a.id is not None + assert a.detail == {"k": "v"}