import gradio as gr
import numpy as np
import torch
import time
import warnings
from dataclasses import dataclass
from typing import List, Tuple, Dict
import threading
import queue
import os
import requests
from pathlib import Path
import base64
# Suppress warnings
warnings.filterwarnings('ignore')
# Function to convert image to base64
def image_to_base64(image_path):
try:
with open(image_path, "rb") as img_file:
return base64.b64encode(img_file.read()).decode('utf-8')
except Exception as e:
print(f"Error loading image {image_path}: {e}")
return None
# Load logos as base64
def load_logos():
logos = {}
logo_files = {
'ai4s': 'ai4s_banner.png',
'surrey': 'surrey_logo.png',
'epsrc': 'EPSRC_logo.png',
'cvssp': 'CVSSP_logo.png'
}
for key, filename in logo_files.items():
if os.path.exists(filename):
logos[key] = image_to_base64(filename)
else:
print(f"Logo file {filename} not found")
logos[key] = None
return logos
# Optional imports with fallbacks
try:
import librosa
LIBROSA_AVAILABLE = True
print("✅ Librosa available")
except ImportError:
LIBROSA_AVAILABLE = False
print("⚠️ Librosa not available, using scipy fallback")
try:
import webrtcvad
WEBRTC_AVAILABLE = True
print("✅ WebRTC VAD available")
except ImportError:
WEBRTC_AVAILABLE = False
print("⚠️ WebRTC VAD not available, using fallback")
try:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
PLOTLY_AVAILABLE = True
print("✅ Plotly available")
except ImportError:
PLOTLY_AVAILABLE = False
print("⚠️ Plotly not available")
# PANNs imports
try:
from panns_inference import AudioTagging, labels
PANNS_AVAILABLE = True
print("✅ PANNs available")
except ImportError:
PANNS_AVAILABLE = False
print("⚠️ PANNs not available, using fallback")
# Transformers for AST
try:
from transformers import ASTForAudioClassification, ASTFeatureExtractor
import transformers
AST_AVAILABLE = True
print("✅ AST (Transformers) available")
except ImportError:
AST_AVAILABLE = False
print("⚠️ AST not available, using fallback")
print("🚀 Creating Real-time VAD Demo...")
# ===== DATA STRUCTURES =====
@dataclass
class VADResult:
probability: float
is_speech: bool
model_name: str
processing_time: float
timestamp: float
@dataclass
class OnsetOffset:
onset_time: float
offset_time: float
model_name: str
confidence: float
# ===== MODEL IMPLEMENTATIONS =====
class OptimizedSileroVAD:
def __init__(self):
self.model = None
self.sample_rate = 16000
self.model_name = "Silero-VAD"
self.load_model()
def load_model(self):
try:
self.model, _ = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False,
onnx=False
)
self.model.eval()
print(f"✅ {self.model_name} loaded successfully")
except Exception as e:
print(f"❌ Error loading {self.model_name}: {e}")
self.model = None
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
start_time = time.time()
if self.model is None or len(audio) == 0:
return VADResult(0.0, False, f"{self.model_name} (unavailable)", time.time() - start_time, timestamp)
try:
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
required_samples = 512
if len(audio) != required_samples:
if len(audio) > required_samples:
start_idx = (len(audio) - required_samples) // 2
audio_chunk = audio[start_idx:start_idx + required_samples]
else:
audio_chunk = np.pad(audio, (0, required_samples - len(audio)), 'constant')
else:
audio_chunk = audio
audio_tensor = torch.FloatTensor(audio_chunk).unsqueeze(0)
with torch.no_grad():
speech_prob = self.model(audio_tensor, self.sample_rate).item()
is_speech = speech_prob > 0.5
processing_time = time.time() - start_time
return VADResult(speech_prob, is_speech, self.model_name, processing_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
class OptimizedWebRTCVAD:
def __init__(self):
self.model_name = "WebRTC-VAD"
self.sample_rate = 16000
self.frame_duration = 30
self.frame_size = int(self.sample_rate * self.frame_duration / 1000)
if WEBRTC_AVAILABLE:
try:
self.vad = webrtcvad.Vad(3)
print(f"✅ {self.model_name} loaded successfully")
except:
self.vad = None
else:
self.vad = None
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
start_time = time.time()
if self.vad is None or len(audio) == 0:
energy = np.sum(audio ** 2) if len(audio) > 0 else 0
threshold = 0.01
probability = min(energy / threshold, 1.0)
is_speech = energy > threshold
return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
try:
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
audio_int16 = (audio * 32767).astype(np.int16)
speech_frames = 0
total_frames = 0
for i in range(0, len(audio_int16) - self.frame_size, self.frame_size):
frame = audio_int16[i:i + self.frame_size].tobytes()
if self.vad.is_speech(frame, self.sample_rate):
speech_frames += 1
total_frames += 1
probability = speech_frames / max(total_frames, 1)
is_speech = probability > 0.3
return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
class OptimizedEPANNs:
def __init__(self):
self.model_name = "E-PANNs"
self.sample_rate = 32000
print(f"✅ {self.model_name} initialized")
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
start_time = time.time()
try:
if len(audio) == 0:
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
# Convert audio to target sample rate for E-PANNs
if LIBROSA_AVAILABLE:
# Resample to E-PANNs sample rate if needed
audio_resampled = librosa.resample(audio.astype(float),
orig_sr=16000,
target_sr=self.sample_rate)
mel_spec = librosa.feature.melspectrogram(y=audio_resampled, sr=self.sample_rate, n_mels=64)
energy = np.mean(librosa.power_to_db(mel_spec, ref=np.max))
spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio_resampled, sr=self.sample_rate))
# Better speech detection using multiple features
mfcc = librosa.feature.mfcc(y=audio_resampled, sr=self.sample_rate, n_mfcc=13)
mfcc_var = np.var(mfcc, axis=1).mean()
# Combine features for better speech detection
speech_score = ((energy + 80) / 40) * 0.4 + (spectral_centroid / 5000) * 0.3 + (mfcc_var / 100) * 0.3
else:
from scipy import signal
# Basic fallback without librosa
f, t, Sxx = signal.spectrogram(audio, 16000) # Use original sample rate
energy = np.mean(10 * np.log10(Sxx + 1e-10))
# Simple energy-based detection as fallback
speech_score = (energy + 100) / 50
probability = np.clip(speech_score, 0, 1)
is_speech = probability > 0.6
return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
class OptimizedPANNs:
def __init__(self):
self.model_name = "PANNs"
self.sample_rate = 32000
self.model = None
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.load_model()
def load_model(self):
try:
if PANNS_AVAILABLE:
self.model = AudioTagging(checkpoint_path=None, device=self.device)
print(f"✅ {self.model_name} loaded successfully")
else:
print(f"⚠️ {self.model_name} not available, using fallback")
self.model = None
except Exception as e:
print(f"❌ Error loading {self.model_name}: {e}")
self.model = None
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
start_time = time.time()
if self.model is None or len(audio) == 0:
if len(audio) > 0:
energy = np.sum(audio ** 2)
threshold = 0.01
probability = min(energy / threshold, 1.0)
is_speech = energy > threshold
else:
probability = 0.0
is_speech = False
return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
try:
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
# Convert audio to PANNs sample rate
if LIBROSA_AVAILABLE:
audio_resampled = librosa.resample(audio.astype(float),
orig_sr=16000,
target_sr=self.sample_rate)
else:
# Simple resampling fallback
resample_factor = self.sample_rate / 16000
audio_resampled = np.interp(
np.linspace(0, len(audio) - 1, int(len(audio) * resample_factor)),
np.arange(len(audio)),
audio
)
# Ensure minimum length for PANNs (need at least 1 second)
min_samples = self.sample_rate # 1 second
if len(audio_resampled) < min_samples:
audio_resampled = np.pad(audio_resampled, (0, min_samples - len(audio_resampled)), 'constant')
clip_probs, _ = self.model.inference(audio_resampled[np.newaxis, :],
input_sr=self.sample_rate)
# Find speech-related indices
speech_indices = []
for i, lbl in enumerate(labels):
if any(word in lbl.lower() for word in ['speech', 'voice', 'talk', 'conversation', 'speaking']):
speech_indices.append(i)
if not speech_indices:
# Fallback to a known speech index if available
try:
speech_indices = [labels.index('Speech')]
except ValueError:
# If 'Speech' label doesn't exist, use first 10 indices as approximation
speech_indices = list(range(min(10, len(labels))))
speech_prob = clip_probs[0, speech_indices].mean().item()
return VADResult(float(speech_prob), speech_prob > 0.5, self.model_name, time.time()-start_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
if len(audio) > 0:
energy = np.sum(audio ** 2)
threshold = 0.01
probability = min(energy / threshold, 1.0)
is_speech = energy > threshold
else:
probability = 0.0
is_speech = False
return VADResult(probability, is_speech, f"{self.model_name} (error)", time.time() - start_time, timestamp)
class OptimizedAST:
def __init__(self):
self.model_name = "AST"
self.sample_rate = 16000
self.model = None
self.feature_extractor = None
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.load_model()
def load_model(self):
try:
if AST_AVAILABLE:
model_name = "MIT/ast-finetuned-audioset-10-10-0.4593"
self.feature_extractor = ASTFeatureExtractor.from_pretrained(model_name)
self.model = ASTForAudioClassification.from_pretrained(model_name)
self.model.to(self.device)
self.model.eval()
print(f"✅ {self.model_name} loaded successfully")
else:
print(f"⚠️ {self.model_name} not available, using fallback")
self.model = None
except Exception as e:
print(f"❌ Error loading {self.model_name}: {e}")
self.model = None
def predict(self, audio: np.ndarray, timestamp: float = 0.0, full_audio: np.ndarray = None) -> VADResult:
start_time = time.time()
if self.model is None or len(audio) == 0:
# Enhanced fallback using spectral features
if len(audio) > 0:
energy = np.sum(audio ** 2)
if LIBROSA_AVAILABLE:
spectral_features = librosa.feature.spectral_rolloff(y=audio, sr=self.sample_rate)
spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=self.sample_rate))
# Combine multiple features for better speech detection
probability = min((energy * 100 + spectral_centroid / 500) / 2, 1.0)
else:
probability = min(energy * 50, 1.0)
is_speech = probability > 0.3
else:
probability = 0.0
is_speech = False
return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
try:
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
# Use longer context for AST - take from full audio if available
if full_audio is not None and len(full_audio) > self.sample_rate:
# Take 3-second window centered around current timestamp
center_pos = int(timestamp * self.sample_rate)
window_size = int(1.5 * self.sample_rate) # 1.5 seconds each side
start_pos = max(0, center_pos - window_size)
end_pos = min(len(full_audio), center_pos + window_size)
# Ensure we have at least 1 second
if end_pos - start_pos < self.sample_rate:
end_pos = min(len(full_audio), start_pos + self.sample_rate)
audio_for_ast = full_audio[start_pos:end_pos]
else:
audio_for_ast = audio
# Ensure minimum length for AST
if len(audio_for_ast) < self.sample_rate:
audio_for_ast = np.pad(audio_for_ast, (0, self.sample_rate - len(audio_for_ast)), 'constant')
# Feature extraction with proper AST parameters
inputs = self.feature_extractor(
audio_for_ast,
sampling_rate=self.sample_rate,
return_tensors="pt",
max_length=1024, # Proper AST context
truncation=True
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
probs = torch.sigmoid(logits)
# Find speech-related classes
label2id = self.model.config.label2id
speech_indices = []
speech_keywords = ['speech', 'voice', 'talk', 'conversation', 'speaking']
for lbl, idx in label2id.items():
if any(word in lbl.lower() for word in speech_keywords):
speech_indices.append(idx)
if speech_indices:
speech_prob = probs[0, speech_indices].mean().item()
# Boost the probability if it's too low but there's clear audio content
if speech_prob < 0.1 and np.sum(audio_for_ast ** 2) > 0.001:
speech_prob = min(speech_prob * 5, 0.8) # Boost but cap at 0.8
else:
# Fallback to energy-based detection
energy = np.sum(audio_for_ast ** 2)
speech_prob = min(energy * 20, 1.0)
return VADResult(float(speech_prob), speech_prob > 0.4, self.model_name, time.time()-start_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
# Enhanced fallback
if len(audio) > 0:
energy = np.sum(audio ** 2)
probability = min(energy * 30, 1.0) # More aggressive energy scaling
is_speech = energy > 0.002
else:
probability = 0.0
is_speech = False
return VADResult(probability, is_speech, f"{self.model_name} (error)", time.time() - start_time, timestamp)
# ===== AUDIO PROCESSOR =====
class AudioProcessor:
def __init__(self, sample_rate=16000):
self.sample_rate = sample_rate
self.chunk_duration = 4.0
self.chunk_size = int(sample_rate * self.chunk_duration)
self.n_fft = 2048
self.hop_length = 256
self.n_mels = 128
self.fmin = 20
self.fmax = 8000
self.window_size = 0.064
self.hop_size = 0.032
self.delay_compensation = 0.0
self.correlation_threshold = 0.7
def process_audio(self, audio):
if audio is None:
return np.array([])
try:
if isinstance(audio, tuple):
sample_rate, audio_data = audio
if sample_rate != self.sample_rate and LIBROSA_AVAILABLE:
audio_data = librosa.resample(audio_data.astype(float),
orig_sr=sample_rate,
target_sr=self.sample_rate)
else:
audio_data = audio
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1)
if np.max(np.abs(audio_data)) > 0:
audio_data = audio_data / np.max(np.abs(audio_data))
return audio_data
except Exception as e:
print(f"Audio processing error: {e}")
return np.array([])
def compute_high_res_spectrogram(self, audio_data):
try:
if LIBROSA_AVAILABLE and len(audio_data) > 0:
stft = librosa.stft(
audio_data,
n_fft=self.n_fft,
hop_length=self.hop_length,
win_length=self.n_fft,
window='hann',
center=False
)
power_spec = np.abs(stft) ** 2
mel_basis = librosa.filters.mel(
sr=self.sample_rate,
n_fft=self.n_fft,
n_mels=self.n_mels,
fmin=self.fmin,
fmax=self.fmax
)
mel_spec = np.dot(mel_basis, power_spec)
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
time_frames = np.arange(mel_spec_db.shape[1]) * self.hop_length / self.sample_rate
return mel_spec_db, time_frames
else:
from scipy import signal
f, t, Sxx = signal.spectrogram(
audio_data,
self.sample_rate,
nperseg=self.n_fft,
noverlap=self.n_fft - self.hop_length,
window='hann'
)
mel_spec_db = np.zeros((self.n_mels, Sxx.shape[1]))
mel_freqs = np.logspace(
np.log10(self.fmin),
np.log10(min(self.fmax, self.sample_rate/2)),
self.n_mels + 1
)
for i in range(self.n_mels):
f_start = mel_freqs[i]
f_end = mel_freqs[i + 1]
bin_start = int(f_start * len(f) / (self.sample_rate/2))
bin_end = int(f_end * len(f) / (self.sample_rate/2))
if bin_end > bin_start:
mel_spec_db[i, :] = np.mean(Sxx[bin_start:bin_end, :], axis=0)
mel_spec_db = 10 * np.log10(mel_spec_db + 1e-10)
return mel_spec_db, t
except Exception as e:
print(f"Spectrogram computation error: {e}")
dummy_spec = np.zeros((self.n_mels, 200))
dummy_time = np.linspace(0, len(audio_data) / self.sample_rate, 200)
return dummy_spec, dummy_time
def detect_onset_offset_advanced(self, vad_results: List[VADResult], threshold: float = 0.5) -> List[OnsetOffset]:
onsets_offsets = []
if len(vad_results) < 3:
return onsets_offsets
models = {}
for result in vad_results:
if result.model_name not in models:
models[result.model_name] = []
models[result.model_name].append(result)
for model_name, results in models.items():
if len(results) < 3:
continue
results.sort(key=lambda x: x.timestamp)
timestamps = np.array([r.timestamp for r in results])
probabilities = np.array([r.probability for r in results])
if len(probabilities) > 5:
window_size = min(5, len(probabilities) // 3)
probabilities = np.convolve(probabilities, np.ones(window_size)/window_size, mode='same')
upper_thresh = threshold + 0.1
lower_thresh = threshold - 0.1
in_speech_segment = False
current_onset_time = -1
for i in range(1, len(results)):
prev_prob = probabilities[i-1]
curr_prob = probabilities[i]
curr_time = timestamps[i]
if not in_speech_segment and prev_prob <= upper_thresh and curr_prob > upper_thresh:
in_speech_segment = True
current_onset_time = curr_time - self.delay_compensation
elif in_speech_segment and prev_prob >= lower_thresh and curr_prob < lower_thresh:
in_speech_segment = False
if current_onset_time >= 0:
offset_time = curr_time - self.delay_compensation
onsets_offsets.append(OnsetOffset(
onset_time=max(0, current_onset_time),
offset_time=offset_time,
model_name=model_name,
confidence=np.mean(probabilities[
(timestamps >= current_onset_time) &
(timestamps <= offset_time)
]) if len(probabilities) > 0 else curr_prob
))
current_onset_time = -1
if in_speech_segment and current_onset_time >= 0:
onsets_offsets.append(OnsetOffset(
onset_time=max(0, current_onset_time),
offset_time=timestamps[-1],
model_name=model_name,
confidence=np.mean(probabilities[-3:]) if len(probabilities) >= 3 else probabilities[-1]
))
return onsets_offsets
def estimate_delay_compensation(self, audio_data, vad_results):
try:
if len(audio_data) == 0 or len(vad_results) == 0:
return 0.0
window_size = int(self.sample_rate * self.window_size)
hop_size = int(self.sample_rate * self.hop_size)
energy_signal = []
for i in range(0, len(audio_data) - window_size, hop_size):
window = audio_data[i:i + window_size]
energy = np.sum(window ** 2)
energy_signal.append(energy)
energy_signal = np.array(energy_signal)
if len(energy_signal) == 0:
return 0.0
energy_signal = (energy_signal - np.mean(energy_signal)) / (np.std(energy_signal) + 1e-8)
vad_times = np.array([r.timestamp for r in vad_results])
vad_probs = np.array([r.probability for r in vad_results])
energy_times = np.arange(len(energy_signal)) * self.hop_size
vad_interp = np.interp(energy_times, vad_times, vad_probs)
vad_interp = (vad_interp - np.mean(vad_interp)) / (np.std(vad_interp) + 1e-8)
if len(energy_signal) > 10 and len(vad_interp) > 10:
correlation = np.correlate(energy_signal, vad_interp, mode='full')
delay_samples = np.argmax(correlation) - len(vad_interp) + 1
delay_seconds = delay_samples * self.hop_size
max_corr = np.max(correlation) / (len(vad_interp) * np.std(energy_signal) * np.std(vad_interp))
if max_corr > self.correlation_threshold:
self.delay_compensation = np.clip(delay_seconds, -0.1, 0.1)
return self.delay_compensation
except Exception as e:
print(f"Delay estimation error: {e}")
return 0.0
# ===== ENHANCED VISUALIZATION =====
def create_realtime_plot(audio_data: np.ndarray, vad_results: List[VADResult],
onsets_offsets: List[OnsetOffset], processor: AudioProcessor,
model_a: str, model_b: str, threshold: float):
if not PLOTLY_AVAILABLE:
return None
try:
mel_spec_db, time_frames = processor.compute_high_res_spectrogram(audio_data)
freq_axis = np.linspace(processor.fmin, processor.fmax, processor.n_mels)
fig = make_subplots(
rows=2, cols=1,
subplot_titles=(f"Model A: {model_a}", f"Model B: {model_b}"),
vertical_spacing=0.02,
shared_xaxes=True,
specs=[[{"secondary_y": True}], [{"secondary_y": True}]]
)
colorscale = 'Viridis'
fig.add_trace(
go.Heatmap(
z=mel_spec_db,
x=time_frames,
y=freq_axis,
colorscale=colorscale,
showscale=False,
hovertemplate='Time: %{x:.2f}s
Freq: %{y:.0f}Hz
Power: %{z:.1f}dB
Freq: %{y:.0f}Hz
Power: %{z:.1f}dB
Probability: %{y:.3f}
Probability: %{y:.3f}
Multi-Model Real-time Speech Detection Framework