- IntersectionObserver defers both cover images and placeholder canvas drawing until cards enter viewport — eliminates 1000+ upfront ops - ETag on /library/list: browser gets 304 Not Modified when nothing changed - Single DOM pass in renderBooksGrid/renderDuplicatesView/renderSeriesDetail: card.querySelector replaces second iteration with 500+ getElementById calls - book_tags joined via json_agg in main query, removing separate SELECT + Python merge - loadLibrary: error handling prevents silent failures showing as infinite loading - Delete TODO-PERF-library-load.md (all four bottlenecks resolved) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
480 lines
18 KiB
Python
480 lines
18 KiB
Python
import base64
|
||
import html as _html
|
||
import io
|
||
import posixpath
|
||
import re
|
||
import zipfile as zf
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
import psycopg2
|
||
from bs4 import BeautifulSoup
|
||
from PIL import Image, ImageOps, UnidentifiedImageError
|
||
|
||
from cbr import cbr_cover_thumb, cbr_page_count
|
||
from db import get_db_conn
|
||
from pdf import pdf_cover_thumb, pdf_page_count, pdf_scan_metadata
|
||
|
||
LIBRARY_DIR = Path("library")
|
||
LIBRARY_DIR.mkdir(exist_ok=True)
|
||
LIBRARY_ROOT = LIBRARY_DIR.resolve()
|
||
COVER_W = 300
|
||
COVER_H = 450
|
||
|
||
|
||
def clean_segment(value: str, fallback: str, max_len: int) -> str:
|
||
txt = re.sub(r"\s+", " ", (value or "").strip())
|
||
txt = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", txt)
|
||
txt = re.sub(r"\.+$", "", txt).strip()
|
||
return (txt or fallback)[:max_len]
|
||
|
||
|
||
def resolve_library_path(filename: str) -> Path | None:
|
||
rel = Path(filename)
|
||
if rel.is_absolute() or any(part in {"", ".", ".."} for part in rel.parts):
|
||
return None
|
||
candidate = (LIBRARY_DIR / rel).resolve()
|
||
try:
|
||
candidate.relative_to(LIBRARY_ROOT)
|
||
except ValueError:
|
||
return None
|
||
return candidate
|
||
|
||
|
||
def media_type_from_suffix(path: Path) -> str:
|
||
ext = path.suffix.lower()
|
||
if ext == ".epub":
|
||
return "epub"
|
||
if ext == ".pdf":
|
||
return "pdf"
|
||
if ext in {".cbr", ".cbz"}:
|
||
return "cbr"
|
||
return ""
|
||
|
||
|
||
def parse_volume_str(value: int | str | None) -> tuple[int, str]:
|
||
"""Parse a volume string like '21a' or '0' into (index, suffix).
|
||
|
||
Returns (0, '') for anything unparseable.
|
||
index is clamped to 0–999; suffix is lowercased alpha only, max 5 chars.
|
||
"""
|
||
s = str(value or "").strip()
|
||
m = re.match(r"^(\d+)([a-zA-Z]*)$", s)
|
||
if m:
|
||
idx = max(0, min(999, int(m.group(1))))
|
||
suffix = m.group(2).lower()[:5]
|
||
return idx, suffix
|
||
try:
|
||
return max(0, min(999, int(float(s)))), ""
|
||
except Exception:
|
||
return 0, ""
|
||
|
||
|
||
def coerce_series_index(value: int | str | None) -> int:
|
||
try:
|
||
return max(0, min(999, int(value or 0)))
|
||
except Exception:
|
||
return 0
|
||
|
||
|
||
def make_rel_path(*, media_type: str, publisher: str, author: str, title: str, series: str, series_index: int | str | None, series_suffix: str = "", ext: str = "") -> Path:
|
||
if media_type == "epub":
|
||
pub = clean_segment(publisher, "Unknown Publisher", 80)
|
||
auth = clean_segment(author, "Unknown Author", 80)
|
||
ttl = clean_segment(title, "Untitled", 140)
|
||
series_name = clean_segment(series, "", 80)
|
||
if series_name:
|
||
idx = coerce_series_index(series_index)
|
||
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
|
||
return Path("epub") / pub / auth / "Series" / series_name / f"{idx:03d}{sfx} - {ttl}.epub"
|
||
return Path("epub") / pub / auth / "Stories" / f"{ttl}.epub"
|
||
|
||
if media_type == "pdf":
|
||
pub = clean_segment(publisher, "Unknown Publisher", 80)
|
||
auth = clean_segment(author, "Unknown Author", 80)
|
||
ttl = clean_segment(title, "Untitled", 140)
|
||
return Path("pdf") / pub / auth / f"{ttl}.pdf"
|
||
|
||
# CBR / CBZ — preserve the original extension; default to .cbr
|
||
comics_ext = ext if ext in {".cbr", ".cbz"} else ".cbr"
|
||
pub = clean_segment(publisher, "Unknown Publisher", 80)
|
||
auth = clean_segment(author, "Unknown", 80)
|
||
ttl = clean_segment(title, "Untitled", 140)
|
||
return Path("comics") / pub / auth / f"{ttl}{comics_ext}"
|
||
|
||
|
||
def ensure_unique_rel_path(rel_path: Path) -> Path:
|
||
candidate = rel_path
|
||
suffix = candidate.suffix
|
||
stem = candidate.stem
|
||
counter = 2
|
||
while (LIBRARY_DIR / candidate).exists():
|
||
candidate = rel_path.with_name(f"{stem} ({counter}){suffix}")
|
||
counter += 1
|
||
return candidate
|
||
|
||
|
||
def extract_cover_from_epub(epub_path: Path) -> tuple[bytes, str] | None:
|
||
try:
|
||
with zf.ZipFile(epub_path, "r") as z:
|
||
names = z.namelist()
|
||
cover = next((n for n in names if "/Images/cover." in n or n.lower().endswith("/cover.jpg")), "")
|
||
if not cover:
|
||
return None
|
||
data = z.read(cover)
|
||
ext = Path(cover).suffix.lower()
|
||
mt = {
|
||
".jpg": "image/jpeg",
|
||
".jpeg": "image/jpeg",
|
||
".png": "image/png",
|
||
".webp": "image/webp",
|
||
".gif": "image/gif",
|
||
}.get(ext, "image/jpeg")
|
||
return data, mt
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def make_cover_thumb_webp(image_bytes: bytes) -> bytes:
|
||
with Image.open(io.BytesIO(image_bytes)) as im:
|
||
im = ImageOps.exif_transpose(im)
|
||
if im.mode not in ("RGB", "RGBA"):
|
||
im = im.convert("RGB")
|
||
thumb = ImageOps.fit(im, (COVER_W, COVER_H), method=Image.Resampling.LANCZOS, centering=(0.5, 0.5))
|
||
out = io.BytesIO()
|
||
thumb.save(out, format="WEBP", quality=82, method=6)
|
||
return out.getvalue()
|
||
|
||
|
||
def upsert_cover_cache(conn, filename: str, mime_type: str, thumb_webp: bytes) -> None:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO library_cover_cache (filename, mime_type, thumb_webp, updated_at)
|
||
VALUES (%s, %s, %s, NOW())
|
||
ON CONFLICT (filename) DO UPDATE SET
|
||
mime_type = EXCLUDED.mime_type,
|
||
thumb_webp = EXCLUDED.thumb_webp,
|
||
updated_at = NOW()
|
||
""",
|
||
(filename, mime_type, psycopg2.Binary(thumb_webp)),
|
||
)
|
||
|
||
|
||
def ensure_cover_cache_for_book(conn, filename: str, full_path: Path, media_type: str) -> bool:
|
||
try:
|
||
if media_type == "epub":
|
||
raw = extract_cover_from_epub(full_path)
|
||
if not raw:
|
||
return False
|
||
data, mt = raw
|
||
thumb = make_cover_thumb_webp(data)
|
||
upsert_cover_cache(conn, filename, mt, thumb)
|
||
return True
|
||
if media_type == "pdf":
|
||
thumb = pdf_cover_thumb(full_path)
|
||
upsert_cover_cache(conn, filename, "image/webp", thumb)
|
||
return True
|
||
if media_type == "cbr":
|
||
thumb = cbr_cover_thumb(full_path)
|
||
upsert_cover_cache(conn, filename, "image/webp", thumb)
|
||
return True
|
||
except (UnidentifiedImageError, OSError, ValueError, RuntimeError):
|
||
return False
|
||
return False
|
||
|
||
|
||
def prune_empty_dirs(start_dir: Path) -> None:
|
||
cur = start_dir.resolve()
|
||
try:
|
||
cur.relative_to(LIBRARY_ROOT)
|
||
except Exception:
|
||
return
|
||
while cur != LIBRARY_ROOT:
|
||
try:
|
||
cur.rmdir()
|
||
except OSError:
|
||
return
|
||
cur = cur.parent
|
||
|
||
|
||
def _find_opf_path(names: set[str], container_xml: str | None) -> str | None:
|
||
opf_path = "OEBPS/content.opf"
|
||
if container_xml:
|
||
m = re.search(r"full-path\s*=\s*['\"]([^'\"]+)['\"]", container_xml)
|
||
if m:
|
||
opf_path = m.group(1)
|
||
if opf_path in names:
|
||
return opf_path
|
||
candidates = sorted(n for n in names if n.lower().endswith(".opf"))
|
||
return candidates[0] if candidates else None
|
||
|
||
|
||
def scan_epub(path: Path) -> dict:
|
||
out = {
|
||
"has_cover": False,
|
||
"series": "",
|
||
"series_index": 0,
|
||
"series_suffix": "",
|
||
"title": "",
|
||
"publication_status": "",
|
||
"author": "",
|
||
"publisher": "",
|
||
"source_url": "",
|
||
"publish_date": "",
|
||
"subjects": [],
|
||
"description": "",
|
||
}
|
||
try:
|
||
with zf.ZipFile(path, "r") as z:
|
||
names = set(z.namelist())
|
||
out["has_cover"] = extract_cover_from_epub(path) is not None
|
||
container_xml = z.read("META-INF/container.xml").decode("utf-8", errors="replace") if "META-INF/container.xml" in names else None
|
||
opf_path = _find_opf_path(names, container_xml)
|
||
if not opf_path or opf_path not in names:
|
||
return out
|
||
opf = z.read(opf_path).decode("utf-8", errors="replace")
|
||
|
||
def _find(pat: str) -> str:
|
||
m = re.search(pat, opf, re.DOTALL | re.IGNORECASE)
|
||
return _html.unescape(m.group(1).strip()) if m else ""
|
||
|
||
out["title"] = _find(r"<(?:dc:)?title[^>]*>(.*?)</(?:dc:)?title>")
|
||
out["author"] = _find(r"<(?:dc:)?creator[^>]*>(.*?)</(?:dc:)?creator>")
|
||
out["publisher"] = _find(r"<(?:dc:)?publisher[^>]*>(.*?)</(?:dc:)?publisher>")
|
||
out["source_url"] = _find(r"<(?:dc:)?source[^>]*>(.*?)</(?:dc:)?source>")
|
||
out["description"] = _find(r"<(?:dc:)?description[^>]*>(.*?)</(?:dc:)?description>")
|
||
|
||
m = re.search(r'<meta[^>]*name="calibre:series"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
|
||
if m:
|
||
out["series"] = _html.unescape(m.group(1).strip())
|
||
m = re.search(r'<meta[^>]*name="calibre:series_index"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
|
||
if m:
|
||
try:
|
||
out["series_index"] = int(float(m.group(1)))
|
||
except Exception:
|
||
out["series_index"] = 0
|
||
m = re.search(r'<meta[^>]*name="novela:series_suffix"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
|
||
if m:
|
||
out["series_suffix"] = re.sub(r"[^a-z]", "", m.group(1).lower())[:5]
|
||
m = re.search(r'<meta[^>]*name="publication_status"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
|
||
if m:
|
||
out["publication_status"] = _html.unescape(m.group(1).strip())
|
||
|
||
pd = _find(r"<(?:dc:)?date[^>]*>(.*?)</(?:dc:)?date>")
|
||
if pd:
|
||
date_candidate = pd.split("T", 1)[0]
|
||
try:
|
||
out["publish_date"] = datetime.fromisoformat(date_candidate).date().isoformat()
|
||
except Exception:
|
||
out["publish_date"] = ""
|
||
|
||
out["subjects"] = [
|
||
_html.unescape(s.strip())
|
||
for s in re.findall(r"<(?:dc:)?subject[^>]*>(.*?)</(?:dc:)?subject>", opf, re.DOTALL | re.IGNORECASE)
|
||
if s.strip()
|
||
]
|
||
m = re.search(r'<meta[^>]*name="novela:rating"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
|
||
if m:
|
||
try:
|
||
out["rating"] = max(0, min(5, int(m.group(1))))
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
return out
|
||
|
||
|
||
def scan_cbz_rating(path: Path) -> int:
|
||
"""Read NovelaRating from ComicInfo.xml inside a CBZ (ZIP) file."""
|
||
try:
|
||
with zf.ZipFile(path, "r") as z:
|
||
names = {n.lower(): n for n in z.namelist()}
|
||
ci_key = names.get("comicinfo.xml")
|
||
if ci_key is None:
|
||
return 0
|
||
xml = z.read(ci_key).decode("utf-8", errors="replace")
|
||
m = re.search(r"<NovelaRating>(\d+)</NovelaRating>", xml)
|
||
if m:
|
||
return max(0, min(5, int(m.group(1))))
|
||
except Exception:
|
||
pass
|
||
return 0
|
||
|
||
|
||
def scan_media(path: Path) -> dict:
|
||
mt = media_type_from_suffix(path)
|
||
if mt == "epub":
|
||
meta = scan_epub(path)
|
||
elif mt == "pdf":
|
||
meta = pdf_scan_metadata(path)
|
||
elif mt == "cbr":
|
||
meta = {
|
||
"title": path.stem,
|
||
"author": "",
|
||
"publisher": "",
|
||
"series": "",
|
||
"series_index": 0,
|
||
"publication_status": "",
|
||
"has_cover": cbr_page_count(path) > 0,
|
||
"description": "",
|
||
"source_url": "",
|
||
"publish_date": "",
|
||
"subjects": [],
|
||
}
|
||
if path.suffix.lower() == ".cbz":
|
||
meta["rating"] = scan_cbz_rating(path)
|
||
else:
|
||
meta = {}
|
||
meta["media_type"] = mt
|
||
return meta
|
||
|
||
|
||
def upsert_book(conn, filename: str, meta: dict, tags: list[tuple[str, str]] | None = None) -> None:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO library (filename, media_type, title, author, publisher, has_cover,
|
||
series, series_index, series_suffix, publication_status, source_url,
|
||
publish_date, description, needs_review, want_to_read, rating, updated_at)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, FALSE, %s, NOW())
|
||
ON CONFLICT (filename) DO UPDATE SET
|
||
media_type = EXCLUDED.media_type,
|
||
title = COALESCE(NULLIF(EXCLUDED.title, ''), library.title),
|
||
author = COALESCE(NULLIF(EXCLUDED.author, ''), library.author),
|
||
publisher = COALESCE(NULLIF(EXCLUDED.publisher, ''), library.publisher),
|
||
has_cover = (library.has_cover OR EXCLUDED.has_cover),
|
||
series = COALESCE(NULLIF(EXCLUDED.series, ''), library.series),
|
||
series_index = CASE WHEN COALESCE(EXCLUDED.series_index, 0) > 0 THEN EXCLUDED.series_index ELSE library.series_index END,
|
||
series_suffix = COALESCE(NULLIF(EXCLUDED.series_suffix, ''), library.series_suffix),
|
||
publication_status = COALESCE(NULLIF(EXCLUDED.publication_status, ''), library.publication_status),
|
||
source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), library.source_url),
|
||
publish_date = COALESCE(EXCLUDED.publish_date, library.publish_date),
|
||
description = COALESCE(NULLIF(EXCLUDED.description, ''), library.description),
|
||
rating = CASE WHEN EXCLUDED.rating > 0 THEN EXCLUDED.rating ELSE library.rating END,
|
||
updated_at = NOW()
|
||
""",
|
||
(
|
||
filename,
|
||
meta.get("media_type", "epub"),
|
||
meta.get("title", ""),
|
||
meta.get("author", ""),
|
||
meta.get("publisher", ""),
|
||
bool(meta.get("has_cover", False)),
|
||
meta.get("series", ""),
|
||
meta.get("series_index", 0),
|
||
meta.get("series_suffix", ""),
|
||
meta.get("publication_status", ""),
|
||
meta.get("source_url", ""),
|
||
meta.get("publish_date") or None,
|
||
meta.get("description", ""),
|
||
bool(meta.get("needs_review", False)),
|
||
max(0, min(5, int(meta.get("rating", 0) or 0))),
|
||
),
|
||
)
|
||
|
||
if tags is not None:
|
||
cur.execute("DELETE FROM book_tags WHERE filename = %s", (filename,))
|
||
rows = []
|
||
seen: set[tuple[str, str]] = set()
|
||
for tag, ttype in tags:
|
||
t = (tag or "").strip()
|
||
tp = (ttype or "").strip()
|
||
if not t or not tp:
|
||
continue
|
||
key = (t.casefold(), tp)
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
rows.append((filename, t, tp))
|
||
if rows:
|
||
cur.executemany(
|
||
"INSERT INTO book_tags (filename, tag, tag_type) VALUES (%s, %s, %s) ON CONFLICT (filename, tag, tag_type) DO NOTHING",
|
||
rows,
|
||
)
|
||
|
||
|
||
def list_library_json() -> list[dict]:
|
||
with get_db_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT l.filename, l.media_type, l.title, l.author, l.publisher, l.has_cover,
|
||
l.series, l.series_index, l.publication_status, l.want_to_read,
|
||
l.archived, l.needs_review, l.updated_at,
|
||
rp.progress, rp.cfi, rp.page,
|
||
COALESCE(rs.read_count, 0)::int AS read_count,
|
||
rs.last_read,
|
||
(cc.filename IS NOT NULL) AS has_cached_cover,
|
||
l.rating,
|
||
COALESCE(l.series_suffix, '') AS series_suffix,
|
||
json_agg(
|
||
json_build_object('tag', bt.tag, 'tag_type', bt.tag_type)
|
||
) FILTER (WHERE bt.tag IS NOT NULL) AS tags
|
||
FROM library l
|
||
LEFT JOIN reading_progress rp ON rp.filename = l.filename
|
||
LEFT JOIN (
|
||
SELECT filename, COUNT(*)::int AS read_count, MAX(read_at) AS last_read
|
||
FROM reading_sessions
|
||
GROUP BY filename
|
||
) rs ON rs.filename = l.filename
|
||
LEFT JOIN library_cover_cache cc ON cc.filename = l.filename
|
||
LEFT JOIN book_tags bt ON bt.filename = l.filename
|
||
GROUP BY l.filename, l.media_type, l.title, l.author, l.publisher, l.has_cover,
|
||
l.series, l.series_index, l.publication_status, l.want_to_read,
|
||
l.archived, l.needs_review, l.updated_at,
|
||
rp.progress, rp.cfi, rp.page,
|
||
rs.read_count, rs.last_read,
|
||
cc.filename, l.rating, l.series_suffix
|
||
ORDER BY COALESCE(l.publisher, ''), COALESCE(l.author, ''), COALESCE(l.series, ''), l.series_index, COALESCE(l.title, '')
|
||
"""
|
||
)
|
||
rows = cur.fetchall()
|
||
|
||
out = []
|
||
for r in rows:
|
||
out.append(
|
||
{
|
||
"filename": r[0],
|
||
"media_type": r[1],
|
||
"title": r[2] or "",
|
||
"author": r[3] or "",
|
||
"publisher": r[4] or "",
|
||
"has_cover": bool(r[5]),
|
||
"has_cached_cover": bool(r[18]),
|
||
"series": r[6] or "",
|
||
"series_index": r[7] or 0,
|
||
"series_suffix": r[20] or "",
|
||
"publication_status": r[8] or "",
|
||
"want_to_read": bool(r[9]),
|
||
"archived": bool(r[10]),
|
||
"needs_review": bool(r[11]),
|
||
"updated_at": r[12].isoformat() if r[12] else None,
|
||
"progress": r[13] or 0,
|
||
"progress_cfi": r[14],
|
||
"page": r[15],
|
||
"read_count": r[16] or 0,
|
||
"last_read": r[17].isoformat() if r[17] else None,
|
||
"tags": r[21] or [],
|
||
"rating": r[19] or 0,
|
||
}
|
||
)
|
||
return out
|
||
|
||
|
||
def normalize_site(raw: str) -> str:
|
||
raw = (raw or "").strip()
|
||
if "://" in raw:
|
||
from urllib.parse import urlparse
|
||
|
||
raw = urlparse(raw).netloc
|
||
return re.sub(r"^www\.", "", raw).lower()
|
||
|
||
|
||
def relative_file_info(path: Path) -> dict:
|
||
stat = path.stat()
|
||
return {
|
||
"size": stat.st_size,
|
||
"modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
|
||
}
|