auth: add User, UserSession, AuthAudit models
This commit is contained in:
parent
53e1094a10
commit
61ab979f5a
61
containers/clearview/src/clearview_app/auth/models.py
Normal file
61
containers/clearview/src/clearview_app/auth/models.py
Normal file
@ -0,0 +1,61 @@
|
||||
"""SQLAlchemy models for authentication, sessions, and audit log.
|
||||
|
||||
A dedicated ``Base`` is used so these tables can be created independently
|
||||
of the existing scan/tenant models in tests; in production they coexist
|
||||
in the same database under Alembic.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, JSON, String, Text
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
|
||||
|
||||
|
||||
def _utcnow() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "users"
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
|
||||
username: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
|
||||
password_hash: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
role: Mapped[str] = mapped_column(String(16), nullable=False)
|
||||
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False)
|
||||
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False)
|
||||
|
||||
|
||||
class UserSession(Base):
|
||||
__tablename__ = "user_sessions"
|
||||
|
||||
id: Mapped[str] = mapped_column(String(64), primary_key=True)
|
||||
user_id: Mapped[int] = mapped_column(
|
||||
Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
|
||||
)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False)
|
||||
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True)
|
||||
last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False)
|
||||
ip: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||
user_agent: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
remember: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
|
||||
|
||||
|
||||
class AuthAudit(Base):
|
||||
__tablename__ = "auth_audit"
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
|
||||
ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False, index=True)
|
||||
user_id: Mapped[int | None] = mapped_column(
|
||||
Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True
|
||||
)
|
||||
event: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
|
||||
ip: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||
detail: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True)
|
||||
34
containers/clearview/tests/test_models.py
Normal file
34
containers/clearview/tests/test_models.py
Normal file
@ -0,0 +1,34 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from clearview_app.auth.models import AuthAudit, User, UserSession
|
||||
|
||||
|
||||
def test_user_defaults(db_session):
|
||||
u = User(username="alice", password_hash="x", role="admin")
|
||||
db_session.add(u); db_session.commit(); db_session.refresh(u)
|
||||
assert u.id is not None
|
||||
assert u.is_active is True
|
||||
assert isinstance(u.created_at, datetime)
|
||||
|
||||
|
||||
def test_session_persists_with_expiry(db_session):
|
||||
u = User(username="bob", password_hash="x", role="user")
|
||||
db_session.add(u); db_session.commit(); db_session.refresh(u)
|
||||
|
||||
s = UserSession(
|
||||
id="abc123",
|
||||
user_id=u.id,
|
||||
expires_at=datetime.now(timezone.utc) + timedelta(hours=8),
|
||||
ip="1.2.3.4",
|
||||
user_agent="ua",
|
||||
remember=False,
|
||||
)
|
||||
db_session.add(s); db_session.commit()
|
||||
assert s.created_at is not None
|
||||
|
||||
|
||||
def test_audit_row(db_session):
|
||||
a = AuthAudit(event="login_ok", ip="9.9.9.9", detail={"k": "v"})
|
||||
db_session.add(a); db_session.commit()
|
||||
assert a.id is not None
|
||||
assert a.detail == {"k": "v"}
|
||||
Loading…
Reference in New Issue
Block a user