novela/containers/novela/routers/reader.py
Ivo Oskamp e4d2e2c636 DB-stored books, full-text search, backup restore, and AO3 scraper
- DB-stored books (Fase 1–6): chapters and images stored in PostgreSQL; grabber writes to DB, EPUB→DB conversion, DB→EPUB export, FTS search page (/search)
- Chapter editor: Monaco editor supports DB-stored books; inline title editing
- Grabber: DB/EPUB storage toggle on Convert page
- Backup: restore from Dropbox snapshot (browse snapshots, restore individual or selected files)
- AO3 scraper: initial implementation
- Changelog: v0.1.2 and v0.1.3 entries added to changelog.py and changelog.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 15:13:08 +02:00

1614 lines
64 KiB
Python

"""reader.py — Reader and book detail routes for EPUB/PDF/CBR."""
import html as _html
import io
import posixpath
import re
import uuid
import zipfile as zf
from datetime import datetime
from pathlib import Path
from bs4 import BeautifulSoup
from fastapi import APIRouter, Request
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, Response
from fastapi.templating import Jinja2Templates
from cbr import cbr_get_page, cbr_page_count
from db import get_db_conn
from epub import make_chapter_xhtml, make_epub, read_epub_file, write_epub_file
from pdf import pdf_page_count, pdf_render_page
from routers.common import (
IMAGES_DIR,
LIBRARY_DIR,
ensure_unique_db_filename,
is_db_filename,
make_cover_thumb_webp,
make_rel_path,
prune_empty_dirs,
resolve_library_path,
scan_epub,
upsert_chapter,
upsert_cover_cache,
write_image_file,
)
router = APIRouter()
templates = Jinja2Templates(directory="templates")
# ---------------------------------------------------------------------------
# EPUB helpers
# ---------------------------------------------------------------------------
def _epub_spine(path: Path) -> list[dict]:
"""Return ordered list of {index, title, href} for all spine items.
Supports both EPUB2 (toc.ncx) and EPUB3 (nav.xhtml), and respects
the OPF location declared in META-INF/container.xml.
"""
def _norm(base_dir: str, rel: str) -> str:
rel = (rel or '').split('#', 1)[0].strip()
if not rel:
return ''
joined = posixpath.normpath(posixpath.join(base_dir, rel))
return joined.lstrip('./')
with zf.ZipFile(path, 'r') as z:
names = set(z.namelist())
opf_path = 'OEBPS/content.opf'
try:
container_xml = z.read('META-INF/container.xml').decode('utf-8', errors='replace')
m = re.search(r"full-path\\s*=\\s*['\"]([^'\"]+)['\"]", container_xml)
if m:
opf_path = m.group(1)
except Exception:
pass
if opf_path not in names:
# fallback for malformed books
candidates = [n for n in names if n.lower().endswith('.opf')]
if not candidates:
return []
opf_path = sorted(candidates)[0]
opf_xml = z.read(opf_path).decode('utf-8', errors='replace')
opf = BeautifulSoup(opf_xml, 'xml')
opf_dir = posixpath.dirname(opf_path)
manifest: dict[str, str] = {}
for item in opf.find_all('item'):
iid = item.get('id')
href = item.get('href')
if iid and href:
manifest[iid] = _norm(opf_dir, href)
spine_idrefs: list[str] = []
spine_tag = opf.find('spine')
toc_id = spine_tag.get('toc') if spine_tag else None
if spine_tag:
for ir in spine_tag.find_all('itemref'):
rid = ir.get('idref')
if rid:
spine_idrefs.append(rid)
hrefs = [manifest[rid] for rid in spine_idrefs if rid in manifest]
href_to_title: dict[str, str] = {}
# EPUB2: NCX titles
ncx_path = ''
if toc_id and toc_id in manifest:
ncx_path = manifest[toc_id]
elif 'toc.ncx' in names:
ncx_path = 'toc.ncx'
elif 'OEBPS/toc.ncx' in names:
ncx_path = 'OEBPS/toc.ncx'
if ncx_path and ncx_path in names:
try:
ncx_xml = z.read(ncx_path).decode('utf-8', errors='replace')
ncx = BeautifulSoup(ncx_xml, 'xml')
ncx_dir = posixpath.dirname(ncx_path)
for np in ncx.find_all('navPoint'):
content = np.find('content')
label_tag = np.find('text')
src = content.get('src') if content else ''
label = label_tag.get_text(strip=True) if label_tag else ''
if src and label:
href_to_title[_norm(ncx_dir, src)] = _html.unescape(label)
except Exception:
pass
# EPUB3: nav.xhtml titles (fallback)
if not href_to_title:
nav_item = None
for item in opf.find_all('item'):
props = (item.get('properties') or '').split()
if 'nav' in props:
nav_item = item
break
if nav_item and nav_item.get('href'):
nav_path = _norm(opf_dir, nav_item.get('href'))
if nav_path in names:
try:
nav_xml = z.read(nav_path).decode('utf-8', errors='replace')
nav = BeautifulSoup(nav_xml, 'lxml')
nav_dir = posixpath.dirname(nav_path)
for a in nav.select('nav a[href]'):
src = a.get('href', '')
label = a.get_text(' ', strip=True)
if src and label:
href_to_title[_norm(nav_dir, src)] = _html.unescape(label)
except Exception:
pass
chapters = []
for i, href in enumerate(hrefs):
base = posixpath.basename(href)
title = href_to_title.get(href, re.sub(r'\.(xhtml|html|htm)$', '', base, flags=re.I))
chapters.append({'index': i, 'title': title or f'Chapter {i+1}', 'href': href})
return chapters
def _norm_href(base_dir: str, rel: str) -> str:
rel = (rel or '').split('#', 1)[0].strip()
if not rel:
return ''
return posixpath.normpath(posixpath.join(base_dir, rel)).lstrip('./')
def _find_opf_path(names: set[str], container_xml: str | None) -> str | None:
opf_path = 'OEBPS/content.opf'
if container_xml:
m = re.search(r'full-path\s*=\s*[\'"]([^\'"]+)[\'"]', container_xml)
if m:
opf_path = m.group(1)
if opf_path in names:
return opf_path
candidates = sorted(n for n in names if n.lower().endswith('.opf'))
return candidates[0] if candidates else None
def _make_new_chapter_xhtml(title: str) -> str:
safe_title = _html.escape((title or 'New chapter').strip() or 'New chapter')
return (
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"\n'
' "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">\n'
'<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">\n'
'<head>\n'
' <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>\n'
f' <title>{safe_title}</title>\n'
' <link rel="stylesheet" type="text/css" href="../Styles/style.css"/>\n'
'</head>\n'
'<body>\n'
f' <h2 class="chapter-title">{safe_title}</h2>\n'
' <p></p>\n'
'</body>\n'
'</html>\n'
)
def _tag_local(name: str | None) -> str:
if not name:
return ''
return name.split(':', 1)[-1].lower()
def _write_epub_rating(epub_path: Path, rating: int) -> None:
"""Write (or remove) novela:rating in the EPUB OPF metadata."""
with zf.ZipFile(epub_path, "r") as z:
names = set(z.namelist())
container_xml = z.read("META-INF/container.xml").decode("utf-8", errors="replace") if "META-INF/container.xml" in names else None
opf_path = _find_opf_path(names, container_xml)
if not opf_path or opf_path not in names:
return
opf_xml = z.read(opf_path).decode("utf-8", errors="replace")
opf = BeautifulSoup(opf_xml, "xml")
metadata = opf.find(lambda t: _tag_local(getattr(t, "name", None)) == "metadata")
if not metadata:
return
for t in metadata.find_all(lambda t: _tag_local(getattr(t, "name", None)) == "meta"):
if (t.get("name") or "").strip() == "novela:rating":
t.decompose()
if rating > 0:
nt = opf.new_tag("meta")
nt["name"] = "novela:rating"
nt["content"] = str(rating)
metadata.append(nt)
_rewrite_epub_entries(epub_path, {opf_path: str(opf).encode("utf-8")})
def _write_cbz_rating(cbz_path: Path, rating: int) -> None:
"""Write (or remove) NovelaRating in ComicInfo.xml inside a CBZ."""
with open(cbz_path, "rb") as f:
original = f.read()
out = io.BytesIO()
ci_key: str | None = None
ci_data: bytes | None = None
with zf.ZipFile(io.BytesIO(original), "r") as zin:
names = zin.namelist()
for n in names:
if n.lower() == "comicinfo.xml":
ci_key = n
ci_data = zin.read(n)
break
xml: str
if ci_data is not None:
xml = ci_data.decode("utf-8", errors="replace")
xml = re.sub(r"<NovelaRating>\d+</NovelaRating>\s*", "", xml)
if rating > 0:
xml = xml.replace("</ComicInfo>", f" <NovelaRating>{rating}</NovelaRating>\n</ComicInfo>")
else:
ci_key = "ComicInfo.xml"
if rating > 0:
xml = f'<?xml version="1.0"?>\n<ComicInfo>\n <NovelaRating>{rating}</NovelaRating>\n</ComicInfo>\n'
else:
return # nothing to write
new_ci = xml.encode("utf-8")
with zf.ZipFile(io.BytesIO(original), "r") as zin, zf.ZipFile(out, "w", zf.ZIP_DEFLATED) as zout:
for item in zin.infolist():
if item.filename.lower() == "comicinfo.xml":
continue
data = zin.read(item.filename)
ctype = zf.ZIP_STORED if item.filename == "mimetype" else zf.ZIP_DEFLATED
zout.writestr(item, data, compress_type=ctype)
zout.writestr(ci_key, new_ci, compress_type=zf.ZIP_DEFLATED)
with open(cbz_path, "wb") as f:
f.write(out.getvalue())
def _sync_epub_metadata(
epub_path: Path,
*,
title: str,
author: str,
publisher: str,
publication_status: str,
source_url: str,
publish_date: str,
description: str,
series: str,
series_index: int | str | None,
series_suffix: str = "",
subjects: list[str],
) -> None:
"""Write edited metadata back into OPF so DB and EPUB stay aligned."""
with zf.ZipFile(epub_path, 'r') as z:
names = set(z.namelist())
container_xml = z.read('META-INF/container.xml').decode('utf-8', errors='replace') if 'META-INF/container.xml' in names else None
opf_path = _find_opf_path(names, container_xml)
if not opf_path or opf_path not in names:
return
opf_xml = z.read(opf_path).decode('utf-8', errors='replace')
opf = BeautifulSoup(opf_xml, 'xml')
metadata = opf.find(lambda t: _tag_local(getattr(t, 'name', None)) == 'metadata')
if not metadata:
return
def set_dc(local_name: str, value: str) -> None:
existing = [t for t in metadata.find_all(lambda t: _tag_local(getattr(t, 'name', None)) == local_name)]
tag_name = existing[0].name if existing else f'dc:{local_name}'
for t in existing:
t.decompose()
if value:
nt = opf.new_tag(tag_name)
nt.string = value
metadata.append(nt)
def set_named_meta(key: str, value: str) -> None:
existing = [
t for t in metadata.find_all(lambda t: _tag_local(getattr(t, 'name', None)) == 'meta')
if (t.get('name') or '').strip() == key
]
tag_name = existing[0].name if existing else 'meta'
for t in existing:
t.decompose()
if value:
nt = opf.new_tag(tag_name)
nt['name'] = key
nt['content'] = value
metadata.append(nt)
set_dc('title', (title or '').strip())
set_dc('creator', (author or '').strip())
set_dc('publisher', (publisher or '').strip())
set_dc('source', (source_url or '').strip())
date_value = (publish_date or '').strip()
if date_value:
date_candidate = date_value.split('T', 1)[0]
try:
parsed_date = datetime.fromisoformat(date_candidate).date()
date_value = parsed_date.isoformat() if parsed_date.year >= 1900 else ''
except Exception:
date_value = ''
set_dc('date', date_value)
set_dc('description', (description or '').strip())
# Replace subjects from editor tags (genres + subgenres + tags).
for t in [t for t in metadata.find_all(lambda t: _tag_local(getattr(t, 'name', None)) == 'subject')]:
t.decompose()
seen: set[str] = set()
for raw in subjects:
val = (raw or '').strip()
if not val:
continue
key = val.casefold()
if key in seen:
continue
seen.add(key)
nt = opf.new_tag('dc:subject')
nt.string = val
metadata.append(nt)
set_named_meta('publication_status', (publication_status or '').strip())
series_val = (series or '').strip()
set_named_meta('calibre:series', series_val)
if series_val:
set_named_meta('calibre:series_index', str(_coerce_series_index(series_index)))
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
set_named_meta('novela:series_suffix', sfx)
else:
set_named_meta('calibre:series_index', '')
set_named_meta('novela:series_suffix', '')
_rewrite_epub_entries(epub_path, {opf_path: str(opf).encode('utf-8')})
def _rewrite_epub_entries(epub_path: Path, updates: dict[str, bytes], remove_paths: set[str] | None = None) -> None:
remove_paths = remove_paths or set()
with open(epub_path, 'rb') as f:
original = f.read()
out = io.BytesIO()
with zf.ZipFile(io.BytesIO(original), 'r') as zin, zf.ZipFile(out, 'w', zf.ZIP_DEFLATED) as zout:
existing = set()
for item in zin.infolist():
name = item.filename
existing.add(name)
if name in remove_paths:
continue
data = updates.get(name)
if data is None:
data = zin.read(name)
ctype = zf.ZIP_STORED if name == 'mimetype' else zf.ZIP_DEFLATED
zout.writestr(name, data, compress_type=ctype)
for name, data in updates.items():
if name in existing or name in remove_paths:
continue
ctype = zf.ZIP_STORED if name == 'mimetype' else zf.ZIP_DEFLATED
zout.writestr(name, data, compress_type=ctype)
with open(epub_path, 'wb') as f:
f.write(out.getvalue())
def _clean_segment(value: str, fallback: str, max_len: int = 100) -> str:
txt = re.sub(r"\s+", " ", (value or "").strip())
txt = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", txt)
txt = re.sub(r"\.+$", "", txt).strip()
if not txt:
txt = fallback
return txt[:max_len]
def _coerce_series_index(value: int | str | None) -> int:
try:
return max(0, min(999, int(value or 0)))
except (TypeError, ValueError):
return 0
def _make_rel_path(
*,
publisher: str,
author: str,
title: str,
series: str,
series_index: int | str | None,
series_suffix: str = "",
ext: str = ".epub",
) -> Path:
auth = _clean_segment(author, "Unknown Author", 80)
ttl = _clean_segment(title, "Untitled", 140)
if ext == ".epub":
pub = _clean_segment(publisher, "Unknown Publisher", 80)
series_name = _clean_segment(series, "", 120)
if series_name:
idx = _coerce_series_index(series_index)
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
return Path("epub") / pub / auth / "Series" / series_name / f"{idx:03d}{sfx} - {ttl}.epub"
return Path("epub") / pub / auth / "Stories" / f"{ttl}.epub"
if ext == ".pdf":
pub = _clean_segment(publisher, "Unknown Publisher", 80)
return Path("pdf") / pub / auth / f"{ttl}.pdf"
# .cbr / .cbz
pub = _clean_segment(publisher, "Unknown Publisher", 80)
series_name = _clean_segment(series, "", 80)
if series_name:
idx = _coerce_series_index(series_index)
sfx = re.sub(r"[^a-z]", "", (series_suffix or "").lower())[:5]
return Path("comics") / pub / auth / "Series" / series_name / f"{idx:03d}{sfx} - {ttl}{ext}"
return Path("comics") / pub / auth / f"{ttl}{ext}"
def _ensure_unique_rel_path(rel_path: Path, *, exclude: Path | None = None) -> Path:
base = rel_path.with_suffix(rel_path.suffix or ".epub")
candidate = base
counter = 2
while True:
full = (LIBRARY_DIR / candidate).resolve()
if exclude is not None and full == exclude.resolve():
return candidate
if not full.exists():
return candidate
candidate = base.with_name(f"{base.stem} ({counter}){base.suffix}")
counter += 1
def _guard(filename: str) -> bool:
"""Return True if the filename contains path-traversal characters."""
return "/" in filename or "\\" in filename or ".." in filename
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@router.get("/library/db-images/{path:path}")
async def serve_db_image(path: str):
"""Serve an image from the content-addressed imagestore."""
img_path = (IMAGES_DIR / path).resolve()
try:
img_path.relative_to(IMAGES_DIR.resolve())
except ValueError:
return Response(status_code=404)
if not img_path.exists():
return Response(status_code=404)
ext = img_path.suffix.lower()
mt = {".jpg": "image/jpeg", ".png": "image/png", ".webp": "image/webp", ".gif": "image/gif"}.get(ext, "application/octet-stream")
return FileResponse(img_path, media_type=mt)
@router.get("/library/epub/{filename:path}")
async def library_epub(filename: str):
"""Serve EPUB inline (no Content-Disposition: attachment) for the reader."""
path = resolve_library_path(filename)
if path is None:
return Response(status_code=404)
if not path.exists():
return Response(status_code=404)
return FileResponse(path, media_type="application/epub+zip")
@router.get("/library/chapters/{filename:path}")
async def get_chapter_list(filename: str):
if is_db_filename(filename):
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT chapter_index, title FROM book_chapters WHERE filename = %s ORDER BY chapter_index",
(filename,),
)
rows = cur.fetchall()
if not rows:
return Response(status_code=404)
return [{"index": r[0], "title": r[1], "href": f"db:{r[0]}"} for r in rows]
path = resolve_library_path(filename)
if path is None:
return Response(status_code=404)
if not path.exists():
return Response(status_code=404)
return _epub_spine(path)
@router.get("/library/chapter/{index}/{filename:path}")
async def get_chapter_html(filename: str, index: int):
"""Extract a single chapter from the EPUB (or DB) and return it as an HTML fragment."""
if is_db_filename(filename):
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT title, content FROM book_chapters WHERE filename = %s AND chapter_index = %s",
(filename, index),
)
row = cur.fetchone()
if not row:
return Response(status_code=404)
title, content = row
safe_title = _html.escape(title or "")
return Response(
f'<body><h2 class="chapter-title">{safe_title}</h2>\n{content}\n</body>',
media_type="text/html",
)
path = resolve_library_path(filename)
if path is None:
return Response(status_code=404)
if not path.exists():
return Response(status_code=404)
spine = _epub_spine(path)
if index < 0 or index >= len(spine):
return Response(status_code=404)
href = spine[index]["href"]
with zf.ZipFile(path, "r") as z:
xhtml = z.read(href).decode("utf-8", errors="replace")
soup = BeautifulSoup(xhtml, "lxml")
body = soup.find("body")
if not body:
return Response("<p>No content.</p>", media_type="text/html")
# Rewrite relative image paths to the chapter-image API endpoint
href_dir = href.rsplit("/", 1)[0] # e.g. "OEBPS/Text"
for img in body.find_all("img"):
src = img.get("src", "")
if src and not src.startswith("http") and not src.startswith("data:"):
parts = href_dir.split("/") + src.split("/")
resolved: list[str] = []
for p in parts:
if p == "..":
if resolved:
resolved.pop()
else:
resolved.append(p)
img["src"] = f"/library/chapter-img/{'/'.join(resolved)}?filename={filename}"
return Response(str(body), media_type="text/html")
@router.get("/library/chapter-img/{path:path}")
async def get_chapter_image(path: str, filename: str):
"""Serve an image extracted from the EPUB zip."""
epub_path = resolve_library_path(filename)
if epub_path is None:
return Response(status_code=404)
if not epub_path.exists():
return Response(status_code=404)
try:
with zf.ZipFile(epub_path, "r") as z:
names = z.namelist()
if path in names:
data = z.read(path)
else:
# Case-insensitive fallback
target = path.lower()
match = next((n for n in names if n.lower() == target), None)
if match is None:
return Response(status_code=404)
data = z.read(match)
except KeyError:
return Response(status_code=404)
ext = path.rsplit(".", 1)[-1].lower()
mt = {"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png",
"webp": "image/webp", "gif": "image/gif"}.get(ext, "image/octet-stream")
return Response(content=data, media_type=mt)
@router.get("/library/progress/{filename:path}")
async def get_progress(filename: str):
if resolve_library_path(filename) is None:
return {"error": "Invalid filename"}
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT cfi, page, progress FROM reading_progress WHERE filename = %s",
(filename,),
)
row = cur.fetchone()
return {"cfi": row[0], "page": row[1], "progress": row[2] or 0} if row else {"cfi": None, "page": None, "progress": 0}
@router.delete("/library/progress/{filename:path}")
async def clear_progress(filename: str):
"""Remove reading progress so the book returns to unread state.
Reading sessions (mark-as-read history) are intentionally left intact.
"""
if resolve_library_path(filename) is None:
return {"error": "Invalid filename"}
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM reading_progress WHERE filename = %s", (filename,))
return {"ok": True}
@router.post("/library/progress/{filename:path}")
async def save_progress(filename: str, request: Request):
if resolve_library_path(filename) is None:
return {"error": "Invalid filename"}
body = await request.json()
cfi = body.get("cfi", "")
page = body.get("page")
if page is not None:
try:
page = int(page)
except Exception:
page = None
progress = max(0, min(100, int(body.get("progress", 0))))
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reading_progress (filename, cfi, page, progress, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (filename) DO UPDATE
SET cfi = EXCLUDED.cfi,
page = EXCLUDED.page,
progress = EXCLUDED.progress,
updated_at = NOW()
""",
(filename, cfi, page, progress),
)
return {"ok": True}
@router.post("/library/mark-read/{filename:path}")
async def library_mark_read(filename: str, request: Request):
if not is_db_filename(filename):
path = resolve_library_path(filename)
if path is None or not path.exists():
return {"error": "File not found"}
else:
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM library WHERE filename = %s", (filename,))
if not cur.fetchone():
return {"error": "Not found"}
body = {}
try:
body = await request.json()
except Exception:
pass
read_at = body.get("read_at") # ISO datetime string, or None for now
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
if read_at:
cur.execute(
"INSERT INTO reading_sessions (filename, read_at) VALUES (%s, %s)",
(filename, read_at),
)
else:
cur.execute(
"INSERT INTO reading_sessions (filename) VALUES (%s)",
(filename,),
)
cur.execute("DELETE FROM reading_progress WHERE filename = %s", (filename,))
return {"ok": True}
@router.get("/library/book/{filename:path}", response_class=HTMLResponse)
async def book_detail_page(filename: str, request: Request):
db_book = is_db_filename(filename)
if not db_book:
path = resolve_library_path(filename)
if path is None:
return HTMLResponse("Not found", status_code=404)
if not path.exists():
return HTMLResponse("Not found", status_code=404)
else:
path = None
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT title, author, publisher, has_cover, series, series_index,
publication_status, want_to_read, source_url, archived, publish_date, description,
rating, COALESCE(series_suffix, '') AS series_suffix,
COALESCE(storage_type, 'file') AS storage_type
FROM library WHERE filename = %s
""",
(filename,),
)
lib_row = cur.fetchone()
if lib_row:
entry = {
"title": lib_row[0] or "",
"author": lib_row[1] or "",
"publisher": lib_row[2] or "",
"has_cover": lib_row[3] or False,
"series": lib_row[4] or "",
"series_index": lib_row[5] or 0,
"series_suffix": lib_row[13] or "",
"publication_status": lib_row[6] or "",
"want_to_read": lib_row[7] or False,
"source_url": lib_row[8] or "",
"archived": lib_row[9] or False,
"publish_date": lib_row[10].isoformat() if lib_row[10] else "",
"description": lib_row[11] or "",
"rating": lib_row[12] or 0,
"storage_type": lib_row[14] or "file",
}
# Supplement empty fields from EPUB metadata (file-based books only)
if not db_book and path and (
not entry["source_url"] or not entry["publish_date"] or not entry["description"]
):
epub_meta = scan_epub(path)
if not entry["source_url"]:
entry["source_url"] = epub_meta.get("source_url", "")
if not entry["publish_date"]:
entry["publish_date"] = epub_meta.get("publish_date", "")
if not entry["description"]:
entry["description"] = epub_meta.get("description", "")
else:
if db_book:
return HTMLResponse("Not found", status_code=404)
entry = scan_epub(path)
entry.setdefault("want_to_read", False)
entry.setdefault("archived", False)
entry.setdefault("publish_date", "")
entry.setdefault("description", "")
entry.setdefault("rating", 0)
entry.setdefault("storage_type", "file")
cur.execute(
"SELECT tag, tag_type FROM book_tags WHERE filename = %s ORDER BY tag_type, tag",
(filename,),
)
genres: list[str] = []
subgenres: list[str] = []
tags_list: list[str] = []
rows = cur.fetchall()
for tag, tag_type in rows:
if tag_type == "genre":
genres.append(tag)
elif tag_type == "subgenre":
subgenres.append(tag)
else:
tags_list.append(tag)
if not rows and not db_book and path:
# Fallback for books where tags only exist in OPF after DB loss/rebuild.
epub_meta = scan_epub(path)
for subject in epub_meta.get("subjects", []):
if subject not in tags_list:
tags_list.append(subject)
cur.execute(
"SELECT COUNT(*)::int, MAX(read_at) FROM reading_sessions WHERE filename = %s",
(filename,),
)
row = cur.fetchone()
read_count = row[0] or 0
last_read = row[1].isoformat() if row[1] else None
cur.execute(
"SELECT cfi, progress FROM reading_progress WHERE filename = %s",
(filename,),
)
row = cur.fetchone()
progress = row[1] or 0 if row else 0
cfi = row[0] if row else None
series_is_indexed = False
if entry.get("series"):
cur.execute(
"SELECT COUNT(*) FROM library WHERE series = %s AND series_index > 0",
(entry["series"],),
)
series_is_indexed = (cur.fetchone()[0] or 0) > 0
return templates.TemplateResponse(request, "book.html", {
"active": "book",
"filename": filename,
"title": entry["title"],
"author": entry["author"],
"series": entry["series"],
"series_index": entry["series_index"],
"series_suffix": entry["series_suffix"],
"genres": genres,
"subgenres": subgenres,
"tags": tags_list,
"publisher": entry["publisher"],
"publication_status": entry["publication_status"],
"publish_date": entry.get("publish_date", ""),
"has_cover": entry["has_cover"],
"want_to_read": entry["want_to_read"],
"archived": entry["archived"],
"source_url": entry.get("source_url", ""),
"description": entry.get("description", ""),
"read_count": read_count,
"last_read": last_read,
"progress": progress,
"cfi": cfi,
"rating": entry.get("rating", 0),
"series_is_indexed": series_is_indexed,
"storage_type": entry.get("storage_type", "file"),
})
@router.get("/api/genres")
async def api_genres(type: str | None = None):
"""Return all distinct tags from book_tags, sorted alphabetically.
Optional ``type`` query parameter filters by tag_type (genre, subgenre, tag).
"""
with get_db_conn() as conn:
with conn.cursor() as cur:
if type == "tag":
cur.execute(
"SELECT DISTINCT tag FROM book_tags WHERE tag_type IN ('tag', 'subject') ORDER BY tag"
)
elif type:
cur.execute(
"SELECT DISTINCT tag FROM book_tags WHERE tag_type = %s ORDER BY tag",
(type,),
)
else:
cur.execute("SELECT DISTINCT tag FROM book_tags ORDER BY tag")
result = [r[0] for r in cur.fetchall()]
return JSONResponse(result)
@router.get("/api/suggestions")
async def api_suggestions(type: str | None = None):
"""Return distinct non-empty values for author, publisher, or series, sorted alphabetically."""
col_map = {"author": "author", "publisher": "publisher", "series": "series"}
col = col_map.get(type or "")
if not col:
return JSONResponse([])
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
f"SELECT DISTINCT {col} FROM library WHERE {col} IS NOT NULL AND {col} <> '' ORDER BY {col}"
)
return JSONResponse([r[0] for r in cur.fetchall()])
@router.patch("/library/book/{filename:path}")
async def book_update(filename: str, request: Request):
"""Update book metadata and tags, and rename/move the file when needed."""
body = await request.json()
title = body.get("title", "")
author = body.get("author", "")
publisher = body.get("publisher", "")
series = body.get("series", "")
from routers.common import parse_volume_str
series_index, series_suffix = parse_volume_str(body.get("series_index", ""))
# --- DB-stored book branch (no file on disk) ---
if is_db_filename(filename):
base_new = make_rel_path(
media_type="db",
publisher=publisher,
author=author,
title=title,
series=series,
series_index=series_index,
series_suffix=series_suffix,
).as_posix()
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM library WHERE filename = %s", (filename,))
if not cur.fetchone():
return JSONResponse({"error": "not found"}, status_code=404)
new_filename = ensure_unique_db_filename(conn, base_new) if base_new != filename else filename
with conn:
with conn.cursor() as cur:
cur.execute("SELECT has_cover FROM library WHERE filename = %s", (filename,))
row = cur.fetchone()
has_cover = bool(row[0]) if row else False
cur.execute(
"""
INSERT INTO library (
filename, title, author, publisher, has_cover,
series, series_index, series_suffix, publication_status,
source_url, publish_date, description,
archived, needs_review, storage_type, updated_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, FALSE, FALSE, 'db', NOW())
ON CONFLICT (filename) DO UPDATE SET
title = EXCLUDED.title,
author = EXCLUDED.author,
publisher = EXCLUDED.publisher,
series = EXCLUDED.series,
series_index = EXCLUDED.series_index,
series_suffix = EXCLUDED.series_suffix,
publication_status = EXCLUDED.publication_status,
source_url = EXCLUDED.source_url,
publish_date = EXCLUDED.publish_date,
description = EXCLUDED.description,
needs_review = FALSE,
updated_at = NOW()
""",
(
new_filename, title, author, publisher, has_cover,
series, series_index if series else 0,
series_suffix if series else "",
body.get("publication_status", ""),
body.get("source_url", ""),
body.get("publish_date") or None,
body.get("description", ""),
),
)
if new_filename != filename:
cur.execute("UPDATE book_tags SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE reading_progress SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE reading_sessions SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE library_cover_cache SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE book_chapters SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE bookmarks SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("DELETE FROM library WHERE filename = %s", (filename,))
cur.execute("DELETE FROM book_tags WHERE filename = %s", (new_filename,))
rows = (
[(new_filename, g, "genre") for g in body.get("genres", []) if g]
+ [(new_filename, g, "subgenre") for g in body.get("subgenres", []) if g]
+ [(new_filename, g, "tag") for g in body.get("tags", []) if g]
)
if rows:
cur.executemany(
"INSERT INTO book_tags (filename, tag, tag_type) VALUES (%s, %s, %s)"
" ON CONFLICT (filename, tag, tag_type) DO NOTHING",
rows,
)
return JSONResponse({"ok": True, "filename": new_filename, "renamed": new_filename != filename})
# --- File-based book branch ---
old_path = resolve_library_path(filename)
if old_path is None or not old_path.exists():
return JSONResponse({"error": "not found"}, status_code=404)
ext = old_path.suffix.lower()
target_rel = _make_rel_path(
publisher=publisher,
author=author,
title=title,
series=series,
series_index=series_index,
series_suffix=series_suffix,
ext=ext,
)
target_rel = _ensure_unique_rel_path(target_rel, exclude=old_path)
new_filename = target_rel.as_posix()
new_path = (LIBRARY_DIR / target_rel).resolve()
moved = False
if new_path != old_path:
new_path.parent.mkdir(parents=True, exist_ok=True)
old_path.replace(new_path)
moved = True
try:
if ext == ".epub":
_sync_epub_metadata(
new_path,
title=title,
author=author,
publisher=publisher,
publication_status=body.get("publication_status", ""),
source_url=body.get("source_url", ""),
publish_date=body.get("publish_date", ""),
description=body.get("description", ""),
series=series,
series_index=series_index if series else 0,
series_suffix=series_suffix if series else "",
subjects=(body.get("genres", []) + body.get("subgenres", []) + body.get("tags", [])),
)
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute("SELECT has_cover FROM library WHERE filename = %s", (filename,))
row = cur.fetchone()
has_cover = bool(row[0]) if row and row[0] is not None else bool(scan_epub(new_path if moved else old_path).get("has_cover", False))
cur.execute(
"""
INSERT INTO library (
filename, title, author, publisher, has_cover,
series, series_index, series_suffix, publication_status,
source_url, publish_date, description,
archived, needs_review, updated_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, FALSE, FALSE, NOW())
ON CONFLICT (filename) DO UPDATE SET
title = EXCLUDED.title,
author = EXCLUDED.author,
publisher = EXCLUDED.publisher,
series = EXCLUDED.series,
series_index = EXCLUDED.series_index,
series_suffix = EXCLUDED.series_suffix,
publication_status = EXCLUDED.publication_status,
source_url = EXCLUDED.source_url,
publish_date = EXCLUDED.publish_date,
description = EXCLUDED.description,
needs_review = FALSE,
updated_at = NOW()
""",
(
new_filename,
title,
author,
publisher,
has_cover,
series,
series_index if series else 0,
series_suffix if series else "",
body.get("publication_status", ""),
body.get("source_url", ""),
body.get("publish_date") or None,
body.get("description", ""),
),
)
if new_filename != filename:
cur.execute("UPDATE book_tags SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE reading_progress SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE reading_sessions SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("UPDATE library_cover_cache SET filename = %s WHERE filename = %s", (new_filename, filename))
cur.execute("DELETE FROM library WHERE filename = %s", (filename,))
cur.execute("DELETE FROM book_tags WHERE filename = %s", (new_filename,))
rows = (
[(new_filename, g, "genre") for g in body.get("genres", []) if g]
+ [(new_filename, g, "subgenre") for g in body.get("subgenres", []) if g]
+ [(new_filename, g, "tag") for g in body.get("tags", []) if g]
)
if rows:
cur.executemany(
"INSERT INTO book_tags (filename, tag, tag_type) VALUES (%s, %s, %s)"
" ON CONFLICT (filename, tag, tag_type) DO NOTHING",
rows,
)
prune_empty_dirs(old_path.parent)
return JSONResponse({"ok": True, "filename": new_filename, "renamed": new_filename != filename})
except Exception as e:
if moved and new_path.exists() and not old_path.exists():
new_path.replace(old_path)
return JSONResponse({"error": str(e)}, status_code=500)
@router.post("/library/rating/{filename:path}")
async def set_rating(filename: str, request: Request):
"""Set (or clear) a 1-5 star rating for a book. rating=0 removes it."""
if not is_db_filename(filename):
path = resolve_library_path(filename)
if path is None or not path.exists():
return JSONResponse({"error": "not found"}, status_code=404)
else:
path = None
body = await request.json()
try:
rating = max(0, min(5, int(body.get("rating", 0))))
except (TypeError, ValueError):
return JSONResponse({"error": "invalid rating"}, status_code=400)
if path is not None:
ext = path.suffix.lower()
if ext == ".epub":
try:
_write_epub_rating(path, rating)
except Exception as e:
return JSONResponse({"error": f"epub write failed: {e}"}, status_code=500)
elif ext == ".cbz":
try:
_write_cbz_rating(path, rating)
except Exception as e:
return JSONResponse({"error": f"cbz write failed: {e}"}, status_code=500)
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE library SET rating = %s, updated_at = NOW() WHERE filename = %s",
(rating, filename),
)
return JSONResponse({"ok": True, "rating": rating})
# ---------------------------------------------------------------------------
# Fase 4 — EPUB → DB conversion
# ---------------------------------------------------------------------------
def _epub_body_inner(xhtml: str, z: zf.ZipFile, href: str) -> tuple[str, list[dict]]:
"""Parse an EPUB chapter XHTML, rewrite inline images to imagestore URLs.
Returns (inner_html_without_body_tags, []). Images are written to disk but
not registered in book_images here (that happens in the final DB transaction).
"""
soup = BeautifulSoup(xhtml, "lxml")
body = soup.find("body")
if not body:
return "", []
href_dir = href.rsplit("/", 1)[0] if "/" in href else ""
names = z.namelist()
for img in body.find_all("img"):
src = img.get("src", "")
if not src or src.startswith("http") or src.startswith("data:"):
continue
# Resolve relative path inside ZIP
parts = (href_dir.split("/") if href_dir else []) + src.split("/")
resolved: list[str] = []
for p in parts:
if p == "..":
if resolved:
resolved.pop()
elif p:
resolved.append(p)
zip_path = "/".join(resolved)
img_data: bytes | None = None
if zip_path in names:
img_data = z.read(zip_path)
else:
lo = zip_path.lower()
match = next((n for n in names if n.lower() == lo), None)
if match:
img_data = z.read(match)
if img_data:
ext_s = zip_path.rsplit(".", 1)[-1].lower() if "." in zip_path else "jpg"
mime = {"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png",
"webp": "image/webp", "gif": "image/gif"}.get(ext_s, "image/jpeg")
_, _, url = write_image_file(img_data, mime)
img["src"] = url
else:
img.decompose()
# Strip leading heading — EPUB chapters often open with the chapter title as
# an <h1>/<h2>/<h3>. The chapter endpoint always prepends its own
# <h2 class="chapter-title">, so keep the stored content heading-free.
for child in list(body.children):
if getattr(child, "name", None) is None:
continue # NavigableString / text node — skip
if not child.get_text(strip=True):
child.decompose()
continue
if child.name in ("h1", "h2", "h3"):
child.decompose()
break
return body.decode_contents(), []
@router.post("/api/library/convert-to-db/{filename:path}")
async def convert_to_db(filename: str):
"""Convert a file-based EPUB to DB storage."""
if is_db_filename(filename):
return JSONResponse({"error": "Already a DB book"}, status_code=400)
old_path = resolve_library_path(filename)
if old_path is None or not old_path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
if old_path.suffix.lower() != ".epub":
return JSONResponse({"error": "Only EPUB files can be converted"}, status_code=400)
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT title, author, publisher, series, series_index, series_suffix "
"FROM library WHERE filename = %s",
(filename,),
)
row = cur.fetchone()
if not row:
return JSONResponse({"error": "Book not in library"}, status_code=404)
title, author, publisher, series, series_index, series_suffix = row
# Extract chapters from EPUB
try:
spine = _epub_spine(old_path)
chapters = []
with zf.ZipFile(old_path, "r") as z:
for entry in spine:
try:
xhtml = z.read(entry["href"]).decode("utf-8", errors="replace")
except KeyError:
continue
inner, _ = _epub_body_inner(xhtml, z, entry["href"])
if inner.strip():
chapters.append({"title": entry["title"], "content_html": inner})
except Exception as e:
return JSONResponse({"error": f"Failed to extract EPUB: {e}"}, status_code=500)
if not chapters:
return JSONResponse({"error": "No chapters found"}, status_code=400)
base_fn = make_rel_path(
media_type="db",
publisher=publisher or "",
author=author or "",
title=title or "",
series=series or "",
series_index=series_index or 0,
series_suffix=series_suffix or "",
).as_posix()
with get_db_conn() as conn:
with conn:
new_fn = ensure_unique_db_filename(conn, base_fn)
with conn.cursor() as cur:
# Insert new library row
cur.execute(
"""
INSERT INTO library (filename, media_type, storage_type, title, author, publisher,
has_cover, series, series_index, series_suffix, publication_status,
source_url, publish_date, description, archived, want_to_read,
needs_review, rating, created_at, updated_at)
SELECT %s, media_type, 'db', title, author, publisher,
has_cover, series, series_index, series_suffix, publication_status,
source_url, publish_date, description, archived, want_to_read,
needs_review, rating, created_at, NOW()
FROM library WHERE filename = %s
""",
(new_fn, filename),
)
# Migrate child tables
cur.execute("UPDATE book_tags SET filename = %s WHERE filename = %s", (new_fn, filename))
cur.execute("UPDATE reading_progress SET filename = %s WHERE filename = %s", (new_fn, filename))
cur.execute(
"INSERT INTO reading_sessions (filename, read_at) SELECT %s, read_at FROM reading_sessions WHERE filename = %s",
(new_fn, filename),
)
cur.execute("DELETE FROM reading_sessions WHERE filename = %s", (filename,))
cur.execute("UPDATE bookmarks SET filename = %s WHERE filename = %s", (new_fn, filename))
cur.execute(
"INSERT INTO library_cover_cache (filename, mime_type, thumb_webp, updated_at) "
"SELECT %s, mime_type, thumb_webp, updated_at FROM library_cover_cache WHERE filename = %s",
(new_fn, filename),
)
cur.execute("DELETE FROM library_cover_cache WHERE filename = %s", (filename,))
# Insert chapters
for idx, ch in enumerate(chapters):
upsert_chapter(conn, new_fn, idx, ch["title"], ch["content_html"])
with conn.cursor() as cur:
cur.execute("DELETE FROM library WHERE filename = %s", (filename,))
try:
old_path.unlink()
prune_empty_dirs(old_path.parent)
except Exception:
pass
return JSONResponse({"ok": True, "new_filename": new_fn})
# ---------------------------------------------------------------------------
# Fase 5 — DB → EPUB export
# ---------------------------------------------------------------------------
def _rewrite_db_images_for_epub(content_html: str, seen: dict[str, str]) -> tuple[str, list[dict]]:
"""Replace /library/db-images/... img src with EPUB-internal paths.
seen: sha256 → epub_path (deduplication across chapters)
Returns (modified_html, new_image_dicts) where dicts have epub_path/data/media_type.
"""
soup = BeautifulSoup(content_html, "html.parser")
new_images: list[dict] = []
for img in soup.find_all("img"):
src = img.get("src", "")
if not src.startswith("/library/db-images/"):
continue
rel = src[len("/library/db-images/"):]
img_file = IMAGES_DIR / rel
if not img_file.exists():
img.decompose()
continue
sha256 = img_file.stem
ext = img_file.suffix.lower()
if sha256 not in seen:
epub_path = f"OEBPS/Images/{sha256}{ext}"
seen[sha256] = epub_path
mime = {".jpg": "image/jpeg", ".png": "image/png",
".webp": "image/webp", ".gif": "image/gif"}.get(ext, "image/jpeg")
new_images.append({"epub_path": epub_path, "data": img_file.read_bytes(), "media_type": mime})
img["src"] = f"../Images/{sha256}{ext}"
return str(soup), new_images
@router.get("/api/library/export-epub/{filename:path}")
async def export_epub(filename: str):
"""Export a DB-stored book as an EPUB download (no file written to disk)."""
if not is_db_filename(filename):
return JSONResponse({"error": "Not a DB book"}, status_code=400)
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""SELECT title, author, publisher, series, series_index, publication_status,
source_url, description, publish_date
FROM library WHERE filename = %s""",
(filename,),
)
meta_row = cur.fetchone()
if not meta_row:
return JSONResponse({"error": "Not found"}, status_code=404)
cur.execute(
"SELECT tag, tag_type FROM book_tags WHERE filename = %s ORDER BY tag_type, tag",
(filename,),
)
tag_rows = cur.fetchall()
cur.execute(
"SELECT chapter_index, title, content FROM book_chapters "
"WHERE filename = %s ORDER BY chapter_index",
(filename,),
)
ch_rows = cur.fetchall()
cur.execute(
"SELECT thumb_webp FROM library_cover_cache WHERE filename = %s",
(filename,),
)
cover_row = cur.fetchone()
title, author, publisher, series, series_index, pub_status, source_url, description, pub_date = meta_row
cover_data: bytes | None = bytes(cover_row[0]) if cover_row and cover_row[0] else None
genres = [t for t, tp in tag_rows if tp == "genre"]
subgenres = [t for t, tp in tag_rows if tp == "subgenre"]
tags = [t for t, tp in tag_rows if tp in ("tag", "subject")]
book_info = {
"genres": genres, "subgenres": subgenres, "tags": tags,
"description": description or "",
"source_url": source_url or "",
"publisher": publisher or "",
"series": series or "",
"series_index": series_index or 1,
"publication_status": pub_status or "",
"updated_date": pub_date.isoformat() if pub_date else "",
}
seen_images: dict[str, str] = {}
chapters = []
for ch_idx, ch_title, ch_content in ch_rows:
modified_html, new_imgs = _rewrite_db_images_for_epub(ch_content, seen_images)
chapter_xhtml = make_chapter_xhtml(ch_title or f"Chapter {ch_idx + 1}", modified_html, ch_idx + 1)
chapters.append({"title": ch_title or f"Chapter {ch_idx + 1}", "xhtml": chapter_xhtml, "images": new_imgs})
try:
break_img_data = open("static/break.png", "rb").read()
except Exception:
break_img_data = b""
book_id = str(uuid.uuid4())
epub_bytes = make_epub(
title or "Untitled", author or "Unknown", chapters,
cover_data, break_img_data, book_id, book_info,
)
safe_title = re.sub(r'[^\w\-. ]', '', (title or "book")).strip() or "book"
return Response(
content=epub_bytes,
media_type="application/epub+zip",
headers={"Content-Disposition": f'attachment; filename="{safe_title}.epub"'},
)
@router.get("/library/read/{filename:path}", response_class=HTMLResponse)
async def reader_page(filename: str, request: Request):
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT title FROM library WHERE filename = %s", (filename,))
row = cur.fetchone()
if is_db_filename(filename):
if not row:
return HTMLResponse("Not found", status_code=404)
title = row[0] if row[0] else filename
return templates.TemplateResponse(request, "reader.html", {
"filename": filename,
"title": title,
"format": "epub",
"epub_url": "",
})
path = resolve_library_path(filename)
if path is None:
return HTMLResponse("Not found", status_code=404)
if not path.exists():
return HTMLResponse("Not found", status_code=404)
title = row[0] if row and row[0] else filename
fmt = path.suffix.lower().lstrip(".")
return templates.TemplateResponse(request, "reader.html", {
"filename": filename,
"title": title,
"format": fmt,
"epub_url": f"/library/epub/{filename}",
})
@router.get("/api/pdf/info/{filename:path}")
async def pdf_info(filename: str):
path = resolve_library_path(filename)
if path is None or not path.exists():
return JSONResponse({"error": "not found"}, status_code=404)
try:
count = pdf_page_count(path)
return JSONResponse({"page_count": count})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
@router.get("/library/pdf/{filename:path}")
async def library_pdf_page(filename: str, page: int = 0, dpi: int = 150):
path = resolve_library_path(filename)
if path is None:
return JSONResponse({"error": "Invalid filename"}, status_code=400)
if not path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
if path.suffix.lower() != ".pdf":
return JSONResponse({"error": "Not a PDF file"}, status_code=400)
try:
data = pdf_render_page(path, page, dpi=dpi)
return Response(content=data, media_type="image/png")
except IndexError:
return JSONResponse({"error": "Page out of range"}, status_code=416)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
@router.get("/api/cbr/info/{filename:path}")
async def cbr_info(filename: str):
path = resolve_library_path(filename)
if path is None or not path.exists():
return JSONResponse({"error": "not found"}, status_code=404)
try:
count = cbr_page_count(path)
return JSONResponse({"page_count": count})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
@router.get("/library/cbr/{filename:path}")
async def library_cbr_page(filename: str, page: int = 0):
path = resolve_library_path(filename)
if path is None:
return JSONResponse({"error": "Invalid filename"}, status_code=400)
if not path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
if path.suffix.lower() not in {".cbr", ".cbz"}:
return JSONResponse({"error": "Not a CBR/CBZ file"}, status_code=400)
try:
data, mime = cbr_get_page(path, page)
return Response(content=data, media_type=mime)
except IndexError:
return JSONResponse({"error": "Page out of range"}, status_code=416)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
# ---------------------------------------------------------------------------
# Bookmark routes
# ---------------------------------------------------------------------------
@router.get("/library/bookmarks/{filename:path}")
async def get_bookmarks(filename: str):
if resolve_library_path(filename) is None:
return JSONResponse({"error": "Invalid filename"}, status_code=400)
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, chapter_index, scroll_frac, chapter_title, note,
created_at AT TIME ZONE 'UTC'
FROM bookmarks WHERE filename = %s ORDER BY created_at DESC
""",
(filename,),
)
rows = cur.fetchall()
return JSONResponse([
{
"id": r[0],
"chapter_index": r[1],
"scroll_frac": r[2],
"chapter_title": r[3],
"note": r[4],
"created_at": r[5].isoformat() + "Z" if r[5] else None,
}
for r in rows
])
@router.post("/library/bookmarks/{filename:path}")
async def add_bookmark(filename: str, request: Request):
if resolve_library_path(filename) is None:
return JSONResponse({"error": "Invalid filename"}, status_code=400)
body = await request.json()
chapter_index = int(body.get("chapter_index", 0))
scroll_frac = float(body.get("scroll_frac", 0.0))
chapter_title = str(body.get("chapter_title", ""))[:500]
note = str(body.get("note", ""))
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO bookmarks (filename, chapter_index, scroll_frac, chapter_title, note)
VALUES (%s, %s, %s, %s, %s)
RETURNING id, created_at AT TIME ZONE 'UTC'
""",
(filename, chapter_index, scroll_frac, chapter_title, note),
)
row = cur.fetchone()
return JSONResponse({
"ok": True,
"id": row[0],
"created_at": row[1].isoformat() + "Z" if row[1] else None,
})
@router.patch("/library/bookmarks/{bookmark_id}")
async def update_bookmark(bookmark_id: int, request: Request):
body = await request.json()
note = str(body.get("note", ""))
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE bookmarks SET note = %s WHERE id = %s",
(note, bookmark_id),
)
if cur.rowcount == 0:
return JSONResponse({"error": "not found"}, status_code=404)
return JSONResponse({"ok": True})
@router.delete("/library/bookmarks/{bookmark_id}")
async def delete_bookmark(bookmark_id: int):
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM bookmarks WHERE id = %s", (bookmark_id,))
if cur.rowcount == 0:
return JSONResponse({"error": "not found"}, status_code=404)
return JSONResponse({"ok": True})
@router.get("/api/bookmarks")
async def api_all_bookmarks():
"""Return all bookmarks across all books, enriched with book title/author."""
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT b.id, b.filename, b.chapter_index, b.scroll_frac,
b.chapter_title, b.note,
b.created_at AT TIME ZONE 'UTC',
l.title, l.author
FROM bookmarks b
LEFT JOIN library l ON l.filename = b.filename
ORDER BY b.created_at DESC
""",
)
rows = cur.fetchall()
return JSONResponse([
{
"id": r[0],
"filename": r[1],
"chapter_index": r[2],
"scroll_frac": r[3],
"chapter_title": r[4],
"note": r[5],
"created_at": r[6].isoformat() + "Z" if r[6] else None,
"book_title": r[7] or r[1],
"book_author": r[8] or "",
}
for r in rows
])