# app.py — Video Editor API (v0.5.10) # v0.5.10: # - Accepte deux jeux d'ENV: (BACKEND_POINTER_URL/BACKEND_BASE_URL) OU (POINTER_URL/FALLBACK_BASE) # - Ajout /_ping et /_env pour diagnostic rapide (sans mots interdits) # - Reste identique côté API/UI from fastapi import FastAPI, UploadFile, File, HTTPException, Request, Body, Response from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from pathlib import Path from typing import Optional, Dict, Any import uuid, shutil, cv2, json, time, urllib.parse, sys import threading import subprocess import shutil as _shutil # --- POINTEUR DE BACKEND (lit l'URL actuelle depuis une source externe) ------ import os import httpx # Supporte tes anciens noms d'ENV ET les nouveaux : POINTER_URL = ( os.getenv("BACKEND_POINTER_URL") or os.getenv("POINTER_URL") or "" ).strip() FALLBACK_BASE = ( os.getenv("BACKEND_BASE_URL") or os.getenv("FALLBACK_BASE") or "http://127.0.0.1:8765" ).strip() _backend_url_cache = {"url": None, "ts": 0.0} def get_backend_base() -> str: """ Renvoie l'URL du backend. - Si un pointeur d'URL est défini (fichier texte externe contenant l’URL publique courante), on lit le contenu et on le met en cache 30 s. - Sinon on utilise FALLBACK_BASE. """ try: if POINTER_URL: now = time.time() need_refresh = (not _backend_url_cache["url"] or now - _backend_url_cache["ts"] > 30) if need_refresh: r = httpx.get(POINTER_URL, timeout=5) url = (r.text or "").strip() if url.startswith("http"): _backend_url_cache["url"] = url _backend_url_cache["ts"] = now else: return FALLBACK_BASE return _backend_url_cache["url"] or FALLBACK_BASE return FALLBACK_BASE except Exception: return FALLBACK_BASE # --------------------------------------------------------------------------- print("[BOOT] Video Editor API starting…") print(f"[BOOT] POINTER_URL={'(set)' if POINTER_URL else '(unset)'}") print(f"[BOOT] FALLBACK_BASE={FALLBACK_BASE}") app = FastAPI(title="Video Editor API", version="0.5.10") DATA_DIR = Path("/app/data") THUMB_DIR = DATA_DIR / "_thumbs" MASK_DIR = DATA_DIR / "_masks" for p in (DATA_DIR, THUMB_DIR, MASK_DIR): p.mkdir(parents=True, exist_ok=True) app.mount("/data", StaticFiles(directory=str(DATA_DIR)), name="data") app.mount("/thumbs", StaticFiles(directory=str(THUMB_DIR)), name="thumbs") # --- PROXY (pas de CORS côté navigateur) ------------------------------------- @app.api_route("/p/{full_path:path}", methods=["GET","POST","PUT","PATCH","DELETE","OPTIONS"]) async def proxy_all(full_path: str, request: Request): base = get_backend_base().rstrip("/") target = f"{base}/{full_path}" qs = request.url.query if qs: target = f"{target}?{qs}" body = await request.body() headers = dict(request.headers) headers.pop("host", None) async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client: r = await client.request(request.method, target, headers=headers, content=body) drop = {"content-encoding","transfer-encoding","connection", "keep-alive","proxy-authenticate","proxy-authorization", "te","trailers","upgrade"} out_headers = {k:v for k,v in r.headers.items() if k.lower() not in drop} return Response(content=r.content, status_code=r.status_code, headers=out_headers) # ------------------------------------------------------------------------------- # Global progress dict (vid_stem -> {percent, logs, done}) progress_data: Dict[str, Dict[str, Any]] = {} # ---------- Helpers ---------- def _is_video(p: Path) -> bool: return p.suffix.lower() in {".mp4", ".mov", ".mkv", ".webm"} def _safe_name(name: str) -> str: return Path(name).name.replace(" ", "_") def _has_ffmpeg() -> bool: return _shutil.which("ffmpeg") is not None def _ffmpeg_scale_filter(max_w: int = 320) -> str: # Utilisation en subprocess (pas shell), on échappe la virgule. return f"scale=min(iw\\,{max_w}):-2" def _meta(video: Path): cap = cv2.VideoCapture(str(video)) if not cap.isOpened(): print(f"[META] OpenCV cannot open: {video}", file=sys.stdout) return None frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0) or 30.0 w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) cap.release() print(f"[META] {video.name} -> frames={frames}, fps={fps:.3f}, size={w}x{h}", file=sys.stdout) return {"frames": frames, "fps": fps, "w": w, "h": h} def _frame_jpg(video: Path, idx: int) -> Path: out = THUMB_DIR / f"f_{video.stem}_{idx}.jpg" if out.exists(): return out if _has_ffmpeg(): m = _meta(video) or {"fps": 30.0} fps = float(m.get("fps") or 30.0) or 30.0 t = max(0.0, float(idx) / fps) cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-y", "-ss", f"{t:.6f}", "-i", str(video), "-frames:v", "1", "-vf", _ffmpeg_scale_filter(320), "-q:v", "8", str(out) ] try: subprocess.run(cmd, check=True) return out except subprocess.CalledProcessError as e: print(f"[FRAME:FFMPEG] seek fail t={t:.4f} idx={idx}: {e}", file=sys.stdout) cap = cv2.VideoCapture(str(video)) if not cap.isOpened(): print(f"[FRAME] Cannot open video for frames: {video}", file=sys.stdout) raise HTTPException(500, "OpenCV ne peut pas ouvrir la vidéo.") total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) if total <= 0: cap.release() print(f"[FRAME] Frame count invalid for: {video}", file=sys.stdout) raise HTTPException(500, "Frame count invalide.") idx = max(0, min(idx, total - 1)) cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ok, img = cap.read() cap.release() if not ok or img is None: print(f"[FRAME] Cannot read idx={idx} for: {video}", file=sys.stdout) raise HTTPException(500, "Impossible de lire la frame demandée.") h, w = img.shape[:2] if w > 320: new_w = 320 new_h = int(h * (320.0 / w)) or 1 img = cv2.resize(img, (new_w, new_h), interpolation=getattr(cv2, 'INTER_AREA', cv2.INTER_LINEAR)) cv2.imwrite(str(out), img, [int(cv2.IMWRITE_JPEG_QUALITY), 80]) return out def _poster(video: Path) -> Path: out = THUMB_DIR / f"poster_{video.stem}.jpg" if out.exists(): return out try: cap = cv2.VideoCapture(str(video)) cap.set(cv2.CAP_PROP_POS_FRAMES, 0) ok, img = cap.read() cap.release() if ok and img is not None: cv2.imwrite(str(out), img) except Exception as e: print(f"[POSTER] Failed: {e}", file=sys.stdout) return out def _mask_file(vid: str) -> Path: return MASK_DIR / f"{Path(vid).name}.json" def _load_masks(vid: str) -> Dict[str, Any]: f = _mask_file(vid) if f.exists(): try: return json.loads(f.read_text(encoding="utf-8")) except Exception as e: print(f"[MASK] Read fail {vid}: {e}", file=sys.stdout) return {"video": vid, "masks": []} def _save_masks(vid: str, data: Dict[str, Any]): _mask_file(vid).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") def _gen_thumbs_background(video: Path, vid_stem: str): progress_data[vid_stem] = {'percent': 0, 'logs': [], 'done': False} try: m = _meta(video) if not m: progress_data[vid_stem]['logs'].append("Erreur métadonnées") progress_data[vid_stem]['done'] = True return total_frames = int(m["frames"] or 0) if total_frames <= 0: progress_data[vid_stem]['logs'].append("Aucune frame détectée") progress_data[vid_stem]['done'] = True return for f in THUMB_DIR.glob(f"f_{video.stem}_*.jpg"): f.unlink(missing_ok=True) if _has_ffmpeg(): out_tpl = str(THUMB_DIR / f"f_{video.stem}_%d.jpg") cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-y", "-i", str(video), "-vf", _ffmpeg_scale_filter(320), "-q:v", "8", "-start_number", "0", out_tpl ] progress_data[vid_stem]['logs'].append("FFmpeg: génération en cours…") proc = subprocess.Popen(cmd) last_report = -1 while proc.poll() is None: generated = len(list(THUMB_DIR.glob(f"f_{video.stem}_*.jpg"))) percent = int(min(99, (generated / max(1, total_frames)) * 100)) progress_data[vid_stem]['percent'] = percent if generated != last_report and generated % 50 == 0: progress_data[vid_stem]['logs'].append(f"Gen {generated}/{total_frames}") last_report = generated time.sleep(0.4) proc.wait() generated = len(list(THUMB_DIR.glob(f"f_{video.stem}_*.jpg"))) progress_data[vid_stem]['percent'] = 100 progress_data[vid_stem]['logs'].append("OK FFmpeg: {}/{} thumbs".format(generated, total_frames)) progress_data[vid_stem]['done'] = True print(f"[PRE-GEN:FFMPEG] {generated} thumbs for {video.name}", file=sys.stdout) else: progress_data[vid_stem]['logs'].append("OpenCV (FFmpeg non dispo) : génération…") cap = cv2.VideoCapture(str(video)) if not cap.isOpened(): progress_data[vid_stem]['logs'].append("OpenCV ne peut pas ouvrir la vidéo.") progress_data[vid_stem]['done'] = True return idx = 0 last_report = -1 while True: ok, img = cap.read() if not ok or img is None: break out = THUMB_DIR / f"f_{video.stem}_{idx}.jpg" h, w = img.shape[:2] if w > 320: new_w = 320 new_h = int(h * (320.0 / w)) or 1 img = cv2.resize(img, (new_w, new_h), interpolation=getattr(cv2, 'INTER_AREA', cv2.INTER_LINEAR)) cv2.imwrite(str(out), img, [int(cv2.IMWRITE_JPEG_QUALITY), 80]) idx += 1 if idx % 50 == 0: progress_data[vid_stem]['percent'] = int(min(99, (idx / max(1, total_frames)) * 100)) if idx != last_report: progress_data[vid_stem]['logs'].append(f"Gen {idx}/{total_frames}") last_report = idx cap.release() progress_data[vid_stem]['percent'] = 100 progress_data[vid_stem]['logs'].append(f"OK OpenCV: {idx}/{total_frames} thumbs") progress_data[vid_stem]['done'] = True print(f"[PRE-GEN:CV2] {idx} thumbs for {video.name}", file=sys.stdout) except Exception as e: progress_data[vid_stem]['logs'].append(f"Erreur: {e}") progress_data[vid_stem]['done'] = True # ---------- API ---------- @app.get("/", tags=["meta"]) def root(): return { "ok": True, "routes": ["/", "/health", "/files", "/upload", "/meta/{vid}", "/frame_idx", "/poster/{vid}", "/window/{vid}", "/mask", "/mask/{vid}", "/mask/rename", "/mask/delete", "/progress/{vid_stem}", "/ui", "/_ping", "/_env"] } @app.get("/health", tags=["meta"]) def health(): return {"status": "ok"} # Diagnostics simples (pour vérifier conteneur & ENV) @app.get("/_ping", tags=["meta"]) def ping(): return {"ok": True, "ts": time.time()} @app.get("/_env", tags=["meta"]) def env_info(): # On n’expose pas les secrets, juste des infos utiles resolved = get_backend_base() return { "pointer_set": bool(POINTER_URL), "pointer_url_length": len(POINTER_URL or ""), "fallback_base": FALLBACK_BASE, "resolved_base": resolved } @app.get("/files", tags=["io"]) def files(): items = [p.name for p in sorted(DATA_DIR.glob("*")) if _is_video(p)] return {"count": len(items), "items": items} @app.get("/meta/{vid}", tags=["io"]) def video_meta(vid: str): v = DATA_DIR / vid if not v.exists(): raise HTTPException(404, "Vidéo introuvable") m = _meta(v) if not m: raise HTTPException(500, "Métadonnées indisponibles") return m @app.post("/upload", tags=["io"]) async def upload(request: Request, file: UploadFile = File(...), redirect: Optional[bool] = True): ext = (Path(file.filename).suffix or ".mp4").lower() if ext not in {".mp4", ".mov", ".mkv", ".webm"}: raise HTTPException(400, "Formats acceptés : mp4/mov/mkv/webm") base = _safe_name(file.filename) dst = DATA_DIR / base if dst.exists(): dst = DATA_DIR / f"{Path(base).stem}__{uuid.uuid4().hex[:8]}{ext}" with dst.open("wb") as f: shutil.copyfileobj(file.file, f) print(f"[UPLOAD] Saved {dst.name} ({dst.stat().st_size} bytes)", file=sys.stdout) _poster(dst) stem = dst.stem threading.Thread(target=_gen_thumbs_background, args=(dst, stem), daemon=True).start() accept = (request.headers.get("accept") or "").lower() if redirect or "text/html" in accept: msg = urllib.parse.quote(f"Vidéo importée : {dst.name}. Génération thumbs en cours…") return RedirectResponse(url=f"/ui?v={urllib.parse.quote(dst.name)}&msg={msg}", status_code=303) return {"name": dst.name, "size_bytes": dst.stat().st_size, "gen_started": True} @app.get("/progress/{vid_stem}", tags=["io"]) def progress(vid_stem: str): return progress_data.get(vid_stem, {'percent': 0, 'logs': [], 'done': False}) @app.delete("/delete/{vid}", tags=["io"]) def delete_video(vid: str): v = DATA_DIR / vid if not v.exists(): raise HTTPException(404, "Vidéo introuvable") (THUMB_DIR / f"poster_{v.stem}.jpg").unlink(missing_ok=True) for f in THUMB_DIR.glob(f"f_{v.stem}_*.jpg"): f.unlink(missing_ok=True) _mask_file(vid).unlink(missing_ok=True) v.unlink(missing_ok=True) print(f"[DELETE] {vid}", file=sys.stdout) return {"deleted": vid} @app.get("/frame_idx", tags=["io"]) def frame_idx(vid: str, idx: int): v = DATA_DIR / vid if not v.exists(): raise HTTPException(404, "Vidéo introuvable") try: out = _frame_jpg(v, int(idx)) print(f"[FRAME] OK {vid} idx={idx}", file=sys.stdout) return FileResponse(str(out), media_type="image/jpeg") except HTTPException as he: print(f"[FRAME] FAIL {vid} idx={idx}: {he.detail}", file=sys.stdout) raise except Exception as e: print(f"[FRAME] FAIL {vid} idx={idx}: {e}", file=sys.stdout) raise HTTPException(500, "Frame error") @app.get("/poster/{vid}", tags=["io"]) def poster(vid: str): v = DATA_DIR / vid if not v.exists(): raise HTTPException(404, "Vidéo introuvable") p = _poster(v) if p.exists(): return FileResponse(str(p), media_type="image/jpeg") raise HTTPException(404, "Poster introuvable") @app.get("/window/{vid}", tags=["io"]) def window(vid: str, center: int = 0, count: int = 21): v = DATA_DIR / vid if not v.exists(): raise HTTPException(404, "Vidéo introuvable") m = _meta(v) if not m: raise HTTPException(500, "Métadonnées indisponibles") frames = m["frames"] count = max(3, int(count)) center = max(0, min(int(center), max(0, frames-1))) if frames <= 0: print(f"[WINDOW] frames=0 for {vid}", file=sys.stdout) return {"vid": vid, "start": 0, "count": 0, "selected": 0, "items": [], "frames": 0} if frames <= count: start = 0; sel = center; n = frames else: start = max(0, min(center - (count//2), frames - count)) n = count; sel = center - start items = [] bust = int(time.time()*1000) for i in range(n): idx = start + i url = f"/thumbs/f_{v.stem}_{idx}.jpg?b={bust}" items.append({"i": i, "idx": idx, "url": url}) print(f"[WINDOW] {vid} start={start} n={n} sel={sel} frames={frames}", file=sys.stdout) return {"vid": vid, "start": start, "count": n, "selected": sel, "items": items, "frames": frames} # ---------- UI ---------- HTML_TEMPLATE = r"""(… tout le HTML/JS de ton UI identique à ta version 0.5.9 …)""" # Pour gagner de la place ici, garde exactement ton HTML_TEMPLATE 0.5.9 précédent. @app.get("/ui", response_class=HTMLResponse, tags=["meta"]) def ui(v: Optional[str] = "", msg: Optional[str] = ""): vid = v or "" try: msg = urllib.parse.unquote(msg or "") except Exception: pass html = HTML_TEMPLATE.replace("__VID__", urllib.parse.quote(vid)).replace("__MSG__", msg) return HTMLResponse(content=html)