sleep-meditation/containers/sleep-meditation-downloader/api.py
Ivo Oskamp 0d9f20690f Release v0.1.6
- Switch to shared build-and-push.sh; version read from docs/changelog.md
- Add docs/changelog.md; remove version.txt, .last-branch, .gitignore
- Stack: image tag via SLEEP_MEDITATION_IMAGE_TAG
- Downloader: YouTube support (yt-dlp + ffmpeg), best audio to mp3
- Downloader: Content-Type validation for direct URLs
- Downloader: auto-fetch YouTube title; title field optional
- Downloader: progress bar with phase (downloading/converting)
- Downloader: store source URL per file; show Source link in manage list

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 15:13:10 +02:00

414 lines
13 KiB
Python

import os
import re
import json
import shutil
import subprocess
import threading
from pathlib import Path
from urllib.parse import urlparse
from flask import Flask, request, jsonify, send_from_directory
import requests as http_requests
YOUTUBE_HOSTS = (
"youtube.com", "www.youtube.com", "m.youtube.com",
"music.youtube.com", "youtu.be",
)
AUDIO_CONTENT_TYPES = ("audio/", "application/ogg")
def is_youtube_url(url: str) -> bool:
try:
host = (urlparse(url).hostname or "").lower()
except Exception:
return False
return host in YOUTUBE_HOSTS
def fetch_youtube_title(url: str) -> str | None:
if not shutil.which("yt-dlp"):
return None
try:
proc = subprocess.run(
["yt-dlp", "--get-title", "--no-playlist", "--skip-download",
"--no-warnings", url],
capture_output=True, text=True, timeout=30,
)
except Exception:
return None
if proc.returncode != 0:
return None
title = (proc.stdout or "").strip().splitlines()
return title[0] if title else None
SITE_DIR = Path(__file__).parent / "site"
app = Flask(__name__)
MP3_DIR = Path(os.environ.get("MP3_DIR", "/mp3"))
DOWNLOADS_DIR = MP3_DIR / "downloads"
AUDIO_EXT = {".mp3", ".m4a", ".aac", ".wav", ".ogg"}
DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)
_downloads = {} # track_id -> {"status": ..., "error": ...}
_TITLES_FILE = DOWNLOADS_DIR / "titles.json"
_SKIP_FILE = DOWNLOADS_DIR / "skip.json"
_SOURCES_FILE = DOWNLOADS_DIR / "sources.json"
def safe_name(title: str) -> str:
name = re.sub(r"[^\w\s\-]", "", title).strip()
name = re.sub(r"\s+", " ", name)
return name[:100] or "track"
def _load_titles() -> dict:
if _TITLES_FILE.exists():
try:
with open(_TITLES_FILE) as f:
return json.load(f)
except Exception:
pass
return {}
def _save_titles(titles: dict) -> None:
with open(_TITLES_FILE, "w") as f:
json.dump(titles, f, indent=2, ensure_ascii=False)
def _load_skip() -> dict:
if _SKIP_FILE.exists():
try:
with open(_SKIP_FILE) as f:
return json.load(f)
except Exception:
pass
return {}
def _save_skip(skip: dict) -> None:
with open(_SKIP_FILE, "w") as f:
json.dump(skip, f, indent=2)
def _load_sources() -> dict:
if _SOURCES_FILE.exists():
try:
with open(_SOURCES_FILE) as f:
return json.load(f)
except Exception:
pass
return {}
def _save_sources(sources: dict) -> None:
with open(_SOURCES_FILE, "w") as f:
json.dump(sources, f, indent=2, ensure_ascii=False)
def strip_mp3_prefix(src: str) -> str:
return src.lstrip("/").removeprefix("mp3/").lstrip("/")
# ── Downloads ────────────────────────────────────────────────────────────────
@app.route("/api/downloads")
def list_downloads():
titles = _load_titles()
skip = _load_skip()
sources = _load_sources()
files = []
for f in sorted(DOWNLOADS_DIR.iterdir()):
if f.is_file() and f.suffix.lower() in AUDIO_EXT:
files.append({
"title": titles.get(f.name, f.stem),
"filename": f.name,
"src": f"/mp3/downloads/{f.name}",
"skip": skip.get(f.name, 0),
"source": sources.get(f.name, ""),
})
return jsonify(files)
@app.route("/api/download", methods=["POST"])
def start_download():
data = request.get_json(silent=True) or {}
url = (data.get("url") or "").strip()
title = (data.get("title") or "").strip()
if not url:
return jsonify({"error": "URL is required"}), 400
if not url.startswith(("http://", "https://")):
return jsonify({"error": "Only http/https URLs are supported"}), 400
if not title and is_youtube_url(url):
fetched = fetch_youtube_title(url)
if fetched:
title = fetched
if not title:
title = "track"
filename = safe_name(title) + ".mp3"
dest = DOWNLOADS_DIR / filename
track_id = filename
if _downloads.get(track_id, {}).get("status") == "downloading":
return jsonify({"status": "downloading", "track_id": track_id})
_downloads[track_id] = {"status": "downloading", "progress": 0, "phase": "starting"}
titles = _load_titles()
titles[filename] = title
_save_titles(titles)
sources = _load_sources()
sources[filename] = url
_save_sources(sources)
def fail(msg: str):
_downloads[track_id] = {"status": "error", "error": msg}
dest.unlink(missing_ok=True)
t = _load_titles()
t.pop(filename, None)
_save_titles(t)
s = _load_sources()
s.pop(filename, None)
_save_sources(s)
def download_youtube():
try:
from yt_dlp import YoutubeDL
except Exception:
fail("YouTube download not available: yt-dlp is not installed")
return
out_template = str(DOWNLOADS_DIR / (Path(filename).stem + ".%(ext)s"))
def hook(d):
st = d.get("status")
if st == "downloading":
total = d.get("total_bytes") or d.get("total_bytes_estimate")
done = d.get("downloaded_bytes") or 0
if total:
pct = max(0, min(99, int(done * 100 / total)))
_downloads[track_id]["progress"] = pct
_downloads[track_id]["phase"] = "downloading"
elif st == "finished":
_downloads[track_id]["progress"] = 99
_downloads[track_id]["phase"] = "converting"
opts = {
"format": "bestaudio/best",
"outtmpl": out_template,
"noplaylist": True,
"quiet": True,
"no_warnings": True,
"progress_hooks": [hook],
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "0",
}],
}
try:
with YoutubeDL(opts) as ydl:
ydl.download([url])
except Exception as exc:
tail = str(exc).strip().splitlines()
fail(f"YouTube download failed: {tail[-1] if tail else exc}")
return
if not dest.is_file():
fail("YouTube download failed: output file missing")
return
_downloads[track_id]["progress"] = 100
_downloads[track_id]["status"] = "done"
def download_direct():
try:
resp = http_requests.get(
url, stream=True, timeout=600,
headers={"User-Agent": "Mozilla/5.0"},
)
resp.raise_for_status()
except Exception as exc:
fail(f"Download failed: {exc}")
return
ctype = (resp.headers.get("Content-Type") or "").split(";")[0].strip().lower()
if ctype and not any(ctype.startswith(p) for p in AUDIO_CONTENT_TYPES):
resp.close()
fail(
f"URL is not a supported audio file (Content-Type: {ctype}). "
"For YouTube/streaming sites, paste the page URL directly — "
"other sites are only supported if they link straight to an audio file."
)
return
total = 0
try:
total = int(resp.headers.get("Content-Length") or 0)
except ValueError:
total = 0
try:
done = 0
with open(dest, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
done += len(chunk)
if total:
pct = max(0, min(99, int(done * 100 / total)))
_downloads[track_id]["progress"] = pct
_downloads[track_id]["phase"] = "downloading"
except Exception as exc:
fail(f"Download failed: {exc}")
return
_downloads[track_id]["progress"] = 100
_downloads[track_id]["status"] = "done"
def do_download():
if is_youtube_url(url):
download_youtube()
else:
download_direct()
threading.Thread(target=do_download, daemon=True).start()
return jsonify({"status": "downloading", "track_id": track_id})
@app.route("/api/download/status/<path:track_id>")
def download_status(track_id):
return jsonify(_downloads.get(track_id, {"status": "unknown"}))
@app.route("/api/downloads/<filename>", methods=["DELETE"])
def delete_download(filename):
if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename:
return jsonify({"error": "Invalid filename"}), 400
dest = DOWNLOADS_DIR / filename
if dest.is_file() and dest.suffix.lower() in AUDIO_EXT:
dest.unlink()
titles = _load_titles()
titles.pop(filename, None)
_save_titles(titles)
skip = _load_skip()
skip.pop(filename, None)
_save_skip(skip)
sources = _load_sources()
sources.pop(filename, None)
_save_sources(sources)
return jsonify({"status": "deleted"})
return jsonify({"error": "Not found"}), 404
@app.route("/api/downloads/<filename>/skip", methods=["POST"])
def set_skip(filename):
if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename:
return jsonify({"error": "Invalid filename"}), 400
if not (DOWNLOADS_DIR / filename).is_file():
return jsonify({"error": "Not found"}), 404
data = request.get_json(silent=True) or {}
seconds = data.get("seconds", 0)
try:
seconds = max(0, int(seconds))
except (TypeError, ValueError):
return jsonify({"error": "seconds must be a number"}), 400
skip = _load_skip()
if seconds == 0:
skip.pop(filename, None)
else:
skip[filename] = seconds
_save_skip(skip)
return jsonify({"status": "saved", "skip": seconds})
@app.route("/api/downloads/<filename>/rename", methods=["POST"])
def rename_download(filename):
if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename:
return jsonify({"error": "Invalid filename"}), 400
data = request.get_json(silent=True) or {}
new_title = (data.get("title") or "").strip()
if not new_title:
return jsonify({"error": "Title is required"}), 400
if not (DOWNLOADS_DIR / filename).is_file():
return jsonify({"error": "Not found"}), 404
titles = _load_titles()
titles[filename] = new_title
_save_titles(titles)
return jsonify({"status": "renamed"})
# ── Playlist track rename ─────────────────────────────────────────────────────
def _load_playlist():
path = MP3_DIR / "playlist.json"
if path.exists():
try:
with open(path) as f:
return json.load(f)
except Exception:
pass
tracks = []
for f in sorted(MP3_DIR.iterdir()):
if f.is_file() and f.suffix.lower() in AUDIO_EXT:
title = f.stem.replace("_", " ").replace("-", " ").strip()
tracks.append({"src": f.name, "title": title})
return tracks
def _save_playlist(tracks):
with open(MP3_DIR / "playlist.json", "w") as f:
json.dump(tracks, f, indent=2)
@app.route("/api/tracks/rename", methods=["POST"])
def rename_track():
data = request.get_json(silent=True) or {}
src = (data.get("src") or "").strip()
new_title = (data.get("title") or "").strip()
if not src or not new_title:
return jsonify({"error": "src and title are required"}), 400
bare = strip_mp3_prefix(src)
tracks = _load_playlist()
for track in tracks:
if strip_mp3_prefix(track.get("src", "")) == bare:
track["title"] = new_title
_save_playlist(tracks)
return jsonify({"status": "renamed"})
tracks.append({"src": bare, "title": new_title})
_save_playlist(tracks)
return jsonify({"status": "renamed"})
# ── Playlist tracks (read) ────────────────────────────────────────────────────
@app.route("/api/tracks")
def list_tracks():
tracks = _load_playlist()
return jsonify([
{"src": t.get("src", ""), "title": t.get("title", "")}
for t in tracks
])
# ── UI ────────────────────────────────────────────────────────────────────────
@app.route("/")
def ui_index():
return send_from_directory(SITE_DIR, "index.html")
@app.route("/<path:filename>")
def ui_static(filename):
return send_from_directory(SITE_DIR, filename)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8001, debug=False)