novela/containers/novela/routers/editor.py
2026-04-15 21:39:20 +02:00

502 lines
20 KiB
Python

import html as _html
import posixpath
import re
import uuid
import zipfile as zf
from pathlib import Path
from bs4 import BeautifulSoup
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse, Response
from shared_templates import templates
from db import get_db_conn
from epub import read_epub_file, write_epub_file
from routers.common import LIBRARY_DIR, is_db_filename, resolve_library_path, upsert_chapter
router = APIRouter()
def _norm(base_dir: str, rel: str) -> str:
rel = (rel or "").split("#", 1)[0].strip()
if not rel:
return ""
joined = posixpath.normpath(posixpath.join(base_dir, rel))
return joined.lstrip("./")
def _epub_spine(path: Path) -> list[dict]:
with zf.ZipFile(path, "r") as z:
names = set(z.namelist())
opf_path = "OEBPS/content.opf"
try:
container_xml = z.read("META-INF/container.xml").decode("utf-8", errors="replace")
m = re.search(r"full-path\\s*=\\s*['\"]([^'\"]+)['\"]", container_xml)
if m:
opf_path = m.group(1)
except Exception:
pass
if opf_path not in names:
candidates = [n for n in names if n.lower().endswith(".opf")]
if not candidates:
return []
opf_path = sorted(candidates)[0]
opf_xml = z.read(opf_path).decode("utf-8", errors="replace")
opf = BeautifulSoup(opf_xml, "xml")
opf_dir = posixpath.dirname(opf_path)
manifest: dict[str, str] = {}
for item in opf.find_all("item"):
iid = item.get("id")
href = item.get("href")
if iid and href:
manifest[iid] = _norm(opf_dir, href)
spine_idrefs: list[str] = []
spine_tag = opf.find("spine")
toc_id = spine_tag.get("toc") if spine_tag else None
if spine_tag:
for ir in spine_tag.find_all("itemref"):
rid = ir.get("idref")
if rid:
spine_idrefs.append(rid)
hrefs = [manifest[rid] for rid in spine_idrefs if rid in manifest]
href_to_title: dict[str, str] = {}
ncx_path = ""
if toc_id and toc_id in manifest:
ncx_path = manifest[toc_id]
elif "toc.ncx" in names:
ncx_path = "toc.ncx"
elif "OEBPS/toc.ncx" in names:
ncx_path = "OEBPS/toc.ncx"
if ncx_path and ncx_path in names:
try:
ncx_xml = z.read(ncx_path).decode("utf-8", errors="replace")
ncx = BeautifulSoup(ncx_xml, "xml")
ncx_dir = posixpath.dirname(ncx_path)
for np in ncx.find_all("navPoint"):
content = np.find("content")
label_tag = np.find("text")
src = content.get("src") if content else ""
label = label_tag.get_text(strip=True) if label_tag else ""
if src and label:
href_to_title[_norm(ncx_dir, src)] = _html.unescape(label)
except Exception:
pass
chapters = []
for i, href in enumerate(hrefs):
base = posixpath.basename(href)
title = href_to_title.get(href, re.sub(r"\.(xhtml|html|htm)$", "", base, flags=re.I))
chapters.append({"index": i, "title": title or f"Chapter {i+1}", "href": href})
return chapters
def _norm_href(base_dir: str, rel: str) -> str:
rel = (rel or "").split("#", 1)[0].strip()
if not rel:
return ""
return posixpath.normpath(posixpath.join(base_dir, rel)).lstrip("./")
def _find_opf_path(names: set[str], container_xml: str | None) -> str | None:
opf_path = "OEBPS/content.opf"
if container_xml:
m = re.search(r"full-path\s*=\s*['\"]([^'\"]+)['\"]", container_xml)
if m:
opf_path = m.group(1)
if opf_path in names:
return opf_path
candidates = sorted(n for n in names if n.lower().endswith(".opf"))
return candidates[0] if candidates else None
def _make_new_chapter_xhtml(title: str) -> str:
safe_title = _html.escape((title or "New chapter").strip() or "New chapter")
return (
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
"<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n"
" \"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">\n"
"<html xmlns=\"http://www.w3.org/1999/xhtml\" xml:lang=\"en\">\n"
"<head>\n"
" <meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\"/>\n"
f" <title>{safe_title}</title>\n"
" <link rel=\"stylesheet\" type=\"text/css\" href=\"../Styles/style.css\"/>\n"
"</head>\n"
"<body>\n"
f" <h2 class=\"chapter-title\">{safe_title}</h2>\n"
" <p></p>\n"
"</body>\n"
"</html>\n"
)
def _rewrite_epub_entries(epub_path: Path, updates: dict[str, bytes], remove_paths: set[str] | None = None) -> None:
remove_paths = set(remove_paths or set())
tmp = epub_path.with_suffix(".tmp.epub")
with zf.ZipFile(epub_path, "r") as zin, zf.ZipFile(tmp, "w", compression=zf.ZIP_DEFLATED) as zout:
names = zin.namelist()
for name in names:
if name in remove_paths:
continue
if name in updates:
zout.writestr(name, updates[name])
else:
zout.writestr(name, zin.read(name))
for name, data in updates.items():
if name not in names:
zout.writestr(name, data)
tmp.replace(epub_path)
@router.get("/library/editor/{filename:path}", response_class=HTMLResponse)
async def editor_page(filename: str, request: Request):
if not is_db_filename(filename):
path = resolve_library_path(filename)
if path is None or not path.exists():
return HTMLResponse("Not found", status_code=404)
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT title FROM library WHERE filename = %s", (filename,))
row = cur.fetchone()
if not row:
return HTMLResponse("Not found", status_code=404)
title = row[0] if row[0] else filename
return templates.TemplateResponse(request, "editor.html", {
"filename": filename,
"title": title,
"is_db": is_db_filename(filename),
})
@router.get("/api/edit/chapter/{index:int}/{filename:path}")
async def get_edit_chapter(filename: str, index: int):
if is_db_filename(filename):
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT title, content FROM book_chapters WHERE filename = %s AND chapter_index = %s",
(filename, index),
)
row = cur.fetchone()
if not row:
return Response(status_code=404)
return JSONResponse({"index": index, "href": f"db:{index}", "title": row[0], "content": row[1]})
path = resolve_library_path(filename)
if path is None or not path.exists():
return Response(status_code=404)
spine = _epub_spine(path)
if index < 0 or index >= len(spine):
return Response(status_code=404)
ch = spine[index]
content = read_epub_file(path, ch["href"])
return JSONResponse({"index": index, "href": ch["href"], "title": ch["title"], "content": content})
@router.post("/api/edit/chapter/{index:int}/{filename:path}")
async def save_edit_chapter(filename: str, index: int, request: Request):
body = await request.json()
content = body.get("content", "")
if is_db_filename(filename):
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT title FROM book_chapters WHERE filename = %s AND chapter_index = %s",
(filename, index),
)
row = cur.fetchone()
if not row:
return JSONResponse({"error": "Chapter not found"}, status_code=404)
new_title = (body.get("title") or "").strip() or row[0]
with conn:
upsert_chapter(conn, filename, index, new_title, content)
return JSONResponse({"ok": True})
path = resolve_library_path(filename)
if path is None:
return JSONResponse({"error": "not found"}, status_code=404)
if not path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
if not content:
return JSONResponse({"error": "No content"}, status_code=400)
spine = _epub_spine(path)
if index < 0 or index >= len(spine):
return JSONResponse({"error": "Chapter not found"}, status_code=404)
href = spine[index]["href"]
try:
write_epub_file(path, href, content)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
return JSONResponse({"ok": True})
@router.post("/api/edit/chapter/add/{filename:path}")
async def add_edit_chapter(filename: str, request: Request):
body = await request.json()
title = (body.get("title") or "New chapter").strip() or "New chapter"
after_index = body.get("after_index", -1)
if is_db_filename(filename):
try:
after_index = int(after_index)
except Exception:
after_index = -1
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM book_chapters WHERE filename = %s", (filename,))
total = cur.fetchone()[0]
cur.execute("SELECT 1 FROM library WHERE filename = %s", (filename,))
if not cur.fetchone():
return JSONResponse({"error": "not found"}, status_code=404)
insert_idx = total if after_index < 0 or after_index >= total else after_index + 1
with conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE book_chapters SET chapter_index = chapter_index + 1 WHERE filename = %s AND chapter_index >= %s",
(filename, insert_idx),
)
upsert_chapter(conn, filename, insert_idx, title, "")
return JSONResponse({"ok": True, "index": insert_idx, "count": total + 1})
path = resolve_library_path(filename)
if path is None:
return JSONResponse({"error": "not found"}, status_code=404)
if not path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
try:
after_index = int(after_index)
except Exception:
after_index = -1
try:
after_index = int(after_index)
except Exception:
after_index = -1
with zf.ZipFile(path, "r") as z:
names = set(z.namelist())
container_xml = z.read("META-INF/container.xml").decode("utf-8", errors="replace") if "META-INF/container.xml" in names else None
opf_path = _find_opf_path(names, container_xml)
if not opf_path:
return JSONResponse({"error": "OPF not found"}, status_code=400)
opf_xml = z.read(opf_path).decode("utf-8", errors="replace")
opf = BeautifulSoup(opf_xml, "xml")
opf_dir = posixpath.dirname(opf_path)
manifest = {}
for item in opf.find_all("item"):
iid = item.get("id")
href = item.get("href")
if iid and href:
manifest[iid] = _norm_href(opf_dir, href)
spine_tag = opf.find("spine")
if not spine_tag:
return JSONResponse({"error": "Invalid OPF spine"}, status_code=400)
itemrefs = spine_tag.find_all("itemref")
current_len = len(itemrefs)
if after_index < -1:
after_index = -1
if after_index >= current_len:
after_index = current_len - 1
ref_dir_rel = "Text"
if current_len > 0 and after_index >= 0:
ref_idref = itemrefs[after_index].get("idref", "")
ref_abs = manifest.get(ref_idref, "")
if ref_abs:
ref_rel = posixpath.relpath(ref_abs, opf_dir)
ref_dir_rel = posixpath.dirname(ref_rel) or ""
while True:
stem = f"chapter_added_{uuid.uuid4().hex[:8]}"
rel = posixpath.join(ref_dir_rel, f"{stem}.xhtml") if ref_dir_rel else f"{stem}.xhtml"
abs_path = _norm_href(opf_dir, rel)
if abs_path not in names:
break
existing_ids = {item.get("id") for item in opf.find_all("item") if item.get("id")}
i = 1
new_id = f"ch_add_{i:03d}"
while new_id in existing_ids:
i += 1
new_id = f"ch_add_{i:03d}"
manifest_tag = opf.find("manifest")
if not manifest_tag:
return JSONResponse({"error": "Invalid OPF manifest"}, status_code=400)
new_item = opf.new_tag("item")
new_item["id"] = new_id
new_item["href"] = rel
new_item["media-type"] = "application/xhtml+xml"
manifest_tag.append(new_item)
new_itemref = opf.new_tag("itemref")
new_itemref["idref"] = new_id
if after_index >= 0 and after_index + 1 < len(itemrefs):
itemrefs[after_index + 1].insert_before(new_itemref)
else:
spine_tag.append(new_itemref)
toc_id = spine_tag.get("toc")
ncx_path = manifest.get(toc_id, "") if toc_id else ""
if not ncx_path:
for item in opf.find_all("item"):
mt = (item.get("media-type") or "").lower()
if mt == "application/x-dtbncx+xml" and item.get("href"):
ncx_path = _norm_href(opf_dir, item.get("href"))
break
updates: dict[str, bytes] = {opf_path: str(opf).encode("utf-8")}
if ncx_path and ncx_path in names:
ncx_xml = z.read(ncx_path).decode("utf-8", errors="replace")
ncx = BeautifulSoup(ncx_xml, "xml")
nav_map = ncx.find("navMap")
if nav_map:
nav_points = nav_map.find_all("navPoint")
np = ncx.new_tag("navPoint")
np["id"] = f"{new_id}_nav"
label = ncx.new_tag("navLabel")
text = ncx.new_tag("text")
text.string = title
label.append(text)
content = ncx.new_tag("content")
ncx_dir = posixpath.dirname(ncx_path)
content["src"] = posixpath.relpath(abs_path, ncx_dir)
np.append(label)
np.append(content)
insert_pos = after_index + 1
if 0 <= insert_pos < len(nav_points):
nav_points[insert_pos].insert_before(np)
else:
nav_map.append(np)
for idx, node in enumerate(nav_map.find_all("navPoint"), 1):
node["playOrder"] = str(idx)
updates[ncx_path] = str(ncx).encode("utf-8")
updates[abs_path] = _make_new_chapter_xhtml(title).encode("utf-8")
_rewrite_epub_entries(path, updates)
new_spine = _epub_spine(path)
new_index = min(max(after_index + 1, 0), max(len(new_spine) - 1, 0))
return JSONResponse({"ok": True, "index": new_index, "count": len(new_spine)})
@router.delete("/api/edit/chapter/{index:int}/{filename:path}")
async def delete_edit_chapter(filename: str, index: int):
if is_db_filename(filename):
with get_db_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM book_chapters WHERE filename = %s", (filename,))
total = cur.fetchone()[0]
if total <= 1:
return JSONResponse({"error": "Cannot delete the last chapter"}, status_code=400)
with conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM book_chapters WHERE filename = %s AND chapter_index = %s",
(filename, index),
)
cur.execute(
"UPDATE book_chapters SET chapter_index = chapter_index - 1 WHERE filename = %s AND chapter_index > %s",
(filename, index),
)
new_total = total - 1
return JSONResponse({"ok": True, "index": min(index, new_total - 1), "count": new_total})
path = resolve_library_path(filename)
if path is None:
return JSONResponse({"error": "not found"}, status_code=404)
if not path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
with zf.ZipFile(path, "r") as z:
names = set(z.namelist())
container_xml = z.read("META-INF/container.xml").decode("utf-8", errors="replace") if "META-INF/container.xml" in names else None
opf_path = _find_opf_path(names, container_xml)
if not opf_path:
return JSONResponse({"error": "OPF not found"}, status_code=400)
opf_xml = z.read(opf_path).decode("utf-8", errors="replace")
opf = BeautifulSoup(opf_xml, "xml")
opf_dir = posixpath.dirname(opf_path)
manifest = {}
for item in opf.find_all("item"):
iid = item.get("id")
href = item.get("href")
if iid and href:
manifest[iid] = _norm_href(opf_dir, href)
spine_tag = opf.find("spine")
if not spine_tag:
return JSONResponse({"error": "Invalid OPF spine"}, status_code=400)
itemrefs = spine_tag.find_all("itemref")
if index < 0 or index >= len(itemrefs):
return JSONResponse({"error": "Chapter not found"}, status_code=404)
if len(itemrefs) <= 1:
return JSONResponse({"error": "Cannot delete the last chapter"}, status_code=400)
target_idref = itemrefs[index].get("idref", "")
target_href = manifest.get(target_idref, "")
if not target_href:
return JSONResponse({"error": "Chapter target missing in manifest"}, status_code=400)
itemrefs[index].decompose()
manifest_tag = opf.find("manifest")
if manifest_tag:
for item in manifest_tag.find_all("item"):
if item.get("id") == target_idref:
item.decompose()
break
toc_id = spine_tag.get("toc")
ncx_path = manifest.get(toc_id, "") if toc_id else ""
if not ncx_path:
for item in opf.find_all("item"):
mt = (item.get("media-type") or "").lower()
if mt == "application/x-dtbncx+xml" and item.get("href"):
ncx_path = _norm_href(opf_dir, item.get("href"))
break
updates: dict[str, bytes] = {opf_path: str(opf).encode("utf-8")}
remove_paths: set[str] = {target_href}
if ncx_path and ncx_path in names:
ncx_xml = z.read(ncx_path).decode("utf-8", errors="replace")
ncx = BeautifulSoup(ncx_xml, "xml")
nav_map = ncx.find("navMap")
if nav_map:
ncx_dir = posixpath.dirname(ncx_path)
for np in nav_map.find_all("navPoint"):
content = np.find("content")
src = content.get("src") if content else ""
if src and _norm_href(ncx_dir, src) == target_href:
np.decompose()
for idx, node in enumerate(nav_map.find_all("navPoint"), 1):
node["playOrder"] = str(idx)
updates[ncx_path] = str(ncx).encode("utf-8")
_rewrite_epub_entries(path, updates, remove_paths)
new_spine = _epub_spine(path)
new_index = min(index, max(len(new_spine) - 1, 0))
return JSONResponse({"ok": True, "index": new_index, "count": len(new_spine)})