- Switch to shared build-and-push.sh; version read from docs/changelog.md - Add docs/changelog.md; remove version.txt, .last-branch, .gitignore - Stack: image tag via SLEEP_MEDITATION_IMAGE_TAG - Downloader: YouTube support (yt-dlp + ffmpeg), best audio to mp3 - Downloader: Content-Type validation for direct URLs - Downloader: auto-fetch YouTube title; title field optional - Downloader: progress bar with phase (downloading/converting) - Downloader: store source URL per file; show Source link in manage list Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
414 lines
13 KiB
Python
414 lines
13 KiB
Python
import os
|
|
import re
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
from flask import Flask, request, jsonify, send_from_directory
|
|
import requests as http_requests
|
|
|
|
YOUTUBE_HOSTS = (
|
|
"youtube.com", "www.youtube.com", "m.youtube.com",
|
|
"music.youtube.com", "youtu.be",
|
|
)
|
|
AUDIO_CONTENT_TYPES = ("audio/", "application/ogg")
|
|
|
|
|
|
def is_youtube_url(url: str) -> bool:
|
|
try:
|
|
host = (urlparse(url).hostname or "").lower()
|
|
except Exception:
|
|
return False
|
|
return host in YOUTUBE_HOSTS
|
|
|
|
|
|
def fetch_youtube_title(url: str) -> str | None:
|
|
if not shutil.which("yt-dlp"):
|
|
return None
|
|
try:
|
|
proc = subprocess.run(
|
|
["yt-dlp", "--get-title", "--no-playlist", "--skip-download",
|
|
"--no-warnings", url],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
except Exception:
|
|
return None
|
|
if proc.returncode != 0:
|
|
return None
|
|
title = (proc.stdout or "").strip().splitlines()
|
|
return title[0] if title else None
|
|
|
|
SITE_DIR = Path(__file__).parent / "site"
|
|
|
|
app = Flask(__name__)
|
|
|
|
MP3_DIR = Path(os.environ.get("MP3_DIR", "/mp3"))
|
|
DOWNLOADS_DIR = MP3_DIR / "downloads"
|
|
AUDIO_EXT = {".mp3", ".m4a", ".aac", ".wav", ".ogg"}
|
|
|
|
DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
_downloads = {} # track_id -> {"status": ..., "error": ...}
|
|
_TITLES_FILE = DOWNLOADS_DIR / "titles.json"
|
|
_SKIP_FILE = DOWNLOADS_DIR / "skip.json"
|
|
_SOURCES_FILE = DOWNLOADS_DIR / "sources.json"
|
|
|
|
|
|
def safe_name(title: str) -> str:
|
|
name = re.sub(r"[^\w\s\-]", "", title).strip()
|
|
name = re.sub(r"\s+", " ", name)
|
|
return name[:100] or "track"
|
|
|
|
|
|
def _load_titles() -> dict:
|
|
if _TITLES_FILE.exists():
|
|
try:
|
|
with open(_TITLES_FILE) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _save_titles(titles: dict) -> None:
|
|
with open(_TITLES_FILE, "w") as f:
|
|
json.dump(titles, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def _load_skip() -> dict:
|
|
if _SKIP_FILE.exists():
|
|
try:
|
|
with open(_SKIP_FILE) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _save_skip(skip: dict) -> None:
|
|
with open(_SKIP_FILE, "w") as f:
|
|
json.dump(skip, f, indent=2)
|
|
|
|
|
|
def _load_sources() -> dict:
|
|
if _SOURCES_FILE.exists():
|
|
try:
|
|
with open(_SOURCES_FILE) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _save_sources(sources: dict) -> None:
|
|
with open(_SOURCES_FILE, "w") as f:
|
|
json.dump(sources, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def strip_mp3_prefix(src: str) -> str:
|
|
return src.lstrip("/").removeprefix("mp3/").lstrip("/")
|
|
|
|
|
|
# ── Downloads ────────────────────────────────────────────────────────────────
|
|
|
|
@app.route("/api/downloads")
|
|
def list_downloads():
|
|
titles = _load_titles()
|
|
skip = _load_skip()
|
|
sources = _load_sources()
|
|
files = []
|
|
for f in sorted(DOWNLOADS_DIR.iterdir()):
|
|
if f.is_file() and f.suffix.lower() in AUDIO_EXT:
|
|
files.append({
|
|
"title": titles.get(f.name, f.stem),
|
|
"filename": f.name,
|
|
"src": f"/mp3/downloads/{f.name}",
|
|
"skip": skip.get(f.name, 0),
|
|
"source": sources.get(f.name, ""),
|
|
})
|
|
return jsonify(files)
|
|
|
|
|
|
@app.route("/api/download", methods=["POST"])
|
|
def start_download():
|
|
data = request.get_json(silent=True) or {}
|
|
url = (data.get("url") or "").strip()
|
|
title = (data.get("title") or "").strip()
|
|
|
|
if not url:
|
|
return jsonify({"error": "URL is required"}), 400
|
|
if not url.startswith(("http://", "https://")):
|
|
return jsonify({"error": "Only http/https URLs are supported"}), 400
|
|
|
|
if not title and is_youtube_url(url):
|
|
fetched = fetch_youtube_title(url)
|
|
if fetched:
|
|
title = fetched
|
|
if not title:
|
|
title = "track"
|
|
|
|
filename = safe_name(title) + ".mp3"
|
|
dest = DOWNLOADS_DIR / filename
|
|
track_id = filename
|
|
|
|
if _downloads.get(track_id, {}).get("status") == "downloading":
|
|
return jsonify({"status": "downloading", "track_id": track_id})
|
|
|
|
_downloads[track_id] = {"status": "downloading", "progress": 0, "phase": "starting"}
|
|
|
|
titles = _load_titles()
|
|
titles[filename] = title
|
|
_save_titles(titles)
|
|
|
|
sources = _load_sources()
|
|
sources[filename] = url
|
|
_save_sources(sources)
|
|
|
|
def fail(msg: str):
|
|
_downloads[track_id] = {"status": "error", "error": msg}
|
|
dest.unlink(missing_ok=True)
|
|
t = _load_titles()
|
|
t.pop(filename, None)
|
|
_save_titles(t)
|
|
s = _load_sources()
|
|
s.pop(filename, None)
|
|
_save_sources(s)
|
|
|
|
def download_youtube():
|
|
try:
|
|
from yt_dlp import YoutubeDL
|
|
except Exception:
|
|
fail("YouTube download not available: yt-dlp is not installed")
|
|
return
|
|
|
|
out_template = str(DOWNLOADS_DIR / (Path(filename).stem + ".%(ext)s"))
|
|
|
|
def hook(d):
|
|
st = d.get("status")
|
|
if st == "downloading":
|
|
total = d.get("total_bytes") or d.get("total_bytes_estimate")
|
|
done = d.get("downloaded_bytes") or 0
|
|
if total:
|
|
pct = max(0, min(99, int(done * 100 / total)))
|
|
_downloads[track_id]["progress"] = pct
|
|
_downloads[track_id]["phase"] = "downloading"
|
|
elif st == "finished":
|
|
_downloads[track_id]["progress"] = 99
|
|
_downloads[track_id]["phase"] = "converting"
|
|
|
|
opts = {
|
|
"format": "bestaudio/best",
|
|
"outtmpl": out_template,
|
|
"noplaylist": True,
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"progress_hooks": [hook],
|
|
"postprocessors": [{
|
|
"key": "FFmpegExtractAudio",
|
|
"preferredcodec": "mp3",
|
|
"preferredquality": "0",
|
|
}],
|
|
}
|
|
try:
|
|
with YoutubeDL(opts) as ydl:
|
|
ydl.download([url])
|
|
except Exception as exc:
|
|
tail = str(exc).strip().splitlines()
|
|
fail(f"YouTube download failed: {tail[-1] if tail else exc}")
|
|
return
|
|
if not dest.is_file():
|
|
fail("YouTube download failed: output file missing")
|
|
return
|
|
_downloads[track_id]["progress"] = 100
|
|
_downloads[track_id]["status"] = "done"
|
|
|
|
def download_direct():
|
|
try:
|
|
resp = http_requests.get(
|
|
url, stream=True, timeout=600,
|
|
headers={"User-Agent": "Mozilla/5.0"},
|
|
)
|
|
resp.raise_for_status()
|
|
except Exception as exc:
|
|
fail(f"Download failed: {exc}")
|
|
return
|
|
|
|
ctype = (resp.headers.get("Content-Type") or "").split(";")[0].strip().lower()
|
|
if ctype and not any(ctype.startswith(p) for p in AUDIO_CONTENT_TYPES):
|
|
resp.close()
|
|
fail(
|
|
f"URL is not a supported audio file (Content-Type: {ctype}). "
|
|
"For YouTube/streaming sites, paste the page URL directly — "
|
|
"other sites are only supported if they link straight to an audio file."
|
|
)
|
|
return
|
|
|
|
total = 0
|
|
try:
|
|
total = int(resp.headers.get("Content-Length") or 0)
|
|
except ValueError:
|
|
total = 0
|
|
try:
|
|
done = 0
|
|
with open(dest, "wb") as f:
|
|
for chunk in resp.iter_content(chunk_size=65536):
|
|
f.write(chunk)
|
|
done += len(chunk)
|
|
if total:
|
|
pct = max(0, min(99, int(done * 100 / total)))
|
|
_downloads[track_id]["progress"] = pct
|
|
_downloads[track_id]["phase"] = "downloading"
|
|
except Exception as exc:
|
|
fail(f"Download failed: {exc}")
|
|
return
|
|
_downloads[track_id]["progress"] = 100
|
|
_downloads[track_id]["status"] = "done"
|
|
|
|
def do_download():
|
|
if is_youtube_url(url):
|
|
download_youtube()
|
|
else:
|
|
download_direct()
|
|
|
|
threading.Thread(target=do_download, daemon=True).start()
|
|
return jsonify({"status": "downloading", "track_id": track_id})
|
|
|
|
|
|
@app.route("/api/download/status/<path:track_id>")
|
|
def download_status(track_id):
|
|
return jsonify(_downloads.get(track_id, {"status": "unknown"}))
|
|
|
|
|
|
@app.route("/api/downloads/<filename>", methods=["DELETE"])
|
|
def delete_download(filename):
|
|
if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename:
|
|
return jsonify({"error": "Invalid filename"}), 400
|
|
dest = DOWNLOADS_DIR / filename
|
|
if dest.is_file() and dest.suffix.lower() in AUDIO_EXT:
|
|
dest.unlink()
|
|
titles = _load_titles()
|
|
titles.pop(filename, None)
|
|
_save_titles(titles)
|
|
skip = _load_skip()
|
|
skip.pop(filename, None)
|
|
_save_skip(skip)
|
|
sources = _load_sources()
|
|
sources.pop(filename, None)
|
|
_save_sources(sources)
|
|
return jsonify({"status": "deleted"})
|
|
return jsonify({"error": "Not found"}), 404
|
|
|
|
|
|
@app.route("/api/downloads/<filename>/skip", methods=["POST"])
|
|
def set_skip(filename):
|
|
if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename:
|
|
return jsonify({"error": "Invalid filename"}), 400
|
|
if not (DOWNLOADS_DIR / filename).is_file():
|
|
return jsonify({"error": "Not found"}), 404
|
|
data = request.get_json(silent=True) or {}
|
|
seconds = data.get("seconds", 0)
|
|
try:
|
|
seconds = max(0, int(seconds))
|
|
except (TypeError, ValueError):
|
|
return jsonify({"error": "seconds must be a number"}), 400
|
|
skip = _load_skip()
|
|
if seconds == 0:
|
|
skip.pop(filename, None)
|
|
else:
|
|
skip[filename] = seconds
|
|
_save_skip(skip)
|
|
return jsonify({"status": "saved", "skip": seconds})
|
|
|
|
|
|
@app.route("/api/downloads/<filename>/rename", methods=["POST"])
|
|
def rename_download(filename):
|
|
if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename:
|
|
return jsonify({"error": "Invalid filename"}), 400
|
|
data = request.get_json(silent=True) or {}
|
|
new_title = (data.get("title") or "").strip()
|
|
if not new_title:
|
|
return jsonify({"error": "Title is required"}), 400
|
|
|
|
if not (DOWNLOADS_DIR / filename).is_file():
|
|
return jsonify({"error": "Not found"}), 404
|
|
|
|
titles = _load_titles()
|
|
titles[filename] = new_title
|
|
_save_titles(titles)
|
|
return jsonify({"status": "renamed"})
|
|
|
|
|
|
# ── Playlist track rename ─────────────────────────────────────────────────────
|
|
|
|
def _load_playlist():
|
|
path = MP3_DIR / "playlist.json"
|
|
if path.exists():
|
|
try:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
tracks = []
|
|
for f in sorted(MP3_DIR.iterdir()):
|
|
if f.is_file() and f.suffix.lower() in AUDIO_EXT:
|
|
title = f.stem.replace("_", " ").replace("-", " ").strip()
|
|
tracks.append({"src": f.name, "title": title})
|
|
return tracks
|
|
|
|
|
|
def _save_playlist(tracks):
|
|
with open(MP3_DIR / "playlist.json", "w") as f:
|
|
json.dump(tracks, f, indent=2)
|
|
|
|
|
|
@app.route("/api/tracks/rename", methods=["POST"])
|
|
def rename_track():
|
|
data = request.get_json(silent=True) or {}
|
|
src = (data.get("src") or "").strip()
|
|
new_title = (data.get("title") or "").strip()
|
|
|
|
if not src or not new_title:
|
|
return jsonify({"error": "src and title are required"}), 400
|
|
|
|
bare = strip_mp3_prefix(src)
|
|
tracks = _load_playlist()
|
|
|
|
for track in tracks:
|
|
if strip_mp3_prefix(track.get("src", "")) == bare:
|
|
track["title"] = new_title
|
|
_save_playlist(tracks)
|
|
return jsonify({"status": "renamed"})
|
|
|
|
tracks.append({"src": bare, "title": new_title})
|
|
_save_playlist(tracks)
|
|
return jsonify({"status": "renamed"})
|
|
|
|
|
|
# ── Playlist tracks (read) ────────────────────────────────────────────────────
|
|
|
|
@app.route("/api/tracks")
|
|
def list_tracks():
|
|
tracks = _load_playlist()
|
|
return jsonify([
|
|
{"src": t.get("src", ""), "title": t.get("title", "")}
|
|
for t in tracks
|
|
])
|
|
|
|
|
|
# ── UI ────────────────────────────────────────────────────────────────────────
|
|
|
|
@app.route("/")
|
|
def ui_index():
|
|
return send_from_directory(SITE_DIR, "index.html")
|
|
|
|
|
|
@app.route("/<path:filename>")
|
|
def ui_static(filename):
|
|
return send_from_directory(SITE_DIR, filename)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=8001, debug=False)
|