novela/containers/novela/routers/common.py
Ivo Oskamp b43366723c Add Bulk Import, Following, Incomplete, status overhaul, performance, and CBR fixes
- Bulk Import page: filename pattern parsing, shared metadata, duplicate detection (volume-aware), batch upload with progress
- Following page: track external author URLs; authors table; sidebar counter
- Incomplete view: non-archived books with publication_status ≠ Complete
- Status: added Temporary Hold, renamed Hiatus → Long-Term Hold; statusBadgeHtml() helper
- Status/want-to-read badges: dark fill + ring for readability on any cover colour
- Disk usage warning in sidebar (amber/red thresholds)
- Bulk delete batched via POST /library/bulk-delete
- CBR: magic bytes format detection + py7zr 7-zip support; unrar → proprietary unrar v6
- Performance: IntersectionObserver lazy covers, ETag 304, single DOM pass, json_agg tags
- Duplicate detection in library and Convert page warning
- All books Grid/List toggle; star ratings; reader text colour presets; bookmarks
- Docs: TECHNICAL.md and changelog updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 14:20:25 +02:00

485 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import html as _html
import io
import posixpath
import re
import zipfile as zf
from datetime import datetime, timezone
from pathlib import Path
import psycopg2
from bs4 import BeautifulSoup
from PIL import Image, ImageOps, UnidentifiedImageError
from cbr import cbr_cover_thumb, cbr_page_count
from db import get_db_conn
from pdf import pdf_cover_thumb, pdf_page_count, pdf_scan_metadata
LIBRARY_DIR = Path("library")
LIBRARY_DIR.mkdir(exist_ok=True)
LIBRARY_ROOT = LIBRARY_DIR.resolve()
COVER_W = 300
COVER_H = 450
def clean_segment(value: str, fallback: str, max_len: int) -> str:
txt = re.sub(r"\s+", " ", (value or "").strip())
txt = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", txt)
txt = re.sub(r"\.+$", "", txt).strip()
return (txt or fallback)[:max_len]
def resolve_library_path(filename: str) -> Path | None:
rel = Path(filename)
if rel.is_absolute() or any(part in {"", ".", ".."} for part in rel.parts):
return None
candidate = (LIBRARY_DIR / rel).resolve()
try:
candidate.relative_to(LIBRARY_ROOT)
except ValueError:
return None
return candidate
def media_type_from_suffix(path: Path) -> str:
ext = path.suffix.lower()
if ext == ".epub":
return "epub"
if ext == ".pdf":
return "pdf"
if ext in {".cbr", ".cbz"}:
return "cbr"
return ""
def parse_volume_str(value: int | str | None) -> tuple[int, str]:
"""Parse a volume string like '21a' or '0' into (index, suffix).
Returns (0, '') for anything unparseable.
index is clamped to 0999; suffix is lowercased alpha only, max 5 chars.
"""
s = str(value or "").strip()
m = re.match(r"^(\d+)([a-zA-Z]*)$", s)
if m:
idx = max(0, min(999, int(m.group(1))))
suffix = m.group(2).lower()[:5]
return idx, suffix
try:
return max(0, min(999, int(float(s)))), ""
except Exception:
return 0, ""
def coerce_series_index(value: int | str | None) -> int:
try:
return max(0, min(999, int(value or 0)))
except Exception:
return 0
def make_rel_path(*, media_type: str, publisher: str, author: str, title: str, series: str, series_index: int | str | None, series_suffix: str = "", ext: str = "") -> Path:
if media_type == "epub":
pub = clean_segment(publisher, "Unknown Publisher", 80)
auth = clean_segment(author, "Unknown Author", 80)
ttl = clean_segment(title, "Untitled", 140)
series_name = clean_segment(series, "", 80)
if series_name:
idx = coerce_series_index(series_index)
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
return Path("epub") / pub / auth / "Series" / series_name / f"{idx:03d}{sfx} - {ttl}.epub"
return Path("epub") / pub / auth / "Stories" / f"{ttl}.epub"
if media_type == "pdf":
pub = clean_segment(publisher, "Unknown Publisher", 80)
auth = clean_segment(author, "Unknown Author", 80)
ttl = clean_segment(title, "Untitled", 140)
return Path("pdf") / pub / auth / f"{ttl}.pdf"
# CBR / CBZ — preserve the original extension; default to .cbr
comics_ext = ext if ext in {".cbr", ".cbz"} else ".cbr"
pub = clean_segment(publisher, "Unknown Publisher", 80)
auth = clean_segment(author, "Unknown", 80)
ttl = clean_segment(title, "Untitled", 140)
series_name = clean_segment(series, "", 80)
if series_name:
idx = coerce_series_index(series_index)
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
return Path("comics") / pub / auth / "Series" / series_name / f"{idx:03d}{sfx} - {ttl}{comics_ext}"
return Path("comics") / pub / auth / f"{ttl}{comics_ext}"
def ensure_unique_rel_path(rel_path: Path) -> Path:
candidate = rel_path
suffix = candidate.suffix
stem = candidate.stem
counter = 2
while (LIBRARY_DIR / candidate).exists():
candidate = rel_path.with_name(f"{stem} ({counter}){suffix}")
counter += 1
return candidate
def extract_cover_from_epub(epub_path: Path) -> tuple[bytes, str] | None:
try:
with zf.ZipFile(epub_path, "r") as z:
names = z.namelist()
cover = next((n for n in names if "/Images/cover." in n or n.lower().endswith("/cover.jpg")), "")
if not cover:
return None
data = z.read(cover)
ext = Path(cover).suffix.lower()
mt = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".webp": "image/webp",
".gif": "image/gif",
}.get(ext, "image/jpeg")
return data, mt
except Exception:
return None
def make_cover_thumb_webp(image_bytes: bytes) -> bytes:
with Image.open(io.BytesIO(image_bytes)) as im:
im = ImageOps.exif_transpose(im)
if im.mode not in ("RGB", "RGBA"):
im = im.convert("RGB")
thumb = ImageOps.fit(im, (COVER_W, COVER_H), method=Image.Resampling.LANCZOS, centering=(0.5, 0.5))
out = io.BytesIO()
thumb.save(out, format="WEBP", quality=82, method=6)
return out.getvalue()
def upsert_cover_cache(conn, filename: str, mime_type: str, thumb_webp: bytes) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO library_cover_cache (filename, mime_type, thumb_webp, updated_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (filename) DO UPDATE SET
mime_type = EXCLUDED.mime_type,
thumb_webp = EXCLUDED.thumb_webp,
updated_at = NOW()
""",
(filename, mime_type, psycopg2.Binary(thumb_webp)),
)
def ensure_cover_cache_for_book(conn, filename: str, full_path: Path, media_type: str) -> bool:
try:
if media_type == "epub":
raw = extract_cover_from_epub(full_path)
if not raw:
return False
data, mt = raw
thumb = make_cover_thumb_webp(data)
upsert_cover_cache(conn, filename, mt, thumb)
return True
if media_type == "pdf":
thumb = pdf_cover_thumb(full_path)
upsert_cover_cache(conn, filename, "image/webp", thumb)
return True
if media_type == "cbr":
thumb = cbr_cover_thumb(full_path)
upsert_cover_cache(conn, filename, "image/webp", thumb)
return True
except (UnidentifiedImageError, OSError, ValueError, RuntimeError):
return False
return False
def prune_empty_dirs(start_dir: Path) -> None:
cur = start_dir.resolve()
try:
cur.relative_to(LIBRARY_ROOT)
except Exception:
return
while cur != LIBRARY_ROOT:
try:
cur.rmdir()
except OSError:
return
cur = cur.parent
def _find_opf_path(names: set[str], container_xml: str | None) -> str | None:
opf_path = "OEBPS/content.opf"
if container_xml:
m = re.search(r"full-path\s*=\s*['\"]([^'\"]+)['\"]", container_xml)
if m:
opf_path = m.group(1)
if opf_path in names:
return opf_path
candidates = sorted(n for n in names if n.lower().endswith(".opf"))
return candidates[0] if candidates else None
def scan_epub(path: Path) -> dict:
out = {
"has_cover": False,
"series": "",
"series_index": 0,
"series_suffix": "",
"title": "",
"publication_status": "",
"author": "",
"publisher": "",
"source_url": "",
"publish_date": "",
"subjects": [],
"description": "",
}
try:
with zf.ZipFile(path, "r") as z:
names = set(z.namelist())
out["has_cover"] = extract_cover_from_epub(path) is not None
container_xml = z.read("META-INF/container.xml").decode("utf-8", errors="replace") if "META-INF/container.xml" in names else None
opf_path = _find_opf_path(names, container_xml)
if not opf_path or opf_path not in names:
return out
opf = z.read(opf_path).decode("utf-8", errors="replace")
def _find(pat: str) -> str:
m = re.search(pat, opf, re.DOTALL | re.IGNORECASE)
return _html.unescape(m.group(1).strip()) if m else ""
out["title"] = _find(r"<(?:dc:)?title[^>]*>(.*?)</(?:dc:)?title>")
out["author"] = _find(r"<(?:dc:)?creator[^>]*>(.*?)</(?:dc:)?creator>")
out["publisher"] = _find(r"<(?:dc:)?publisher[^>]*>(.*?)</(?:dc:)?publisher>")
out["source_url"] = _find(r"<(?:dc:)?source[^>]*>(.*?)</(?:dc:)?source>")
out["description"] = _find(r"<(?:dc:)?description[^>]*>(.*?)</(?:dc:)?description>")
m = re.search(r'<meta[^>]*name="calibre:series"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
out["series"] = _html.unescape(m.group(1).strip())
m = re.search(r'<meta[^>]*name="calibre:series_index"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
try:
out["series_index"] = int(float(m.group(1)))
except Exception:
out["series_index"] = 0
m = re.search(r'<meta[^>]*name="novela:series_suffix"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
out["series_suffix"] = re.sub(r"[^a-z]", "", m.group(1).lower())[:5]
m = re.search(r'<meta[^>]*name="publication_status"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
out["publication_status"] = _html.unescape(m.group(1).strip())
pd = _find(r"<(?:dc:)?date[^>]*>(.*?)</(?:dc:)?date>")
if pd:
date_candidate = pd.split("T", 1)[0]
try:
out["publish_date"] = datetime.fromisoformat(date_candidate).date().isoformat()
except Exception:
out["publish_date"] = ""
out["subjects"] = [
_html.unescape(s.strip())
for s in re.findall(r"<(?:dc:)?subject[^>]*>(.*?)</(?:dc:)?subject>", opf, re.DOTALL | re.IGNORECASE)
if s.strip()
]
m = re.search(r'<meta[^>]*name="novela:rating"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
try:
out["rating"] = max(0, min(5, int(m.group(1))))
except Exception:
pass
except Exception:
pass
return out
def scan_cbz_rating(path: Path) -> int:
"""Read NovelaRating from ComicInfo.xml inside a CBZ (ZIP) file."""
try:
with zf.ZipFile(path, "r") as z:
names = {n.lower(): n for n in z.namelist()}
ci_key = names.get("comicinfo.xml")
if ci_key is None:
return 0
xml = z.read(ci_key).decode("utf-8", errors="replace")
m = re.search(r"<NovelaRating>(\d+)</NovelaRating>", xml)
if m:
return max(0, min(5, int(m.group(1))))
except Exception:
pass
return 0
def scan_media(path: Path) -> dict:
mt = media_type_from_suffix(path)
if mt == "epub":
meta = scan_epub(path)
elif mt == "pdf":
meta = pdf_scan_metadata(path)
elif mt == "cbr":
meta = {
"title": path.stem,
"author": "",
"publisher": "",
"series": "",
"series_index": 0,
"publication_status": "",
"has_cover": cbr_page_count(path) > 0,
"description": "",
"source_url": "",
"publish_date": "",
"subjects": [],
}
if path.suffix.lower() == ".cbz":
meta["rating"] = scan_cbz_rating(path)
else:
meta = {}
meta["media_type"] = mt
return meta
def upsert_book(conn, filename: str, meta: dict, tags: list[tuple[str, str]] | None = None) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO library (filename, media_type, title, author, publisher, has_cover,
series, series_index, series_suffix, publication_status, source_url,
publish_date, description, needs_review, want_to_read, rating, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, FALSE, %s, NOW())
ON CONFLICT (filename) DO UPDATE SET
media_type = EXCLUDED.media_type,
title = COALESCE(NULLIF(EXCLUDED.title, ''), library.title),
author = COALESCE(NULLIF(EXCLUDED.author, ''), library.author),
publisher = COALESCE(NULLIF(EXCLUDED.publisher, ''), library.publisher),
has_cover = (library.has_cover OR EXCLUDED.has_cover),
series = COALESCE(NULLIF(EXCLUDED.series, ''), library.series),
series_index = CASE WHEN COALESCE(EXCLUDED.series_index, 0) > 0 THEN EXCLUDED.series_index ELSE library.series_index END,
series_suffix = COALESCE(NULLIF(EXCLUDED.series_suffix, ''), library.series_suffix),
publication_status = COALESCE(NULLIF(EXCLUDED.publication_status, ''), library.publication_status),
source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), library.source_url),
publish_date = COALESCE(EXCLUDED.publish_date, library.publish_date),
description = COALESCE(NULLIF(EXCLUDED.description, ''), library.description),
rating = CASE WHEN EXCLUDED.rating > 0 THEN EXCLUDED.rating ELSE library.rating END,
updated_at = NOW()
""",
(
filename,
meta.get("media_type", "epub"),
meta.get("title", ""),
meta.get("author", ""),
meta.get("publisher", ""),
bool(meta.get("has_cover", False)),
meta.get("series", ""),
meta.get("series_index", 0),
meta.get("series_suffix", ""),
meta.get("publication_status", ""),
meta.get("source_url", ""),
meta.get("publish_date") or None,
meta.get("description", ""),
bool(meta.get("needs_review", False)),
max(0, min(5, int(meta.get("rating", 0) or 0))),
),
)
if tags is not None:
cur.execute("DELETE FROM book_tags WHERE filename = %s", (filename,))
rows = []
seen: set[tuple[str, str]] = set()
for tag, ttype in tags:
t = (tag or "").strip()
tp = (ttype or "").strip()
if not t or not tp:
continue
key = (t.casefold(), tp)
if key in seen:
continue
seen.add(key)
rows.append((filename, t, tp))
if rows:
cur.executemany(
"INSERT INTO book_tags (filename, tag, tag_type) VALUES (%s, %s, %s) ON CONFLICT (filename, tag, tag_type) DO NOTHING",
rows,
)
def list_library_json() -> list[dict]:
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT l.filename, l.media_type, l.title, l.author, l.publisher, l.has_cover,
l.series, l.series_index, l.publication_status, l.want_to_read,
l.archived, l.needs_review, l.updated_at,
rp.progress, rp.cfi, rp.page,
COALESCE(rs.read_count, 0)::int AS read_count,
rs.last_read,
(cc.filename IS NOT NULL) AS has_cached_cover,
l.rating,
COALESCE(l.series_suffix, '') AS series_suffix,
json_agg(
json_build_object('tag', bt.tag, 'tag_type', bt.tag_type)
) FILTER (WHERE bt.tag IS NOT NULL) AS tags
FROM library l
LEFT JOIN reading_progress rp ON rp.filename = l.filename
LEFT JOIN (
SELECT filename, COUNT(*)::int AS read_count, MAX(read_at) AS last_read
FROM reading_sessions
GROUP BY filename
) rs ON rs.filename = l.filename
LEFT JOIN library_cover_cache cc ON cc.filename = l.filename
LEFT JOIN book_tags bt ON bt.filename = l.filename
GROUP BY l.filename, l.media_type, l.title, l.author, l.publisher, l.has_cover,
l.series, l.series_index, l.publication_status, l.want_to_read,
l.archived, l.needs_review, l.updated_at,
rp.progress, rp.cfi, rp.page,
rs.read_count, rs.last_read,
cc.filename, l.rating, l.series_suffix
ORDER BY COALESCE(l.publisher, ''), COALESCE(l.author, ''), COALESCE(l.series, ''), l.series_index, COALESCE(l.title, '')
"""
)
rows = cur.fetchall()
out = []
for r in rows:
out.append(
{
"filename": r[0],
"media_type": r[1],
"title": r[2] or "",
"author": r[3] or "",
"publisher": r[4] or "",
"has_cover": bool(r[5]),
"has_cached_cover": bool(r[18]),
"series": r[6] or "",
"series_index": r[7] or 0,
"series_suffix": r[20] or "",
"publication_status": r[8] or "",
"want_to_read": bool(r[9]),
"archived": bool(r[10]),
"needs_review": bool(r[11]),
"updated_at": r[12].isoformat() if r[12] else None,
"progress": r[13] or 0,
"progress_cfi": r[14],
"page": r[15],
"read_count": r[16] or 0,
"last_read": r[17].isoformat() if r[17] else None,
"tags": r[21] or [],
"rating": r[19] or 0,
}
)
return out
def normalize_site(raw: str) -> str:
raw = (raw or "").strip()
if "://" in raw:
from urllib.parse import urlparse
raw = urlparse(raw).netloc
return re.sub(r"^www\.", "", raw).lower()
def relative_file_info(path: Path) -> dict:
stat = path.stat()
return {
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
}