import base64
import hashlib
import html as _html
import io
import posixpath
import re
import zipfile as zf
from datetime import datetime, timezone
from pathlib import Path
import psycopg2
from bs4 import BeautifulSoup
from PIL import Image, ImageOps, UnidentifiedImageError
from cbr import cbr_cover_thumb, cbr_page_count
from db import get_db_conn
from pdf import pdf_cover_thumb, pdf_page_count, pdf_scan_metadata
LIBRARY_DIR = Path("library")
LIBRARY_DIR.mkdir(exist_ok=True)
LIBRARY_ROOT = LIBRARY_DIR.resolve()
IMAGES_DIR = LIBRARY_DIR / "images"
COVER_W = 300
COVER_H = 450
def is_db_filename(filename: str) -> bool:
"""True if the filename is a synthetic DB-stored book path (no file on disk)."""
return (filename or "").startswith("db/")
def clean_segment(value: str, fallback: str, max_len: int) -> str:
txt = re.sub(r"\s+", " ", (value or "").strip())
txt = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", txt)
txt = re.sub(r"\.+$", "", txt).strip()
return (txt or fallback)[:max_len]
def resolve_library_path(filename: str) -> Path | None:
rel = Path(filename)
if rel.is_absolute() or any(part in {"", ".", ".."} for part in rel.parts):
return None
candidate = (LIBRARY_DIR / rel).resolve()
try:
candidate.relative_to(LIBRARY_ROOT)
except ValueError:
return None
return candidate
def media_type_from_suffix(path: Path) -> str:
ext = path.suffix.lower()
if ext == ".epub":
return "epub"
if ext == ".pdf":
return "pdf"
if ext in {".cbr", ".cbz"}:
return "cbr"
return ""
def parse_volume_str(value: int | str | None) -> tuple[int, str]:
"""Parse a volume string like '21a' or '0' into (index, suffix).
Returns (0, '') for anything unparseable.
index is clamped to 0–999; suffix is lowercased alpha only, max 5 chars.
"""
s = str(value or "").strip()
m = re.match(r"^(\d+)([a-zA-Z]*)$", s)
if m:
idx = max(0, min(999, int(m.group(1))))
suffix = m.group(2).lower()[:5]
return idx, suffix
try:
return max(0, min(999, int(float(s)))), ""
except Exception:
return 0, ""
def coerce_series_index(value: int | str | None) -> int:
try:
return max(0, min(999, int(value or 0)))
except Exception:
return 0
def make_rel_path(*, media_type: str, publisher: str, author: str, title: str, series: str, series_index: int | str | None, series_suffix: str = "", ext: str = "") -> Path:
if media_type == "db":
pub = clean_segment(publisher, "Unknown Publisher", 80)
auth = clean_segment(author, "Unknown Author", 80)
ttl = clean_segment(title, "Untitled", 140)
series_name = clean_segment(series, "", 80)
if series_name:
idx = coerce_series_index(series_index)
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
return Path("db") / pub / auth / "Series" / series_name / f"{idx:03d}{sfx} - {ttl}"
return Path("db") / pub / auth / ttl
if media_type == "epub":
pub = clean_segment(publisher, "Unknown Publisher", 80)
auth = clean_segment(author, "Unknown Author", 80)
ttl = clean_segment(title, "Untitled", 140)
series_name = clean_segment(series, "", 80)
if series_name:
idx = coerce_series_index(series_index)
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
return Path("epub") / pub / auth / "Series" / series_name / f"{idx:03d}{sfx} - {ttl}.epub"
return Path("epub") / pub / auth / "Stories" / f"{ttl}.epub"
if media_type == "pdf":
pub = clean_segment(publisher, "Unknown Publisher", 80)
auth = clean_segment(author, "Unknown Author", 80)
ttl = clean_segment(title, "Untitled", 140)
return Path("pdf") / pub / auth / f"{ttl}.pdf"
# CBR / CBZ — preserve the original extension; default to .cbr
comics_ext = ext if ext in {".cbr", ".cbz"} else ".cbr"
pub = clean_segment(publisher, "Unknown Publisher", 80)
auth = clean_segment(author, "Unknown", 80)
ttl = clean_segment(title, "Untitled", 140)
series_name = clean_segment(series, "", 80)
if series_name:
idx = coerce_series_index(series_index)
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
return Path("comics") / pub / auth / "Series" / series_name / f"{idx:03d}{sfx} - {ttl}{comics_ext}"
return Path("comics") / pub / auth / f"{ttl}{comics_ext}"
def ensure_unique_rel_path(rel_path: Path) -> Path:
candidate = rel_path
suffix = candidate.suffix
stem = candidate.stem
counter = 2
while (LIBRARY_DIR / candidate).exists():
candidate = rel_path.with_name(f"{stem} ({counter}){suffix}")
counter += 1
return candidate
def extract_cover_from_epub(epub_path: Path) -> tuple[bytes, str] | None:
try:
with zf.ZipFile(epub_path, "r") as z:
names = z.namelist()
cover = next((n for n in names if "/Images/cover." in n or n.lower().endswith("/cover.jpg")), "")
if not cover:
return None
data = z.read(cover)
ext = Path(cover).suffix.lower()
mt = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".webp": "image/webp",
".gif": "image/gif",
}.get(ext, "image/jpeg")
return data, mt
except Exception:
return None
def make_cover_thumb_webp(image_bytes: bytes) -> bytes:
with Image.open(io.BytesIO(image_bytes)) as im:
im = ImageOps.exif_transpose(im)
if im.mode not in ("RGB", "RGBA"):
im = im.convert("RGB")
thumb = ImageOps.fit(im, (COVER_W, COVER_H), method=Image.Resampling.LANCZOS, centering=(0.5, 0.5))
out = io.BytesIO()
thumb.save(out, format="WEBP", quality=82, method=6)
return out.getvalue()
def upsert_cover_cache(conn, filename: str, mime_type: str, thumb_webp: bytes) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO library_cover_cache (filename, mime_type, thumb_webp, updated_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (filename) DO UPDATE SET
mime_type = EXCLUDED.mime_type,
thumb_webp = EXCLUDED.thumb_webp,
updated_at = NOW()
""",
(filename, mime_type, psycopg2.Binary(thumb_webp)),
)
def ensure_cover_cache_for_book(conn, filename: str, full_path: Path, media_type: str) -> bool:
try:
if media_type == "epub":
raw = extract_cover_from_epub(full_path)
if not raw:
return False
data, mt = raw
thumb = make_cover_thumb_webp(data)
upsert_cover_cache(conn, filename, mt, thumb)
return True
if media_type == "pdf":
thumb = pdf_cover_thumb(full_path)
upsert_cover_cache(conn, filename, "image/webp", thumb)
return True
if media_type == "cbr":
thumb = cbr_cover_thumb(full_path)
upsert_cover_cache(conn, filename, "image/webp", thumb)
return True
except (UnidentifiedImageError, OSError, ValueError, RuntimeError):
return False
return False
def prune_empty_dirs(start_dir: Path) -> None:
cur = start_dir.resolve()
try:
cur.relative_to(LIBRARY_ROOT)
except Exception:
return
while cur != LIBRARY_ROOT:
try:
cur.rmdir()
except OSError:
return
cur = cur.parent
def _find_opf_path(names: set[str], container_xml: str | None) -> str | None:
opf_path = "OEBPS/content.opf"
if container_xml:
m = re.search(r"full-path\s*=\s*['\"]([^'\"]+)['\"]", container_xml)
if m:
opf_path = m.group(1)
if opf_path in names:
return opf_path
candidates = sorted(n for n in names if n.lower().endswith(".opf"))
return candidates[0] if candidates else None
def scan_epub(path: Path) -> dict:
out = {
"has_cover": False,
"series": "",
"series_index": 0,
"series_suffix": "",
"title": "",
"publication_status": "",
"author": "",
"publisher": "",
"source_url": "",
"publish_date": "",
"subjects": [],
"description": "",
}
try:
with zf.ZipFile(path, "r") as z:
names = set(z.namelist())
out["has_cover"] = extract_cover_from_epub(path) is not None
container_xml = z.read("META-INF/container.xml").decode("utf-8", errors="replace") if "META-INF/container.xml" in names else None
opf_path = _find_opf_path(names, container_xml)
if not opf_path or opf_path not in names:
return out
opf = z.read(opf_path).decode("utf-8", errors="replace")
def _find(pat: str) -> str:
m = re.search(pat, opf, re.DOTALL | re.IGNORECASE)
return _html.unescape(m.group(1).strip()) if m else ""
out["title"] = _find(r"<(?:dc:)?title[^>]*>(.*?)(?:dc:)?title>")
out["author"] = _find(r"<(?:dc:)?creator[^>]*>(.*?)(?:dc:)?creator>")
out["publisher"] = _find(r"<(?:dc:)?publisher[^>]*>(.*?)(?:dc:)?publisher>")
out["source_url"] = _find(r"<(?:dc:)?source[^>]*>(.*?)(?:dc:)?source>")
out["description"] = _find(r"<(?:dc:)?description[^>]*>(.*?)(?:dc:)?description>")
m = re.search(r']*name="calibre:series"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
out["series"] = _html.unescape(m.group(1).strip())
m = re.search(r']*name="calibre:series_index"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
try:
out["series_index"] = int(float(m.group(1)))
except Exception:
out["series_index"] = 0
m = re.search(r']*name="novela:series_suffix"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
out["series_suffix"] = re.sub(r"[^a-z]", "", m.group(1).lower())[:5]
m = re.search(r']*name="publication_status"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
out["publication_status"] = _html.unescape(m.group(1).strip())
pd = _find(r"<(?:dc:)?date[^>]*>(.*?)(?:dc:)?date>")
if pd:
date_candidate = pd.split("T", 1)[0]
try:
out["publish_date"] = datetime.fromisoformat(date_candidate).date().isoformat()
except Exception:
out["publish_date"] = ""
out["subjects"] = [
_html.unescape(s.strip())
for s in re.findall(r"<(?:dc:)?subject[^>]*>(.*?)(?:dc:)?subject>", opf, re.DOTALL | re.IGNORECASE)
if s.strip()
]
m = re.search(r']*name="novela:rating"[^>]*content="([^"]+)"', opf, re.IGNORECASE)
if m:
try:
out["rating"] = max(0, min(5, int(m.group(1))))
except Exception:
pass
except Exception:
pass
return out
def scan_cbz_rating(path: Path) -> int:
"""Read NovelaRating from ComicInfo.xml inside a CBZ (ZIP) file."""
try:
with zf.ZipFile(path, "r") as z:
names = {n.lower(): n for n in z.namelist()}
ci_key = names.get("comicinfo.xml")
if ci_key is None:
return 0
xml = z.read(ci_key).decode("utf-8", errors="replace")
m = re.search(r"(\d+)", xml)
if m:
return max(0, min(5, int(m.group(1))))
except Exception:
pass
return 0
def scan_media(path: Path) -> dict:
mt = media_type_from_suffix(path)
if mt == "epub":
meta = scan_epub(path)
elif mt == "pdf":
meta = pdf_scan_metadata(path)
elif mt == "cbr":
meta = {
"title": path.stem,
"author": "",
"publisher": "",
"series": "",
"series_index": 0,
"publication_status": "",
"has_cover": cbr_page_count(path) > 0,
"description": "",
"source_url": "",
"publish_date": "",
"subjects": [],
}
if path.suffix.lower() == ".cbz":
meta["rating"] = scan_cbz_rating(path)
else:
meta = {}
meta["media_type"] = mt
return meta
def upsert_book(conn, filename: str, meta: dict, tags: list[tuple[str, str]] | None = None) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO library (filename, media_type, storage_type, title, author, publisher, has_cover,
series, series_index, series_suffix, publication_status, source_url,
publish_date, description, needs_review, want_to_read, rating, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, FALSE, %s, NOW())
ON CONFLICT (filename) DO UPDATE SET
media_type = EXCLUDED.media_type,
storage_type = EXCLUDED.storage_type,
title = COALESCE(NULLIF(EXCLUDED.title, ''), library.title),
author = COALESCE(NULLIF(EXCLUDED.author, ''), library.author),
publisher = COALESCE(NULLIF(EXCLUDED.publisher, ''), library.publisher),
has_cover = (library.has_cover OR EXCLUDED.has_cover),
series = COALESCE(NULLIF(EXCLUDED.series, ''), library.series),
series_index = CASE WHEN COALESCE(EXCLUDED.series_index, 0) > 0 THEN EXCLUDED.series_index ELSE library.series_index END,
series_suffix = COALESCE(NULLIF(EXCLUDED.series_suffix, ''), library.series_suffix),
publication_status = COALESCE(NULLIF(EXCLUDED.publication_status, ''), library.publication_status),
source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), library.source_url),
publish_date = COALESCE(EXCLUDED.publish_date, library.publish_date),
description = COALESCE(NULLIF(EXCLUDED.description, ''), library.description),
rating = CASE WHEN EXCLUDED.rating > 0 THEN EXCLUDED.rating ELSE library.rating END,
updated_at = NOW()
""",
(
filename,
meta.get("media_type", "epub"),
meta.get("storage_type", "file"),
meta.get("title", ""),
meta.get("author", ""),
meta.get("publisher", ""),
bool(meta.get("has_cover", False)),
meta.get("series", ""),
meta.get("series_index", 0),
meta.get("series_suffix", ""),
meta.get("publication_status", ""),
meta.get("source_url", ""),
meta.get("publish_date") or None,
meta.get("description", ""),
bool(meta.get("needs_review", False)),
max(0, min(5, int(meta.get("rating", 0) or 0))),
),
)
if tags is not None:
cur.execute("DELETE FROM book_tags WHERE filename = %s", (filename,))
rows = []
seen: set[tuple[str, str]] = set()
for tag, ttype in tags:
t = (tag or "").strip()
tp = (ttype or "").strip()
if not t or not tp:
continue
key = (t.casefold(), tp)
if key in seen:
continue
seen.add(key)
rows.append((filename, t, tp))
if rows:
cur.executemany(
"INSERT INTO book_tags (filename, tag, tag_type) VALUES (%s, %s, %s) ON CONFLICT (filename, tag, tag_type) DO NOTHING",
rows,
)
def list_library_json() -> list[dict]:
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT l.filename, l.media_type, l.title, l.author, l.publisher, l.has_cover,
l.series, l.series_index, l.publication_status, l.want_to_read,
l.archived, l.needs_review, l.updated_at,
rp.progress, rp.cfi, rp.page,
COALESCE(rs.read_count, 0)::int AS read_count,
rs.last_read,
(cc.filename IS NOT NULL) AS has_cached_cover,
l.rating,
COALESCE(l.series_suffix, '') AS series_suffix,
COALESCE(l.storage_type, 'file') AS storage_type,
json_agg(
json_build_object('tag', bt.tag, 'tag_type', bt.tag_type)
) FILTER (WHERE bt.tag IS NOT NULL) AS tags
FROM library l
LEFT JOIN reading_progress rp ON rp.filename = l.filename
LEFT JOIN (
SELECT filename, COUNT(*)::int AS read_count, MAX(read_at) AS last_read
FROM reading_sessions
GROUP BY filename
) rs ON rs.filename = l.filename
LEFT JOIN library_cover_cache cc ON cc.filename = l.filename
LEFT JOIN book_tags bt ON bt.filename = l.filename
GROUP BY l.filename, l.media_type, l.title, l.author, l.publisher, l.has_cover,
l.series, l.series_index, l.publication_status, l.want_to_read,
l.archived, l.needs_review, l.updated_at,
rp.progress, rp.cfi, rp.page,
rs.read_count, rs.last_read,
cc.filename, l.rating, l.series_suffix, l.storage_type
ORDER BY COALESCE(l.publisher, ''), COALESCE(l.author, ''), COALESCE(l.series, ''), l.series_index, COALESCE(l.title, '')
"""
)
rows = cur.fetchall()
out = []
for r in rows:
out.append(
{
"filename": r[0],
"media_type": r[1],
"title": r[2] or "",
"author": r[3] or "",
"publisher": r[4] or "",
"has_cover": bool(r[5]),
"has_cached_cover": bool(r[18]),
"series": r[6] or "",
"series_index": r[7] or 0,
"series_suffix": r[20] or "",
"publication_status": r[8] or "",
"want_to_read": bool(r[9]),
"archived": bool(r[10]),
"needs_review": bool(r[11]),
"updated_at": r[12].isoformat() if r[12] else None,
"progress": r[13] or 0,
"progress_cfi": r[14],
"page": r[15],
"read_count": r[16] or 0,
"last_read": r[17].isoformat() if r[17] else None,
"storage_type": r[21] or "file",
"tags": r[22] or [],
"rating": r[19] or 0,
}
)
return out
_IMAGE_EXT_MAP = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/webp": ".webp",
"image/gif": ".gif",
}
def write_image_file(data: bytes, media_type: str) -> tuple[str, str, str]:
"""Write image bytes to the content-addressed imagestore (no DB).
Returns (sha256, ext, url).
"""
sha256 = hashlib.sha256(data).hexdigest()
ext = _IMAGE_EXT_MAP.get(media_type, ".jpg")
img_path = IMAGES_DIR / sha256[:2] / f"{sha256}{ext}"
if not img_path.exists():
img_path.parent.mkdir(parents=True, exist_ok=True)
img_path.write_bytes(data)
url = f"/library/db-images/{sha256[:2]}/{sha256}{ext}"
return sha256, ext, url
def store_db_image(conn, data: bytes, media_type: str) -> tuple[str, str, str]:
"""Write image to imagestore and register in book_images table.
Returns (sha256, ext, url).
"""
sha256, ext, url = write_image_file(data, media_type)
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO book_images (sha256, ext, media_type, size_bytes)
VALUES (%s, %s, %s, %s)
ON CONFLICT (sha256) DO NOTHING
""",
(sha256, ext, media_type, len(data)),
)
return sha256, ext, url
def html_to_plain(html: str) -> str:
"""Strip HTML tags for tsvector input."""
from bs4 import BeautifulSoup
return BeautifulSoup(html, "html.parser").get_text(" ", strip=True)
def upsert_chapter(conn, filename: str, chapter_index: int, title: str, content_html: str) -> None:
"""Insert or replace a chapter in book_chapters and update its tsvector."""
plain = html_to_plain(content_html)
tsv_input = (title or "") + " " + plain
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO book_chapters (filename, chapter_index, title, content, content_tsv)
VALUES (%s, %s, %s, %s, to_tsvector('simple', %s))
ON CONFLICT (filename, chapter_index) DO UPDATE SET
title = EXCLUDED.title,
content = EXCLUDED.content,
content_tsv = EXCLUDED.content_tsv
""",
(filename, chapter_index, title, content_html, tsv_input),
)
def ensure_unique_db_filename(conn, base_filename: str) -> str:
"""Return a filename that doesn't yet exist in the library table."""
candidate = base_filename
counter = 2
while True:
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM library WHERE filename = %s", (candidate,))
if not cur.fetchone():
return candidate
candidate = f"{base_filename} ({counter})"
counter += 1
def normalize_site(raw: str) -> str:
raw = (raw or "").strip()
if "://" in raw:
from urllib.parse import urlparse
raw = urlparse(raw).netloc
return re.sub(r"^www\.", "", raw).lower()
def relative_file_info(path: Path) -> dict:
stat = path.stat()
return {
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
}