import gradio as gr import numpy as np import torch import time import warnings from dataclasses import dataclass from typing import List, Tuple, Dict import threading import queue import os import requests from pathlib import Path import base64 # Suppress warnings warnings.filterwarnings('ignore') # Function to convert image to base64 def image_to_base64(image_path): try: with open(image_path, "rb") as img_file: return base64.b64encode(img_file.read()).decode('utf-8') except Exception as e: print(f"Error loading image {image_path}: {e}") return None # Load logos as base64 def load_logos(): logos = {} logo_files = { 'ai4s': 'ai4s_banner.png', 'surrey': 'surrey_logo.png', 'epsrc': 'EPSRC_logo.png', 'cvssp': 'CVSSP_logo.png' } for key, filename in logo_files.items(): if os.path.exists(filename): logos[key] = image_to_base64(filename) else: print(f"Logo file {filename} not found") logos[key] = None return logos # Optional imports with fallbacks try: import librosa LIBROSA_AVAILABLE = True print("✅ Librosa available") except ImportError: LIBROSA_AVAILABLE = False print("⚠️ Librosa not available, using scipy fallback") try: import webrtcvad WEBRTC_AVAILABLE = True print("✅ WebRTC VAD available") except ImportError: WEBRTC_AVAILABLE = False print("⚠️ WebRTC VAD not available, using fallback") try: import plotly.graph_objects as go from plotly.subplots import make_subplots PLOTLY_AVAILABLE = True print("✅ Plotly available") except ImportError: PLOTLY_AVAILABLE = False print("⚠️ Plotly not available") # PANNs imports try: from panns_inference import AudioTagging, labels PANNS_AVAILABLE = True print("✅ PANNs available") except ImportError: PANNS_AVAILABLE = False print("⚠️ PANNs not available, using fallback") # Transformers for AST try: from transformers import ASTForAudioClassification, ASTFeatureExtractor import transformers AST_AVAILABLE = True print("✅ AST (Transformers) available") except ImportError: AST_AVAILABLE = False print("⚠️ AST not available, using fallback") print("🚀 Creating Real-time VAD Demo...") # ===== DATA STRUCTURES ===== @dataclass class VADResult: probability: float is_speech: bool model_name: str processing_time: float timestamp: float @dataclass class OnsetOffset: onset_time: float offset_time: float model_name: str confidence: float # ===== MODEL IMPLEMENTATIONS ===== class OptimizedSileroVAD: def __init__(self): self.model = None self.sample_rate = 16000 self.model_name = "Silero-VAD" self.load_model() def load_model(self): try: self.model, _ = torch.hub.load( repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=False, onnx=False ) self.model.eval() print(f"✅ {self.model_name} loaded successfully") except Exception as e: print(f"❌ Error loading {self.model_name}: {e}") self.model = None def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult: start_time = time.time() if self.model is None or len(audio) == 0: return VADResult(0.0, False, f"{self.model_name} (unavailable)", time.time() - start_time, timestamp) try: if len(audio.shape) > 1: audio = audio.mean(axis=1) required_samples = 512 if len(audio) != required_samples: if len(audio) > required_samples: start_idx = (len(audio) - required_samples) // 2 audio_chunk = audio[start_idx:start_idx + required_samples] else: audio_chunk = np.pad(audio, (0, required_samples - len(audio)), 'constant') else: audio_chunk = audio audio_tensor = torch.FloatTensor(audio_chunk).unsqueeze(0) with torch.no_grad(): speech_prob = self.model(audio_tensor, self.sample_rate).item() is_speech = speech_prob > 0.5 processing_time = time.time() - start_time return VADResult(speech_prob, is_speech, self.model_name, processing_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp) class OptimizedWebRTCVAD: def __init__(self): self.model_name = "WebRTC-VAD" self.sample_rate = 16000 self.frame_duration = 30 self.frame_size = int(self.sample_rate * self.frame_duration / 1000) if WEBRTC_AVAILABLE: try: self.vad = webrtcvad.Vad(3) print(f"✅ {self.model_name} loaded successfully") except: self.vad = None else: self.vad = None def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult: start_time = time.time() if self.vad is None or len(audio) == 0: energy = np.sum(audio ** 2) if len(audio) > 0 else 0 threshold = 0.01 probability = min(energy / threshold, 1.0) is_speech = energy > threshold return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp) try: if len(audio.shape) > 1: audio = audio.mean(axis=1) audio_int16 = (audio * 32767).astype(np.int16) speech_frames = 0 total_frames = 0 for i in range(0, len(audio_int16) - self.frame_size, self.frame_size): frame = audio_int16[i:i + self.frame_size].tobytes() if self.vad.is_speech(frame, self.sample_rate): speech_frames += 1 total_frames += 1 probability = speech_frames / max(total_frames, 1) is_speech = probability > 0.3 return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp) class OptimizedEPANNs: def __init__(self): self.model_name = "E-PANNs" self.sample_rate = 32000 print(f"✅ {self.model_name} initialized") def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult: start_time = time.time() try: if len(audio) == 0: return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp) if len(audio.shape) > 1: audio = audio.mean(axis=1) # Convert audio to target sample rate for E-PANNs if LIBROSA_AVAILABLE: # Resample to E-PANNs sample rate if needed audio_resampled = librosa.resample(audio.astype(float), orig_sr=16000, target_sr=self.sample_rate) mel_spec = librosa.feature.melspectrogram(y=audio_resampled, sr=self.sample_rate, n_mels=64) energy = np.mean(librosa.power_to_db(mel_spec, ref=np.max)) spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio_resampled, sr=self.sample_rate)) # Better speech detection using multiple features mfcc = librosa.feature.mfcc(y=audio_resampled, sr=self.sample_rate, n_mfcc=13) mfcc_var = np.var(mfcc, axis=1).mean() # Combine features for better speech detection speech_score = ((energy + 80) / 40) * 0.4 + (spectral_centroid / 5000) * 0.3 + (mfcc_var / 100) * 0.3 else: from scipy import signal # Basic fallback without librosa f, t, Sxx = signal.spectrogram(audio, 16000) # Use original sample rate energy = np.mean(10 * np.log10(Sxx + 1e-10)) # Simple energy-based detection as fallback speech_score = (energy + 100) / 50 probability = np.clip(speech_score, 0, 1) is_speech = probability > 0.6 return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp) class OptimizedPANNs: def __init__(self): self.model_name = "PANNs" self.sample_rate = 32000 self.model = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.load_model() def load_model(self): try: if PANNS_AVAILABLE: self.model = AudioTagging(checkpoint_path=None, device=self.device) print(f"✅ {self.model_name} loaded successfully") else: print(f"⚠️ {self.model_name} not available, using fallback") self.model = None except Exception as e: print(f"❌ Error loading {self.model_name}: {e}") self.model = None def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult: start_time = time.time() if self.model is None or len(audio) == 0: if len(audio) > 0: energy = np.sum(audio ** 2) threshold = 0.01 probability = min(energy / threshold, 1.0) is_speech = energy > threshold else: probability = 0.0 is_speech = False return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp) try: if len(audio.shape) > 1: audio = audio.mean(axis=1) # Convert audio to PANNs sample rate if LIBROSA_AVAILABLE: audio_resampled = librosa.resample(audio.astype(float), orig_sr=16000, target_sr=self.sample_rate) else: # Simple resampling fallback resample_factor = self.sample_rate / 16000 audio_resampled = np.interp( np.linspace(0, len(audio) - 1, int(len(audio) * resample_factor)), np.arange(len(audio)), audio ) # Ensure minimum length for PANNs (need at least 1 second) min_samples = self.sample_rate # 1 second if len(audio_resampled) < min_samples: audio_resampled = np.pad(audio_resampled, (0, min_samples - len(audio_resampled)), 'constant') clip_probs, _ = self.model.inference(audio_resampled[np.newaxis, :], input_sr=self.sample_rate) # Find speech-related indices speech_indices = [] for i, lbl in enumerate(labels): if any(word in lbl.lower() for word in ['speech', 'voice', 'talk', 'conversation', 'speaking']): speech_indices.append(i) if not speech_indices: # Fallback to a known speech index if available try: speech_indices = [labels.index('Speech')] except ValueError: # If 'Speech' label doesn't exist, use first 10 indices as approximation speech_indices = list(range(min(10, len(labels)))) speech_prob = clip_probs[0, speech_indices].mean().item() return VADResult(float(speech_prob), speech_prob > 0.5, self.model_name, time.time()-start_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") if len(audio) > 0: energy = np.sum(audio ** 2) threshold = 0.01 probability = min(energy / threshold, 1.0) is_speech = energy > threshold else: probability = 0.0 is_speech = False return VADResult(probability, is_speech, f"{self.model_name} (error)", time.time() - start_time, timestamp) class OptimizedAST: def __init__(self): self.model_name = "AST" self.sample_rate = 16000 self.model = None self.feature_extractor = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.load_model() def load_model(self): try: if AST_AVAILABLE: model_name = "MIT/ast-finetuned-audioset-10-10-0.4593" self.feature_extractor = ASTFeatureExtractor.from_pretrained(model_name) self.model = ASTForAudioClassification.from_pretrained(model_name) self.model.to(self.device) self.model.eval() print(f"✅ {self.model_name} loaded successfully") else: print(f"⚠️ {self.model_name} not available, using fallback") self.model = None except Exception as e: print(f"❌ Error loading {self.model_name}: {e}") self.model = None def predict(self, audio: np.ndarray, timestamp: float = 0.0, full_audio: np.ndarray = None) -> VADResult: start_time = time.time() if self.model is None or len(audio) == 0: # Enhanced fallback using spectral features if len(audio) > 0: energy = np.sum(audio ** 2) if LIBROSA_AVAILABLE: spectral_features = librosa.feature.spectral_rolloff(y=audio, sr=self.sample_rate) spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=self.sample_rate)) # Combine multiple features for better speech detection probability = min((energy * 100 + spectral_centroid / 500) / 2, 1.0) else: probability = min(energy * 50, 1.0) is_speech = probability > 0.3 else: probability = 0.0 is_speech = False return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp) try: if len(audio.shape) > 1: audio = audio.mean(axis=1) # Use longer context for AST - take from full audio if available if full_audio is not None and len(full_audio) > self.sample_rate: # Take 3-second window centered around current timestamp center_pos = int(timestamp * self.sample_rate) window_size = int(1.5 * self.sample_rate) # 1.5 seconds each side start_pos = max(0, center_pos - window_size) end_pos = min(len(full_audio), center_pos + window_size) # Ensure we have at least 1 second if end_pos - start_pos < self.sample_rate: end_pos = min(len(full_audio), start_pos + self.sample_rate) audio_for_ast = full_audio[start_pos:end_pos] else: audio_for_ast = audio # Ensure minimum length for AST if len(audio_for_ast) < self.sample_rate: audio_for_ast = np.pad(audio_for_ast, (0, self.sample_rate - len(audio_for_ast)), 'constant') # Feature extraction with proper AST parameters inputs = self.feature_extractor( audio_for_ast, sampling_rate=self.sample_rate, return_tensors="pt", max_length=1024, # Proper AST context truncation=True ) inputs = {k: v.to(self.device) for k, v in inputs.items()} with torch.no_grad(): outputs = self.model(**inputs) logits = outputs.logits probs = torch.sigmoid(logits) # Find speech-related classes label2id = self.model.config.label2id speech_indices = [] speech_keywords = ['speech', 'voice', 'talk', 'conversation', 'speaking'] for lbl, idx in label2id.items(): if any(word in lbl.lower() for word in speech_keywords): speech_indices.append(idx) if speech_indices: speech_prob = probs[0, speech_indices].mean().item() # Boost the probability if it's too low but there's clear audio content if speech_prob < 0.1 and np.sum(audio_for_ast ** 2) > 0.001: speech_prob = min(speech_prob * 5, 0.8) # Boost but cap at 0.8 else: # Fallback to energy-based detection energy = np.sum(audio_for_ast ** 2) speech_prob = min(energy * 20, 1.0) return VADResult(float(speech_prob), speech_prob > 0.4, self.model_name, time.time()-start_time, timestamp) except Exception as e: print(f"Error in {self.model_name}: {e}") # Enhanced fallback if len(audio) > 0: energy = np.sum(audio ** 2) probability = min(energy * 30, 1.0) # More aggressive energy scaling is_speech = energy > 0.002 else: probability = 0.0 is_speech = False return VADResult(probability, is_speech, f"{self.model_name} (error)", time.time() - start_time, timestamp) # ===== AUDIO PROCESSOR ===== class AudioProcessor: def __init__(self, sample_rate=16000): self.sample_rate = sample_rate self.chunk_duration = 4.0 self.chunk_size = int(sample_rate * self.chunk_duration) self.n_fft = 2048 self.hop_length = 256 self.n_mels = 128 self.fmin = 20 self.fmax = 8000 self.window_size = 0.064 self.hop_size = 0.032 self.delay_compensation = 0.0 self.correlation_threshold = 0.7 def process_audio(self, audio): if audio is None: return np.array([]) try: if isinstance(audio, tuple): sample_rate, audio_data = audio if sample_rate != self.sample_rate and LIBROSA_AVAILABLE: audio_data = librosa.resample(audio_data.astype(float), orig_sr=sample_rate, target_sr=self.sample_rate) else: audio_data = audio if len(audio_data.shape) > 1: audio_data = audio_data.mean(axis=1) if np.max(np.abs(audio_data)) > 0: audio_data = audio_data / np.max(np.abs(audio_data)) return audio_data except Exception as e: print(f"Audio processing error: {e}") return np.array([]) def compute_high_res_spectrogram(self, audio_data): try: if LIBROSA_AVAILABLE and len(audio_data) > 0: stft = librosa.stft( audio_data, n_fft=self.n_fft, hop_length=self.hop_length, win_length=self.n_fft, window='hann', center=False ) power_spec = np.abs(stft) ** 2 mel_basis = librosa.filters.mel( sr=self.sample_rate, n_fft=self.n_fft, n_mels=self.n_mels, fmin=self.fmin, fmax=self.fmax ) mel_spec = np.dot(mel_basis, power_spec) mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max) time_frames = np.arange(mel_spec_db.shape[1]) * self.hop_length / self.sample_rate return mel_spec_db, time_frames else: from scipy import signal f, t, Sxx = signal.spectrogram( audio_data, self.sample_rate, nperseg=self.n_fft, noverlap=self.n_fft - self.hop_length, window='hann' ) mel_spec_db = np.zeros((self.n_mels, Sxx.shape[1])) mel_freqs = np.logspace( np.log10(self.fmin), np.log10(min(self.fmax, self.sample_rate/2)), self.n_mels + 1 ) for i in range(self.n_mels): f_start = mel_freqs[i] f_end = mel_freqs[i + 1] bin_start = int(f_start * len(f) / (self.sample_rate/2)) bin_end = int(f_end * len(f) / (self.sample_rate/2)) if bin_end > bin_start: mel_spec_db[i, :] = np.mean(Sxx[bin_start:bin_end, :], axis=0) mel_spec_db = 10 * np.log10(mel_spec_db + 1e-10) return mel_spec_db, t except Exception as e: print(f"Spectrogram computation error: {e}") dummy_spec = np.zeros((self.n_mels, 200)) dummy_time = np.linspace(0, len(audio_data) / self.sample_rate, 200) return dummy_spec, dummy_time def detect_onset_offset_advanced(self, vad_results: List[VADResult], threshold: float = 0.5) -> List[OnsetOffset]: onsets_offsets = [] if len(vad_results) < 3: return onsets_offsets models = {} for result in vad_results: if result.model_name not in models: models[result.model_name] = [] models[result.model_name].append(result) for model_name, results in models.items(): if len(results) < 3: continue results.sort(key=lambda x: x.timestamp) timestamps = np.array([r.timestamp for r in results]) probabilities = np.array([r.probability for r in results]) if len(probabilities) > 5: window_size = min(5, len(probabilities) // 3) probabilities = np.convolve(probabilities, np.ones(window_size)/window_size, mode='same') upper_thresh = threshold + 0.1 lower_thresh = threshold - 0.1 in_speech_segment = False current_onset_time = -1 for i in range(1, len(results)): prev_prob = probabilities[i-1] curr_prob = probabilities[i] curr_time = timestamps[i] if not in_speech_segment and prev_prob <= upper_thresh and curr_prob > upper_thresh: in_speech_segment = True current_onset_time = curr_time - self.delay_compensation elif in_speech_segment and prev_prob >= lower_thresh and curr_prob < lower_thresh: in_speech_segment = False if current_onset_time >= 0: offset_time = curr_time - self.delay_compensation onsets_offsets.append(OnsetOffset( onset_time=max(0, current_onset_time), offset_time=offset_time, model_name=model_name, confidence=np.mean(probabilities[ (timestamps >= current_onset_time) & (timestamps <= offset_time) ]) if len(probabilities) > 0 else curr_prob )) current_onset_time = -1 if in_speech_segment and current_onset_time >= 0: onsets_offsets.append(OnsetOffset( onset_time=max(0, current_onset_time), offset_time=timestamps[-1], model_name=model_name, confidence=np.mean(probabilities[-3:]) if len(probabilities) >= 3 else probabilities[-1] )) return onsets_offsets def estimate_delay_compensation(self, audio_data, vad_results): try: if len(audio_data) == 0 or len(vad_results) == 0: return 0.0 window_size = int(self.sample_rate * self.window_size) hop_size = int(self.sample_rate * self.hop_size) energy_signal = [] for i in range(0, len(audio_data) - window_size, hop_size): window = audio_data[i:i + window_size] energy = np.sum(window ** 2) energy_signal.append(energy) energy_signal = np.array(energy_signal) if len(energy_signal) == 0: return 0.0 energy_signal = (energy_signal - np.mean(energy_signal)) / (np.std(energy_signal) + 1e-8) vad_times = np.array([r.timestamp for r in vad_results]) vad_probs = np.array([r.probability for r in vad_results]) energy_times = np.arange(len(energy_signal)) * self.hop_size vad_interp = np.interp(energy_times, vad_times, vad_probs) vad_interp = (vad_interp - np.mean(vad_interp)) / (np.std(vad_interp) + 1e-8) if len(energy_signal) > 10 and len(vad_interp) > 10: correlation = np.correlate(energy_signal, vad_interp, mode='full') delay_samples = np.argmax(correlation) - len(vad_interp) + 1 delay_seconds = delay_samples * self.hop_size max_corr = np.max(correlation) / (len(vad_interp) * np.std(energy_signal) * np.std(vad_interp)) if max_corr > self.correlation_threshold: self.delay_compensation = np.clip(delay_seconds, -0.1, 0.1) return self.delay_compensation except Exception as e: print(f"Delay estimation error: {e}") return 0.0 # ===== ENHANCED VISUALIZATION ===== def create_realtime_plot(audio_data: np.ndarray, vad_results: List[VADResult], onsets_offsets: List[OnsetOffset], processor: AudioProcessor, model_a: str, model_b: str, threshold: float): if not PLOTLY_AVAILABLE: return None try: mel_spec_db, time_frames = processor.compute_high_res_spectrogram(audio_data) freq_axis = np.linspace(processor.fmin, processor.fmax, processor.n_mels) fig = make_subplots( rows=2, cols=1, subplot_titles=(f"Model A: {model_a}", f"Model B: {model_b}"), vertical_spacing=0.02, shared_xaxes=True, specs=[[{"secondary_y": True}], [{"secondary_y": True}]] ) colorscale = 'Viridis' fig.add_trace( go.Heatmap( z=mel_spec_db, x=time_frames, y=freq_axis, colorscale=colorscale, showscale=False, hovertemplate='Time: %{x:.2f}s
Freq: %{y:.0f}Hz
Power: %{z:.1f}dB', name=f'Spectrogram {model_a}' ), row=1, col=1 ) fig.add_trace( go.Heatmap( z=mel_spec_db, x=time_frames, y=freq_axis, colorscale=colorscale, showscale=False, hovertemplate='Time: %{x:.2f}s
Freq: %{y:.0f}Hz
Power: %{z:.1f}dB', name=f'Spectrogram {model_b}' ), row=2, col=1 ) if len(time_frames) > 0: # Add threshold lines to both panels fig.add_hline( y=threshold, line=dict(color='cyan', width=2, dash='dash'), annotation_text=f'Threshold: {threshold:.2f}', annotation_position="top right", row=1, col=1, secondary_y=True ) fig.add_hline( y=threshold, line=dict(color='cyan', width=2, dash='dash'), annotation_text=f'Threshold: {threshold:.2f}', annotation_position="top right", row=2, col=1, secondary_y=True ) model_a_data = {'times': [], 'probs': []} model_b_data = {'times': [], 'probs': []} for result in vad_results: if result.model_name.startswith(model_a): model_a_data['times'].append(result.timestamp) model_a_data['probs'].append(result.probability) elif result.model_name.startswith(model_b): model_b_data['times'].append(result.timestamp) model_b_data['probs'].append(result.probability) if len(model_a_data['times']) > 1: fig.add_trace( go.Scatter( x=model_a_data['times'], y=model_a_data['probs'], mode='lines', line=dict(color='yellow', width=3), name=f'{model_a} Probability', hovertemplate='Time: %{x:.2f}s
Probability: %{y:.3f}', showlegend=True ), row=1, col=1, secondary_y=True ) if len(model_b_data['times']) > 1: fig.add_trace( go.Scatter( x=model_b_data['times'], y=model_b_data['probs'], mode='lines', line=dict(color='orange', width=3), name=f'{model_b} Probability', hovertemplate='Time: %{x:.2f}s
Probability: %{y:.3f}', showlegend=True ), row=2, col=1, secondary_y=True ) model_a_events = [e for e in onsets_offsets if e.model_name.startswith(model_a)] model_b_events = [e for e in onsets_offsets if e.model_name.startswith(model_b)] for event in model_a_events: if event.onset_time >= 0 and event.onset_time <= time_frames[-1]: fig.add_vline( x=event.onset_time, line=dict(color='lime', width=3), annotation_text='▲', annotation_position="top", row=1, col=1 ) if event.offset_time >= 0 and event.offset_time <= time_frames[-1]: fig.add_vline( x=event.offset_time, line=dict(color='red', width=3), annotation_text='▼', annotation_position="bottom", row=1, col=1 ) for event in model_b_events: if event.onset_time >= 0 and event.onset_time <= time_frames[-1]: fig.add_vline( x=event.onset_time, line=dict(color='lime', width=3), annotation_text='▲', annotation_position="top", row=2, col=1 ) if event.offset_time >= 0 and event.offset_time <= time_frames[-1]: fig.add_vline( x=event.offset_time, line=dict(color='red', width=3), annotation_text='▼', annotation_position="bottom", row=2, col=1 ) fig.update_layout( height=500, title_text="Real-Time Speech Visualizer", showlegend=True, legend=dict( x=1.02, y=1, bgcolor="rgba(255,255,255,0.8)", bordercolor="Black", borderwidth=1 ), font=dict(size=10), margin=dict(l=60, r=120, t=50, b=50), plot_bgcolor='black', paper_bgcolor='white', yaxis2=dict(overlaying='y', side='right', title='Probability', range=[0, 1]), yaxis4=dict(overlaying='y3', side='right', title='Probability', range=[0, 1]) ) fig.update_xaxes( title_text="Time (seconds)", row=2, col=1, gridcolor='gray', gridwidth=1, griddash='dot' ) fig.update_yaxes( title_text="Frequency (Hz)", range=[processor.fmin, processor.fmax], gridcolor='gray', gridwidth=1, griddash='dot', secondary_y=False ) fig.update_yaxes( title_text="Probability", range=[0, 1], secondary_y=True ) return fig except Exception as e: print(f"Visualization error: {e}") import traceback traceback.print_exc() fig = go.Figure() fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode='lines', name='Error')) fig.update_layout(title=f"Visualization Error: {str(e)}") return fig # ===== MAIN APPLICATION ===== class VADDemo: def __init__(self): print("🎤 Initializing Real-time VAD Demo with 5 models...") self.processor = AudioProcessor() self.models = { 'Silero-VAD': OptimizedSileroVAD(), 'WebRTC-VAD': OptimizedWebRTCVAD(), 'E-PANNs': OptimizedEPANNs(), 'PANNs': OptimizedPANNs(), 'AST': OptimizedAST() } print("🎤 Real-time VAD Demo initialized successfully") print(f"📊 Available models: {list(self.models.keys())}") def process_audio_with_events(self, audio, model_a, model_b, threshold): if audio is None: return None, "🔇 No audio detected", "Ready to process audio..." try: processed_audio = self.processor.process_audio(audio) if len(processed_audio) == 0: return None, "🎵 Processing audio...", "No audio data processed" window_samples = int(self.processor.sample_rate * self.processor.window_size) hop_samples = int(self.processor.sample_rate * self.processor.hop_size) vad_results = [] selected_models = list(set([model_a, model_b])) # Process each window individually for all models for i in range(0, len(processed_audio) - window_samples, hop_samples): timestamp = i / self.processor.sample_rate chunk = processed_audio[i:i + window_samples] for model_name in selected_models: if model_name in self.models: # Special handling for AST - pass full audio for context if model_name == 'AST': result = self.models[model_name].predict(chunk, timestamp, full_audio=processed_audio) else: result = self.models[model_name].predict(chunk, timestamp) result.is_speech = result.probability > threshold vad_results.append(result) delay_compensation = self.processor.estimate_delay_compensation(processed_audio, vad_results) onsets_offsets = self.processor.detect_onset_offset_advanced(vad_results, threshold) fig = create_realtime_plot( processed_audio, vad_results, onsets_offsets, self.processor, model_a, model_b, threshold ) speech_detected = any(result.is_speech for result in vad_results) total_speech_time = sum(1 for r in vad_results if r.is_speech) * self.processor.hop_size if speech_detected: status_msg = f"🎙️ SPEECH DETECTED - {total_speech_time:.1f}s total" else: status_msg = f"🔇 No speech detected" # Simplified details model_summaries = {} for result in vad_results: name = result.model_name.split(' ')[0] if name not in model_summaries: model_summaries[name] = {'probs': [], 'speech_chunks': 0, 'total_chunks': 0} summary = model_summaries[name] summary['probs'].append(result.probability) summary['total_chunks'] += 1 if result.is_speech: summary['speech_chunks'] += 1 details_lines = [f"**Analysis Results** (Threshold: {threshold:.2f})"] for model_name, summary in model_summaries.items(): avg_prob = np.mean(summary['probs']) if summary['probs'] else 0 speech_ratio = (summary['speech_chunks'] / summary['total_chunks']) if summary['total_chunks'] > 0 else 0 status_icon = "🟢" if speech_ratio > 0.5 else "🟡" if speech_ratio > 0.2 else "🔴" details_lines.append(f"{status_icon} **{model_name}**: {avg_prob:.3f} avg prob, {speech_ratio*100:.1f}% speech") if onsets_offsets: details_lines.append(f"\n**Speech Events**: {len(onsets_offsets)} detected") for i, event in enumerate(onsets_offsets[:5]): # Show first 5 only duration = event.offset_time - event.onset_time if event.offset_time > event.onset_time else 0 details_lines.append(f"• {event.model_name}: {event.onset_time:.2f}s - {event.offset_time:.2f}s ({duration:.2f}s)") details_text = "\n".join(details_lines) return fig, status_msg, details_text except Exception as e: print(f"Processing error: {e}") import traceback traceback.print_exc() return None, f"❌ Error: {str(e)}", f"Error details: {traceback.format_exc()}" # Initialize demo print("🎤 Initializing VAD Demo...") demo_app = VADDemo() # ===== GRADIO INTERFACE ===== def create_interface(): # Load logos logos = load_logos() # Create logo HTML with base64 images logo_html = """
""" logo_info = [ ('ai4s', 'AI4S'), ('surrey', 'University of Surrey'), ('epsrc', 'EPSRC'), ('cvssp', 'CVSSP') ] for key, alt_text in logo_info: if logos[key]: logo_html += f'{alt_text}' else: logo_html += f'{alt_text}' logo_html += "
" with gr.Blocks(title="VAD Demo - Voice Activity Detection", theme=gr.themes.Soft()) as interface: # Header with logos gr.Markdown("""

🎤 VAD Demo - Voice Activity Detection

Multi-Model Real-time Speech Detection Framework

""") # Logos section with gr.Row(): gr.HTML(logo_html) # Main interface with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 🎛️ Controls") audio_input = gr.Audio( sources=["microphone"], type="numpy", label="Record Audio" ) model_a = gr.Dropdown( choices=["Silero-VAD", "WebRTC-VAD", "E-PANNs", "PANNs", "AST"], value="Silero-VAD", label="Model A (Top Panel)" ) model_b = gr.Dropdown( choices=["Silero-VAD", "WebRTC-VAD", "E-PANNs", "PANNs", "AST"], value="PANNs", label="Model B (Bottom Panel)" ) threshold_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.5, step=0.01, label="Detection Threshold" ) process_btn = gr.Button("🎤 Analyze", variant="primary", size="lg") with gr.Column(scale=3): status_display = gr.Textbox( label="Status", value="🔇 Ready to analyze audio", interactive=False, lines=2 ) # Results gr.Markdown("### 📊 Results") with gr.Row(): plot_output = gr.Plot(label="Speech Detection Visualization") with gr.Row(): details_output = gr.Textbox( label="Analysis Details", lines=10, interactive=False ) # Event handlers process_btn.click( fn=demo_app.process_audio_with_events, inputs=[audio_input, model_a, model_b, threshold_slider], outputs=[plot_output, status_display, details_output] ) # Footer gr.Markdown(""" --- **Models**: Silero-VAD, WebRTC-VAD, E-PANNs, PANNs, AST | **Research**: WASPAA 2025 | **Institution**: University of Surrey, CVSSP """) return interface # Create and launch interface if __name__ == "__main__": # Initialize demo print("🎤 Initializing VAD Demo...") demo_app = VADDemo() interface = create_interface() interface.launch(share=True, debug=False)