- Bulk Import page: filename pattern parsing, shared metadata, duplicate detection (volume-aware), batch upload with progress - Following page: track external author URLs; authors table; sidebar counter - Incomplete view: non-archived books with publication_status ≠ Complete - Status: added Temporary Hold, renamed Hiatus → Long-Term Hold; statusBadgeHtml() helper - Status/want-to-read badges: dark fill + ring for readability on any cover colour - Disk usage warning in sidebar (amber/red thresholds) - Bulk delete batched via POST /library/bulk-delete - CBR: magic bytes format detection + py7zr 7-zip support; unrar → proprietary unrar v6 - Performance: IntersectionObserver lazy covers, ETag 304, single DOM pass, json_agg tags - Duplicate detection in library and Convert page warning - All books Grid/List toggle; star ratings; reader text colour presets; bookmarks - Docs: TECHNICAL.md and changelog updated Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
146 lines
5.1 KiB
Python
146 lines
5.1 KiB
Python
import json
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, File, Form, Request, UploadFile
|
|
from fastapi.responses import HTMLResponse, JSONResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from cbr import cbr_page_count
|
|
from db import get_db_conn
|
|
from routers.common import (
|
|
LIBRARY_DIR,
|
|
ensure_cover_cache_for_book,
|
|
ensure_unique_rel_path,
|
|
make_rel_path,
|
|
media_type_from_suffix,
|
|
parse_volume_str,
|
|
upsert_book,
|
|
)
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/bulk-import", response_class=HTMLResponse)
|
|
async def bulk_import_page(request: Request):
|
|
return templates.TemplateResponse(request, "bulk_import.html", {"active": "bulk_import"})
|
|
|
|
|
|
@router.post("/library/bulk-import")
|
|
async def library_bulk_import(
|
|
files: list[UploadFile] = File(...),
|
|
rows: str = Form(...),
|
|
shared: str = Form("{}"),
|
|
):
|
|
try:
|
|
rows_data = json.loads(rows)
|
|
shared_data = json.loads(shared)
|
|
except Exception:
|
|
return JSONResponse({"ok": False, "error": "Invalid JSON"}, status_code=400)
|
|
|
|
rows_by_name = {r["original_filename"]: r for r in rows_data}
|
|
shared_author = shared_data.get("author", "")
|
|
shared_publisher = shared_data.get("publisher", "")
|
|
shared_status = shared_data.get("status", "")
|
|
shared_genres = [t.strip() for t in shared_data.get("genres", "").split(",") if t.strip()]
|
|
shared_tags = [t.strip() for t in shared_data.get("tags", "").split(",") if t.strip()]
|
|
|
|
imported: list[str] = []
|
|
skipped: list[dict] = []
|
|
|
|
for upload in files:
|
|
try:
|
|
name = upload.filename or "upload.bin"
|
|
suffix = Path(name).suffix.lower()
|
|
if suffix not in {".epub", ".pdf", ".cbr", ".cbz"}:
|
|
skipped.append({"file": name, "reason": "Unsupported file type"})
|
|
continue
|
|
|
|
mt = media_type_from_suffix(Path(name))
|
|
if not mt:
|
|
skipped.append({"file": name, "reason": "Could not detect media type"})
|
|
continue
|
|
|
|
data = await upload.read()
|
|
if not data:
|
|
skipped.append({"file": name, "reason": "Empty upload"})
|
|
continue
|
|
|
|
row = rows_by_name.get(name, {})
|
|
|
|
title = (row.get("title") or "").strip() or Path(name).stem
|
|
author = (row.get("author") or "").strip() or shared_author
|
|
publisher = (row.get("publisher") or "").strip() or shared_publisher
|
|
series = (row.get("series") or "").strip() or shared_data.get("series", "")
|
|
series_index, series_suffix = parse_volume_str(row.get("volume") or "")
|
|
status = (row.get("status") or "").strip() or shared_status
|
|
|
|
year = str(row.get("year") or "").strip()
|
|
publish_date: str | None = f"{year}-01-01" if year.isdigit() and len(year) == 4 else None
|
|
|
|
row_genres = [t.strip() for t in (row.get("genres") or "").split(",") if t.strip()]
|
|
row_tags = [t.strip() for t in (row.get("tags") or "").split(",") if t.strip()]
|
|
genres = row_genres if row_genres else shared_genres
|
|
plain_tags = row_tags if row_tags else shared_tags
|
|
|
|
tmp = LIBRARY_DIR / f".import-{uuid.uuid4().hex}{suffix}"
|
|
tmp.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp.write_bytes(data)
|
|
|
|
rel = ensure_unique_rel_path(
|
|
make_rel_path(
|
|
media_type=mt,
|
|
publisher=publisher,
|
|
author=author,
|
|
title=title,
|
|
series=series,
|
|
series_index=series_index,
|
|
series_suffix=series_suffix,
|
|
ext=suffix,
|
|
)
|
|
)
|
|
dest = LIBRARY_DIR / rel
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp.replace(dest)
|
|
|
|
rel_name = rel.as_posix()
|
|
|
|
has_cover = False
|
|
try:
|
|
if mt == "cbr":
|
|
has_cover = cbr_page_count(dest) > 0
|
|
except Exception:
|
|
pass
|
|
|
|
meta = {
|
|
"media_type": mt,
|
|
"title": title,
|
|
"author": author,
|
|
"publisher": publisher,
|
|
"series": series,
|
|
"series_index": series_index,
|
|
"series_suffix": series_suffix,
|
|
"publication_status": status,
|
|
"publish_date": publish_date,
|
|
"has_cover": has_cover,
|
|
"needs_review": True,
|
|
"description": "",
|
|
"source_url": "",
|
|
}
|
|
tag_tuples = [(g, "genre") for g in genres] + [(t, "tag") for t in plain_tags]
|
|
|
|
with get_db_conn() as conn:
|
|
with conn:
|
|
upsert_book(conn, rel_name, meta, tag_tuples)
|
|
ensure_cover_cache_for_book(conn, rel_name, dest, mt)
|
|
|
|
imported.append(rel_name)
|
|
|
|
except Exception as e:
|
|
skipped.append({"file": upload.filename or "upload", "reason": str(e)})
|
|
finally:
|
|
await upload.close()
|
|
|
|
return {"ok": True, "imported": imported, "skipped": skipped}
|