import os import re import json import shutil import subprocess import threading from pathlib import Path from urllib.parse import urlparse from flask import Flask, request, jsonify, send_from_directory import requests as http_requests YOUTUBE_HOSTS = ( "youtube.com", "www.youtube.com", "m.youtube.com", "music.youtube.com", "youtu.be", ) AUDIO_CONTENT_TYPES = ("audio/", "application/ogg") def is_youtube_url(url: str) -> bool: try: host = (urlparse(url).hostname or "").lower() except Exception: return False return host in YOUTUBE_HOSTS def fetch_youtube_title(url: str) -> str | None: if not shutil.which("yt-dlp"): return None try: proc = subprocess.run( ["yt-dlp", "--get-title", "--no-playlist", "--skip-download", "--no-warnings", url], capture_output=True, text=True, timeout=30, ) except Exception: return None if proc.returncode != 0: return None title = (proc.stdout or "").strip().splitlines() return title[0] if title else None SITE_DIR = Path(__file__).parent / "site" app = Flask(__name__) MP3_DIR = Path(os.environ.get("MP3_DIR", "/mp3")) DOWNLOADS_DIR = MP3_DIR / "downloads" AUDIO_EXT = {".mp3", ".m4a", ".aac", ".wav", ".ogg"} DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True) _downloads = {} # track_id -> {"status": ..., "error": ...} _TITLES_FILE = DOWNLOADS_DIR / "titles.json" _SKIP_FILE = DOWNLOADS_DIR / "skip.json" _SOURCES_FILE = DOWNLOADS_DIR / "sources.json" def safe_name(title: str) -> str: name = re.sub(r"[^\w\s\-]", "", title).strip() name = re.sub(r"\s+", " ", name) return name[:100] or "track" def _load_titles() -> dict: if _TITLES_FILE.exists(): try: with open(_TITLES_FILE) as f: return json.load(f) except Exception: pass return {} def _save_titles(titles: dict) -> None: with open(_TITLES_FILE, "w") as f: json.dump(titles, f, indent=2, ensure_ascii=False) def _load_skip() -> dict: if _SKIP_FILE.exists(): try: with open(_SKIP_FILE) as f: return json.load(f) except Exception: pass return {} def _save_skip(skip: dict) -> None: with open(_SKIP_FILE, "w") as f: json.dump(skip, f, indent=2) def _load_sources() -> dict: if _SOURCES_FILE.exists(): try: with open(_SOURCES_FILE) as f: return json.load(f) except Exception: pass return {} def _save_sources(sources: dict) -> None: with open(_SOURCES_FILE, "w") as f: json.dump(sources, f, indent=2, ensure_ascii=False) def strip_mp3_prefix(src: str) -> str: return src.lstrip("/").removeprefix("mp3/").lstrip("/") # ── Downloads ──────────────────────────────────────────────────────────────── @app.route("/api/downloads") def list_downloads(): titles = _load_titles() skip = _load_skip() sources = _load_sources() files = [] for f in sorted(DOWNLOADS_DIR.iterdir()): if f.is_file() and f.suffix.lower() in AUDIO_EXT: files.append({ "title": titles.get(f.name, f.stem), "filename": f.name, "src": f"/mp3/downloads/{f.name}", "skip": skip.get(f.name, 0), "source": sources.get(f.name, ""), }) return jsonify(files) @app.route("/api/download", methods=["POST"]) def start_download(): data = request.get_json(silent=True) or {} url = (data.get("url") or "").strip() title = (data.get("title") or "").strip() if not url: return jsonify({"error": "URL is required"}), 400 if not url.startswith(("http://", "https://")): return jsonify({"error": "Only http/https URLs are supported"}), 400 if not title and is_youtube_url(url): fetched = fetch_youtube_title(url) if fetched: title = fetched if not title: title = "track" filename = safe_name(title) + ".mp3" dest = DOWNLOADS_DIR / filename track_id = filename if _downloads.get(track_id, {}).get("status") == "downloading": return jsonify({"status": "downloading", "track_id": track_id}) _downloads[track_id] = {"status": "downloading", "progress": 0, "phase": "starting"} titles = _load_titles() titles[filename] = title _save_titles(titles) sources = _load_sources() sources[filename] = url _save_sources(sources) def fail(msg: str): _downloads[track_id] = {"status": "error", "error": msg} dest.unlink(missing_ok=True) t = _load_titles() t.pop(filename, None) _save_titles(t) s = _load_sources() s.pop(filename, None) _save_sources(s) def download_youtube(): try: from yt_dlp import YoutubeDL except Exception: fail("YouTube download not available: yt-dlp is not installed") return out_template = str(DOWNLOADS_DIR / (Path(filename).stem + ".%(ext)s")) def hook(d): st = d.get("status") if st == "downloading": total = d.get("total_bytes") or d.get("total_bytes_estimate") done = d.get("downloaded_bytes") or 0 if total: pct = max(0, min(99, int(done * 100 / total))) _downloads[track_id]["progress"] = pct _downloads[track_id]["phase"] = "downloading" elif st == "finished": _downloads[track_id]["progress"] = 99 _downloads[track_id]["phase"] = "converting" opts = { "format": "bestaudio/best", "outtmpl": out_template, "noplaylist": True, "quiet": True, "no_warnings": True, "progress_hooks": [hook], "postprocessors": [{ "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "0", }], } try: with YoutubeDL(opts) as ydl: ydl.download([url]) except Exception as exc: tail = str(exc).strip().splitlines() fail(f"YouTube download failed: {tail[-1] if tail else exc}") return if not dest.is_file(): fail("YouTube download failed: output file missing") return _downloads[track_id]["progress"] = 100 _downloads[track_id]["status"] = "done" def download_direct(): try: resp = http_requests.get( url, stream=True, timeout=600, headers={"User-Agent": "Mozilla/5.0"}, ) resp.raise_for_status() except Exception as exc: fail(f"Download failed: {exc}") return ctype = (resp.headers.get("Content-Type") or "").split(";")[0].strip().lower() if ctype and not any(ctype.startswith(p) for p in AUDIO_CONTENT_TYPES): resp.close() fail( f"URL is not a supported audio file (Content-Type: {ctype}). " "For YouTube/streaming sites, paste the page URL directly — " "other sites are only supported if they link straight to an audio file." ) return total = 0 try: total = int(resp.headers.get("Content-Length") or 0) except ValueError: total = 0 try: done = 0 with open(dest, "wb") as f: for chunk in resp.iter_content(chunk_size=65536): f.write(chunk) done += len(chunk) if total: pct = max(0, min(99, int(done * 100 / total))) _downloads[track_id]["progress"] = pct _downloads[track_id]["phase"] = "downloading" except Exception as exc: fail(f"Download failed: {exc}") return _downloads[track_id]["progress"] = 100 _downloads[track_id]["status"] = "done" def do_download(): if is_youtube_url(url): download_youtube() else: download_direct() threading.Thread(target=do_download, daemon=True).start() return jsonify({"status": "downloading", "track_id": track_id}) @app.route("/api/download/status/") def download_status(track_id): return jsonify(_downloads.get(track_id, {"status": "unknown"})) @app.route("/api/downloads/", methods=["DELETE"]) def delete_download(filename): if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename: return jsonify({"error": "Invalid filename"}), 400 dest = DOWNLOADS_DIR / filename if dest.is_file() and dest.suffix.lower() in AUDIO_EXT: dest.unlink() titles = _load_titles() titles.pop(filename, None) _save_titles(titles) skip = _load_skip() skip.pop(filename, None) _save_skip(skip) sources = _load_sources() sources.pop(filename, None) _save_sources(sources) return jsonify({"status": "deleted"}) return jsonify({"error": "Not found"}), 404 @app.route("/api/downloads//skip", methods=["POST"]) def set_skip(filename): if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename: return jsonify({"error": "Invalid filename"}), 400 if not (DOWNLOADS_DIR / filename).is_file(): return jsonify({"error": "Not found"}), 404 data = request.get_json(silent=True) or {} seconds = data.get("seconds", 0) try: seconds = max(0, int(seconds)) except (TypeError, ValueError): return jsonify({"error": "seconds must be a number"}), 400 skip = _load_skip() if seconds == 0: skip.pop(filename, None) else: skip[filename] = seconds _save_skip(skip) return jsonify({"status": "saved", "skip": seconds}) @app.route("/api/downloads//rename", methods=["POST"]) def rename_download(filename): if not re.match(r"^[\w\s\-\.]+$", filename) or ".." in filename: return jsonify({"error": "Invalid filename"}), 400 data = request.get_json(silent=True) or {} new_title = (data.get("title") or "").strip() if not new_title: return jsonify({"error": "Title is required"}), 400 if not (DOWNLOADS_DIR / filename).is_file(): return jsonify({"error": "Not found"}), 404 titles = _load_titles() titles[filename] = new_title _save_titles(titles) return jsonify({"status": "renamed"}) # ── Playlist track rename ───────────────────────────────────────────────────── def _load_playlist(): path = MP3_DIR / "playlist.json" if path.exists(): try: with open(path) as f: return json.load(f) except Exception: pass tracks = [] for f in sorted(MP3_DIR.iterdir()): if f.is_file() and f.suffix.lower() in AUDIO_EXT: title = f.stem.replace("_", " ").replace("-", " ").strip() tracks.append({"src": f.name, "title": title}) return tracks def _save_playlist(tracks): with open(MP3_DIR / "playlist.json", "w") as f: json.dump(tracks, f, indent=2) @app.route("/api/tracks/rename", methods=["POST"]) def rename_track(): data = request.get_json(silent=True) or {} src = (data.get("src") or "").strip() new_title = (data.get("title") or "").strip() if not src or not new_title: return jsonify({"error": "src and title are required"}), 400 bare = strip_mp3_prefix(src) tracks = _load_playlist() for track in tracks: if strip_mp3_prefix(track.get("src", "")) == bare: track["title"] = new_title _save_playlist(tracks) return jsonify({"status": "renamed"}) tracks.append({"src": bare, "title": new_title}) _save_playlist(tracks) return jsonify({"status": "renamed"}) # ── Playlist tracks (read) ──────────────────────────────────────────────────── @app.route("/api/tracks") def list_tracks(): tracks = _load_playlist() return jsonify([ {"src": t.get("src", ""), "title": t.get("title", "")} for t in tracks ]) # ── UI ──────────────────────────────────────────────────────────────────────── @app.route("/") def ui_index(): return send_from_directory(SITE_DIR, "index.html") @app.route("/") def ui_static(filename): return send_from_directory(SITE_DIR, filename) if __name__ == "__main__": app.run(host="0.0.0.0", port=8001, debug=False)