novela/containers/novela/routers/grabber.py
Ivo Oskamp b43366723c Add Bulk Import, Following, Incomplete, status overhaul, performance, and CBR fixes
- Bulk Import page: filename pattern parsing, shared metadata, duplicate detection (volume-aware), batch upload with progress
- Following page: track external author URLs; authors table; sidebar counter
- Incomplete view: non-archived books with publication_status ≠ Complete
- Status: added Temporary Hold, renamed Hiatus → Long-Term Hold; statusBadgeHtml() helper
- Status/want-to-read badges: dark fill + ring for readability on any cover colour
- Disk usage warning in sidebar (amber/red thresholds)
- Bulk delete batched via POST /library/bulk-delete
- CBR: magic bytes format detection + py7zr 7-zip support; unrar → proprietary unrar v6
- Performance: IntersectionObserver lazy covers, ETag 304, single DOM pass, json_agg tags
- Duplicate detection in library and Convert page warning
- All books Grid/List toggle; star ratings; reader text colour presets; bookmarks
- Docs: TECHNICAL.md and changelog updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 14:20:25 +02:00

489 lines
18 KiB
Python

import asyncio
import base64
import json
import traceback
import uuid
from datetime import datetime, timezone
from typing import AsyncGenerator
from urllib.parse import urljoin, urlparse
import httpx
from bs4 import Tag
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from db import get_db_conn
from epub import detect_image_format, make_chapter_xhtml, make_epub
from routers.common import (
LIBRARY_DIR,
ensure_cover_cache_for_book,
ensure_unique_rel_path,
make_rel_path,
normalize_site,
upsert_book,
)
from scrapers import get_scraper
from scrapers.base import HEADERS
from security import decrypt_value, encrypt_value, is_encrypted_value
from xhtml import configure_break_patterns, element_to_xhtml, is_break_element
templates = Jinja2Templates(directory="templates")
router = APIRouter()
JOBS: dict[str, dict] = {}
def _load_all_credentials() -> dict:
out = {}
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute("SELECT site, username, password FROM credentials ORDER BY site")
rows = cur.fetchall()
for site, username_raw, password_raw in rows:
username = decrypt_value(username_raw)
password = decrypt_value(password_raw)
out[site] = {"username": username, "password": password}
if not is_encrypted_value(username_raw) or not is_encrypted_value(password_raw):
cur.execute(
"""
UPDATE credentials
SET username = %s, password = %s, updated_at = NOW()
WHERE site = %s
""",
(encrypt_value(username), encrypt_value(password), site),
)
return out
def _domain(url: str) -> str:
raw = (url or "").strip()
if "://" in raw:
raw = urlparse(raw).netloc
return normalize_site(raw)
def _load_break_patterns() -> None:
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT pattern_type, pattern FROM break_patterns WHERE enabled = TRUE ORDER BY id"
)
rows = cur.fetchall()
configure_break_patterns(
regex_strings=[r[1] for r in rows if r[0] == "regex"],
css_classes=[r[1] for r in rows if r[0] == "css_class"],
)
def _next_series_index(series: str) -> int:
if not series:
return 1
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT COALESCE(MAX(series_index), 0) FROM library WHERE series = %s",
(series,),
)
return (cur.fetchone()[0] or 0) + 1
@router.get("/grabber", response_class=HTMLResponse)
async def grabber_page(request: Request):
from pathlib import Path
tmpl = "grabber.html" if Path("templates/grabber.html").exists() else "index.html"
return templates.TemplateResponse(request, tmpl, {"active": "grabber"})
@router.get("/convert", response_class=HTMLResponse)
async def convert_page(request: Request):
from pathlib import Path
tmpl = "grabber.html" if Path("templates/grabber.html").exists() else "index.html"
return templates.TemplateResponse(request, tmpl, {"active": "grabber"})
@router.get("/credentials-manager", response_class=HTMLResponse)
async def credentials_manager_page(request: Request):
return templates.TemplateResponse(request, "credentials.html", {"active": "credentials"})
@router.get("/debug", response_class=HTMLResponse)
async def debug_page(request: Request):
return templates.TemplateResponse(request, "debug.html", {"active": "debug"})
@router.post("/debug/run")
async def debug_run(request: Request):
body = await request.json()
url = (body.get("url") or "").strip()
if not url:
return {"error": "No URL provided"}
creds = _load_all_credentials().get(_domain(url), {})
username = creds.get("username", "")
password = creds.get("password", "")
try:
scraper = get_scraper(url)
except ValueError as e:
return {"error": str(e)}
result: dict = {}
try:
async with httpx.AsyncClient(headers=HEADERS, follow_redirects=True, timeout=30) as client:
if username:
await scraper.login(client, username, password)
book = await scraper.fetch_book_info(client, url)
result = {
"title": book.get("title", ""),
"author": book.get("author", ""),
"publisher": book.get("publisher", ""),
"series": book.get("series", ""),
"chapter_count": len(book.get("chapters", [])),
"chapter_method": book.get("chapter_method", ""),
"genres": book.get("genres", []),
"subgenres": book.get("subgenres", []),
"tags": book.get("tags", []),
"description": book.get("description", ""),
"publication_status": book.get("publication_status", ""),
}
except Exception:
result["error"] = traceback.format_exc()
return result
@router.get("/credentials")
async def get_credentials():
return _load_all_credentials()
@router.post("/credentials")
async def save_credential(request: Request):
body = await request.json()
site = normalize_site(body.get("site", ""))
if not site:
return {"error": "No site provided"}
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO credentials (site, username, password, updated_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (site) DO UPDATE
SET username = EXCLUDED.username,
password = EXCLUDED.password,
updated_at = NOW()
""",
(site, encrypt_value(body.get("username", "")), encrypt_value(body.get("password", ""))),
)
return {"ok": True}
@router.delete("/credentials/{site:path}")
async def delete_credential(site: str):
with get_db_conn() as conn:
with conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM credentials WHERE site = %s", (normalize_site(site),))
return {"ok": True}
@router.post("/preload")
async def preload(request: Request):
body = await request.json()
url = (body.get("url") or "").strip()
if not url:
return {"error": "No URL provided"}
creds = _load_all_credentials().get(_domain(url), {})
username = creds.get("username", "")
password = creds.get("password", "")
try:
scraper = get_scraper(url)
except ValueError as e:
return {"error": str(e)}
async with httpx.AsyncClient(headers=HEADERS, follow_redirects=True, timeout=30) as client:
if username:
await scraper.login(client, username, password)
book = await scraper.fetch_book_info(client, url)
series = book.get("series", "")
hint = int(book.get("series_index_hint", 0) or 0)
title = book.get("title", "")
author = book.get("author", "")
existing_books = []
if title or author:
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""SELECT filename, title, author FROM library
WHERE LOWER(TRIM(title)) = LOWER(TRIM(%s))
AND LOWER(TRIM(author)) = LOWER(TRIM(%s))""",
(title, author),
)
existing_books = [
{"filename": r[0], "title": r[1] or "", "author": r[2] or ""}
for r in cur.fetchall()
]
return {
"title": title,
"author": author,
"publisher": book.get("publisher", ""),
"series": series,
"series_index_next": hint if hint else _next_series_index(series),
"genres": book.get("genres", []),
"subgenres": book.get("subgenres", []),
"tags": book.get("tags", []),
"description": book.get("description", ""),
"updated_date": book.get("updated_date", ""),
"publication_status": book.get("publication_status", ""),
"already_exists": bool(existing_books),
"existing_books": existing_books,
}
async def scrape_book(job_id: str, url: str, username: str, password: str) -> None:
job = JOBS[job_id]
def send(event: str, data: dict):
job["events"].append({"event": event, "data": data})
try:
await _run_scrape(job_id, url, username, password, send)
except Exception as e:
send("error", {"message": f"Unexpected error: {e}"})
job["done"] = True
async def _run_scrape(job_id: str, url: str, username: str, password: str, send) -> None:
job = JOBS[job_id]
send("status", {"message": "Connecting..."})
scraper = get_scraper(url)
async with httpx.AsyncClient(headers=HEADERS, follow_redirects=True, timeout=30) as client:
if username:
send("status", {"message": "Logging in..."})
await scraper.login(client, username, password)
book = await scraper.fetch_book_info(client, url)
book_title = book["title"]
author = book["author"]
send("meta", {"title": book_title, "author": author})
if not book.get("chapters"):
send("error", {"message": "No chapters found. Check the URL or credentials."})
job["done"] = True
return
send("chapters", {"chapters": [c["title"] for c in book["chapters"]]})
send("status", {"message": f"Found {len(book['chapters'])} chapters. Downloading..."})
cover_data: bytes | None = job.pop("cover_upload", None)
tags = list(book.get("tags", []))
if len(book["chapters"]) < 4 and "Shorts" not in tags:
tags.append("Shorts")
status_map = {"Temporary-Hold": "Temporary Hold"}
pub_status = status_map.get(book.get("publication_status", ""), book.get("publication_status", ""))
series = book.get("series", "")
series_index = int(job.get("series_index", 1) or 1)
updated_date_override = (job.pop("updated_date_override", "") or "").strip()
final_updated_date = (
updated_date_override
or book.get("updated_date", "")
or datetime.now(timezone.utc).strftime("%Y-%m-%d")
)
book_info = {
"genres": book.get("genres", []),
"subgenres": book.get("subgenres", []),
"tags": tags,
"description": book.get("description", ""),
"updated_date": final_updated_date,
"source_url": book.get("source_url", ""),
"publisher": book.get("publisher", ""),
"series": series,
"series_index": series_index,
"publication_status": pub_status,
}
_load_break_patterns()
break_img_data = open("static/break.png", "rb").read()
chapters = []
for i, ch in enumerate(book["chapters"], 1):
send("progress", {"current": i, "total": len(book["chapters"]), "title": ch["title"]})
try:
ch_data = await scraper.fetch_chapter(client, ch)
content_el = ch_data["content_el"]
chapter_images = []
if content_el:
img_counter = 1
for img_tag in content_el.find_all("img"):
if is_break_element(img_tag):
continue
src = img_tag.get("src", "")
if not src or src.startswith("data:"):
img_tag.decompose()
continue
try:
img_resp = await client.get(urljoin(ch["url"], src))
if img_resp.status_code == 200:
img_name, img_mime = detect_image_format(
img_resp.content, f"ch{i:03d}_img{img_counter:03d}"
)
img_tag["src"] = f"../Images/{img_name}"
img_tag["alt"] = img_tag.get("alt", "")
chapter_images.append(
{
"epub_path": f"OEBPS/Images/{img_name}",
"data": img_resp.content,
"media_type": img_mime,
}
)
img_counter += 1
else:
img_tag.decompose()
except Exception:
img_tag.decompose()
xhtml_parts = []
if content_el:
all_p = content_el.find_all("p")
empty_p = sum(
1
for p in all_p
if not [c for c in p.children if isinstance(c, Tag)]
and not p.get_text().replace("\xa0", "").strip()
)
filled_p = len(all_p) - empty_p
empty_p_is_spacer = filled_p > 0 and empty_p >= filled_p * 0.5
for child in content_el.children:
part = element_to_xhtml(child, empty_p_is_spacer=empty_p_is_spacer)
if part.strip():
xhtml_parts.append(part)
content_xhtml = "\n".join(xhtml_parts)
chapter_xhtml = make_chapter_xhtml(ch_data["title"], content_xhtml, i)
chapters.append({"title": ch_data["title"], "xhtml": chapter_xhtml, "images": chapter_images})
await asyncio.sleep(0.2)
except Exception as e:
send("warning", {"message": f"Chapter {i} skipped: {e}"})
if not chapters:
send("error", {"message": "No chapters could be processed."})
job["done"] = True
return
send("status", {"message": "Building EPUB..."})
book_id = str(uuid.uuid4())
epub_bytes = make_epub(book_title, author, chapters, cover_data, break_img_data, book_id, book_info)
rel = ensure_unique_rel_path(
make_rel_path(
media_type="epub",
publisher=book_info.get("publisher", ""),
author=author,
title=book_title,
series=series,
series_index=series_index,
)
)
out_path = LIBRARY_DIR / rel
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(epub_bytes)
rel_filename = rel.as_posix()
job["filename"] = rel_filename
book_meta = {
"media_type": "epub",
"has_cover": cover_data is not None,
"series": book_info.get("series", ""),
"series_index": series_index if book_info.get("series") else 0,
"title": book_title,
"publication_status": book_info.get("publication_status", ""),
"author": author,
"publisher": book_info.get("publisher", ""),
"source_url": book_info.get("source_url", ""),
"description": book_info.get("description", ""),
"publish_date": final_updated_date,
"needs_review": False,
}
book_tags = (
[(g, "genre") for g in book_info.get("genres", [])]
+ [(g, "subgenre") for g in book_info.get("subgenres", [])]
+ [(g, "tag") for g in book_info.get("tags", [])]
)
with get_db_conn() as conn:
with conn:
upsert_book(conn, rel_filename, book_meta, book_tags)
ensure_cover_cache_for_book(conn, rel_filename, out_path, "epub")
send("done", {"filename": rel_filename, "title": book_title, "chapters": len(chapters)})
job["done"] = True
@router.post("/convert")
async def convert(request: Request):
body = await request.json()
url = (body.get("url") or "").strip()
if not url:
return {"error": "No URL provided"}
creds = _load_all_credentials().get(_domain(url), {})
username = creds.get("username", "")
password = creds.get("password", "")
job_id = str(uuid.uuid4())
job: dict = {"events": [], "done": False, "filename": None}
cover_b64 = body.get("cover_b64")
if cover_b64:
try:
job["cover_upload"] = base64.b64decode(cover_b64)
except Exception:
pass
job["series_index"] = int(body.get("series_index", 1) or 1)
job["updated_date_override"] = (body.get("updated_date") or "").strip()
JOBS[job_id] = job
asyncio.create_task(scrape_book(job_id, url, username, password))
return {"job_id": job_id, "using_credentials": bool(username)}
@router.get("/events/{job_id}")
async def events(job_id: str):
if job_id not in JOBS:
return StreamingResponse(iter([]), media_type="text/event-stream")
async def stream() -> AsyncGenerator[str, None]:
sent = 0
while True:
job = JOBS.get(job_id, {})
evts = job.get("events", [])
while sent < len(evts):
evt = evts[sent]
yield f"event: {evt['event']}\ndata: {json.dumps(evt['data'])}\n\n"
sent += 1
if job.get("done") and sent >= len(evts):
break
await asyncio.sleep(0.2)
return StreamingResponse(stream(), media_type="text/event-stream")