novela/containers/novela/scrapers/nifty.py
2026-04-15 21:39:20 +02:00

359 lines
14 KiB
Python

import re
from email.utils import parsedate
from html import escape as he
from time import mktime
from datetime import date as _date
from urllib.parse import urljoin, urlparse
import httpx
from bs4 import BeautifulSoup
from .base import BaseScraper
# Email header field names that appear at the top of Nifty classic chapters.
_HEADER_RE = re.compile(
r"^(Date|From|Subject|Reply-To|Message-ID|MIME-Version|Content-Type|X-[\w-]+):",
re.I,
)
# Scene-break patterns in plain text (subset of xhtml.BREAK_PATTERNS for text matching).
_BREAK_RE = re.compile(
r"^("
r"[\*\-]{3,}"
r"|[~=]{3,}"
r"|#{3,}"
r"|[·•◦‣⁃]\s*[·•◦‣⁃]\s*[·•◦‣⁃]"
r"|[-–—]\s*[oO0]\s*[-–—]"
r")$"
)
class NiftyScraper(BaseScraper):
_LEAD_MARKERS = (
"notice this is a work of fiction",
"if it is illegal to read stories",
"if you enjoy this story",
"for my other stories",
"nifty archive",
"code of conduct",
"author note",
"author's note",
"disclaimer",
"this story contains",
"this story includes",
"all characters are",
"all characters depicted",
)
_TAIL_MARKERS = (
"please remember to donate",
"donate",
"support nifty",
"support the archive",
"nifty archive alliance",
"donate.nifty.org",
"nifty.org/donate",
"nifty.org/support",
"patreon",
"buy me a coffee",
"tip jar",
"become a supporter",
)
@classmethod
def matches(cls, url: str) -> bool:
return "nifty.org" in url and "new.nifty.org" not in url
async def login(self, client: httpx.AsyncClient, username: str, password: str) -> bool:
return True # no login required
# ── Helpers ───────────────────────────────────────────────────────────────
def _to_index_url(self, url: str) -> str:
"""Return the story index URL for any Nifty URL (index or chapter).
Nifty path structure:
/nifty/{category}/{subcategory}/{story}/ ← index (4 segments)
/nifty/{category}/{subcategory}/{story}/{chapter} ← chapter (5 segments)
"""
parsed = urlparse(url)
parts = [p for p in parsed.path.split("/") if p]
if len(parts) >= 5:
path = "/" + "/".join(parts[:4]) + "/"
else:
path = parsed.path.rstrip("/") + "/"
return f"{parsed.scheme}://{parsed.netloc}{path}"
def _slug_to_title(self, slug: str) -> str:
return slug.replace("-", " ").title()
def _parse_date_header(self, text: str) -> str:
"""Return YYYY-MM-DD from a 'Date: …' line, or ''."""
m = re.search(r"^Date:\s+(.+)$", text, re.M)
if not m:
return ""
try:
parsed = parsedate(m.group(1).strip())
if parsed:
return _date.fromtimestamp(mktime(parsed)).isoformat()
except Exception:
pass
return ""
def _parse_author_header(self, text: str) -> str:
"""Return author name from 'From: Name <email>' line, or ''."""
m = re.search(r"^From:\s+([^<\n]+?)(?:\s*<[^>]+>)?\s*$", text, re.M)
return m.group(1).strip() if m else ""
def _parse_subject_header(self, text: str) -> str:
"""Return the Subject header value, or ''."""
m = re.search(r"^Subject:\s+(.+)$", text, re.M)
return m.group(1).strip() if m else ""
def _normalize(self, text: str) -> str:
"""Normalise text for boilerplate comparison (lowercase, collapsed whitespace)."""
return re.sub(r"\s+", " ", text.lower()).strip()
async def _get_text(self, client: httpx.AsyncClient, url: str) -> tuple[BeautifulSoup, str]:
"""Fetch *url* and return (soup, raw_text).
Nifty classic pages wrap the story content in a <pre> element.
Falls back to the full body text if no <pre> is found.
"""
r = await client.get(url)
soup = BeautifulSoup(r.text, "html.parser")
pre = soup.find("pre")
if pre:
raw = pre.get_text()
else:
body = soup.find("body")
raw = body.get_text("\n") if body else soup.get_text("\n")
return soup, raw
def _strip_email_headers(self, text: str) -> str:
"""Remove the leading email header block (Date/From/Subject/…) from chapter text.
Tolerates blank lines between header fields — some Nifty pages place the
Subject on a separate line after a blank line:
Date: …
From: …
Subject: …
"""
lines = text.splitlines()
i = 0
# Skip leading blank lines.
while i < len(lines) and not lines[i].strip():
i += 1
# Only strip if this actually looks like an email header block.
if not any(_HEADER_RE.match(lines[j]) for j in range(i, min(i + 12, len(lines)))):
return text
# Skip header lines, tolerating blank lines between them.
# A blank line ends the block only when no further header line follows.
while i < len(lines):
stripped = lines[i].strip()
if _HEADER_RE.match(stripped):
i += 1
elif not stripped:
# Peek ahead past any blank lines.
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
if j < len(lines) and _HEADER_RE.match(lines[j].strip()):
i = j # more headers follow — jump over the blank line(s)
else:
i += 1
break # no more headers — end of block
else:
break # non-header, non-blank line — end of block
# Skip blank lines immediately after the header block.
while i < len(lines) and not lines[i].strip():
i += 1
return "\n".join(lines[i:])
def _text_to_paragraphs(self, text: str) -> list[str]:
"""Split plain text into paragraphs; join hard-wrapped lines within each paragraph.
Nifty classic stories are stored as email submissions: paragraphs are
separated by blank lines, and each line is wrapped at ~70 characters.
This function merges those wrapped lines back into a single line per
paragraph.
"""
text = text.replace("\r\n", "\n").replace("\r", "\n")
blocks = re.split(r"\n{2,}", text)
result = []
for block in blocks:
lines = [l.strip() for l in block.splitlines() if l.strip()]
if lines:
result.append(" ".join(lines))
return result
def _comment_safe(self, text: str) -> str:
return text.replace("--", "- -")
def _plain_text(self, text: str) -> str:
if "<" in text and ">" in text:
return BeautifulSoup(text, "html.parser").get_text(" ", strip=True)
return text
def _looks_like_lead_boilerplate(self, text: str) -> bool:
t = re.sub(r"\s+", " ", self._plain_text(text).lower()).strip()
if not t or len(t) > 4000:
return False
return any(m in t for m in self._LEAD_MARKERS)
def _looks_like_tail_boilerplate(self, text: str) -> bool:
t = re.sub(r"\s+", " ", self._plain_text(text).lower()).strip()
if not t or len(t) > 4000:
return False
return any(m in t for m in self._TAIL_MARKERS)
def _extract_hidden_boilerplate(self, paragraphs: list[str]) -> tuple[list[str], list[str], list[str]]:
visible = list(paragraphs)
leading: list[str] = []
trailing: list[str] = []
while visible and len(leading) < 6 and self._looks_like_lead_boilerplate(visible[0]):
leading.append(visible.pop(0))
while visible and len(trailing) < 6 and self._looks_like_tail_boilerplate(visible[-1]):
trailing.insert(0, visible.pop())
if not visible:
return list(paragraphs), [], []
return visible, leading, trailing
# ── BaseScraper interface ─────────────────────────────────────────────────
async def fetch_book_info(self, client: httpx.AsyncClient, url: str) -> dict:
index_url = self._to_index_url(url)
r = await client.get(index_url)
soup = BeautifulSoup(r.text, "html.parser")
# Title from URL slug.
slug = urlparse(index_url).path.rstrip("/").rsplit("/", 1)[-1]
book_title = self._slug_to_title(slug)
# Genres from URL path: /nifty/{category}/{subcategory}/{story}/
path_parts = [p for p in urlparse(index_url).path.split("/") if p]
category = self._slug_to_title(path_parts[1]) if len(path_parts) > 1 else ""
subcategory = self._slug_to_title(path_parts[2]) if len(path_parts) > 2 else ""
# Chapter links: all <a> tags pointing one level deeper than the index.
chapter_links: list[dict] = []
seen: set[str] = set()
for a in soup.find_all("a", href=True):
full = urljoin(index_url, a["href"])
if (
full.startswith(index_url)
and full.rstrip("/") != index_url.rstrip("/")
and full not in seen
):
seen.add(full)
chapter_links.append({"url": full, "title": a.get_text(strip=True)})
# Sort by trailing chapter number.
def _num(ch: dict) -> int:
m = re.search(r"-(\d+)$", ch["url"].rstrip("/"))
return int(m.group(1)) if m else 0
chapter_links.sort(key=_num)
for i, ch in enumerate(chapter_links, 1):
ch["title"] = f"Chapter {i}"
# Author and dates: extract from email headers in first and last chapters.
author = "Unknown author"
updated_date = ""
preamble_count = 0
if chapter_links:
_, first_text = await self._get_text(client, chapter_links[0]["url"])
author = self._parse_author_header(first_text) or author
pub_date = self._parse_date_header(first_text)
if len(chapter_links) > 1:
_, last_text = await self._get_text(client, chapter_links[-1]["url"])
updated_date = self._parse_date_header(last_text) or pub_date
else:
updated_date = pub_date
# Boilerplate detection: compare leading paragraphs of chapters 1 and 2.
# Paragraphs present in both (after header strip) are repeated preamble.
if len(chapter_links) >= 2:
_, ch2_text = await self._get_text(client, chapter_links[1]["url"])
paras1 = self._text_to_paragraphs(self._strip_email_headers(first_text))
paras2 = self._text_to_paragraphs(self._strip_email_headers(ch2_text))
for p1, p2 in zip(paras1, paras2):
if self._normalize(p1) == self._normalize(p2):
preamble_count += 1
else:
break
for ch in chapter_links:
ch["preamble_count"] = preamble_count
return {
"title": book_title,
"author": author,
"publisher": "nifty.org",
"series": "",
"series_index_hint": 0,
"genres": [],
"subgenres": [],
"tags": [t for t in [category, subcategory] if t],
"description": "",
"updated_date": updated_date,
"publication_status": "",
"source_url": index_url,
"chapters": chapter_links,
"chapter_method": "html_scan",
"index_image_url": None,
}
async def fetch_chapter(self, client: httpx.AsyncClient, ch: dict) -> dict:
_, raw_text = await self._get_text(client, ch["url"])
# Extract Subject before stripping headers; store as invisible comment.
subject = self._parse_subject_header(raw_text)
# Remove email header block.
story_text = self._strip_email_headers(raw_text)
# Convert hard-wrapped plain text to paragraphs.
paragraphs = self._text_to_paragraphs(story_text)
# Skip repeated boilerplate paragraphs at the top of each chapter.
preamble_count = ch.get("preamble_count", 0)
if preamble_count:
paragraphs = paragraphs[preamble_count:]
paragraphs, hidden_lead, hidden_tail = self._extract_hidden_boilerplate(paragraphs)
# Build an HTML fragment: subject as comment, scene-breaks as <hr/>, rest as <p>.
html_parts: list[str] = []
if subject:
html_parts.append(f"<!-- Subject: {self._comment_safe(subject)} -->")
if hidden_lead:
lead_text = " || ".join(re.sub(r"\s+", " ", p).strip() for p in hidden_lead if p.strip())
if lead_text:
html_parts.append(f"<!-- NIFTY_HIDDEN_LEAD: {self._comment_safe(lead_text)} -->")
for para in paragraphs:
if _BREAK_RE.match(para.strip()):
html_parts.append("<hr/>")
else:
html_parts.append(f"<p>{he(para)}</p>")
if hidden_tail:
tail_text = " || ".join(re.sub(r"\s+", " ", p).strip() for p in hidden_tail if p.strip())
if tail_text:
html_parts.append(f"<!-- NIFTY_HIDDEN_TAIL: {self._comment_safe(tail_text)} -->")
wrapper = BeautifulSoup(
"<div>" + "".join(html_parts) + "</div>",
"html.parser",
)
content_el = wrapper.find("div")
return {
"title": ch["title"],
"content_el": content_el,
"selector_id": None,
"selector_class": None,
}