import os import json import shutil import tempfile import re from typing import List, Literal, Optional from fastapi import FastAPI, File, UploadFile, Form, HTTPException from fastapi.responses import ( FileResponse, JSONResponse, PlainTextResponse, HTMLResponse, ) from pydantic import BaseModel from faster_whisper import WhisperModel import pyzipper import soundfile as sf # noqa: F401 from docx import Document # ===================== CONFIG ===================== MODEL_SIZE = os.getenv("WHISPER_MODEL_SIZE", "large-v3") DEVICE = os.getenv("WHISPER_DEVICE", "cpu") # "cpu" or "cuda" COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8") # Keep .dct as allowed, but handle decoder failure nicely AUDIO_EXTENSIONS = ( ".wav", ".mp3", ".m4a", ".flac", ".ogg", ".opus", ".webm", ".dct", ) # Default ZIP password if user leaves password blank DEFAULT_ZIP_PASSWORD = os.getenv("DEFAULT_ZIP_PASSWORD", "dietcoke1") MEMORY_PATH = "transcribe_memory.json" MEDICAL_TERMS_PATH = "medical_terms.json" fw_model: Optional[WhisperModel] = None memory_cache: Optional[dict] = None medical_terms_cache: Optional[dict] = None WORD_RE = re.compile(r"[A-Za-z][A-Za-z\-]{2,}") STOPWORDS = { "the", "and", "for", "that", "with", "this", "have", "from", "into", "about", "will", "there", "their", "which", "your", "been", "were", "they", "them", "then", "than", "also", "some", "very", "over", "under", "after", "before", "because", "would", "could", "should", "when", "where", "what", "while", "here", "such", "much", "more", "most", "many", "each", "every", "other", "another", "those", "these", "ours", "yours", "doctor", "patient", "patients", "report", "note", } # ===================== MEMORY HELPERS ===================== def ensure_memory_file(): if not os.path.exists(MEMORY_PATH): data = {"replacements": []} with open(MEMORY_PATH, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_memory() -> dict: global memory_cache if memory_cache is not None: return memory_cache ensure_memory_file() try: with open(MEMORY_PATH, "r", encoding="utf-8") as f: memory_cache = json.load(f) except Exception: memory_cache = {"replacements": []} return memory_cache def save_memory(data: dict): global memory_cache memory_cache = data with open(MEMORY_PATH, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def apply_memory_to_text(text: str) -> str: mem = load_memory() replacements = mem.get("replacements", []) out = text for rule in replacements: src = rule.get("source") or "" dst = rule.get("target") or "" if src: out = out.replace(src, dst) return out # ===================== MEDICAL TERMS HELPERS ===================== def ensure_med_terms_file(): if not os.path.exists(MEDICAL_TERMS_PATH): data = {"terms": {}} with open(MEDICAL_TERMS_PATH, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_med_terms() -> dict: global medical_terms_cache if medical_terms_cache is not None: return medical_terms_cache ensure_med_terms_file() try: with open(MEDICAL_TERMS_PATH, "r", encoding="utf-8") as f: medical_terms_cache = json.load(f) except Exception: medical_terms_cache = {"terms": {}} return medical_terms_cache def save_med_terms(data: dict): global medical_terms_cache medical_terms_cache = data with open(MEDICAL_TERMS_PATH, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def extract_candidate_terms(text: str) -> List[str]: words = WORD_RE.findall(text) terms = set() for w in words: lw = w.lower() if len(lw) < 4: continue if any(ch.isdigit() for ch in lw): continue if lw in STOPWORDS: continue # crude heuristic: longer, non-stopword words are usually domain terms / names terms.add(lw) return list(terms) def update_med_terms_from_text(text: str) -> List[str]: """Update medical_terms.json from given text; return list of NEW terms added.""" data = load_med_terms() existing = data.get("terms", {}) terms = extract_candidate_terms(text) new_terms: List[str] = [] for t in terms: if t in existing: existing[t] += 1 else: existing[t] = 1 new_terms.append(t) data["terms"] = existing save_med_terms(data) return new_terms # ===================== MODEL HELPERS ===================== def get_model() -> WhisperModel: global fw_model if fw_model is not None: return fw_model fw_model = WhisperModel( MODEL_SIZE, device=DEVICE, compute_type=COMPUTE_TYPE, ) return fw_model def build_transcription_params(mode: str): """ Fast, CPU-friendly settings: - greedy decoding (beam_size=1, best_of=1) """ params = { "task": "transcribe", "beam_size": 1, # was 5 → faster "best_of": 1, # was 5 → faster "temperature": 0.0, } if mode == "medical_en": params["language"] = "en" params["initial_prompt"] = ( "This is an English medical dictation. Use accurate medical terminology, " "including anatomy, diseases, lab values, imaging, and medications. " "Write in a formal clinical style." ) else: # leave language autodetect for general mode params["language"] = None return params def transcribe_file(path: str, mode: str) -> str: """ Run faster-whisper on a single file, return plain text with memory applied. If the audio decoder fails (e.g. proprietary .dct), we raise a clear error. """ model = get_model() params = build_transcription_params(mode) try: segments, info = model.transcribe( path, task=params["task"], beam_size=params["beam_size"], best_of=params["best_of"], temperature=params["temperature"], language=params["language"], initial_prompt=params.get("initial_prompt"), ) except Exception as e: msg = str(e) fname = os.path.basename(path) # ffmpeg / decoder-type failures decoder_signatures = [ "Invalid data found when processing input", "error opening", "Decoder", "demuxing failed", "Could not seek to", ] if any(sig in msg for sig in decoder_signatures): raise RuntimeError( f"Audio decoder could not read file '{fname}'. " f"This often happens with proprietary .dct formats. " f"Please export/convert this dictation file to WAV or MP3 " f"using your dictation software, then upload the converted file." ) from e raise RuntimeError( f"Transcription failed for {fname}: {msg}" ) from e raw_text_parts: List[str] = [] for seg in segments: raw_text_parts.append(seg.text) raw_text = "".join(raw_text_parts).strip() final_text = apply_memory_to_text(raw_text) return final_text # ===================== Pydantic models ===================== class FileTranscript(BaseModel): filename: str text: str class TranscriptionResponse(BaseModel): mode: Literal["general", "medical_en"] combined_transcript: str items: List[FileTranscript] file_count: int audio_files: List[str] new_medical_terms: List[str] = [] class MemoryRule(BaseModel): source: str target: str class MemoryResponse(BaseModel): replacements: List[MemoryRule] class MedicalTermsResponse(BaseModel): terms: dict # ===================== OTHER HELPERS ===================== def filter_audio_files(paths: List[str]) -> List[str]: out: List[str] = [] for p in paths: _, ext = os.path.splitext(p) if ext.lower() in AUDIO_EXTENSIONS: out.append(p) return out def format_combined(results: List[FileTranscript]) -> str: parts: List[str] = [] for idx, item in enumerate(results, start=1): parts.append(f"### File {idx}: {item.filename}") parts.append("") parts.append(item.text if item.text else "[No transcript]") parts.append("") return "\n".join(parts).strip() def build_docx(results: List[FileTranscript], title: str) -> str: doc = Document() doc.add_heading(title, level=1) for idx, item in enumerate(results, start=1): doc.add_heading(f"File {idx}: {item.filename}", level=2) doc.add_paragraph(item.text if item.text else "[No transcript]") doc.add_paragraph() tmpdir = tempfile.mkdtemp(prefix="docx_") out_path = os.path.join(tmpdir, "transcripts.docx") doc.save(out_path) return out_path def save_uploads_to_temp(files: List[UploadFile]) -> List[str]: tmpdir = tempfile.mkdtemp(prefix="uploads_") local_paths: List[str] = [] for uf in files: filename = os.path.basename(uf.filename or "audio") local_path = os.path.join(tmpdir, filename) with open(local_path, "wb") as out_f: shutil.copyfileobj(uf.file, out_f) local_paths.append(local_path) return local_paths def extract_zip_to_temp(zip_file: UploadFile, password: Optional[str]) -> List[str]: tmpdir = tempfile.mkdtemp(prefix="zip_") zip_path = os.path.join(tmpdir, os.path.basename(zip_file.filename or "archive.zip")) with open(zip_path, "wb") as out_f: shutil.copyfileobj(zip_file.file, out_f) outdir = tempfile.mkdtemp(prefix="zip_files_") try: with pyzipper.AESZipFile(zip_path, "r") as zf: if password: zf.setpassword(password.encode("utf-8")) for info in zf.infolist(): if info.is_dir(): continue name = os.path.basename(info.filename) if not name: continue out_path = os.path.join(outdir, name) os.makedirs(os.path.dirname(out_path), exist_ok=True) with zf.open(info) as src, open(out_path, "wb") as dst: shutil.copyfileobj(src, dst) except (pyzipper.BadZipFile, RuntimeError, KeyError) as e: shutil.rmtree(outdir, ignore_errors=True) raise HTTPException( status_code=400, detail=f"Failed to open ZIP file. Check password / integrity. {e}", ) files = [os.path.join(outdir, f) for f in os.listdir(outdir)] return files # ===================== FastAPI app ===================== app = FastAPI( title="Whisper Large V3 – Medical Batch Transcription API (faster-whisper CPU)", description=""" HTTP API for Whisper (via faster-whisper) with: - Multi-file audio upload (including .dct where supported by ffmpeg) - Password-protected ZIP upload (default password: dietcoke1) - Option to ONLY extract ZIP and list audio names (no transcription) - NEW: ZIP → choose selected files to transcribe - Medical-biased transcription mode - Persistent word/phrase memory (replacements) - Extraction & saving of frequent 'medical terms' from transcripts - Combined transcript + DOCX export - Fast greedy decoding for CPU (beam_size=1, best_of=1) If a .dct file uses a proprietary codec that ffmpeg cannot decode, you will get a clear error suggesting to convert to WAV/MP3 first. Use `/docs` for Swagger UI and `/ui` for the web interface. """, version="2.5.0", ) @app.get("/", response_class=PlainTextResponse) def root(): return ( "Whisper Large V3 – Medical Batch Transcription API (faster-whisper)\n" "Open /docs for API documentation or /ui for the web interface.\n" ) @app.get("/health", response_class=PlainTextResponse) def health(): return "OK" @app.get("/self-test") def self_test(): """ Basic self-check: - can we create/load the faster-whisper model? - device & compute type - number of memory rules - number of collected medical terms """ try: model = get_model() _ = model mem = load_memory() num_rules = len(mem.get("replacements", [])) med = load_med_terms() med_count = len(med.get("terms", {})) return JSONResponse( { "status": "ok", "message": "Model loaded successfully.", "model_size": MODEL_SIZE, "device": DEVICE, "compute_type": COMPUTE_TYPE, "memory_rules": num_rules, "medical_terms_count": med_count, "zip_default_password": DEFAULT_ZIP_PASSWORD, "decoding": "fast (beam_size=1, best_of=1)", } ) except Exception as e: return JSONResponse( { "status": "error", "message": f"Model or memory failed to load: {e}", }, status_code=500, ) # ---------- 1. Multi-file transcription (JSON) ---------- @app.post("/api/transcribe/files", response_model=TranscriptionResponse) def transcribe_files( files: List[UploadFile] = File(..., description="One or more audio files"), mode: Literal["general", "medical_en"] = Form("medical_en"), extract_terms: bool = Form(False), ): if not files: raise HTTPException(status_code=400, detail="No files uploaded.") local_paths = save_uploads_to_temp(files) audio_paths = filter_audio_files(local_paths) if not audio_paths: raise HTTPException( status_code=400, detail=( f"No valid audio files found. " f"Supported extensions: {', '.join(AUDIO_EXTENSIONS)}" ), ) items: List[FileTranscript] = [] try: for path in audio_paths: fname = os.path.basename(path) text = transcribe_file(path, mode) items.append(FileTranscript(filename=fname, text=text)) except RuntimeError as e: msg = str(e) # If decoder can't read (common for proprietary .dct), treat as 400 if "Audio decoder could not read file" in msg: raise HTTPException(status_code=400, detail=msg) from e raise HTTPException( status_code=500, detail=f"Transcription failed: {msg}", ) from e combined = format_combined(items) filenames = [it.filename for it in items] new_terms: List[str] = [] if extract_terms and combined: new_terms = update_med_terms_from_text(combined) return TranscriptionResponse( mode=mode, combined_transcript=combined, items=items, file_count=len(items), audio_files=filenames, new_medical_terms=new_terms, ) # ---------- 2. Multi-file transcription (DOCX) ---------- @app.post("/api/transcribe/files/docx") def transcribe_files_docx( files: List[UploadFile] = File(..., description="One or more audio files"), mode: Literal["general", "medical_en"] = Form("medical_en"), extract_terms: bool = Form(False), ): if not files: raise HTTPException(status_code=400, detail="No files uploaded.") local_paths = save_uploads_to_temp(files) audio_paths = filter_audio_files(local_paths) if not audio_paths: raise HTTPException( status_code=400, detail=( f"No valid audio files found. " f"Supported extensions: {', '.join(AUDIO_EXTENSIONS)}" ), ) items: List[FileTranscript] = [] combined_text: List[str] = [] try: for path in audio_paths: fname = os.path.basename(path) text = transcribe_file(path, mode) items.append(FileTranscript(filename=fname, text=text)) combined_text.append(text) except RuntimeError as e: msg = str(e) if "Audio decoder could not read file" in msg: raise HTTPException(status_code=400, detail=msg) from e raise HTTPException( status_code=500, detail=f"Transcription failed while building DOCX: {msg}", ) from e if extract_terms and combined_text: update_med_terms_from_text("\n".join(combined_text)) docx_path = build_docx(items, "Multi-file transcription") return FileResponse( docx_path, media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", filename="transcripts_files.docx", ) # ---------- 3. ZIP EXTRACT ONLY (no transcription) ---------- @app.post("/api/zip/extract") def zip_extract_only( file: UploadFile = File(..., description="ZIP file containing audio files"), password: str = Form( "", description="ZIP password. Leave blank to use default 'dietcoke1'.", ), ): if file is None: raise HTTPException(status_code=400, detail="No ZIP uploaded.") effective_password = password if password else DEFAULT_ZIP_PASSWORD extracted_paths = extract_zip_to_temp(file, effective_password) audio_paths = filter_audio_files(extracted_paths) if not audio_paths: raise HTTPException( status_code=400, detail=( "No valid audio files in ZIP. " f"Supported extensions: {', '.join(AUDIO_EXTENSIONS)}" ), ) filenames = [os.path.basename(p) for p in audio_paths] return JSONResponse( { "status": "ok", "count": len(filenames), "audio_files": filenames, } ) # ---------- 4. ZIP transcription (JSON) – ALL FILES ---------- @app.post("/api/transcribe/zip", response_model=TranscriptionResponse) def transcribe_zip( file: UploadFile = File(..., description="ZIP file containing audio files"), password: str = Form( "", description="ZIP password. Leave blank to use default 'dietcoke1'.", ), mode: Literal["general", "medical_en"] = Form("medical_en"), extract_terms: bool = Form(False), ): if file is None: raise HTTPException(status_code=400, detail="No ZIP uploaded.") effective_password = password if password else DEFAULT_ZIP_PASSWORD extracted_paths = extract_zip_to_temp(file, effective_password) audio_paths = filter_audio_files(extracted_paths) if not audio_paths: raise HTTPException( status_code=400, detail=( "No valid audio files in ZIP. " f"Supported extensions: {', '.join(AUDIO_EXTENSIONS)}" ), ) items: List[FileTranscript] = [] try: for path in audio_paths: fname = os.path.basename(path) text = transcribe_file(path, mode) items.append(FileTranscript(filename=fname, text=text)) except RuntimeError as e: msg = str(e) if "Audio decoder could not read file" in msg: raise HTTPException(status_code=400, detail=msg) from e raise HTTPException( status_code=500, detail=f"Transcription failed (ZIP): {msg}", ) from e combined = format_combined(items) filenames = [it.filename for it in items] new_terms: List[str] = [] if extract_terms and combined: new_terms = update_med_terms_from_text(combined) return TranscriptionResponse( mode=mode, combined_transcript=combined, items=items, file_count=len(items), audio_files=filenames, new_medical_terms=new_terms, ) # ---------- 5. ZIP transcription (JSON) – SELECTED FILES ONLY ---------- @app.post("/api/transcribe/zip/selected", response_model=TranscriptionResponse) def transcribe_zip_selected( file: UploadFile = File(..., description="ZIP file containing audio files"), password: str = Form( "", description="ZIP password. Leave blank to use default 'dietcoke1'.", ), selected_files: str = Form( "", description="Comma-separated file names (inside ZIP) to transcribe", ), mode: Literal["general", "medical_en"] = Form("medical_en"), extract_terms: bool = Form(False), ): """ Extract ZIP, then ONLY transcribe the subset of files whose basenames are passed in 'selected_files' (comma-separated). """ if file is None: raise HTTPException(status_code=400, detail="No ZIP uploaded.") effective_password = password if password else DEFAULT_ZIP_PASSWORD selected_set = { name.strip() for name in (selected_files or "").split(",") if name.strip() } if not selected_set: raise HTTPException( status_code=400, detail="No selected_files provided. Please choose at least one file from the ZIP.", ) extracted_paths = extract_zip_to_temp(file, effective_password) audio_paths = filter_audio_files(extracted_paths) if not audio_paths: raise HTTPException( status_code=400, detail=( "No valid audio files in ZIP. " f"Supported extensions: {', '.join(AUDIO_EXTENSIONS)}" ), ) # Map names -> paths for quick lookup name_to_path = {} for p in audio_paths: base = os.path.basename(p) if base in selected_set: name_to_path[base] = p if not name_to_path: raise HTTPException( status_code=400, detail="None of the selected_files were found as audio inside the ZIP.", ) items: List[FileTranscript] = [] try: # keep order in which user selected (or alphabetical; here we just iterate on set intersection) for fname in sorted(name_to_path.keys()): path = name_to_path[fname] text = transcribe_file(path, mode) items.append(FileTranscript(filename=fname, text=text)) except RuntimeError as e: msg = str(e) if "Audio decoder could not read file" in msg: raise HTTPException(status_code=400, detail=msg) from e raise HTTPException( status_code=500, detail=f"Transcription failed (ZIP selected): {msg}", ) from e combined = format_combined(items) filenames = [it.filename for it in items] new_terms: List[str] = [] if extract_terms and combined: new_terms = update_med_terms_from_text(combined) return TranscriptionResponse( mode=mode, combined_transcript=combined, items=items, file_count=len(items), audio_files=filenames, new_medical_terms=new_terms, ) # ---------- 6. ZIP transcription (DOCX) – ALL FILES ---------- @app.post("/api/transcribe/zip/docx") def transcribe_zip_docx( file: UploadFile = File(..., description="ZIP file containing audio files"), password: str = Form( "", description="ZIP password. Leave blank to use default 'dietcoke1'.", ), mode: Literal["general", "medical_en"] = Form("medical_en"), extract_terms: bool = Form(False), ): if file is None: raise HTTPException(status_code=400, detail="No ZIP uploaded.") effective_password = password if password else DEFAULT_ZIP_PASSWORD extracted_paths = extract_zip_to_temp(file, effective_password) audio_paths = filter_audio_files(extracted_paths) if not audio_paths: raise HTTPException( status_code=400, detail=( "No valid audio files in ZIP. " f"Supported extensions: {', '.join(AUDIO_EXTENSIONS)}" ), ) items: List[FileTranscript] = [] combined_text: List[str] = [] try: for path in audio_paths: fname = os.path.basename(path) text = transcribe_file(path, mode) items.append(FileTranscript(filename=fname, text=text)) combined_text.append(text) except RuntimeError as e: msg = str(e) if "Audio decoder could not read file" in msg: raise HTTPException(status_code=400, detail=msg) from e raise HTTPException( status_code=500, detail=f"Transcription failed while building ZIP DOCX: {msg}", ) from e if extract_terms and combined_text: update_med_terms_from_text("\n".join(combined_text)) docx_path = build_docx(items, "ZIP transcription") return FileResponse( docx_path, media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", filename="transcripts_zip.docx", ) # ===================== MEMORY ENDPOINTS ===================== @app.get("/api/memory", response_model=MemoryResponse) def get_memory(): mem = load_memory() replacements = [ MemoryRule(source=r.get("source", ""), target=r.get("target", "")) for r in mem.get("replacements", []) ] return MemoryResponse(replacements=replacements) @app.post("/api/memory/add", response_model=MemoryResponse) def add_memory(rule: MemoryRule): mem = load_memory() repl = mem.get("replacements", []) repl = [r for r in repl if r.get("source") != rule.source] repl.append({"source": rule.source, "target": rule.target}) mem["replacements"] = repl save_memory(mem) replacements = [ MemoryRule(source=r.get("source", ""), target=r.get("target", "")) for r in mem.get("replacements", []) ] return MemoryResponse(replacements=replacements) @app.post("/api/memory/reset", response_model=MemoryResponse) def reset_memory(): mem = {"replacements": []} save_memory(mem) return MemoryResponse(replacements=[]) # ===================== MEDICAL TERMS ENDPOINTS ===================== @app.get("/api/medical-terms", response_model=MedicalTermsResponse) def get_med_terms(): data = load_med_terms() return MedicalTermsResponse(terms=data.get("terms", {})) @app.post("/api/medical-terms/reset", response_model=MedicalTermsResponse) def reset_med_terms(): data = {"terms": {}} save_med_terms(data) return MedicalTermsResponse(terms={}) # ===================== Simple HTML UI (multi-tab) ===================== HTML_UI = r""" Whisper – Medical Batch Transcription (faster-whisper)

Whisper – Medical Batch Transcription (faster-whisper CPU)

Multi-file & ZIP transcription with medical mode, .dct support (where decodable), ZIP extract-only mode, selectable ZIP files for transcription, and memory of preferred terms + collected medical vocabulary. Default ZIP password: dietcoke1. API docs: /docs.

Transcription progress
Idle

Audio files JSON & DOCX

Inputs

You can select multiple audio files.
.dct dictation files are accepted when ffmpeg can decode them. If not, you will see a clear error asking you to convert.
Finds frequent longer words (likely medical terms) and adds them into a vocabulary list for future reference.

Combined transcript

Quick example audio

1. Download this public sample file
2. Upload it above and click Transcribe → JSON

👉 Download example audio (mlk.flac)

ZIP upload Extract & Transcribe

ZIP Inputs

ZIP should contain audio or .dct files.
If you don't type anything here, the server will try password dietcoke1.
Adds frequent longer words from all transcripts in the ZIP into the shared medical vocabulary list.

Files inside ZIP (select to transcribe)

Run "Extract only & list audio files" to see files and choose which ones to transcribe.

ZIP combined transcript

Self-check Model & memory status

Use this to verify that the model is loaded and memory rules are available.

Click "Run self-test" to see status...

Memory – preferred words & corrections

Add replacements such as diabetis → diabetes mellitus.
These are applied automatically to every new transcription.

Current memory rules

Loading memory...

Collected medical terms

When you enable "Extract & save medical terms", the app collects frequent longer words here. Use this vocabulary for future fine-tuning or dictionary building.

Click "Refresh terms" to see collected vocabulary...

API example

Multi-file JSON

curl -X POST \
  "https://staraks-whisper-large-v3.hf.space/api/transcribe/files" \
  -H "Accept: application/json" \
  -F "mode=medical_en" \
  -F "extract_terms=true" \
  -F "files=@path/to/audio1.flac"

ZIP selected files JSON

curl -X POST \
  "https://staraks-whisper-large-v3.hf.space/api/transcribe/zip/selected" \
  -H "Accept: application/json" \
  -F "mode=medical_en" \
  -F "extract_terms=true" \
  -F "selected_files=file1.wav,file3.dct" \
  -F "file=@path/to/archive.zip"
""" @app.get("/ui", response_class=HTMLResponse) def get_ui(): return HTML_UI if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "7860")) uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)