import gradio as gr import numpy as np import torch import time import warnings from dataclasses import dataclass from typing import List, Tuple, Dict import threading import queue import os import requests from pathlib import Path import base64 # Suppress warnings warnings.filterwarnings('ignore') # Function to convert image to base64 def image_to_base64(image_path): try: with open(image_path, "rb") as img_file: return base64.b64encode(img_file.read()).decode('utf-8') except Exception as e: print(f"Error loading image {image_path}: {e}") return None # Load logos as base64 def load_logos(): logos = {} logo_files = { 'ai4s': 'ai4s_banner.png', 'surrey': 'surrey_logo.png', 'epsrc': 'EPSRC_logo.png', 'cvssp': 'CVSSP_logo.png' } for key, filename in logo_files.items(): if os.path.exists(filename): logos[key] = image_to_base64(filename) else: print(f"Logo file {filename} not found") logos[key] = None return logos # Optional imports with fallbacks try: import librosa LIBROSA_AVAILABLE = True print("✅ Librosa available") except ImportError: LIBROSA_AVAILABLE = False print("⚠️ Librosa not available, using scipy fallback") try: import webrtcvad WEBRTC_AVAILABLE = True print("✅ WebRTC VAD available") except ImportError: WEBRTC_AVAILABLE = False print("⚠️ WebRTC VAD not available, using fallback") try: import plotly.graph_objects as go from plotly.subplots import make_subplots PLOTLY_AVAILABLE = True print("✅ Plotly available") except ImportError: PLOTLY_AVAILABLE = False print("⚠️ Plotly not available") # PANNs imports try: from panns_inference import AudioTagging, labels PANNS_AVAILABLE = True print("✅ PANNs available") except ImportError: PANNS_AVAILABLE = False print("⚠️ PANNs not available, using fallback") # Transformers for AST try: from transformers import ASTForAudioClassification, ASTFeatureExtractor import transformers AST_AVAILABLE = True print("✅ AST (Transformers) available") except ImportError: AST_AVAILABLE = False print("⚠️ AST not available, using fallback") print("🚀 Creating Real-time VAD Demo...") # ===== DATA STRUCTURES ===== @dataclass class VADResult: probability: float is_speech: bool model_name: str processing_time: float timestamp: float @dataclass class OnsetOffset: onset_time: float offset_time: float model_name: str confidence: float # ===== MODEL IMPLEMENTATIONS ===== class OptimizedSileroVAD: def __init__(self): self.model = None self.sample_rate = 16000 self.model_name = "Silero-VAD" self.load_model() def load_model(self): try: self.model, _ = torch.hub.load( repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=False, onnx=False ) self.model.eval() print(f"✅ {self.model_name} loaded successfully") except Exception as e: print(f"❌ Error loading {self.model_name}: {e}") self.model = None def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult: start_time = time.time() if self.model is None or len(audio) == 0: return VADResult(0.0, False, f"{self.model_name} (unavailable)", time.time() - start_time, timestamp) try: if len(audio.shape) > 1: audio = audio.mean(axis=1) required_samples = 512 # Silero requires exactly 512 samples, handle this precisely if len(audio) != required_samples: if len(audio) > required_samples: # Take center portion to avoid edge effects start_idx = (len(audio) - required_samples) // 2 audio_chunk = audio[start_idx:start_idx + required_samples] else: # Pad symmetrically instead of just at the end pad_total = required_samples - len(audio) pad_left = pad_total // 2 pad_right = pad_total - pad_left audio_chunk = np.pad(audio, (pad_left, pad_right), 'reflect') else: audio_chunk = audio audio_tensor = torch.FloatTensor(audio_chunk).unsqueeze(0) with torch.no_grad(): speech_prob = self.model(audio_tensor, self.sample_rate).item() is_speech = speech_prob > 0.5 processing_time = time.time() - start_time return VADResult(speech_prob, is_speech, self.model_name, processing_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp) class OptimizedWebRTCVAD: def __init__(self): self.model_name = "WebRTC-VAD" self.sample_rate = 16000 self.frame_duration = 30 self.frame_size = int(self.sample_rate * self.frame_duration / 1000) if WEBRTC_AVAILABLE: try: self.vad = webrtcvad.Vad(3) print(f"✅ {self.model_name} loaded successfully") except: self.vad = None else: self.vad = None def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult: start_time = time.time() if self.vad is None or len(audio) == 0: energy = np.sum(audio ** 2) if len(audio) > 0 else 0 threshold = 0.01 probability = min(energy / threshold, 1.0) is_speech = energy > threshold return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp) try: if len(audio.shape) > 1: audio = audio.mean(axis=1) # Properly convert to int16 with clipping to avoid saturation audio_clipped = np.clip(audio, -1.0, 1.0) audio_int16 = (audio_clipped * 32767).astype(np.int16) speech_frames = 0 total_frames = 0 for i in range(0, len(audio_int16) - self.frame_size, self.frame_size): frame = audio_int16[i:i + self.frame_size].tobytes() if self.vad.is_speech(frame, self.sample_rate): speech_frames += 1 total_frames += 1 probability = speech_frames / max(total_frames, 1) is_speech = probability > 0.3 return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp) class OptimizedEPANNs: def __init__(self): self.model_name = "E-PANNs" self.sample_rate = 32000 print(f"✅ {self.model_name} initialized") def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult: start_time = time.time() try: if len(audio) == 0: return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp) if len(audio.shape) > 1: audio = audio.mean(axis=1) # Convert audio to target sample rate for E-PANNs if LIBROSA_AVAILABLE: # Resample to E-PANNs sample rate if needed audio_resampled = librosa.resample(audio.astype(float), orig_sr=16000, target_sr=self.sample_rate) # Ensure minimum length (6 seconds) using wrap mode instead of zero padding min_samples = 6 * self.sample_rate # 6 seconds if len(audio_resampled) < min_samples: if LIBROSA_AVAILABLE: audio_resampled = librosa.util.fix_length(audio_resampled, size=min_samples, mode='wrap') else: # Fallback: repeat the signal repeat_factor = int(np.ceil(min_samples / len(audio_resampled))) audio_resampled = np.tile(audio_resampled, repeat_factor)[:min_samples] mel_spec = librosa.feature.melspectrogram(y=audio_resampled, sr=self.sample_rate, n_mels=64) energy = np.mean(librosa.power_to_db(mel_spec, ref=np.max)) spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio_resampled, sr=self.sample_rate)) # Better speech detection using multiple features mfcc = librosa.feature.mfcc(y=audio_resampled, sr=self.sample_rate, n_mfcc=13) mfcc_var = np.var(mfcc, axis=1).mean() # Combine features for better speech detection speech_score = ((energy + 80) / 40) * 0.4 + (spectral_centroid / 5000) * 0.3 + (mfcc_var / 100) * 0.3 else: from scipy import signal # Basic fallback without librosa f, t, Sxx = signal.spectrogram(audio, 16000) # Use original sample rate energy = np.mean(10 * np.log10(Sxx + 1e-10)) # Simple energy-based detection as fallback speech_score = (energy + 100) / 50 probability = np.clip(speech_score, 0, 1) is_speech = probability > 0.6 return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp) class OptimizedPANNs: def __init__(self): self.model_name = "PANNs" self.sample_rate = 32000 self.model = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.load_model() def load_model(self): try: if PANNS_AVAILABLE: self.model = AudioTagging(checkpoint_path=None, device=self.device) print(f"✅ {self.model_name} loaded successfully") else: print(f"⚠️ {self.model_name} not available, using fallback") self.model = None except Exception as e: print(f"❌ Error loading {self.model_name}: {e}") self.model = None def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult: start_time = time.time() if self.model is None or len(audio) == 0: if len(audio) > 0: energy = np.sum(audio ** 2) threshold = 0.01 probability = min(energy / threshold, 1.0) is_speech = energy > threshold else: probability = 0.0 is_speech = False return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp) try: if len(audio.shape) > 1: audio = audio.mean(axis=1) # Convert audio to PANNs sample rate if LIBROSA_AVAILABLE: audio_resampled = librosa.resample(audio.astype(float), orig_sr=16000, target_sr=self.sample_rate) else: # Simple resampling fallback resample_factor = self.sample_rate / 16000 audio_resampled = np.interp( np.linspace(0, len(audio) - 1, int(len(audio) * resample_factor)), np.arange(len(audio)), audio ) # Ensure minimum length for PANNs (10 seconds) using wrap mode instead of zero padding min_samples = 10 * self.sample_rate # 10 seconds for optimal performance if len(audio_resampled) < min_samples: if LIBROSA_AVAILABLE: audio_resampled = librosa.util.fix_length(audio_resampled, size=min_samples, mode='wrap') else: # Fallback: repeat the signal repeat_factor = int(np.ceil(min_samples / len(audio_resampled))) audio_resampled = np.tile(audio_resampled, repeat_factor)[:min_samples] clip_probs, _ = self.model.inference(audio_resampled[np.newaxis, :], input_sr=self.sample_rate) # Find speech-related indices speech_indices = [] for i, lbl in enumerate(labels): if any(word in lbl.lower() for word in ['speech', 'voice', 'talk', 'conversation', 'speaking']): speech_indices.append(i) if not speech_indices: # Fallback to a known speech index if available try: speech_indices = [labels.index('Speech')] except ValueError: # If 'Speech' label doesn't exist, use first 10 indices as approximation speech_indices = list(range(min(10, len(labels)))) speech_prob = clip_probs[0, speech_indices].mean().item() return VADResult(float(speech_prob), speech_prob > 0.5, self.model_name, time.time()-start_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") if len(audio) > 0: energy = np.sum(audio ** 2) threshold = 0.01 probability = min(energy / threshold, 1.0) is_speech = energy > threshold else: probability = 0.0 is_speech = False return VADResult(probability, is_speech, f"{self.model_name} (error)", time.time() - start_time, timestamp) class OptimizedAST: def __init__(self): self.model_name = "AST" self.sample_rate = 16000 self.model = None self.feature_extractor = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.prediction_cache = {} # Cache para evitar recálculos self.cache_window = 1.0 # Cachear resultados por segundo self.load_model() def load_model(self): try: if AST_AVAILABLE: model_name = "MIT/ast-finetuned-audioset-10-10-0.4593" self.feature_extractor = ASTFeatureExtractor.from_pretrained(model_name) self.model = ASTForAudioClassification.from_pretrained(model_name) self.model.to(self.device) # Use FP16 for faster inference on GPU if self.device.type == 'cuda': self.model = self.model.half() print(f"✅ {self.model_name} loaded with FP16 optimization") else: print(f"✅ {self.model_name} loaded successfully") self.model.eval() else: print(f"⚠️ {self.model_name} not available, using fallback") self.model = None except Exception as e: print(f"❌ Error loading {self.model_name}: {e}") self.model = None def predict(self, audio: np.ndarray, timestamp: float = 0.0, full_audio: np.ndarray = None) -> VADResult: start_time = time.time() if self.model is None or len(audio) == 0: # Enhanced fallback using spectral features if len(audio) > 0: energy = np.sum(audio ** 2) if LIBROSA_AVAILABLE: spectral_features = librosa.feature.spectral_rolloff(y=audio, sr=self.sample_rate) spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=self.sample_rate)) # Combine multiple features for better speech detection probability = min((energy * 100 + spectral_centroid / 500) / 2, 1.0) else: probability = min(energy * 50, 1.0) is_speech = probability > 0.3 else: probability = 0.0 is_speech = False return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp) try: # Cache key based on timestamp rounded to cache window cache_key = int(timestamp / self.cache_window) # Check cache first if cache_key in self.prediction_cache: cached_result = self.prediction_cache[cache_key] # Return cached result with updated timestamp return VADResult( cached_result.probability, cached_result.is_speech, cached_result.model_name + " (cached)", time.time() - start_time, timestamp ) if len(audio.shape) > 1: audio = audio.mean(axis=1) # Use longer context for AST - preferably 6.4 seconds (1024 frames) if full_audio is not None and len(full_audio) >= 6.4 * self.sample_rate: # Take 6.4-second window centered around current timestamp center_pos = int(timestamp * self.sample_rate) window_size = int(3.2 * self.sample_rate) # 3.2 seconds each side start_pos = max(0, center_pos - window_size) end_pos = min(len(full_audio), center_pos + window_size) # Ensure we have at least 6.4 seconds if end_pos - start_pos < 6.4 * self.sample_rate: end_pos = min(len(full_audio), start_pos + int(6.4 * self.sample_rate)) if end_pos - start_pos < 6.4 * self.sample_rate: start_pos = max(0, end_pos - int(6.4 * self.sample_rate)) audio_for_ast = full_audio[start_pos:end_pos] else: audio_for_ast = audio # Ensure minimum length for AST (6.4 seconds for 1024 frames) min_samples = int(6.4 * self.sample_rate) # 6.4 seconds if len(audio_for_ast) < min_samples: if LIBROSA_AVAILABLE: audio_for_ast = librosa.util.fix_length(audio_for_ast, size=min_samples, mode='wrap') else: # Fallback: repeat the signal repeat_factor = int(np.ceil(min_samples / len(audio_for_ast))) audio_for_ast = np.tile(audio_for_ast, repeat_factor)[:min_samples] # Truncate if too long (AST can handle up to ~10s, but we'll use 8s max for efficiency) max_samples = 8 * self.sample_rate if len(audio_for_ast) > max_samples: audio_for_ast = audio_for_ast[:max_samples] # Feature extraction with proper AST parameters (closer to 1024 frames) inputs = self.feature_extractor( audio_for_ast, sampling_rate=self.sample_rate, return_tensors="pt", max_length=1024, # Proper AST context padding="max_length", # Ensure consistent length truncation=True ) # Move inputs to correct device and dtype inputs = {k: v.to(self.device) for k, v in inputs.items()} if self.device.type == 'cuda' and hasattr(self.model, 'half'): # Convert inputs to FP16 if model is in FP16 inputs = {k: v.half() if v.dtype == torch.float32 else v for k, v in inputs.items()} with torch.no_grad(): outputs = self.model(**inputs) logits = outputs.logits probs = torch.sigmoid(logits) # Find speech-related classes label2id = self.model.config.label2id speech_indices = [] speech_keywords = ['speech', 'voice', 'talk', 'conversation', 'speaking'] for lbl, idx in label2id.items(): if any(word in lbl.lower() for word in speech_keywords): speech_indices.append(idx) if speech_indices: speech_prob = probs[0, speech_indices].mean().item() # Apply more reasonable thresholding for AST with lower threshold if speech_prob < 0.15 and np.sum(audio_for_ast ** 2) > 0.001: speech_prob = min(speech_prob * 2.5, 0.6) # Moderate boost, cap at 0.6 else: # Fallback to energy-based detection with higher threshold energy = np.sum(audio_for_ast ** 2) / len(audio_for_ast) # Normalize by length speech_prob = min(energy * 50, 1.0) # Use lower threshold specifically for AST (0.25 instead of 0.4) is_speech_ast = speech_prob > 0.25 result = VADResult(float(speech_prob), is_speech_ast, self.model_name, time.time()-start_time, timestamp) # Cache the result self.prediction_cache[cache_key] = result # Clean old cache entries (keep only last 30 seconds for longer sessions) cache_keys_to_remove = [k for k in self.prediction_cache.keys() if k < cache_key - 30] for k in cache_keys_to_remove: del self.prediction_cache[k] return result except Exception as e: print(f"Error in {self.model_name}: {e}") # Enhanced fallback if len(audio) > 0: energy = np.sum(audio ** 2) / len(audio) # Normalize by length probability = min(energy * 100, 1.0) # More conservative scaling is_speech = energy > 0.001 # Lower threshold for fallback else: probability = 0.0 is_speech = False return VADResult(probability, is_speech, f"{self.model_name} (error)", time.time() - start_time, timestamp) # ===== AUDIO PROCESSOR ===== class AudioProcessor: def __init__(self, sample_rate=16000): self.sample_rate = sample_rate self.chunk_duration = 4.0 self.chunk_size = int(sample_rate * self.chunk_duration) self.n_fft = 2048 self.hop_length = 256 self.n_mels = 128 self.fmin = 20 self.fmax = 8000 self.base_window = 0.064 self.base_hop = 0.032 # Model-specific window sizes (each model gets appropriate context) self.model_windows = { "Silero-VAD": 0.032, # 32ms exactly as required (512 samples) "WebRTC-VAD": 0.03, # 30ms frames "E-PANNs": 6.0, # 6 seconds minimum for reliable results "PANNs": 10.0, # 10 seconds for optimal performance "AST": 6.4 # ~6.4 seconds (1024 frames * 6.25ms) } # Model-specific hop sizes for efficiency self.model_hop_sizes = { "Silero-VAD": 0.016, # 16ms hop for Silero "WebRTC-VAD": 0.01, # 10ms hop for WebRTC "E-PANNs": 1.0, # Process every 1s but with 6s window "PANNs": 2.0, # Process every 2s but with 10s window "AST": 1.0 # Process every 1s but with 6.4s window } # Model-specific thresholds for better detection self.model_thresholds = { "Silero-VAD": 0.5, "WebRTC-VAD": 0.5, "E-PANNs": 0.4, "PANNs": 0.4, "AST": 0.25 } self.delay_compensation = 0.0 self.correlation_threshold = 0.7 def process_audio(self, audio): if audio is None: return np.array([]) try: if isinstance(audio, tuple): sample_rate, audio_data = audio if sample_rate != self.sample_rate and LIBROSA_AVAILABLE: audio_data = librosa.resample(audio_data.astype(float), orig_sr=sample_rate, target_sr=self.sample_rate) else: audio_data = audio if len(audio_data.shape) > 1: audio_data = audio_data.mean(axis=1) if np.max(np.abs(audio_data)) > 0: audio_data = audio_data / np.max(np.abs(audio_data)) return audio_data except Exception as e: print(f"Audio processing error: {e}") return np.array([]) def compute_high_res_spectrogram(self, audio_data): try: if LIBROSA_AVAILABLE and len(audio_data) > 0: stft = librosa.stft( audio_data, n_fft=self.n_fft, hop_length=self.hop_length, win_length=self.n_fft, window='hann', center=False ) power_spec = np.abs(stft) ** 2 mel_basis = librosa.filters.mel( sr=self.sample_rate, n_fft=self.n_fft, n_mels=self.n_mels, fmin=self.fmin, fmax=self.fmax ) mel_spec = np.dot(mel_basis, power_spec) mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max) time_frames = np.arange(mel_spec_db.shape[1]) * self.hop_length / self.sample_rate return mel_spec_db, time_frames else: from scipy import signal f, t, Sxx = signal.spectrogram( audio_data, self.sample_rate, nperseg=self.n_fft, noverlap=self.n_fft - self.hop_length, window='hann' ) mel_spec_db = np.zeros((self.n_mels, Sxx.shape[1])) mel_freqs = np.logspace( np.log10(self.fmin), np.log10(min(self.fmax, self.sample_rate/2)), self.n_mels + 1 ) for i in range(self.n_mels): f_start = mel_freqs[i] f_end = mel_freqs[i + 1] bin_start = int(f_start * len(f) / (self.sample_rate/2)) bin_end = int(f_end * len(f) / (self.sample_rate/2)) if bin_end > bin_start: mel_spec_db[i, :] = np.mean(Sxx[bin_start:bin_end, :], axis=0) mel_spec_db = 10 * np.log10(mel_spec_db + 1e-10) return mel_spec_db, t except Exception as e: print(f"Spectrogram computation error: {e}") dummy_spec = np.zeros((self.n_mels, 200)) dummy_time = np.linspace(0, len(audio_data) / self.sample_rate, 200) return dummy_spec, dummy_time def detect_onset_offset_advanced(self, vad_results: List[VADResult], threshold: float = 0.5) -> List[OnsetOffset]: onsets_offsets = [] if len(vad_results) < 3: return onsets_offsets models = {} for result in vad_results: if result.model_name not in models: models[result.model_name] = [] models[result.model_name].append(result) for model_name, results in models.items(): if len(results) < 3: continue results.sort(key=lambda x: x.timestamp) timestamps = np.array([r.timestamp for r in results]) probabilities = np.array([r.probability for r in results]) if len(probabilities) > 5: window_size = min(5, len(probabilities) // 3) probabilities = np.convolve(probabilities, np.ones(window_size)/window_size, mode='same') upper_thresh = threshold + 0.1 lower_thresh = threshold - 0.1 in_speech_segment = False current_onset_time = -1 for i in range(1, len(results)): prev_prob = probabilities[i-1] curr_prob = probabilities[i] curr_time = timestamps[i] if not in_speech_segment and prev_prob <= upper_thresh and curr_prob > upper_thresh: in_speech_segment = True current_onset_time = curr_time - self.delay_compensation elif in_speech_segment and prev_prob >= lower_thresh and curr_prob < lower_thresh: in_speech_segment = False if current_onset_time >= 0: offset_time = curr_time - self.delay_compensation onsets_offsets.append(OnsetOffset( onset_time=max(0, current_onset_time), offset_time=offset_time, model_name=model_name, confidence=np.mean(probabilities[ (timestamps >= current_onset_time) & (timestamps <= offset_time) ]) if len(probabilities) > 0 else curr_prob )) current_onset_time = -1 if in_speech_segment and current_onset_time >= 0: onsets_offsets.append(OnsetOffset( onset_time=max(0, current_onset_time), offset_time=timestamps[-1], model_name=model_name, confidence=np.mean(probabilities[-3:]) if len(probabilities) >= 3 else probabilities[-1] )) return onsets_offsets def estimate_delay_compensation(self, audio_data, vad_results): try: if len(audio_data) == 0 or len(vad_results) == 0: return 0.0 window_size = int(self.sample_rate * self.base_window) hop_size = int(self.sample_rate * self.base_hop) energy_signal = [] for i in range(0, len(audio_data) - window_size, hop_size): window = audio_data[i:i + window_size] energy = np.sum(window ** 2) energy_signal.append(energy) energy_signal = np.array(energy_signal) if len(energy_signal) == 0: return 0.0 energy_signal = (energy_signal - np.mean(energy_signal)) / (np.std(energy_signal) + 1e-8) vad_times = np.array([r.timestamp for r in vad_results]) vad_probs = np.array([r.probability for r in vad_results]) energy_times = np.arange(len(energy_signal)) * self.base_hop vad_interp = np.interp(energy_times, vad_times, vad_probs) vad_interp = (vad_interp - np.mean(vad_interp)) / (np.std(vad_interp) + 1e-8) if len(energy_signal) > 10 and len(vad_interp) > 10: correlation = np.correlate(energy_signal, vad_interp, mode='full') delay_samples = np.argmax(correlation) - len(vad_interp) + 1 delay_seconds = delay_samples * self.base_hop max_corr = np.max(correlation) / (len(vad_interp) * np.std(energy_signal) * np.std(vad_interp)) if max_corr > self.correlation_threshold: self.delay_compensation = np.clip(delay_seconds, -0.1, 0.1) return self.delay_compensation except Exception as e: print(f"Delay estimation error: {e}") return 0.0 # ===== ENHANCED VISUALIZATION ===== def create_realtime_plot(audio_data: np.ndarray, vad_results: List[VADResult], onsets_offsets: List[OnsetOffset], processor: AudioProcessor, model_a: str, model_b: str, threshold: float): if not PLOTLY_AVAILABLE: return None try: mel_spec_db, time_frames = processor.compute_high_res_spectrogram(audio_data) freq_axis = np.linspace(processor.fmin, processor.fmax, processor.n_mels) fig = make_subplots( rows=2, cols=1, subplot_titles=(f"Model A: {model_a}", f"Model B: {model_b}"), vertical_spacing=0.02, shared_xaxes=True, specs=[[{"secondary_y": True}], [{"secondary_y": True}]] ) colorscale = 'Viridis' fig.add_trace( go.Heatmap( z=mel_spec_db, x=time_frames, y=freq_axis, colorscale=colorscale, showscale=False, hovertemplate='Time: %{x:.2f}s
Freq: %{y:.0f}Hz
Power: %{z:.1f}dB', name=f'Spectrogram {model_a}' ), row=1, col=1 ) fig.add_trace( go.Heatmap( z=mel_spec_db, x=time_frames, y=freq_axis, colorscale=colorscale, showscale=False, hovertemplate='Time: %{x:.2f}s
Freq: %{y:.0f}Hz
Power: %{z:.1f}dB', name=f'Spectrogram {model_b}' ), row=2, col=1 ) if len(time_frames) > 0: # Add threshold lines using add_shape to avoid secondary axis bug fig.add_shape( type="line", x0=time_frames[0], x1=time_frames[-1], y0=threshold, y1=threshold, line=dict(color='cyan', width=2, dash='dash'), row=1, col=1, yref="y2" # Reference to secondary y-axis ) fig.add_shape( type="line", x0=time_frames[0], x1=time_frames[-1], y0=threshold, y1=threshold, line=dict(color='cyan', width=2, dash='dash'), row=2, col=1, yref="y4" # Reference to secondary y-axis of second subplot ) # Add threshold annotations fig.add_annotation( x=time_frames[-1] * 0.95, y=threshold, text=f'Threshold: {threshold:.2f}', showarrow=False, font=dict(color='cyan', size=10), row=1, col=1, yref="y2" ) fig.add_annotation( x=time_frames[-1] * 0.95, y=threshold, text=f'Threshold: {threshold:.2f}', showarrow=False, font=dict(color='cyan', size=10), row=2, col=1, yref="y4" ) model_a_data = {'times': [], 'probs': []} model_b_data = {'times': [], 'probs': []} for result in vad_results: # Fix model name filtering - remove suffixes properly and consistently base_name = result.model_name.split('(')[0].strip() if base_name == model_a: model_a_data['times'].append(result.timestamp) model_a_data['probs'].append(result.probability) elif base_name == model_b: model_b_data['times'].append(result.timestamp) model_b_data['probs'].append(result.probability) if len(model_a_data['times']) > 0: fig.add_trace( go.Scatter( x=model_a_data['times'], y=model_a_data['probs'], mode='lines+markers', # Add markers to show single points line=dict(color='yellow', width=3), marker=dict(size=6, color='yellow'), name=f'{model_a} Probability', hovertemplate='Time: %{x:.2f}s
Probability: %{y:.3f}', showlegend=True ), row=1, col=1, secondary_y=True ) if len(model_b_data['times']) > 0: fig.add_trace( go.Scatter( x=model_b_data['times'], y=model_b_data['probs'], mode='lines+markers', # Add markers to show single points line=dict(color='orange', width=3), marker=dict(size=6, color='orange'), name=f'{model_b} Probability', hovertemplate='Time: %{x:.2f}s
Probability: %{y:.3f}', showlegend=True ), row=2, col=1, secondary_y=True ) model_a_events = [e for e in onsets_offsets if e.model_name.split('(')[0].strip() == model_a] model_b_events = [e for e in onsets_offsets if e.model_name.split('(')[0].strip() == model_b] for event in model_a_events: if event.onset_time >= 0 and event.onset_time <= time_frames[-1]: fig.add_vline( x=event.onset_time, line=dict(color='lime', width=3), annotation_text='▲', annotation_position="top", row=1, col=1 ) if event.offset_time >= 0 and event.offset_time <= time_frames[-1]: fig.add_vline( x=event.offset_time, line=dict(color='red', width=3), annotation_text='▼', annotation_position="bottom", row=1, col=1 ) for event in model_b_events: if event.onset_time >= 0 and event.onset_time <= time_frames[-1]: fig.add_vline( x=event.onset_time, line=dict(color='lime', width=3), annotation_text='▲', annotation_position="top", row=2, col=1 ) if event.offset_time >= 0 and event.offset_time <= time_frames[-1]: fig.add_vline( x=event.offset_time, line=dict(color='red', width=3), annotation_text='▼', annotation_position="bottom", row=2, col=1 ) fig.update_layout( height=500, title_text="Real-Time Speech Visualizer", showlegend=True, legend=dict( x=1.02, y=1, bgcolor="rgba(255,255,255,0.8)", bordercolor="Black", borderwidth=1 ), font=dict(size=10), margin=dict(l=60, r=120, t=50, b=50), plot_bgcolor='black', paper_bgcolor='white', yaxis2=dict(overlaying='y', side='right', title='Probability', range=[0, 1]), yaxis4=dict(overlaying='y3', side='right', title='Probability', range=[0, 1]) ) fig.update_xaxes( title_text="Time (seconds)", row=2, col=1, gridcolor='gray', gridwidth=1, griddash='dot' ) fig.update_yaxes( title_text="Frequency (Hz)", range=[processor.fmin, processor.fmax], gridcolor='gray', gridwidth=1, griddash='dot', secondary_y=False ) fig.update_yaxes( title_text="Probability", range=[0, 1], secondary_y=True ) return fig except Exception as e: print(f"Visualization error: {e}") import traceback traceback.print_exc() fig = go.Figure() fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode='lines', name='Error')) fig.update_layout(title=f"Visualization Error: {str(e)}") return fig # ===== MAIN APPLICATION ===== class VADDemo: def __init__(self): print("🎤 Initializing Real-time VAD Demo with 5 models...") self.processor = AudioProcessor() self.models = { 'Silero-VAD': OptimizedSileroVAD(), 'WebRTC-VAD': OptimizedWebRTCVAD(), 'E-PANNs': OptimizedEPANNs(), 'PANNs': OptimizedPANNs(), 'AST': OptimizedAST() } print("🎤 Real-time VAD Demo initialized successfully") print(f"📊 Available models: {list(self.models.keys())}") def process_audio_with_events(self, audio, model_a, model_b, threshold): if audio is None: return None, "🔇 No audio detected", "Ready to process audio..." try: processed_audio = self.processor.process_audio(audio) if len(processed_audio) == 0: return None, "🎵 Processing audio...", "No audio data processed" vad_results = [] selected_models = list(set([model_a, model_b])) # Process each model with its specific window and hop size for model_name in selected_models: if model_name in self.models: window_size = self.processor.model_windows[model_name] hop_size = self.processor.model_hop_sizes[model_name] model_threshold = self.processor.model_thresholds.get(model_name, threshold) window_samples = int(self.processor.sample_rate * window_size) hop_samples = int(self.processor.sample_rate * hop_size) # For large models, ensure we have enough audio if len(processed_audio) < window_samples: # If audio is too short, repeat it to reach minimum length repeat_factor = int(np.ceil(window_samples / len(processed_audio))) extended_audio = np.tile(processed_audio, repeat_factor)[:window_samples] else: extended_audio = processed_audio for i in range(0, len(extended_audio) - window_samples, hop_samples): timestamp = i / self.processor.sample_rate # Extract window centered around current position start_pos = max(0, i) end_pos = min(len(extended_audio), i + window_samples) chunk = extended_audio[start_pos:end_pos] # Ensure chunk has the right length if len(chunk) < window_samples: chunk = np.pad(chunk, (0, window_samples - len(chunk)), 'wrap') # Special handling for different models if model_name == 'AST': result = self.models[model_name].predict(chunk, timestamp, full_audio=extended_audio) else: result = self.models[model_name].predict(chunk, timestamp) # Use model-specific threshold result.is_speech = result.probability > model_threshold vad_results.append(result) delay_compensation = self.processor.estimate_delay_compensation(processed_audio, vad_results) onsets_offsets = self.processor.detect_onset_offset_advanced(vad_results, threshold) fig = create_realtime_plot( processed_audio, vad_results, onsets_offsets, self.processor, model_a, model_b, threshold ) speech_detected = any(result.is_speech for result in vad_results) total_speech_chunks = sum(1 for r in vad_results if r.is_speech) if speech_detected: status_msg = f"🎙️ SPEECH DETECTED - {total_speech_chunks} active chunks" else: status_msg = f"🔇 No speech detected" # Simplified details model_summaries = {} for result in vad_results: # Fix model name filtering - remove suffixes properly base_name = result.model_name.split('(')[0].strip() if base_name not in model_summaries: model_summaries[base_name] = {'probs': [], 'speech_chunks': 0, 'total_chunks': 0} summary = model_summaries[base_name] summary['probs'].append(result.probability) summary['total_chunks'] += 1 if result.is_speech: summary['speech_chunks'] += 1 details_lines = [f"**Analysis Results** (Global Threshold: {threshold:.2f})"] for model_name, summary in model_summaries.items(): avg_prob = np.mean(summary['probs']) if summary['probs'] else 0 speech_ratio = (summary['speech_chunks'] / summary['total_chunks']) if summary['total_chunks'] > 0 else 0 model_thresh = self.processor.model_thresholds.get(model_name, threshold) status_icon = "🟢" if speech_ratio > 0.5 else "🟡" if speech_ratio > 0.2 else "🔴" details_lines.append(f"{status_icon} **{model_name}**: {avg_prob:.3f} avg prob, {speech_ratio*100:.1f}% speech (thresh: {model_thresh:.2f})") if onsets_offsets: details_lines.append(f"\n**Speech Events**: {len(onsets_offsets)} detected") for i, event in enumerate(onsets_offsets[:5]): # Show first 5 only duration = event.offset_time - event.onset_time if event.offset_time > event.onset_time else 0 event_model = event.model_name.split('(')[0].strip() details_lines.append(f"• {event_model}: {event.onset_time:.2f}s - {event.offset_time:.2f}s ({duration:.2f}s)") details_text = "\n".join(details_lines) return fig, status_msg, details_text except Exception as e: print(f"Processing error: {e}") import traceback traceback.print_exc() return None, f"❌ Error: {str(e)}", f"Error details: {traceback.format_exc()}" # ===== GRADIO INTERFACE ===== def create_interface(): # Load logos logos = load_logos() # Create logo HTML with base64 images logo_html = """
""" logo_info = [ ('ai4s', 'AI4S'), ('surrey', 'University of Surrey'), ('epsrc', 'EPSRC'), ('cvssp', 'CVSSP') ] for key, alt_text in logo_info: if logos[key]: logo_html += f'{alt_text}' else: logo_html += f'{alt_text}' logo_html += "
" with gr.Blocks(title="VAD Demo - Voice Activity Detection", theme=gr.themes.Soft()) as interface: # Header with logos gr.Markdown("""

🎤 VAD Demo - Voice Activity Detection

Multi-Model Real-time Speech Detection Framework

""") # Logos section with gr.Row(): gr.HTML(logo_html) # Main interface with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 🎛️ Controls") audio_input = gr.Audio( sources=["microphone"], type="numpy", label="Record Audio" ) model_a = gr.Dropdown( choices=["Silero-VAD", "WebRTC-VAD", "E-PANNs", "PANNs", "AST"], value="Silero-VAD", label="Model A (Top Panel)" ) model_b = gr.Dropdown( choices=["Silero-VAD", "WebRTC-VAD", "E-PANNs", "PANNs", "AST"], value="E-PANNs", label="Model B (Bottom Panel)" ) threshold_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.5, step=0.01, label="Detection Threshold" ) process_btn = gr.Button("🎤 Analyze", variant="primary", size="lg") with gr.Column(scale=3): status_display = gr.Textbox( label="Status", value="🔇 Ready to analyze audio", interactive=False, lines=2 ) # Results gr.Markdown("### 📊 Results") with gr.Row(): plot_output = gr.Plot(label="Speech Detection Visualization") with gr.Row(): details_output = gr.Textbox( label="Analysis Details", lines=10, interactive=False ) # Event handlers process_btn.click( fn=demo_app.process_audio_with_events, inputs=[audio_input, model_a, model_b, threshold_slider], outputs=[plot_output, status_display, details_output] ) # Footer gr.Markdown(""" --- **Models**: Silero-VAD, WebRTC-VAD, E-PANNs, PANNs, AST | **Research**: WASPAA 2025 | **Institution**: University of Surrey, CVSSP """) return interface # Initialize demo only once demo_app = VADDemo() # Create and launch interface if __name__ == "__main__": interface = create_interface() interface.launch(share=True, debug=False)