novela/containers/novela/routers/backup.py

2067 lines
71 KiB
Python

import asyncio
import base64
import hashlib
import json
import os
import shutil
import subprocess
import time
from datetime import date, datetime, timezone
from pathlib import Path
from tempfile import NamedTemporaryFile
from urllib.parse import urlencode
import dropbox
import httpx
from dropbox.exceptions import ApiError, AuthError
from fastapi import APIRouter, File, Request, UploadFile
from fastapi.responses import HTMLResponse
from shared_templates import templates
from db import get_db_conn
from routers.common import (
scan_media,
upsert_book,
upsert_chapter,
upsert_cover_cache,
)
from security import decrypt_value, encrypt_value, is_encrypted_value
router = APIRouter()
LIBRARY_DIR = Path(os.environ.get("LIBRARY_DIR", "library"))
CONFIG_DIR = Path(os.environ.get("CONFIG_DIR", "config"))
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
MANIFEST_PATH = CONFIG_DIR / "backup_manifest.json"
# Local copies of PostgreSQL dumps, so the database can be restored without a
# Dropbox token. Lives on the persistent config volume.
LOCAL_DUMP_DIR = CONFIG_DIR / "postgres_dumps"
DEFAULT_DROPBOX_ROOT = "/novela"
DEFAULT_RETENTION_COUNT = 14
DEFAULT_SCHEDULE_ENABLED = False
DEFAULT_SCHEDULE_INTERVAL_HOURS = 24
BACKUP_TASKS: dict[int, asyncio.Task] = {}
BACKUP_PROGRESS: dict[int, dict] = {} # log_id → {done, total, phase}
SCHEDULER_TASK: asyncio.Task | None = None
# Full-database restore runs in the background (one at a time) and reports
# byte-level progress here, polled via GET /api/backup/restore/progress.
RESTORE_PROGRESS: dict = {
"active": False,
"phase": "idle", # downloading | safety_dump | resetting | loading | rolling_back | done | error
"done": 0,
"total": 0,
"label": "",
"name": "",
"size": 0,
"ok": None,
"error": None,
}
RESTORE_TASK: "asyncio.Task | None" = None
def _restore_prog(phase: str, done: int = 0, total: int = 0, label: str = "") -> None:
RESTORE_PROGRESS.update(
{"phase": phase, "done": int(done), "total": int(total), "label": label}
)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _load_manifest() -> dict[str, dict[str, float | int | str]]:
if not MANIFEST_PATH.exists():
return {}
try:
data = json.loads(MANIFEST_PATH.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
except Exception:
pass
return {}
def _save_manifest(manifest: dict[str, dict[str, float | int | str]]) -> None:
MANIFEST_PATH.write_text(json.dumps(manifest, indent=2, sort_keys=True), encoding="utf-8")
def _dropbox_credential_details() -> dict:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"SELECT username, password, updated_at FROM credentials WHERE site = 'dropbox' LIMIT 1"
)
row = cur.fetchone()
if not row:
return {"configured": False, "token": "", "updated_at": None}
username_raw, password_raw, updated_at = row
username = decrypt_value(username_raw)
password = decrypt_value(password_raw)
token = (password or username or "").strip()
if not is_encrypted_value(username_raw) or not is_encrypted_value(password_raw):
cur.execute(
"""
UPDATE credentials
SET username = %s, password = %s, updated_at = NOW()
WHERE site = 'dropbox'
RETURNING updated_at
""",
(encrypt_value(username), encrypt_value(password)),
)
upd = cur.fetchone()
if upd:
updated_at = upd[0]
return {
"configured": bool(token),
"token": token,
"updated_at": updated_at.isoformat() if updated_at else None,
}
def _load_dropbox_token() -> str:
return _dropbox_credential_details().get("token", "")
def _load_dropbox_app_key() -> str:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"SELECT password FROM credentials WHERE site = 'dropbox_app_key' LIMIT 1"
)
row = cur.fetchone()
if not row:
return ""
return decrypt_value(row[0]).strip()
def _load_dropbox_app_secret() -> str:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"SELECT password FROM credentials WHERE site = 'dropbox_app_secret' LIMIT 1"
)
row = cur.fetchone()
if not row:
return ""
return decrypt_value(row[0]).strip()
def _save_dropbox_app_key(app_key: str) -> None:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO credentials (site, username, password, updated_at)
VALUES ('dropbox_app_key', %s, %s, NOW())
ON CONFLICT (site) DO UPDATE
SET username = EXCLUDED.username,
password = EXCLUDED.password,
updated_at = NOW()
""",
(encrypt_value(""), encrypt_value(app_key.strip())),
)
def _save_dropbox_app_secret(app_secret: str) -> None:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO credentials (site, username, password, updated_at)
VALUES ('dropbox_app_secret', %s, %s, NOW())
ON CONFLICT (site) DO UPDATE
SET username = EXCLUDED.username,
password = EXCLUDED.password,
updated_at = NOW()
""",
(encrypt_value(""), encrypt_value(app_secret.strip())),
)
def _normalize_dropbox_root(value: str | None) -> str:
root = (value or "").strip() or DEFAULT_DROPBOX_ROOT
if not root.startswith("/"):
root = "/" + root
root = "/" + "/".join(part for part in root.split("/") if part)
return root or DEFAULT_DROPBOX_ROOT
def _dropbox_root_details() -> dict:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"SELECT username, password, updated_at FROM credentials WHERE site = 'dropbox_backup_root' LIMIT 1"
)
row = cur.fetchone()
if not row:
env_val = os.environ.get("DROPBOX_BACKUP_ROOT", DEFAULT_DROPBOX_ROOT)
return {
"root": _normalize_dropbox_root(env_val),
"updated_at": None,
}
username_raw, password_raw, updated_at = row
username = decrypt_value(username_raw)
password = decrypt_value(password_raw)
root = _normalize_dropbox_root(password or username or DEFAULT_DROPBOX_ROOT)
if not is_encrypted_value(username_raw) or not is_encrypted_value(password_raw):
cur.execute(
"""
UPDATE credentials
SET username = %s, password = %s, updated_at = NOW()
WHERE site = 'dropbox_backup_root'
RETURNING updated_at
""",
(encrypt_value(""), encrypt_value(root)),
)
upd = cur.fetchone()
if upd:
updated_at = upd[0]
return {
"root": root,
"updated_at": updated_at.isoformat() if updated_at else None,
}
def _load_dropbox_root() -> str:
return _dropbox_root_details().get("root", DEFAULT_DROPBOX_ROOT)
def _dropbox_retention_details() -> dict:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"SELECT username, password, updated_at FROM credentials WHERE site = 'dropbox_backup_retention' LIMIT 1"
)
row = cur.fetchone()
if not row:
return {"retention_count": DEFAULT_RETENTION_COUNT, "updated_at": None}
username_raw, password_raw, updated_at = row
username = decrypt_value(username_raw)
password = decrypt_value(password_raw)
raw = (password or username or "").strip()
try:
retention_count = max(1, int(raw))
except Exception:
retention_count = DEFAULT_RETENTION_COUNT
if not is_encrypted_value(username_raw) or not is_encrypted_value(password_raw):
cur.execute(
"""
UPDATE credentials
SET username = %s, password = %s, updated_at = NOW()
WHERE site = 'dropbox_backup_retention'
RETURNING updated_at
""",
(encrypt_value(""), encrypt_value(str(retention_count))),
)
upd = cur.fetchone()
if upd:
updated_at = upd[0]
return {
"retention_count": retention_count,
"updated_at": updated_at.isoformat() if updated_at else None,
}
def _load_dropbox_retention_count() -> int:
return int(_dropbox_retention_details().get("retention_count", DEFAULT_RETENTION_COUNT))
def _dropbox_schedule_details() -> dict:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"SELECT username, password, updated_at FROM credentials WHERE site = 'dropbox_backup_schedule' LIMIT 1"
)
row = cur.fetchone()
if not row:
return {
"enabled": DEFAULT_SCHEDULE_ENABLED,
"interval_hours": DEFAULT_SCHEDULE_INTERVAL_HOURS,
"updated_at": None,
}
username_raw, password_raw, updated_at = row
username = decrypt_value(username_raw)
password = decrypt_value(password_raw)
raw = (password or username or "").strip().lower()
enabled = False
interval_hours = DEFAULT_SCHEDULE_INTERVAL_HOURS
try:
obj = json.loads(raw) if raw.startswith("{") else None
except Exception:
obj = None
if isinstance(obj, dict):
enabled = bool(obj.get("enabled", DEFAULT_SCHEDULE_ENABLED))
try:
interval_hours = max(1, int(obj.get("interval_hours", DEFAULT_SCHEDULE_INTERVAL_HOURS)))
except Exception:
interval_hours = DEFAULT_SCHEDULE_INTERVAL_HOURS
else:
parts = raw.split(":")
if len(parts) == 2:
enabled = parts[0] in {"1", "true", "yes", "on"}
try:
interval_hours = max(1, int(parts[1]))
except Exception:
interval_hours = DEFAULT_SCHEDULE_INTERVAL_HOURS
norm = json.dumps({"enabled": enabled, "interval_hours": interval_hours}, separators=(",", ":"))
if not is_encrypted_value(username_raw) or not is_encrypted_value(password_raw):
cur.execute(
"""
UPDATE credentials
SET username = %s, password = %s, updated_at = NOW()
WHERE site = 'dropbox_backup_schedule'
RETURNING updated_at
""",
(encrypt_value(""), encrypt_value(norm)),
)
upd = cur.fetchone()
if upd:
updated_at = upd[0]
return {
"enabled": enabled,
"interval_hours": interval_hours,
"updated_at": updated_at.isoformat() if updated_at else None,
}
def _load_backup_schedule() -> tuple[bool, int]:
d = _dropbox_schedule_details()
return bool(d.get("enabled", DEFAULT_SCHEDULE_ENABLED)), int(d.get("interval_hours", DEFAULT_SCHEDULE_INTERVAL_HOURS))
def _save_backup_schedule(enabled: bool, interval_hours: int) -> None:
interval = max(1, int(interval_hours))
payload = json.dumps({"enabled": bool(enabled), "interval_hours": interval}, separators=(",", ":"))
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO credentials (site, username, password, updated_at)
VALUES ('dropbox_backup_schedule', %s, %s, NOW())
ON CONFLICT (site) DO UPDATE
SET username = EXCLUDED.username,
password = EXCLUDED.password,
updated_at = NOW()
""",
(encrypt_value(""), encrypt_value(payload)),
)
def _dropbox_join(root: str, *parts: str) -> str:
clean_root = _normalize_dropbox_root(root)
segs = [p.strip("/") for p in parts if p and p.strip("/")]
if clean_root == "/":
return "/" + "/".join(segs) if segs else "/"
if not segs:
return clean_root
return clean_root + "/" + "/".join(segs)
def _save_dropbox_root(root: str) -> None:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO credentials (site, username, password, updated_at)
VALUES ('dropbox_backup_root', %s, %s, NOW())
ON CONFLICT (site) DO UPDATE
SET username = EXCLUDED.username,
password = EXCLUDED.password,
updated_at = NOW()
""",
(encrypt_value(""), encrypt_value(_normalize_dropbox_root(root))),
)
def _save_dropbox_retention_count(retention_count: int) -> None:
val = max(1, int(retention_count))
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO credentials (site, username, password, updated_at)
VALUES ('dropbox_backup_retention', %s, %s, NOW())
ON CONFLICT (site) DO UPDATE
SET username = EXCLUDED.username,
password = EXCLUDED.password,
updated_at = NOW()
""",
(encrypt_value(""), encrypt_value(str(val))),
)
def _dbx() -> dropbox.Dropbox:
"""
Maak een Dropbox client aan.
Voorkeursvolgorde:
1. App key + app secret + refresh token -> automatische token refresh
2. Legacy access token (achterwaartse compatibiliteit)
"""
token = _load_dropbox_token()
if not token:
raise RuntimeError("Dropbox token not found in credentials (site='dropbox').")
app_key = _load_dropbox_app_key()
app_secret = _load_dropbox_app_secret()
try:
if app_key and app_secret:
client = dropbox.Dropbox(
oauth2_refresh_token=token,
app_key=app_key,
app_secret=app_secret,
timeout=300,
)
else:
# Fallback: legacy access token
client = dropbox.Dropbox(token, timeout=300)
client.users_get_current_account()
except AuthError as e:
raise RuntimeError(f"Dropbox auth failed: {e}")
return client
def _ensure_dropbox_dir(client: dropbox.Dropbox, path: str) -> None:
if not path or path == "/":
return
parts = [p for p in path.split("/") if p]
cur = ""
for p in parts:
cur += "/" + p
try:
client.files_create_folder_v2(cur)
except ApiError:
pass
_DROPBOX_UPLOAD_CHUNK = 16 * 1024 * 1024 # 16 MB — keeps each chunk well within request timeout
_DROPBOX_UPLOAD_THRESHOLD = 16 * 1024 * 1024 # use session upload above this size
def _dropbox_upload_bytes(client: dropbox.Dropbox, target_path: str, data: bytes) -> int:
parent = str(Path(target_path).parent).replace("\\", "/")
if not parent.startswith("/"):
parent = "/" + parent
_ensure_dropbox_dir(client, parent)
mode = dropbox.files.WriteMode.overwrite
if len(data) <= _DROPBOX_UPLOAD_THRESHOLD:
client.files_upload(data, target_path, mode=mode, mute=True)
else:
# Chunked upload session for large files
offset = 0
session_id = None
while offset < len(data):
chunk = data[offset : offset + _DROPBOX_UPLOAD_CHUNK]
if session_id is None:
res = client.files_upload_session_start(chunk)
session_id = res.session_id
else:
cursor = dropbox.files.UploadSessionCursor(session_id=session_id, offset=offset)
remaining = len(data) - offset - len(chunk)
if remaining == 0:
commit = dropbox.files.CommitInfo(path=target_path, mode=mode, mute=True)
client.files_upload_session_finish(chunk, cursor, commit)
else:
client.files_upload_session_append_v2(chunk, cursor)
offset += len(chunk)
return len(data)
def _dropbox_exists(client: dropbox.Dropbox, path: str) -> bool:
try:
client.files_get_metadata(path)
return True
except ApiError as e:
text = str(e).lower()
if "not_found" in text or "path/not_found" in text:
return False
raise
def _dropbox_list_files_recursive(client: dropbox.Dropbox, root: str) -> list[str]:
paths: list[str] = []
try:
res = client.files_list_folder(root, recursive=True)
except ApiError as e:
text = str(e).lower()
if "not_found" in text or "path/not_found" in text:
return []
raise
while True:
for entry in res.entries:
if isinstance(entry, dropbox.files.FileMetadata):
paths.append(entry.path_lower or entry.path_display or "")
if not res.has_more:
break
res = client.files_list_folder_continue(res.cursor)
return [p for p in paths if p]
def _dropbox_delete_paths(client: dropbox.Dropbox, paths: list[str]) -> int:
deleted = 0
for p in paths:
try:
client.files_delete_v2(p)
deleted += 1
except ApiError:
pass
return deleted
def _iter_library_files() -> list[Path]:
if not LIBRARY_DIR.exists():
return []
return [p for p in LIBRARY_DIR.rglob("*") if p.is_file()]
def _current_file_state(path: Path) -> dict[str, float | int]:
st = path.stat()
return {"mtime": st.st_mtime, "size": st.st_size}
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def _snapshot_name() -> str:
stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
return f"snapshot-{stamp}.json"
# ── Database-stored books (storage_type='db') ────────────────────────────────
# These books have no file on disk: their content lives in PostgreSQL
# (library row + book_chapters + book_tags + library_cover_cache). Inline images
# referenced from chapters live on disk under library/images/ and are backed up
# as ordinary files. To make db-books restorable per-book (just like file books),
# each one is serialized to JSON and stored in the same content-addressed object
# store, then referenced from the snapshot with a "storage": "db" marker.
_DB_LIBRARY_COLS = [
"filename",
"media_type",
"storage_type",
"title",
"author",
"publisher",
"series",
"series_index",
"series_suffix",
"series_volume",
"publication_status",
"has_cover",
"description",
"source_url",
"publish_date",
"archived",
"want_to_read",
"needs_review",
"rating",
"created_at",
]
def _db_book_filenames(cur) -> list[str]:
cur.execute(
"SELECT filename FROM library WHERE storage_type = 'db' OR filename LIKE 'db/%' ORDER BY filename"
)
return [r[0] for r in cur.fetchall()]
def _serialize_db_book(cur, filename: str) -> dict | None:
cols = _DB_LIBRARY_COLS
cur.execute(
f"SELECT {', '.join(cols)} FROM library WHERE filename = %s LIMIT 1",
(filename,),
)
row = cur.fetchone()
if not row:
return None
lib: dict = {}
for col, val in zip(cols, row):
if isinstance(val, (datetime, date)):
val = val.isoformat()
lib[col] = val
cur.execute(
"SELECT chapter_index, title, content FROM book_chapters WHERE filename = %s ORDER BY chapter_index",
(filename,),
)
chapters = [
{"chapter_index": r[0], "title": r[1] or "", "content": r[2] or ""}
for r in cur.fetchall()
]
cur.execute(
"SELECT tag, tag_type FROM book_tags WHERE filename = %s ORDER BY tag, tag_type",
(filename,),
)
tags = [{"tag": r[0], "tag_type": r[1]} for r in cur.fetchall()]
cur.execute(
"SELECT mime_type, thumb_webp FROM library_cover_cache WHERE filename = %s LIMIT 1",
(filename,),
)
cover_row = cur.fetchone()
cover = None
if cover_row and cover_row[1] is not None:
cover = {
"mime_type": cover_row[0] or "image/webp",
"thumb_webp_b64": base64.b64encode(bytes(cover_row[1])).decode("ascii"),
}
return {
"novela_db_book": 1,
"filename": filename,
"library": lib,
"chapters": chapters,
"tags": tags,
"cover": cover,
}
def _restore_db_book(filename: str, payload: dict) -> None:
"""Re-create a db-stored book from a serialized snapshot object."""
lib = dict(payload.get("library") or {})
lib["filename"] = filename
lib.setdefault("storage_type", "db")
lib.setdefault("media_type", "epub")
cols = [c for c in _DB_LIBRARY_COLS if c in lib]
if "filename" not in cols:
cols.insert(0, "filename")
chapters = payload.get("chapters") or []
tags = payload.get("tags") or []
cover = payload.get("cover")
col_list = ", ".join(cols)
placeholders = ", ".join(["%s"] * len(cols))
updates = ", ".join(f"{c} = EXCLUDED.{c}" for c in cols if c != "filename")
values = [lib.get(c) for c in cols]
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
f"""
INSERT INTO library ({col_list})
VALUES ({placeholders})
ON CONFLICT (filename) DO UPDATE SET
{updates},
updated_at = NOW()
""",
values,
)
cur.execute("DELETE FROM book_chapters WHERE filename = %s", (filename,))
for ch in chapters:
try:
idx = int(ch.get("chapter_index"))
except (TypeError, ValueError):
continue
upsert_chapter(conn, filename, idx, ch.get("title", ""), ch.get("content", ""))
with conn.cursor() as cur:
cur.execute("DELETE FROM book_tags WHERE filename = %s", (filename,))
rows = []
seen: set[tuple[str, str]] = set()
for t in tags:
tag = (t.get("tag") or "").strip()
ttype = (t.get("tag_type") or "").strip()
if not tag or not ttype:
continue
key = (tag.casefold(), ttype)
if key in seen:
continue
seen.add(key)
rows.append((filename, tag, ttype))
if rows:
cur.executemany(
"INSERT INTO book_tags (filename, tag, tag_type) VALUES (%s, %s, %s) "
"ON CONFLICT (filename, tag, tag_type) DO NOTHING",
rows,
)
if cover and cover.get("thumb_webp_b64"):
try:
thumb = base64.b64decode(cover["thumb_webp_b64"])
upsert_cover_cache(conn, filename, cover.get("mime_type", "image/webp"), thumb)
except Exception:
pass
def _object_path(objects_root: str, sha256: str) -> str:
return _dropbox_join(objects_root, sha256[:2], sha256)
def _pg_dump_cmd(tmp_path: Path) -> list[str]:
return [
"pg_dump",
"-h",
os.environ.get("POSTGRES_HOST", "postgres"),
"-p",
str(os.environ.get("POSTGRES_PORT", "5432")),
"-U",
os.environ.get("POSTGRES_USER", "novela"),
"-d",
os.environ.get("POSTGRES_DB", "novela"),
"-f",
str(tmp_path),
]
def _run_pg_dump() -> tuple[bytes, str]:
db = os.environ.get("POSTGRES_DB", "novela")
env = os.environ.copy()
env["PGPASSWORD"] = os.environ.get("POSTGRES_PASSWORD", "")
with NamedTemporaryFile(suffix=".sql", delete=False) as tmp:
tmp_path = Path(tmp.name)
try:
cmd = _pg_dump_cmd(tmp_path)
proc = subprocess.run(cmd, env=env, capture_output=True, text=True)
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
raise RuntimeError(f"pg_dump failed: {stderr or 'unknown error'}")
data = tmp_path.read_bytes()
stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
return data, f"{db}-{stamp}.sql"
finally:
tmp_path.unlink(missing_ok=True)
def _psql_base_args() -> list[str]:
return [
"-h",
os.environ.get("POSTGRES_HOST", "postgres"),
"-p",
str(os.environ.get("POSTGRES_PORT", "5432")),
"-U",
os.environ.get("POSTGRES_USER", "novela"),
"-d",
os.environ.get("POSTGRES_DB", "novela"),
]
# Session-GUC SET statements a dump may carry that an OLDER server doesn't know.
# These are emitted in the pg_dump header and are harmless to skip — they only
# affect the restoring session, not the data. (e.g. `transaction_timeout` was
# introduced in PostgreSQL 17; restoring such a dump into a <17 server errors on
# that line.) We must not let these abort an otherwise valid restore, but any
# OTHER error still fails the restore.
_BENIGN_RESTORE_ERROR_MARKERS = (
"unrecognized configuration parameter",
)
def _real_restore_errors(stderr: str) -> list[str]:
errors = []
for line in (stderr or "").splitlines():
if "ERROR:" not in line:
continue
if any(marker in line for marker in _BENIGN_RESTORE_ERROR_MARKERS):
continue
errors.append(line.strip())
return errors
def _apply_pg_dump_file(
path: Path,
total: int,
*,
load_phase: str = "loading",
load_label: str = "Loading dump",
reset_label: str = "Resetting schema",
) -> None:
"""Reset the public schema and load a dump file into it, reporting progress.
The dump is streamed to psql over stdin in chunks; the byte counter tracks
how much has been fed, which (thanks to pipe back-pressure) closely follows
psql's actual progress. The load runs WITHOUT ON_ERROR_STOP so benign header
SETs an older server doesn't recognise (e.g. `transaction_timeout`) don't
abort it; stderr is inspected afterwards and any non-benign `ERROR:` raises.
"""
env = os.environ.copy()
env["PGPASSWORD"] = os.environ.get("POSTGRES_PASSWORD", "")
base = _psql_base_args()
_restore_prog("resetting", 0, 0, reset_label)
reset = subprocess.run(
["psql", *base, "-v", "ON_ERROR_STOP=1", "-c", "DROP SCHEMA public CASCADE; CREATE SCHEMA public;"],
env=env,
capture_output=True,
text=True,
)
if reset.returncode != 0:
raise RuntimeError(f"schema reset failed: {(reset.stderr or '').strip()[:500] or 'unknown error'}")
_restore_prog(load_phase, 0, total, load_label)
with NamedTemporaryFile(suffix=".stderr", delete=False) as errtmp:
err_path = Path(errtmp.name)
try:
with err_path.open("wb") as err_fh, path.open("rb") as src:
proc = subprocess.Popen(
["psql", *base, "-q", "-f", "-"],
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=err_fh,
)
sent = 0
try:
while True:
chunk = src.read(1024 * 1024)
if not chunk:
break
proc.stdin.write(chunk)
sent += len(chunk)
_restore_prog(load_phase, sent, total, load_label)
except BrokenPipeError:
# psql exited early (e.g. fatal error); error is captured in stderr.
pass
finally:
try:
proc.stdin.close()
except Exception:
pass
proc.wait()
stderr_text = err_path.read_text(encoding="utf-8", errors="replace")
real_errors = _real_restore_errors(stderr_text)
if real_errors:
raise RuntimeError("psql restore failed: " + " | ".join(real_errors)[:500])
if proc.returncode != 0 and not real_errors:
raise RuntimeError(
f"psql exited with code {proc.returncode}: {stderr_text.strip()[:300] or 'unknown error'}"
)
finally:
err_path.unlink(missing_ok=True)
def _pg_dump_safety_to_local() -> Path:
"""Run pg_dump of the CURRENT database into the local store, with progress."""
db = os.environ.get("POSTGRES_DB", "novela")
stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
LOCAL_DUMP_DIR.mkdir(parents=True, exist_ok=True)
dest = LOCAL_DUMP_DIR / f"pre-restore-{db}-{stamp}.sql"
env = os.environ.copy()
env["PGPASSWORD"] = os.environ.get("POSTGRES_PASSWORD", "")
proc = subprocess.Popen(
_pg_dump_cmd(dest),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
while proc.poll() is None:
try:
_restore_prog("safety_dump", dest.stat().st_size, 0, "Creating safety snapshot")
except OSError:
pass
time.sleep(0.5)
_out, err = proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"pg_dump failed: {(err or '').strip()[:300] or 'unknown error'}")
return dest
def _download_dropbox_to_file(client: dropbox.Dropbox, src: str, dest: Path) -> None:
md, res = client.files_download(src)
total = int(getattr(md, "size", 0) or 0)
done = 0
_restore_prog("downloading", 0, total, "Downloading dump")
dest.parent.mkdir(parents=True, exist_ok=True)
with dest.open("wb") as f:
for chunk in res.iter_content(chunk_size=1024 * 1024):
if not chunk:
continue
f.write(chunk)
done += len(chunk)
_restore_prog("downloading", done, total, "Downloading dump")
def _restore_worker_sync(
*,
source: str,
dump_path: "Path | None" = None,
dropbox_name: str | None = None,
cleanup_path: str | None = None,
) -> None:
"""Background full-database restore with safety snapshot and rollback.
Phases reported via RESTORE_PROGRESS: downloading (Dropbox only) → safety_dump
→ resetting → loading → done | error. On a failed load the database is rolled
back to the pre-restore safety snapshot.
"""
safety_path: "Path | None" = None
try:
# 1. Obtain the dump as a local file.
if source == "dropbox":
client = _dbx()
postgres_root = _dropbox_join(_load_dropbox_root(), "postgres")
src = _dropbox_join(postgres_root, dropbox_name or "")
LOCAL_DUMP_DIR.mkdir(parents=True, exist_ok=True)
dump_path = LOCAL_DUMP_DIR / (dropbox_name or "download.sql")
_download_dropbox_to_file(client, src, dump_path)
try:
_enforce_local_dump_retention(_load_dropbox_retention_count())
except Exception:
pass
if dump_path is None or not Path(dump_path).is_file():
raise RuntimeError("Dump file not available for restore")
dump_path = Path(dump_path)
total = dump_path.stat().st_size
# 2. Safety snapshot of the current database (saved locally, token-free).
try:
safety_path = _pg_dump_safety_to_local()
try:
_enforce_local_dump_retention(_load_dropbox_retention_count())
except Exception:
pass
except Exception:
safety_path = None
# 3. + 4. Reset schema and load the dump.
try:
_apply_pg_dump_file(dump_path, total)
except Exception as restore_err:
if safety_path is not None and safety_path.exists():
try:
_apply_pg_dump_file(
safety_path,
safety_path.stat().st_size,
load_phase="rolling_back",
load_label="Rolling back (loading safety snapshot)",
reset_label="Rolling back (resetting schema)",
)
except Exception as rollback_err:
RESTORE_PROGRESS.update(
{"ok": False, "error": f"restore failed: {restore_err}; AND rollback failed: {rollback_err}"}
)
_restore_prog("error", 0, 0, "Failed")
return
RESTORE_PROGRESS.update(
{"ok": False, "error": f"restore failed and was rolled back to the pre-restore state: {restore_err}"}
)
_restore_prog("error", 0, 0, "Rolled back")
return
RESTORE_PROGRESS.update({"ok": False, "error": str(restore_err)})
_restore_prog("error", 0, 0, "Failed")
return
RESTORE_PROGRESS.update({"ok": True, "error": None, "size": total})
_restore_prog("done", total, total, "Done")
except Exception as e:
RESTORE_PROGRESS.update({"ok": False, "error": str(e)})
_restore_prog("error", 0, 0, "Failed")
finally:
RESTORE_PROGRESS["active"] = False
if cleanup_path:
try:
Path(cleanup_path).unlink(missing_ok=True)
except Exception:
pass
def _start_restore(*, name: str, **kwargs) -> bool:
"""Start a background restore if none is running. Returns False if busy."""
global RESTORE_TASK
if RESTORE_PROGRESS.get("active"):
return False
RESTORE_PROGRESS.update(
{
"active": True,
"phase": "starting",
"done": 0,
"total": 0,
"label": "Starting",
"name": name,
"size": 0,
"ok": None,
"error": None,
}
)
RESTORE_TASK = asyncio.create_task(asyncio.to_thread(_restore_worker_sync, **kwargs))
return True
def _list_pg_dump_paths(client: dropbox.Dropbox, postgres_root: str) -> list[str]:
files = _dropbox_list_files_recursive(client, postgres_root)
return sorted([p for p in files if p.endswith(".sql")], reverse=True)
def _save_local_dump(name: str, data: bytes) -> None:
LOCAL_DUMP_DIR.mkdir(parents=True, exist_ok=True)
safe = Path(name).name
if not safe.endswith(".sql"):
safe += ".sql"
(LOCAL_DUMP_DIR / safe).write_bytes(data)
def _local_dump_mtime(p: Path) -> float:
try:
return p.stat().st_mtime
except OSError:
return 0.0
def _list_local_dumps() -> list[Path]:
if not LOCAL_DUMP_DIR.exists():
return []
dumps = [p for p in LOCAL_DUMP_DIR.glob("*.sql") if p.is_file()]
return sorted(dumps, key=_local_dump_mtime, reverse=True)
def _enforce_local_dump_retention(keep_count: int) -> None:
keep = max(1, int(keep_count))
for old in _list_local_dumps()[keep:]:
try:
old.unlink()
except OSError:
pass
def _resolve_local_dump(name: str) -> Path | None:
safe = Path(name).name
if not safe.endswith(".sql"):
return None
candidate = (LOCAL_DUMP_DIR / safe).resolve()
try:
candidate.relative_to(LOCAL_DUMP_DIR.resolve())
except ValueError:
return None
return candidate if candidate.is_file() else None
def _has_running_backup() -> bool:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id
FROM backup_log
WHERE status = 'running' AND finished_at IS NULL
ORDER BY started_at DESC
"""
)
rows = [int(r[0]) for r in cur.fetchall()]
if not rows:
return False
active_ids = set(BACKUP_TASKS.keys())
stale_ids = [rid for rid in rows if rid not in active_ids]
if stale_ids:
cur.execute(
"""
UPDATE backup_log
SET status = 'error',
error_msg = COALESCE(error_msg, 'Interrupted: service restart or crash'),
finished_at = NOW()
WHERE id = ANY(%s)
""",
(stale_ids,),
)
return any(rid in active_ids for rid in rows)
def _insert_backup_log_running() -> int:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO backup_log (status, started_at)
VALUES ('running', NOW())
RETURNING id
"""
)
return int(cur.fetchone()[0])
def _finish_backup_log(
log_id: int,
*,
status: str,
files_count: int | None,
scanned_files: int | None,
size_bytes: int | None,
error_msg: str | None,
) -> None:
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE backup_log
SET status = %s,
files_count = %s,
scanned_files = %s,
size_bytes = %s,
error_msg = %s,
finished_at = NOW()
WHERE id = %s
""",
(status, files_count, scanned_files, size_bytes, error_msg, log_id),
)
def _list_snapshot_paths(client: dropbox.Dropbox, snapshots_root: str) -> list[str]:
files = _dropbox_list_files_recursive(client, snapshots_root)
return sorted([p for p in files if p.endswith(".json")], reverse=True)
def _load_snapshot_data(client: dropbox.Dropbox, snapshot_path: str) -> dict:
_meta, res = client.files_download(snapshot_path)
raw = res.content
parsed = json.loads(raw.decode("utf-8", errors="replace"))
return parsed if isinstance(parsed, dict) else {}
def _enforce_snapshot_retention(
client: dropbox.Dropbox,
snapshots_root: str,
keep_count: int,
) -> tuple[list[str], list[str]]:
all_snapshots = _list_snapshot_paths(client, snapshots_root)
keep = max(1, int(keep_count))
kept = all_snapshots[:keep]
to_delete = all_snapshots[keep:]
if to_delete:
_dropbox_delete_paths(client, to_delete)
return kept, to_delete
def _collect_hashes_from_snapshots(client: dropbox.Dropbox, snapshot_paths: list[str]) -> set[str]:
used: set[str] = set()
for path in snapshot_paths:
try:
snap = _load_snapshot_data(client, path)
except Exception:
continue
files = snap.get("files", {}) if isinstance(snap, dict) else {}
if not isinstance(files, dict):
continue
for item in files.values():
if not isinstance(item, dict):
continue
sha = str(item.get("sha256") or "").lower()
if len(sha) == 64 and all(c in "0123456789abcdef" for c in sha):
used.add(sha)
return used
def _prune_orphan_objects(client: dropbox.Dropbox, objects_root: str, referenced_hashes: set[str]) -> int:
object_files = _dropbox_list_files_recursive(client, objects_root)
to_delete: list[str] = []
for p in object_files:
name = Path(p).name.lower()
if len(name) == 64 and all(c in "0123456789abcdef" for c in name):
if name not in referenced_hashes:
to_delete.append(p)
return _dropbox_delete_paths(client, to_delete)
def _run_backup_internal(*, dry_run: bool, progress_key: int | None = None) -> tuple[int, int, int]:
def _prog(done: int, total: int, phase: str) -> None:
if progress_key is not None:
BACKUP_PROGRESS[progress_key] = {"done": done, "total": total, "phase": phase}
client = None if dry_run else _dbx()
manifest = _load_manifest()
files = _iter_library_files()
total_files = len(files)
_prog(0, total_files, "scanning")
uploaded_count = 0
uploaded_size = 0
new_manifest: dict[str, dict[str, float | int | str]] = {}
dropbox_root = _load_dropbox_root()
retention_count = _load_dropbox_retention_count()
objects_root = _dropbox_join(dropbox_root, "library_objects")
snapshots_root = _dropbox_join(dropbox_root, "library_snapshots")
if client is not None:
_ensure_dropbox_dir(client, objects_root)
_ensure_dropbox_dir(client, snapshots_root)
snapshot_files: dict[str, dict[str, float | int | str]] = {}
for idx, path in enumerate(files):
_prog(idx, total_files, "uploading")
rel = path.relative_to(LIBRARY_DIR).as_posix()
state = _current_file_state(path)
prev = manifest.get(rel, {}) if isinstance(manifest.get(rel), dict) else {}
sha256 = ""
if (
prev
and prev.get("mtime") == state["mtime"]
and prev.get("size") == state["size"]
and isinstance(prev.get("sha256"), str)
):
sha256 = str(prev.get("sha256"))
else:
sha256 = _sha256_file(path)
entry = {"mtime": state["mtime"], "size": state["size"], "sha256": sha256}
new_manifest[rel] = entry
snapshot_files[rel] = entry
object_target = _object_path(objects_root, sha256)
if client is not None:
if not _dropbox_exists(client, object_target):
data = path.read_bytes()
uploaded_size += _dropbox_upload_bytes(client, object_target, data)
uploaded_count += 1
else:
# Dry run reports potential upload work for changed objects.
if not prev or prev.get("sha256") != sha256:
uploaded_size += int(state["size"])
uploaded_count += 1
# Database-stored books: serialize each into the content-addressed object
# store and reference it from the snapshot so it can be restored per-book.
with get_db_conn() as conn:
with conn.cursor() as cur:
db_filenames = _db_book_filenames(cur)
for fn in db_filenames:
payload = _serialize_db_book(cur, fn)
if payload is None:
continue
data = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
sha256 = hashlib.sha256(data).hexdigest()
snapshot_files[fn] = {"size": len(data), "sha256": sha256, "storage": "db"}
object_target = _object_path(objects_root, sha256)
if client is not None:
if not _dropbox_exists(client, object_target):
uploaded_size += _dropbox_upload_bytes(client, object_target, data)
uploaded_count += 1
else:
uploaded_size += len(data)
uploaded_count += 1
_prog(total_files, total_files, "snapshot")
snapshot = {
"created_at": _now_iso(),
"retention_count": retention_count,
"files": snapshot_files,
}
snapshot_data = json.dumps(snapshot, sort_keys=True, separators=(",", ":")).encode("utf-8")
snapshot_name = _snapshot_name()
snapshot_target = _dropbox_join(snapshots_root, snapshot_name)
if client is not None:
uploaded_size += _dropbox_upload_bytes(client, snapshot_target, snapshot_data)
uploaded_count += 1
kept_snapshots, _deleted_snapshots = _enforce_snapshot_retention(
client, snapshots_root, retention_count
)
referenced_hashes = _collect_hashes_from_snapshots(client, kept_snapshots)
_prune_orphan_objects(client, objects_root, referenced_hashes)
else:
uploaded_size += len(snapshot_data)
uploaded_count += 1
_prog(total_files, total_files, "pg_dump")
dump_data, dump_name = _run_pg_dump()
dump_target = _dropbox_join(dropbox_root, "postgres", dump_name)
if client is not None:
uploaded_size += _dropbox_upload_bytes(client, dump_target, dump_data)
else:
uploaded_size += len(dump_data)
uploaded_count += 1
if not dry_run:
_save_manifest(new_manifest)
return total_files, uploaded_count, uploaded_size
@router.get("/backup", response_class=HTMLResponse)
async def backup_page(request: Request):
template = "backup.html"
if not Path("templates/backup.html").exists():
template = "settings.html"
return templates.TemplateResponse(request, template, {"active": "backup"})
@router.get("/api/backup/credentials")
async def backup_dropbox_credentials():
details = _dropbox_credential_details()
root_details = _dropbox_root_details()
retention_details = _dropbox_retention_details()
token = details.get("token", "")
preview = ""
if token:
preview = f"{token[:4]}...{token[-4:]}" if len(token) >= 10 else "(configured)"
app_key = _load_dropbox_app_key()
app_secret = _load_dropbox_app_secret()
return {
"configured": bool(token),
"token_preview": preview,
"updated_at": details.get("updated_at"),
"app_key_configured": bool(app_key and app_secret),
"dropbox_root": root_details.get("root", DEFAULT_DROPBOX_ROOT),
"root_updated_at": root_details.get("updated_at"),
"retention_count": int(retention_details.get("retention_count", DEFAULT_RETENTION_COUNT)),
"retention_updated_at": retention_details.get("updated_at"),
"schedule_enabled": _dropbox_schedule_details().get("enabled", DEFAULT_SCHEDULE_ENABLED),
"schedule_interval_hours": _dropbox_schedule_details().get("interval_hours", DEFAULT_SCHEDULE_INTERVAL_HOURS),
"schedule_updated_at": _dropbox_schedule_details().get("updated_at"),
}
@router.post("/api/backup/credentials")
async def backup_dropbox_credentials_save(request: Request):
body = {}
try:
body = await request.json()
except Exception:
pass
try:
existing_token = _load_dropbox_token()
token = (body.get("token") or "").strip() or existing_token
if not token:
return {"ok": False, "error": "Dropbox token is required."}
app_key = (body.get("app_key") or "").strip()
app_secret = (body.get("app_secret") or "").strip()
dropbox_root = _normalize_dropbox_root(body.get("dropbox_root") or _load_dropbox_root())
raw_retention = body.get("retention_count", _load_dropbox_retention_count())
try:
retention_count = max(1, int(raw_retention))
except Exception:
retention_count = DEFAULT_RETENTION_COUNT
schedule_enabled = bool(body.get("schedule_enabled", _load_backup_schedule()[0]))
raw_interval = body.get("schedule_interval_hours", _load_backup_schedule()[1])
try:
schedule_interval_hours = max(1, int(raw_interval))
except Exception:
schedule_interval_hours = DEFAULT_SCHEDULE_INTERVAL_HOURS
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO credentials (site, username, password, updated_at)
VALUES ('dropbox', %s, %s, NOW())
ON CONFLICT (site) DO UPDATE
SET username = EXCLUDED.username,
password = EXCLUDED.password,
updated_at = NOW()
""",
(encrypt_value(""), encrypt_value(token)),
)
if app_key:
_save_dropbox_app_key(app_key)
if app_secret:
_save_dropbox_app_secret(app_secret)
_save_dropbox_root(dropbox_root)
_save_dropbox_retention_count(retention_count)
_save_backup_schedule(schedule_enabled, schedule_interval_hours)
return {
"ok": True,
"dropbox_root": dropbox_root,
"retention_count": retention_count,
"schedule_enabled": schedule_enabled,
"schedule_interval_hours": schedule_interval_hours,
}
except Exception as e:
return {"ok": False, "error": str(e)}
@router.delete("/api/backup/credentials")
async def backup_dropbox_credentials_delete():
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""DELETE FROM credentials WHERE site IN (
'dropbox',
'dropbox_app_key',
'dropbox_app_secret',
'dropbox_backup_root',
'dropbox_backup_retention',
'dropbox_backup_schedule'
)"""
)
return {"ok": True}
@router.get("/api/backup/health")
async def backup_health():
token_present = bool(_load_dropbox_token())
pg_dump_path = shutil.which("pg_dump")
psql_path = shutil.which("psql")
dropbox_ok = False
dropbox_error = None
if token_present:
try:
_dbx()
dropbox_ok = True
except Exception as e:
dropbox_error = str(e)
dropbox_root = _load_dropbox_root()
retention_count = _load_dropbox_retention_count()
schedule_enabled, schedule_interval_hours = _load_backup_schedule()
return {
"token_present": token_present,
"dropbox_ok": dropbox_ok,
"dropbox_error": dropbox_error,
"dropbox_root": dropbox_root,
"retention_count": retention_count,
"schedule_enabled": schedule_enabled,
"schedule_interval_hours": schedule_interval_hours,
"pg_dump_available": bool(pg_dump_path),
"pg_dump_path": pg_dump_path,
"psql_available": bool(psql_path),
"psql_path": psql_path,
"library_exists": LIBRARY_DIR.exists(),
"library_path": str(LIBRARY_DIR.resolve()),
}
@router.get("/api/backup/status")
async def backup_status():
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, status, files_count, scanned_files, size_bytes, error_msg, started_at, finished_at
FROM backup_log
ORDER BY started_at DESC
LIMIT 1
"""
)
row = cur.fetchone()
if not row:
return {"status": "never"}
return {
"id": row[0],
"status": row[1],
"uploaded_files": row[2],
"scanned_files": row[3],
"size_bytes": row[4],
"error_msg": row[5],
"started_at": row[6].isoformat() if row[6] else None,
"finished_at": row[7].isoformat() if row[7] else None,
}
@router.get("/api/backup/history")
async def backup_history():
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, status, files_count, scanned_files, size_bytes, error_msg, started_at, finished_at
FROM backup_log
ORDER BY started_at DESC
LIMIT 20
"""
)
rows = cur.fetchall()
return [
{
"id": r[0],
"status": r[1],
"uploaded_files": r[2],
"scanned_files": r[3],
"size_bytes": r[4],
"error_msg": r[5],
"started_at": r[6].isoformat() if r[6] else None,
"finished_at": r[7].isoformat() if r[7] else None,
}
for r in rows
]
def _start_backup_task(*, dry_run: bool) -> int:
log_id = _insert_backup_log_running()
while len(BACKUP_TASKS) >= 50:
oldest = next(iter(BACKUP_TASKS))
BACKUP_TASKS.pop(oldest, None)
BACKUP_PROGRESS.pop(oldest, None)
task = asyncio.create_task(_run_backup_job(log_id, dry_run))
BACKUP_TASKS[log_id] = task
return log_id
def _is_scheduled_backup_due(interval_hours: int) -> bool:
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT finished_at
FROM backup_log
WHERE status = 'success' AND finished_at IS NOT NULL
ORDER BY finished_at DESC
LIMIT 1
"""
)
row = cur.fetchone()
if not row or not row[0]:
return True
last = row[0]
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
return (now - last).total_seconds() >= max(1, int(interval_hours)) * 3600
async def _scheduler_loop() -> None:
while True:
try:
enabled, interval_hours = _load_backup_schedule()
if enabled and not _has_running_backup() and _is_scheduled_backup_due(interval_hours):
_start_backup_task(dry_run=False)
except Exception:
# Keep scheduler alive; errors are visible in backup history when runs fail.
pass
await asyncio.sleep(60)
async def start_backup_scheduler() -> None:
global SCHEDULER_TASK
if SCHEDULER_TASK is None or SCHEDULER_TASK.done():
SCHEDULER_TASK = asyncio.create_task(_scheduler_loop())
async def stop_backup_scheduler() -> None:
global SCHEDULER_TASK
if SCHEDULER_TASK is not None:
SCHEDULER_TASK.cancel()
try:
await SCHEDULER_TASK
except asyncio.CancelledError:
pass
SCHEDULER_TASK = None
async def _run_backup_job(log_id: int, dry_run: bool) -> None:
BACKUP_PROGRESS[log_id] = {"done": 0, "total": 0, "phase": "starting"}
try:
scanned_files, files_count, size_bytes = await asyncio.to_thread(
_run_backup_internal, dry_run=dry_run, progress_key=log_id
)
_finish_backup_log(
log_id,
status="success",
files_count=files_count,
scanned_files=scanned_files,
size_bytes=size_bytes,
error_msg=None,
)
except Exception as e:
_finish_backup_log(
log_id,
status="error",
files_count=None,
scanned_files=None,
size_bytes=None,
error_msg=str(e),
)
finally:
BACKUP_TASKS.pop(log_id, None)
BACKUP_PROGRESS.pop(log_id, None)
@router.post("/api/backup/oauth/prepare")
async def oauth_prepare(request: Request):
"""
Sla app key + secret op en geef de Dropbox autorisatie-URL terug.
De gebruiker opent deze URL in de browser en krijgt een code te zien.
Gebruikt token_access_type=offline voor een refresh token dat niet verloopt.
"""
body = {}
try:
body = await request.json()
except Exception:
pass
app_key = (body.get("app_key") or "").strip()
app_secret = (body.get("app_secret") or "").strip()
if not app_key or not app_secret:
return {"ok": False, "error": "app_key and app_secret are required."}
_save_dropbox_app_key(app_key)
_save_dropbox_app_secret(app_secret)
params = urlencode({
"client_id": app_key,
"response_type": "code",
"token_access_type": "offline",
})
auth_url = f"https://www.dropbox.com/oauth2/authorize?{params}"
return {"ok": True, "auth_url": auth_url}
@router.post("/api/backup/oauth/exchange")
async def oauth_exchange(request: Request):
"""
Wissel de door de gebruiker ingevoerde autorisatiecode in voor een refresh token.
Slaat het refresh token op als het Dropbox-token.
"""
body = {}
try:
body = await request.json()
except Exception:
pass
code = (body.get("code") or "").strip()
if not code:
return {"ok": False, "error": "Authorization code is required."}
app_key = _load_dropbox_app_key()
app_secret = _load_dropbox_app_secret()
if not app_key or not app_secret:
return {"ok": False, "error": "App key and secret not found. Run prepare step first."}
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
"https://api.dropbox.com/oauth2/token",
data={
"code": code,
"grant_type": "authorization_code",
},
auth=(app_key, app_secret),
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPStatusError as e:
return {"ok": False, "error": f"Dropbox API error: {e.response.status_code} {e.response.text[:200]}"}
except Exception as e:
return {"ok": False, "error": str(e)}
refresh_token = data.get("refresh_token", "").strip()
if not refresh_token:
return {"ok": False, "error": "No refresh token in Dropbox response. Make sure token_access_type=offline was used."}
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO credentials (site, username, password, updated_at)
VALUES ('dropbox', %s, %s, NOW())
ON CONFLICT (site) DO UPDATE
SET username = EXCLUDED.username,
password = EXCLUDED.password,
updated_at = NOW()
""",
(encrypt_value(""), encrypt_value(refresh_token)),
)
return {"ok": True, "message": "Refresh token saved. Dropbox is now connected."}
@router.get("/api/backup/progress")
async def backup_progress():
if not BACKUP_PROGRESS:
return {"running": False}
log_id = max(BACKUP_PROGRESS.keys())
p = BACKUP_PROGRESS[log_id]
return {
"running": True,
"log_id": log_id,
"done": p.get("done", 0),
"total": p.get("total", 0),
"phase": p.get("phase", ""),
}
@router.post("/api/backup/run")
async def run_backup(request: Request):
body = {}
try:
body = await request.json()
except Exception:
pass
dry_run = bool(body.get("dry_run", False))
if _has_running_backup():
return {
"ok": False,
"status": "running",
"error": "A backup is already running.",
"finished_at": _now_iso(),
}
log_id = _start_backup_task(dry_run=dry_run)
return {
"ok": True,
"backup_id": log_id,
"status": "running",
"dry_run": dry_run,
"message": "Backup started in background.",
"started_at": _now_iso(),
}
def _parse_snapshot_date(name: str) -> str:
"""Parse 'snapshot-20260329-123456.json''2026-03-29T12:34:56Z'."""
stem = Path(name).stem # snapshot-20260329-123456
parts = stem.split("-")
if len(parts) >= 3:
d, t = parts[1], parts[2]
if len(d) == 8 and len(t) == 6:
return f"{d[:4]}-{d[4:6]}-{d[6:]}T{t[:2]}:{t[2:4]}:{t[4:]}Z"
return ""
def _entry_storage(rel: str, info: dict) -> str:
storage = str(info.get("storage") or "").strip().lower()
if storage:
return storage
return "db" if rel.startswith("db/") else "file"
def _download_and_restore(client: dropbox.Dropbox, objects_root: str, rel: str, info: dict) -> None:
sha256 = str(info.get("sha256") or "")
if not sha256:
raise ValueError("No sha256 in snapshot entry")
obj_path = _object_path(objects_root, sha256)
_meta, res = client.files_download(obj_path)
data = res.content
if _entry_storage(rel, info) == "db":
try:
payload = json.loads(data.decode("utf-8", errors="replace"))
except Exception as e:
raise ValueError(f"Invalid db-book snapshot object: {e}")
if not isinstance(payload, dict):
raise ValueError("db-book snapshot object is not an object")
_restore_db_book(rel, payload)
return
dest = LIBRARY_DIR / rel
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(data)
meta = scan_media(dest)
tags = [(s, "subject") for s in meta.get("subjects", [])]
with get_db_conn() as conn:
with conn:
upsert_book(conn, rel, meta, tags)
@router.get("/api/backup/snapshots")
async def list_snapshots():
try:
client = await asyncio.to_thread(_dbx)
except Exception as e:
return {"ok": False, "error": str(e), "snapshots": []}
dropbox_root = _load_dropbox_root()
snapshots_root = _dropbox_join(dropbox_root, "library_snapshots")
try:
paths = await asyncio.to_thread(_list_snapshot_paths, client, snapshots_root)
except Exception as e:
return {"ok": False, "error": str(e), "snapshots": []}
snapshots = [
{"name": Path(p).name, "created_at": _parse_snapshot_date(Path(p).name)}
for p in paths
]
return {"ok": True, "snapshots": snapshots}
@router.get("/api/backup/snapshots/{snapshot_name}/files")
async def snapshot_files(snapshot_name: str):
try:
client = await asyncio.to_thread(_dbx)
except Exception as e:
return {"ok": False, "error": str(e), "files": []}
dropbox_root = _load_dropbox_root()
snapshots_root = _dropbox_join(dropbox_root, "library_snapshots")
snapshot_path = _dropbox_join(snapshots_root, snapshot_name)
try:
snap = await asyncio.to_thread(_load_snapshot_data, client, snapshot_path)
except Exception as e:
return {"ok": False, "error": str(e), "files": []}
files_data = snap.get("files", {})
# db-books "exist" when their row is present in the library table, not on disk.
db_rels = [
rel
for rel, info in files_data.items()
if isinstance(info, dict) and _entry_storage(rel, info) == "db"
]
existing_db: set[str] = set()
if db_rels:
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT filename FROM library WHERE filename = ANY(%s)", (db_rels,)
)
existing_db = {r[0] for r in cur.fetchall()}
result = []
for rel, info in sorted(files_data.items()):
if not isinstance(info, dict):
continue
storage = _entry_storage(rel, info)
exists = rel in existing_db if storage == "db" else (LIBRARY_DIR / rel).exists()
result.append(
{
"path": rel,
"size": info.get("size", 0),
"sha256": info.get("sha256", ""),
"storage": storage,
"exists_locally": exists,
}
)
return {"ok": True, "snapshot": snapshot_name, "files": result}
@router.post("/api/backup/restore")
async def restore_files(request: Request):
body = {}
try:
body = await request.json()
except Exception:
pass
snapshot_name = (body.get("snapshot_name") or "").strip()
files_to_restore: list[str] = body.get("files", [])
if not snapshot_name:
return {"ok": False, "error": "snapshot_name is required"}
if not files_to_restore:
return {"ok": False, "error": "No files specified"}
try:
client = await asyncio.to_thread(_dbx)
except Exception as e:
return {"ok": False, "error": str(e)}
dropbox_root = _load_dropbox_root()
snapshots_root = _dropbox_join(dropbox_root, "library_snapshots")
objects_root = _dropbox_join(dropbox_root, "library_objects")
snapshot_path = _dropbox_join(snapshots_root, snapshot_name)
try:
snap = await asyncio.to_thread(_load_snapshot_data, client, snapshot_path)
except Exception as e:
return {"ok": False, "error": f"Failed to load snapshot: {e}"}
files_data = snap.get("files", {})
results = []
for rel in files_to_restore:
if rel not in files_data:
results.append({"path": rel, "ok": False, "error": "Not found in snapshot"})
continue
try:
await asyncio.to_thread(_download_and_restore, client, objects_root, rel, files_data[rel])
results.append({"path": rel, "ok": True})
except Exception as e:
results.append({"path": rel, "ok": False, "error": str(e)})
ok_count = sum(1 for r in results if r["ok"])
return {"ok": True, "restored": ok_count, "total": len(results), "results": results}
@router.get("/api/backup/postgres/dumps")
async def list_pg_dumps():
try:
client = await asyncio.to_thread(_dbx)
except Exception as e:
return {"ok": False, "error": str(e), "dumps": []}
dropbox_root = _load_dropbox_root()
postgres_root = _dropbox_join(dropbox_root, "postgres")
try:
paths = await asyncio.to_thread(_list_pg_dump_paths, client, postgres_root)
except Exception as e:
return {"ok": False, "error": str(e), "dumps": []}
dumps = [{"name": Path(p).name} for p in paths]
return {"ok": True, "dumps": dumps}
@router.post("/api/backup/postgres/restore")
async def restore_pg_dump(request: Request):
"""Restore the entire PostgreSQL database from a Dropbox pg_dump.
DESTRUCTIVE: drops and recreates the public schema before applying the
dump. This recovers everything, including database-stored books, but
overwrites the current database.
"""
body = {}
try:
body = await request.json()
except Exception:
pass
name = Path((body.get("name") or "").strip()).name
if not name or not name.endswith(".sql"):
return {"ok": False, "error": "A valid .sql dump name is required"}
if not shutil.which("psql"):
return {"ok": False, "error": "psql is not available in this container"}
if RESTORE_PROGRESS.get("active"):
return {"ok": False, "error": "A restore is already running."}
# Validate Dropbox access up front so a bad token fails fast (the actual
# download happens in the background worker with progress reporting).
try:
await asyncio.to_thread(_dbx)
except Exception as e:
return {"ok": False, "error": str(e)}
started = _start_restore(name=name, source="dropbox", dropbox_name=name)
if not started:
return {"ok": False, "error": "A restore is already running."}
return {"ok": True, "started": True, "name": name}
@router.get("/api/backup/local/dumps")
async def list_local_dumps():
"""List PostgreSQL dumps stored locally on disk (no Dropbox token needed)."""
dumps = []
for p in _list_local_dumps():
try:
size = p.stat().st_size
except OSError:
size = 0
dumps.append({"name": p.name, "size_bytes": size})
return {"ok": True, "dumps": dumps, "dir": str(LOCAL_DUMP_DIR)}
@router.post("/api/backup/local/restore")
async def restore_local_dump(request: Request):
"""Restore the database from a local dump file. No Dropbox token required."""
body = {}
try:
body = await request.json()
except Exception:
pass
name = (body.get("name") or "").strip()
path = _resolve_local_dump(name)
if path is None:
return {"ok": False, "error": "Local dump not found"}
if not shutil.which("psql"):
return {"ok": False, "error": "psql is not available in this container"}
if RESTORE_PROGRESS.get("active"):
return {"ok": False, "error": "A restore is already running."}
started = _start_restore(name=path.name, source="local", dump_path=path)
if not started:
return {"ok": False, "error": "A restore is already running."}
return {"ok": True, "started": True, "name": path.name}
@router.post("/api/backup/upload-restore")
async def upload_restore_dump(file: UploadFile = File(...)):
"""Restore the database from an uploaded .sql dump. No Dropbox token required.
The upload is written to a temp file and restored in the background (with
progress); a local pre-restore safety snapshot is taken first. The temp file
is removed afterwards (the uploaded dump itself is not persisted).
"""
if not shutil.which("psql"):
return {"ok": False, "error": "psql is not available in this container"}
if RESTORE_PROGRESS.get("active"):
return {"ok": False, "error": "A restore is already running."}
filename = Path(file.filename or "uploaded.sql").name
if not filename.endswith(".sql"):
return {"ok": False, "error": "Please upload a .sql dump file"}
try:
data = await file.read()
except Exception as e:
return {"ok": False, "error": f"Failed to read upload: {e}"}
if not data:
return {"ok": False, "error": "Uploaded file is empty"}
with NamedTemporaryFile(suffix=".sql", delete=False) as tmp:
tmp_path = Path(tmp.name)
tmp_path.write_bytes(data)
started = _start_restore(
name=filename, source="upload", dump_path=tmp_path, cleanup_path=str(tmp_path)
)
if not started:
tmp_path.unlink(missing_ok=True)
return {"ok": False, "error": "A restore is already running."}
return {"ok": True, "started": True, "name": filename}
@router.get("/api/backup/restore/progress")
async def restore_progress():
return dict(RESTORE_PROGRESS)