novela/containers/novela/migrate_paths.py

260 lines
8.3 KiB
Python

"""
One-time migration: move all library files to the correct path structure
and update all database references.
Target structure:
epub/{publisher}/{author}/Stories/{title}.epub
epub/{publisher}/{author}/Series/{series}/{idx:03d} - {title}.epub
pdf/{publisher}/{author}/{title}.pdf
comics/{publisher}/{author}/{title}.cbr|cbz
Run inside the novela container:
python migrate_paths.py [--execute]
Without --execute: dry-run only (no files moved, no DB changes).
"""
import os
import re
import sys
from pathlib import Path
import psycopg2
LIBRARY_DIR = Path("library")
LIBRARY_ROOT = LIBRARY_DIR.resolve()
DRY_RUN = "--execute" not in sys.argv
# ---------------------------------------------------------------------------
# Path helpers (mirrors common.py / reader.py logic)
# ---------------------------------------------------------------------------
def _clean(value: str, fallback: str, max_len: int) -> str:
txt = re.sub(r"\s+", " ", (value or "").strip())
txt = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", txt)
txt = re.sub(r"\.+$", "", txt).strip()
if not txt:
txt = fallback
return txt[:max_len]
def _coerce_index(value) -> int:
try:
return max(1, min(999, int(value or 1)))
except Exception:
return 1
def correct_rel_path(filename: str, title: str, author: str, publisher: str,
series: str, series_index: int) -> Path:
"""Compute the correct relative path for a book based on current metadata."""
ext = Path(filename).suffix.lower()
pub = _clean(publisher, "Unknown Publisher", 80)
auth = _clean(author, "Unknown Author", 80)
ttl = _clean(title or Path(filename).stem, "Untitled", 140)
if ext == ".epub":
series_name = _clean(series or "", "", 80)
if series_name:
idx = _coerce_index(series_index)
return Path("epub") / pub / auth / "Series" / series_name / f"{idx:03d} - {ttl}.epub"
return Path("epub") / pub / auth / "Stories" / f"{ttl}.epub"
if ext == ".pdf":
return Path("pdf") / pub / auth / f"{ttl}.pdf"
# .cbr / .cbz
comics_ext = ext if ext in {".cbr", ".cbz"} else ".cbr"
return Path("comics") / pub / auth / f"{ttl}{comics_ext}"
def ensure_unique(rel_path: Path, exclude_current: Path) -> Path:
"""Add (2), (3), … suffix if target already exists (and isn't the current file)."""
candidate = rel_path
counter = 2
while True:
full = (LIBRARY_DIR / candidate).resolve()
if full == exclude_current.resolve():
return candidate
if not full.exists():
return candidate
candidate = rel_path.with_name(
f"{rel_path.stem} ({counter}){rel_path.suffix}"
)
counter += 1
def prune_empty_dirs(start: Path) -> None:
cur = start.resolve()
while cur != LIBRARY_ROOT:
try:
cur.rmdir()
except OSError:
return
cur = cur.parent
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
db_url = (
f"host=novela-db "
f"dbname={os.environ['POSTGRES_DB']} "
f"user={os.environ['POSTGRES_USER']} "
f"password={os.environ['POSTGRES_PASSWORD']}"
)
conn = psycopg2.connect(db_url)
with conn.cursor() as cur:
cur.execute("""
SELECT filename, title, author, publisher, series, series_index
FROM library
ORDER BY filename
""")
books = cur.fetchall()
print(f"Total books in DB: {len(books)}")
print(f"Mode: {'DRY RUN' if DRY_RUN else '*** EXECUTE ***'}")
print()
moves = []
skipped_missing = []
skipped_same = []
conflicts = []
for (filename, title, author, publisher, series, series_index) in books:
old_path = (LIBRARY_DIR / filename).resolve()
new_rel = correct_rel_path(filename, title or "", author or "",
publisher or "", series or "", series_index or 0)
new_rel = ensure_unique(new_rel, old_path)
new_path = (LIBRARY_DIR / new_rel).resolve()
if not old_path.exists():
skipped_missing.append(filename)
continue
if old_path == new_path:
skipped_same.append(filename)
continue
# Sanity: target already exists and is a different file
if new_path.exists() and new_path != old_path:
conflicts.append((filename, new_rel.as_posix()))
continue
moves.append((filename, old_path, new_rel.as_posix(), new_path))
# Report
print(f"Already correct: {len(skipped_same)}")
print(f"File missing: {len(skipped_missing)}")
print(f"Conflicts: {len(conflicts)}")
print(f"To move: {len(moves)}")
print()
if skipped_missing:
print("=== MISSING FILES (skipped) ===")
for f in skipped_missing:
print(f" {f}")
print()
if conflicts:
print("=== CONFLICTS (skipped) ===")
for old, new in conflicts:
print(f" {old}")
print(f"{new} (target exists!)")
print()
if not moves:
print("Nothing to do.")
conn.close()
return
print("=== MOVES ===")
for old_fn, old_path, new_fn, new_path in moves:
print(f" {old_fn}")
print(f"{new_fn}")
print()
if DRY_RUN:
print("Dry run complete. Run with --execute to apply changes.")
conn.close()
return
# Execute
print("Applying changes...")
moved = 0
errors = []
prunable = set()
for old_fn, old_path, new_fn, new_path in moves:
try:
# Move file
new_path.parent.mkdir(parents=True, exist_ok=True)
old_path.rename(new_path)
prunable.add(old_path.parent)
# Update DB in a transaction
with conn:
with conn.cursor() as cur:
# Copy library row with new filename
cur.execute("""
INSERT INTO library (
filename, title, author, publisher, has_cover, media_type,
series, series_index, publication_status, want_to_read,
source_url, archived, needs_review, updated_at,
publish_date, description, rating
)
SELECT %s, title, author, publisher, has_cover, media_type,
series, series_index, publication_status, want_to_read,
source_url, archived, needs_review, updated_at,
publish_date, description, rating
FROM library WHERE filename = %s
""", (new_fn, old_fn))
# Update child tables
for table in ("book_tags", "reading_progress",
"reading_sessions", "library_cover_cache"):
cur.execute(
f"UPDATE {table} SET filename = %s WHERE filename = %s",
(new_fn, old_fn)
)
# Delete old library row (cascade removes any remaining child rows)
cur.execute("DELETE FROM library WHERE filename = %s", (old_fn,))
moved += 1
print(f" [{moved}/{len(moves)}] {old_fn}{new_fn}")
except Exception as e:
errors.append((old_fn, str(e)))
# Try to move file back if DB failed
if new_path.exists() and not old_path.exists():
try:
old_path.parent.mkdir(parents=True, exist_ok=True)
new_path.rename(old_path)
except Exception:
pass
print(f" ERROR: {old_fn}: {e}")
# Prune empty directories
print("\nPruning empty directories...")
for d in prunable:
prune_empty_dirs(d)
print()
print(f"Done. Moved: {moved}, Errors: {len(errors)}, Skipped (conflict): {len(conflicts)}, Missing: {len(skipped_missing)}")
if errors:
print("\nErrors:")
for fn, err in errors:
print(f" {fn}: {err}")
conn.close()
if __name__ == "__main__":
main()