novela/containers/novela/routers/reader.py
Ivo Oskamp 92cd301658 Add PDF reader/editor support, fix metadata save and dir cleanup
- PDF reader: page-image rendering via /library/pdf/{filename}?page=N;
  new /api/pdf/info/{filename} endpoint returns page count; reader.html
  branches on FORMAT (epub/pdf) injected by server
- PDF metadata edit: PATCH /library/book now updates DB for all formats;
  _sync_epub_metadata only called for .epub; non-EPUB formats skip file write
- Fix file path on metadata save: _make_rel_path now includes format prefix
  (epub/, pdf/, comics/) matching common.make_rel_path used during import;
  previously files were moved outside their format directory
- Fix empty dir cleanup: prune_empty_dirs always runs after successful
  metadata save, not only when file was moved
- Hide Edit EPUB button for non-EPUB files in book detail
- Docs: TECHNICAL.md and changelog-develop.md updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-25 08:47:01 +01:00

976 lines
38 KiB
Python

"""reader.py — Reader and book detail routes for EPUB/PDF/CBR."""
import html as _html
import io
import posixpath
import re
import uuid
import zipfile as zf
from datetime import datetime
from pathlib import Path
from bs4 import BeautifulSoup
from fastapi import APIRouter, Request
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, Response
from fastapi.templating import Jinja2Templates
from cbr import cbr_get_page
from db import get_db_conn
from epub import read_epub_file, write_epub_file
from pdf import pdf_page_count, pdf_render_page
from routers.common import LIBRARY_DIR, prune_empty_dirs, resolve_library_path, scan_epub
router = APIRouter()
templates = Jinja2Templates(directory="templates")
# ---------------------------------------------------------------------------
# EPUB helpers
# ---------------------------------------------------------------------------
def _epub_spine(path: Path) -> list[dict]:
"""Return ordered list of {index, title, href} for all spine items.
Supports both EPUB2 (toc.ncx) and EPUB3 (nav.xhtml), and respects
the OPF location declared in META-INF/container.xml.
"""
def _norm(base_dir: str, rel: str) -> str:
rel = (rel or '').split('#', 1)[0].strip()
if not rel:
return ''
joined = posixpath.normpath(posixpath.join(base_dir, rel))
return joined.lstrip('./')
with zf.ZipFile(path, 'r') as z:
names = set(z.namelist())
opf_path = 'OEBPS/content.opf'
try:
container_xml = z.read('META-INF/container.xml').decode('utf-8', errors='replace')
m = re.search(r"full-path\\s*=\\s*['\"]([^'\"]+)['\"]", container_xml)
if m:
opf_path = m.group(1)
except Exception:
pass
if opf_path not in names:
# fallback for malformed books
candidates = [n for n in names if n.lower().endswith('.opf')]
if not candidates:
return []
opf_path = sorted(candidates)[0]
opf_xml = z.read(opf_path).decode('utf-8', errors='replace')
opf = BeautifulSoup(opf_xml, 'xml')
opf_dir = posixpath.dirname(opf_path)
manifest: dict[str, str] = {}
for item in opf.find_all('item'):
iid = item.get('id')
href = item.get('href')
if iid and href:
manifest[iid] = _norm(opf_dir, href)
spine_idrefs: list[str] = []
spine_tag = opf.find('spine')
toc_id = spine_tag.get('toc') if spine_tag else None
if spine_tag:
for ir in spine_tag.find_all('itemref'):
rid = ir.get('idref')
if rid:
spine_idrefs.append(rid)
hrefs = [manifest[rid] for rid in spine_idrefs if rid in manifest]
href_to_title: dict[str, str] = {}
# EPUB2: NCX titles
ncx_path = ''
if toc_id and toc_id in manifest:
ncx_path = manifest[toc_id]
elif 'toc.ncx' in names:
ncx_path = 'toc.ncx'
elif 'OEBPS/toc.ncx' in names:
ncx_path = 'OEBPS/toc.ncx'
if ncx_path and ncx_path in names:
try:
ncx_xml = z.read(ncx_path).decode('utf-8', errors='replace')
ncx = BeautifulSoup(ncx_xml, 'xml')
ncx_dir = posixpath.dirname(ncx_path)
for np in ncx.find_all('navPoint'):
content = np.find('content')
label_tag = np.find('text')
src = content.get('src') if content else ''
label = label_tag.get_text(strip=True) if label_tag else ''
if src and label:
href_to_title[_norm(ncx_dir, src)] = _html.unescape(label)
except Exception:
pass
# EPUB3: nav.xhtml titles (fallback)
if not href_to_title:
nav_item = None
for item in opf.find_all('item'):
props = (item.get('properties') or '').split()
if 'nav' in props:
nav_item = item
break
if nav_item and nav_item.get('href'):
nav_path = _norm(opf_dir, nav_item.get('href'))
if nav_path in names:
try:
nav_xml = z.read(nav_path).decode('utf-8', errors='replace')
nav = BeautifulSoup(nav_xml, 'lxml')
nav_dir = posixpath.dirname(nav_path)
for a in nav.select('nav a[href]'):
src = a.get('href', '')
label = a.get_text(' ', strip=True)
if src and label:
href_to_title[_norm(nav_dir, src)] = _html.unescape(label)
except Exception:
pass
chapters = []
for i, href in enumerate(hrefs):
base = posixpath.basename(href)
title = href_to_title.get(href, re.sub(r'\.(xhtml|html|htm)$', '', base, flags=re.I))
chapters.append({'index': i, 'title': title or f'Chapter {i+1}', 'href': href})
return chapters
def _norm_href(base_dir: str, rel: str) -> str:
rel = (rel or '').split('#', 1)[0].strip()
if not rel:
return ''
return posixpath.normpath(posixpath.join(base_dir, rel)).lstrip('./')
def _find_opf_path(names: set[str], container_xml: str | None) -> str | None:
opf_path = 'OEBPS/content.opf'
if container_xml:
m = re.search(r'full-path\s*=\s*[\'"]([^\'"]+)[\'"]', container_xml)
if m:
opf_path = m.group(1)
if opf_path in names:
return opf_path
candidates = sorted(n for n in names if n.lower().endswith('.opf'))
return candidates[0] if candidates else None
def _make_new_chapter_xhtml(title: str) -> str:
safe_title = _html.escape((title or 'New chapter').strip() or 'New chapter')
return (
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"\n'
' "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">\n'
'<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">\n'
'<head>\n'
' <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>\n'
f' <title>{safe_title}</title>\n'
' <link rel="stylesheet" type="text/css" href="../Styles/style.css"/>\n'
'</head>\n'
'<body>\n'
f' <h2 class="chapter-title">{safe_title}</h2>\n'
' <p></p>\n'
'</body>\n'
'</html>\n'
)
def _tag_local(name: str | None) -> str:
if not name:
return ''
return name.split(':', 1)[-1].lower()
def _write_epub_rating(epub_path: Path, rating: int) -> None:
"""Write (or remove) novela:rating in the EPUB OPF metadata."""
with zf.ZipFile(epub_path, "r") as z:
names = set(z.namelist())
container_xml = z.read("META-INF/container.xml").decode("utf-8", errors="replace") if "META-INF/container.xml" in names else None
opf_path = _find_opf_path(names, container_xml)
if not opf_path or opf_path not in names:
return
opf_xml = z.read(opf_path).decode("utf-8", errors="replace")
opf = BeautifulSoup(opf_xml, "xml")
metadata = opf.find(lambda t: _tag_local(getattr(t, "name", None)) == "metadata")
if not metadata:
return
for t in metadata.find_all(lambda t: _tag_local(getattr(t, "name", None)) == "meta"):
if (t.get("name") or "").strip() == "novela:rating":
t.decompose()
if rating > 0:
nt = opf.new_tag("meta")
nt["name"] = "novela:rating"
nt["content"] = str(rating)
metadata.append(nt)
_rewrite_epub_entries(epub_path, {opf_path: str(opf).encode("utf-8")})
def _write_cbz_rating(cbz_path: Path, rating: int) -> None:
"""Write (or remove) NovelaRating in ComicInfo.xml inside a CBZ."""
with open(cbz_path, "rb") as f:
original = f.read()
out = io.BytesIO()
ci_key: str | None = None
ci_data: bytes | None = None
with zf.ZipFile(io.BytesIO(original), "r") as zin:
names = zin.namelist()
for n in names:
if n.lower() == "comicinfo.xml":
ci_key = n
ci_data = zin.read(n)
break
xml: str
if ci_data is not None:
xml = ci_data.decode("utf-8", errors="replace")
xml = re.sub(r"<NovelaRating>\d+</NovelaRating>\s*", "", xml)
if rating > 0:
xml = xml.replace("</ComicInfo>", f" <NovelaRating>{rating}</NovelaRating>\n</ComicInfo>")
else:
ci_key = "ComicInfo.xml"
if rating > 0:
xml = f'<?xml version="1.0"?>\n<ComicInfo>\n <NovelaRating>{rating}</NovelaRating>\n</ComicInfo>\n'
else:
return # nothing to write
new_ci = xml.encode("utf-8")
with zf.ZipFile(io.BytesIO(original), "r") as zin, zf.ZipFile(out, "w", zf.ZIP_DEFLATED) as zout:
for item in zin.infolist():
if item.filename.lower() == "comicinfo.xml":
continue
data = zin.read(item.filename)
ctype = zf.ZIP_STORED if item.filename == "mimetype" else zf.ZIP_DEFLATED
zout.writestr(item, data, compress_type=ctype)
zout.writestr(ci_key, new_ci, compress_type=zf.ZIP_DEFLATED)
with open(cbz_path, "wb") as f:
f.write(out.getvalue())
def _sync_epub_metadata(
epub_path: Path,
*,
title: str,
author: str,
publisher: str,
publication_status: str,
source_url: str,
publish_date: str,
description: str,
series: str,
series_index: int | str | None,
subjects: list[str],
) -> None:
"""Write edited metadata back into OPF so DB and EPUB stay aligned."""
with zf.ZipFile(epub_path, 'r') as z:
names = set(z.namelist())
container_xml = z.read('META-INF/container.xml').decode('utf-8', errors='replace') if 'META-INF/container.xml' in names else None
opf_path = _find_opf_path(names, container_xml)
if not opf_path or opf_path not in names:
return
opf_xml = z.read(opf_path).decode('utf-8', errors='replace')
opf = BeautifulSoup(opf_xml, 'xml')
metadata = opf.find(lambda t: _tag_local(getattr(t, 'name', None)) == 'metadata')
if not metadata:
return
def set_dc(local_name: str, value: str) -> None:
existing = [t for t in metadata.find_all(lambda t: _tag_local(getattr(t, 'name', None)) == local_name)]
tag_name = existing[0].name if existing else f'dc:{local_name}'
for t in existing:
t.decompose()
if value:
nt = opf.new_tag(tag_name)
nt.string = value
metadata.append(nt)
def set_named_meta(key: str, value: str) -> None:
existing = [
t for t in metadata.find_all(lambda t: _tag_local(getattr(t, 'name', None)) == 'meta')
if (t.get('name') or '').strip() == key
]
tag_name = existing[0].name if existing else 'meta'
for t in existing:
t.decompose()
if value:
nt = opf.new_tag(tag_name)
nt['name'] = key
nt['content'] = value
metadata.append(nt)
set_dc('title', (title or '').strip())
set_dc('creator', (author or '').strip())
set_dc('publisher', (publisher or '').strip())
set_dc('source', (source_url or '').strip())
date_value = (publish_date or '').strip()
if date_value:
date_candidate = date_value.split('T', 1)[0]
try:
parsed_date = datetime.fromisoformat(date_candidate).date()
date_value = parsed_date.isoformat() if parsed_date.year >= 1900 else ''
except Exception:
date_value = ''
set_dc('date', date_value)
set_dc('description', (description or '').strip())
# Replace subjects from editor tags (genres + subgenres + tags).
for t in [t for t in metadata.find_all(lambda t: _tag_local(getattr(t, 'name', None)) == 'subject')]:
t.decompose()
seen: set[str] = set()
for raw in subjects:
val = (raw or '').strip()
if not val:
continue
key = val.casefold()
if key in seen:
continue
seen.add(key)
nt = opf.new_tag('dc:subject')
nt.string = val
metadata.append(nt)
set_named_meta('publication_status', (publication_status or '').strip())
series_val = (series or '').strip()
set_named_meta('calibre:series', series_val)
if series_val:
set_named_meta('calibre:series_index', str(_coerce_series_index(series_index)))
else:
set_named_meta('calibre:series_index', '')
_rewrite_epub_entries(epub_path, {opf_path: str(opf).encode('utf-8')})
def _rewrite_epub_entries(epub_path: Path, updates: dict[str, bytes], remove_paths: set[str] | None = None) -> None:
remove_paths = remove_paths or set()
with open(epub_path, 'rb') as f:
original = f.read()
out = io.BytesIO()
with zf.ZipFile(io.BytesIO(original), 'r') as zin, zf.ZipFile(out, 'w', zf.ZIP_DEFLATED) as zout:
existing = set()
for item in zin.infolist():
name = item.filename
existing.add(name)
if name in remove_paths:
continue
data = updates.get(name)
if data is None:
data = zin.read(name)
ctype = zf.ZIP_STORED if name == 'mimetype' else zf.ZIP_DEFLATED
zout.writestr(name, data, compress_type=ctype)
for name, data in updates.items():
if name in existing or name in remove_paths:
continue
ctype = zf.ZIP_STORED if name == 'mimetype' else zf.ZIP_DEFLATED
zout.writestr(name, data, compress_type=ctype)
with open(epub_path, 'wb') as f:
f.write(out.getvalue())
def _clean_segment(value: str, fallback: str, max_len: int = 100) -> str:
txt = re.sub(r"\s+", " ", (value or "").strip())
txt = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", txt)
txt = re.sub(r"\.+$", "", txt).strip()
if not txt:
txt = fallback
return txt[:max_len]
def _coerce_series_index(value: int | str | None) -> int:
try:
return max(1, min(999, int(value or 1)))
except (TypeError, ValueError):
return 1
def _make_rel_path(
*,
publisher: str,
author: str,
title: str,
series: str,
series_index: int | str | None,
ext: str = ".epub",
) -> Path:
auth = _clean_segment(author, "Unknown Author", 80)
ttl = _clean_segment(title, "Untitled", 140)
if ext == ".epub":
pub = _clean_segment(publisher, "Unknown Publisher", 80)
series_name = _clean_segment(series, "", 120)
if series_name:
idx = _coerce_series_index(series_index)
return Path("epub") / pub / auth / "Series" / series_name / f"{idx:03d} - {ttl}.epub"
return Path("epub") / pub / auth / "Stories" / f"{ttl}.epub"
if ext == ".pdf":
return Path("pdf") / auth / f"{ttl}.pdf"
# .cbr / .cbz
return Path("comics") / auth / f"{ttl}{ext}"
def _ensure_unique_rel_path(rel_path: Path, *, exclude: Path | None = None) -> Path:
base = rel_path.with_suffix(rel_path.suffix or ".epub")
candidate = base
counter = 2
while True:
full = (LIBRARY_DIR / candidate).resolve()
if exclude is not None and full == exclude.resolve():
return candidate
if not full.exists():
return candidate
candidate = base.with_name(f"{base.stem} ({counter}){base.suffix}")
counter += 1
def _guard(filename: str) -> bool:
"""Return True if the filename contains path-traversal characters."""
return "/" in filename or "\\" in filename or ".." in filename
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@router.get("/library/epub/{filename:path}")
async def library_epub(filename: str):
"""Serve EPUB inline (no Content-Disposition: attachment) for the reader."""
path = resolve_library_path(filename)
if path is None:
return Response(status_code=404)
if not path.exists():
return Response(status_code=404)
return FileResponse(path, media_type="application/epub+zip")
@router.get("/library/chapters/{filename:path}")
async def get_chapter_list(filename: str):
path = resolve_library_path(filename)
if path is None:
return Response(status_code=404)
if not path.exists():
return Response(status_code=404)
return _epub_spine(path)
@router.get("/library/chapter/{index}/{filename:path}")
async def get_chapter_html(filename: str, index: int):
"""Extract a single chapter from the EPUB and return it as an HTML fragment."""
path = resolve_library_path(filename)
if path is None:
return Response(status_code=404)
if not path.exists():
return Response(status_code=404)
spine = _epub_spine(path)
if index < 0 or index >= len(spine):
return Response(status_code=404)
href = spine[index]["href"]
with zf.ZipFile(path, "r") as z:
xhtml = z.read(href).decode("utf-8", errors="replace")
soup = BeautifulSoup(xhtml, "lxml")
body = soup.find("body")
if not body:
return Response("<p>No content.</p>", media_type="text/html")
# Rewrite relative image paths to the chapter-image API endpoint
href_dir = href.rsplit("/", 1)[0] # e.g. "OEBPS/Text"
for img in body.find_all("img"):
src = img.get("src", "")
if src and not src.startswith("http") and not src.startswith("data:"):
parts = href_dir.split("/") + src.split("/")
resolved: list[str] = []
for p in parts:
if p == "..":
if resolved:
resolved.pop()
else:
resolved.append(p)
img["src"] = f"/library/chapter-img/{'/'.join(resolved[1:])}?filename={filename}"
return Response(str(body), media_type="text/html")
@router.get("/library/chapter-img/{path:path}")
async def get_chapter_image(path: str, filename: str):
"""Serve an image extracted from the EPUB zip."""
epub_path = resolve_library_path(filename)
if epub_path is None:
return Response(status_code=404)
if not epub_path.exists():
return Response(status_code=404)
try:
with zf.ZipFile(epub_path, "r") as z:
data = z.read("OEBPS/" + path)
except KeyError:
return Response(status_code=404)
ext = path.rsplit(".", 1)[-1].lower()
mt = {"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png",
"webp": "image/webp", "gif": "image/gif"}.get(ext, "image/octet-stream")
return Response(content=data, media_type=mt)
@router.get("/library/progress/{filename:path}")
async def get_progress(filename: str):
if resolve_library_path(filename) is None:
return {"error": "Invalid filename"}
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT cfi, page, progress FROM reading_progress WHERE filename = %s",
(filename,),
)
row = cur.fetchone()
return {"cfi": row[0], "page": row[1], "progress": row[2] or 0} if row else {"cfi": None, "page": None, "progress": 0}
@router.delete("/library/progress/{filename:path}")
async def clear_progress(filename: str):
"""Remove reading progress so the book returns to unread state.
Reading sessions (mark-as-read history) are intentionally left intact.
"""
if resolve_library_path(filename) is None:
return {"error": "Invalid filename"}
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM reading_progress WHERE filename = %s", (filename,))
return {"ok": True}
@router.post("/library/progress/{filename:path}")
async def save_progress(filename: str, request: Request):
if resolve_library_path(filename) is None:
return {"error": "Invalid filename"}
body = await request.json()
cfi = body.get("cfi", "")
page = body.get("page")
if page is not None:
try:
page = int(page)
except Exception:
page = None
progress = max(0, min(100, int(body.get("progress", 0))))
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reading_progress (filename, cfi, page, progress, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (filename) DO UPDATE
SET cfi = EXCLUDED.cfi,
page = EXCLUDED.page,
progress = EXCLUDED.progress,
updated_at = NOW()
""",
(filename, cfi, page, progress),
)
return {"ok": True}
@router.post("/library/mark-read/{filename:path}")
async def library_mark_read(filename: str, request: Request):
if resolve_library_path(filename) is None:
return {"error": "Invalid filename"}
path = resolve_library_path(filename)
if path is None or not path.exists():
return {"error": "File not found"}
body = {}
try:
body = await request.json()
except Exception:
pass
read_at = body.get("read_at") # ISO datetime string, or None for now
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
if read_at:
cur.execute(
"INSERT INTO reading_sessions (filename, read_at) VALUES (%s, %s)",
(filename, read_at),
)
else:
cur.execute(
"INSERT INTO reading_sessions (filename) VALUES (%s)",
(filename,),
)
cur.execute("DELETE FROM reading_progress WHERE filename = %s", (filename,))
return {"ok": True}
@router.get("/library/book/{filename:path}", response_class=HTMLResponse)
async def book_detail_page(filename: str, request: Request):
path = resolve_library_path(filename)
if path is None:
return HTMLResponse("Not found", status_code=404)
if not path.exists():
return HTMLResponse("Not found", status_code=404)
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT title, author, publisher, has_cover, series, series_index,
publication_status, want_to_read, source_url, archived, publish_date, description,
rating
FROM library WHERE filename = %s
""",
(filename,),
)
lib_row = cur.fetchone()
if lib_row:
entry = {
"title": lib_row[0] or "",
"author": lib_row[1] or "",
"publisher": lib_row[2] or "",
"has_cover": lib_row[3] or False,
"series": lib_row[4] or "",
"series_index": lib_row[5] or 0,
"publication_status": lib_row[6] or "",
"want_to_read": lib_row[7] or False,
"source_url": lib_row[8] or "",
"archived": lib_row[9] or False,
"publish_date": lib_row[10].isoformat() if lib_row[10] else "",
"description": lib_row[11] or "",
"rating": lib_row[12] or 0,
}
# Supplement empty fields from EPUB metadata
if not entry["source_url"] or not entry["publish_date"] or not entry["description"]:
epub_meta = scan_epub(path)
if not entry["source_url"]:
entry["source_url"] = epub_meta.get("source_url", "")
if not entry["publish_date"]:
entry["publish_date"] = epub_meta.get("publish_date", "")
if not entry["description"]:
entry["description"] = epub_meta.get("description", "")
else:
entry = scan_epub(path)
entry.setdefault("want_to_read", False)
entry.setdefault("archived", False)
entry.setdefault("publish_date", "")
entry.setdefault("description", "")
entry.setdefault("rating", 0)
cur.execute(
"SELECT tag, tag_type FROM book_tags WHERE filename = %s ORDER BY tag_type, tag",
(filename,),
)
genres: list[str] = []
subgenres: list[str] = []
tags_list: list[str] = []
rows = cur.fetchall()
for tag, tag_type in rows:
if tag_type == "genre":
genres.append(tag)
elif tag_type == "subgenre":
subgenres.append(tag)
else:
tags_list.append(tag)
if not rows:
# Fallback for books where tags only exist in OPF after DB loss/rebuild.
epub_meta = scan_epub(path)
for subject in epub_meta.get("subjects", []):
if subject not in tags_list:
tags_list.append(subject)
cur.execute(
"SELECT COUNT(*)::int, MAX(read_at) FROM reading_sessions WHERE filename = %s",
(filename,),
)
row = cur.fetchone()
read_count = row[0] or 0
last_read = row[1].isoformat() if row[1] else None
cur.execute(
"SELECT cfi, progress FROM reading_progress WHERE filename = %s",
(filename,),
)
row = cur.fetchone()
progress = row[1] or 0 if row else 0
cfi = row[0] if row else None
return templates.TemplateResponse(request, "book.html", {
"active": "book",
"filename": filename,
"title": entry["title"],
"author": entry["author"],
"series": entry["series"],
"series_index": entry["series_index"],
"genres": genres,
"subgenres": subgenres,
"tags": tags_list,
"publisher": entry["publisher"],
"publication_status": entry["publication_status"],
"publish_date": entry.get("publish_date", ""),
"has_cover": entry["has_cover"],
"want_to_read": entry["want_to_read"],
"archived": entry["archived"],
"source_url": entry.get("source_url", ""),
"description": entry.get("description", ""),
"read_count": read_count,
"last_read": last_read,
"progress": progress,
"cfi": cfi,
"rating": entry.get("rating", 0),
})
@router.get("/api/genres")
async def api_genres(type: str | None = None):
"""Return all distinct tags from book_tags, sorted alphabetically.
Optional ``type`` query parameter filters by tag_type (genre, subgenre, tag).
"""
with get_db_conn() as conn:
with conn.cursor() as cur:
if type == "tag":
cur.execute(
"SELECT DISTINCT tag FROM book_tags WHERE tag_type IN ('tag', 'subject') ORDER BY tag"
)
elif type:
cur.execute(
"SELECT DISTINCT tag FROM book_tags WHERE tag_type = %s ORDER BY tag",
(type,),
)
else:
cur.execute("SELECT DISTINCT tag FROM book_tags ORDER BY tag")
result = [r[0] for r in cur.fetchall()]
return JSONResponse(result)
@router.patch("/library/book/{filename:path}")
async def book_update(filename: str, request: Request):
"""Update book metadata and tags, and rename/move the file when needed."""
old_path = resolve_library_path(filename)
if old_path is None or not old_path.exists():
return JSONResponse({"error": "not found"}, status_code=404)
body = await request.json()
title = body.get("title", "")
author = body.get("author", "")
publisher = body.get("publisher", "")
series = body.get("series", "")
series_index = _coerce_series_index(body.get("series_index", 1))
ext = old_path.suffix.lower()
target_rel = _make_rel_path(
publisher=publisher,
author=author,
title=title,
series=series,
series_index=series_index,
ext=ext,
)
target_rel = _ensure_unique_rel_path(target_rel, exclude=old_path)
new_filename = target_rel.as_posix()
new_path = (LIBRARY_DIR / target_rel).resolve()
moved = False
if new_path != old_path:
new_path.parent.mkdir(parents=True, exist_ok=True)
old_path.replace(new_path)
moved = True
try:
if ext == ".epub":
_sync_epub_metadata(
new_path,
title=title,
author=author,
publisher=publisher,
publication_status=body.get("publication_status", ""),
source_url=body.get("source_url", ""),
publish_date=body.get("publish_date", ""),
description=body.get("description", ""),
series=series,
series_index=series_index if series else 0,
subjects=(body.get("genres", []) + body.get("subgenres", []) + body.get("tags", [])),
)
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute("SELECT has_cover FROM library WHERE filename = %s", (filename,))
row = cur.fetchone()
has_cover = bool(row[0]) if row and row[0] is not None else bool(scan_epub(new_path if moved else old_path).get("has_cover", False))
cur.execute(
"""
INSERT INTO library (
filename, title, author, publisher, has_cover,
series, series_index, publication_status,
source_url, publish_date, description,
archived, needs_review, updated_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, FALSE, FALSE, NOW())
ON CONFLICT (filename) DO UPDATE SET
title = EXCLUDED.title,
author = EXCLUDED.author,
publisher = EXCLUDED.publisher,
series = EXCLUDED.series,
series_index = EXCLUDED.series_index,
publication_status = EXCLUDED.publication_status,
source_url = EXCLUDED.source_url,
publish_date = EXCLUDED.publish_date,
description = EXCLUDED.description,
needs_review = FALSE,
updated_at = NOW()
""",
(
new_filename,
title,
author,
publisher,
has_cover,
series,
series_index if series else 0,
body.get("publication_status", ""),
body.get("source_url", ""),
body.get("publish_date") or None,
body.get("description", ""),
),
)
if new_filename != filename:
cur.execute("UPDATE book_tags SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE reading_progress SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE reading_sessions SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE library_cover_cache SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("DELETE FROM library WHERE filename = %s", (filename,))
cur.execute("DELETE FROM book_tags WHERE filename = %s", (new_filename,))
rows = (
[(new_filename, g, "genre") for g in body.get("genres", []) if g]
+ [(new_filename, g, "subgenre") for g in body.get("subgenres", []) if g]
+ [(new_filename, g, "tag") for g in body.get("tags", []) if g]
)
if rows:
cur.executemany(
"INSERT INTO book_tags (filename, tag, tag_type) VALUES (%s, %s, %s)"
" ON CONFLICT (filename, tag, tag_type) DO NOTHING",
rows,
)
prune_empty_dirs(old_path.parent)
return JSONResponse({"ok": True, "filename": new_filename, "renamed": new_filename != filename})
except Exception as e:
if moved and new_path.exists() and not old_path.exists():
new_path.replace(old_path)
return JSONResponse({"error": str(e)}, status_code=500)
@router.post("/library/rating/{filename:path}")
async def set_rating(filename: str, request: Request):
"""Set (or clear) a 1-5 star rating for a book. rating=0 removes it."""
path = resolve_library_path(filename)
if path is None or not path.exists():
return JSONResponse({"error": "not found"}, status_code=404)
body = await request.json()
try:
rating = max(0, min(5, int(body.get("rating", 0))))
except (TypeError, ValueError):
return JSONResponse({"error": "invalid rating"}, status_code=400)
ext = path.suffix.lower()
if ext == ".epub":
try:
_write_epub_rating(path, rating)
except Exception as e:
return JSONResponse({"error": f"epub write failed: {e}"}, status_code=500)
elif ext == ".cbz":
try:
_write_cbz_rating(path, rating)
except Exception as e:
return JSONResponse({"error": f"cbz write failed: {e}"}, status_code=500)
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE library SET rating = %s, updated_at = NOW() WHERE filename = %s",
(rating, filename),
)
return JSONResponse({"ok": True, "rating": rating})
@router.get("/library/read/{filename:path}", response_class=HTMLResponse)
async def reader_page(filename: str, request: Request):
path = resolve_library_path(filename)
if path is None:
return HTMLResponse("Not found", status_code=404)
if not path.exists():
return HTMLResponse("Not found", status_code=404)
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT title FROM library WHERE filename = %s", (filename,))
row = cur.fetchone()
title = row[0] if row and row[0] else filename
fmt = path.suffix.lower().lstrip(".")
return templates.TemplateResponse(request, "reader.html", {
"filename": filename,
"title": title,
"format": fmt,
"epub_url": f"/library/epub/{filename}",
})
@router.get("/api/pdf/info/{filename:path}")
async def pdf_info(filename: str):
path = resolve_library_path(filename)
if path is None or not path.exists():
return JSONResponse({"error": "not found"}, status_code=404)
try:
count = pdf_page_count(path)
return JSONResponse({"page_count": count})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
@router.get("/library/pdf/{filename:path}")
async def library_pdf_page(filename: str, page: int = 0, dpi: int = 150):
path = resolve_library_path(filename)
if path is None:
return JSONResponse({"error": "Invalid filename"}, status_code=400)
if not path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
if path.suffix.lower() != ".pdf":
return JSONResponse({"error": "Not a PDF file"}, status_code=400)
try:
data = pdf_render_page(path, page, dpi=dpi)
return Response(content=data, media_type="image/png")
except IndexError:
return JSONResponse({"error": "Page out of range"}, status_code=416)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
@router.get("/library/cbr/{filename:path}/{page:int}")
async def library_cbr_page(filename: str, page: int):
path = resolve_library_path(filename)
if path is None:
return JSONResponse({"error": "Invalid filename"}, status_code=400)
if not path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
if path.suffix.lower() not in {".cbr", ".cbz"}:
return JSONResponse({"error": "Not a CBR/CBZ file"}, status_code=400)
try:
data, mime = cbr_get_page(path, page)
return Response(content=data, media_type=mime)
except IndexError:
return JSONResponse({"error": "Page out of range"}, status_code=416)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)