import gradio as gr
import numpy as np
import torch
import time
import warnings
from dataclasses import dataclass
from typing import List, Tuple, Dict
import threading
import queue
import os
import requests
from pathlib import Path
import base64
# Suppress warnings
warnings.filterwarnings('ignore')
# Function to convert image to base64
def image_to_base64(image_path):
try:
with open(image_path, "rb") as img_file:
return base64.b64encode(img_file.read()).decode('utf-8')
except Exception as e:
print(f"Error loading image {image_path}: {e}")
return None
# Load logos as base64
def load_logos():
logos = {}
logo_files = {
'ai4s': 'ai4s_banner.png',
'surrey': 'surrey_logo.png',
'epsrc': 'EPSRC_logo.png',
'cvssp': 'CVSSP_logo.png'
}
for key, filename in logo_files.items():
if os.path.exists(filename):
logos[key] = image_to_base64(filename)
else:
print(f"Logo file {filename} not found")
logos[key] = None
return logos
# Optional imports with fallbacks
try:
import librosa
LIBROSA_AVAILABLE = True
print("✅ Librosa available")
except ImportError:
LIBROSA_AVAILABLE = False
print("⚠️ Librosa not available, using scipy fallback")
try:
import webrtcvad
WEBRTC_AVAILABLE = True
print("✅ WebRTC VAD available")
except ImportError:
WEBRTC_AVAILABLE = False
print("⚠️ WebRTC VAD not available, using fallback")
try:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
PLOTLY_AVAILABLE = True
print("✅ Plotly available")
except ImportError:
PLOTLY_AVAILABLE = False
print("⚠️ Plotly not available")
# PANNs imports
try:
from panns_inference import AudioTagging, labels
PANNS_AVAILABLE = True
print("✅ PANNs available")
except ImportError:
PANNS_AVAILABLE = False
print("⚠️ PANNs not available, using fallback")
# Transformers for AST
try:
from transformers import ASTForAudioClassification, ASTFeatureExtractor
import transformers
AST_AVAILABLE = True
print("✅ AST (Transformers) available")
except ImportError:
AST_AVAILABLE = False
print("⚠️ AST not available, using fallback")
print("🚀 Creating Real-time VAD Demo...")
# ===== DATA STRUCTURES =====
@dataclass
class VADResult:
probability: float
is_speech: bool
model_name: str
processing_time: float
timestamp: float
@dataclass
class OnsetOffset:
onset_time: float
offset_time: float
model_name: str
confidence: float
# ===== MODEL IMPLEMENTATIONS =====
class OptimizedSileroVAD:
def __init__(self):
self.model = None
self.sample_rate = 16000
self.model_name = "Silero-VAD"
self.load_model()
def load_model(self):
try:
self.model, _ = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False,
onnx=False
)
self.model.eval()
print(f"✅ {self.model_name} loaded successfully")
except Exception as e:
print(f"❌ Error loading {self.model_name}: {e}")
self.model = None
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
start_time = time.time()
if self.model is None or len(audio) == 0:
return VADResult(0.0, False, f"{self.model_name} (unavailable)", time.time() - start_time, timestamp)
try:
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
required_samples = 512
# Silero requires exactly 512 samples, handle this precisely
if len(audio) != required_samples:
if len(audio) > required_samples:
# Take center portion to avoid edge effects
start_idx = (len(audio) - required_samples) // 2
audio_chunk = audio[start_idx:start_idx + required_samples]
else:
# Pad symmetrically instead of just at the end
pad_total = required_samples - len(audio)
pad_left = pad_total // 2
pad_right = pad_total - pad_left
audio_chunk = np.pad(audio, (pad_left, pad_right), 'reflect')
else:
audio_chunk = audio
audio_tensor = torch.FloatTensor(audio_chunk).unsqueeze(0)
with torch.no_grad():
speech_prob = self.model(audio_tensor, self.sample_rate).item()
is_speech = speech_prob > 0.5
processing_time = time.time() - start_time
return VADResult(speech_prob, is_speech, self.model_name, processing_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
class OptimizedWebRTCVAD:
def __init__(self):
self.model_name = "WebRTC-VAD"
self.sample_rate = 16000
self.frame_duration = 30
self.frame_size = int(self.sample_rate * self.frame_duration / 1000)
if WEBRTC_AVAILABLE:
try:
self.vad = webrtcvad.Vad(3)
print(f"✅ {self.model_name} loaded successfully")
except:
self.vad = None
else:
self.vad = None
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
start_time = time.time()
if self.vad is None or len(audio) == 0:
energy = np.sum(audio ** 2) if len(audio) > 0 else 0
threshold = 0.01
probability = min(energy / threshold, 1.0)
is_speech = energy > threshold
return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
try:
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
# Properly convert to int16 with clipping to avoid saturation
audio_clipped = np.clip(audio, -1.0, 1.0)
audio_int16 = (audio_clipped * 32767).astype(np.int16)
speech_frames = 0
total_frames = 0
for i in range(0, len(audio_int16) - self.frame_size, self.frame_size):
frame = audio_int16[i:i + self.frame_size].tobytes()
if self.vad.is_speech(frame, self.sample_rate):
speech_frames += 1
total_frames += 1
probability = speech_frames / max(total_frames, 1)
is_speech = probability > 0.3
return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
class OptimizedEPANNs:
def __init__(self):
self.model_name = "E-PANNs"
self.sample_rate = 32000
print(f"✅ {self.model_name} initialized")
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
start_time = time.time()
try:
if len(audio) == 0:
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
# Convert audio to target sample rate for E-PANNs
if LIBROSA_AVAILABLE:
# Resample to E-PANNs sample rate if needed
audio_resampled = librosa.resample(audio.astype(float),
orig_sr=16000,
target_sr=self.sample_rate)
# Ensure minimum length (6 seconds) using wrap mode instead of zero padding
min_samples = 6 * self.sample_rate # 6 seconds
if len(audio_resampled) < min_samples:
if LIBROSA_AVAILABLE:
audio_resampled = librosa.util.fix_length(audio_resampled, size=min_samples, mode='wrap')
else:
# Fallback: repeat the signal
repeat_factor = int(np.ceil(min_samples / len(audio_resampled)))
audio_resampled = np.tile(audio_resampled, repeat_factor)[:min_samples]
mel_spec = librosa.feature.melspectrogram(y=audio_resampled, sr=self.sample_rate, n_mels=64)
energy = np.mean(librosa.power_to_db(mel_spec, ref=np.max))
spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio_resampled, sr=self.sample_rate))
# Better speech detection using multiple features
mfcc = librosa.feature.mfcc(y=audio_resampled, sr=self.sample_rate, n_mfcc=13)
mfcc_var = np.var(mfcc, axis=1).mean()
# Combine features for better speech detection
speech_score = ((energy + 80) / 40) * 0.4 + (spectral_centroid / 5000) * 0.3 + (mfcc_var / 100) * 0.3
else:
from scipy import signal
# Basic fallback without librosa
f, t, Sxx = signal.spectrogram(audio, 16000) # Use original sample rate
energy = np.mean(10 * np.log10(Sxx + 1e-10))
# Simple energy-based detection as fallback
speech_score = (energy + 100) / 50
probability = np.clip(speech_score, 0, 1)
is_speech = probability > 0.6
return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
class OptimizedPANNs:
def __init__(self):
self.model_name = "PANNs"
self.sample_rate = 32000
self.model = None
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.load_model()
def load_model(self):
try:
if PANNS_AVAILABLE:
self.model = AudioTagging(checkpoint_path=None, device=self.device)
print(f"✅ {self.model_name} loaded successfully")
else:
print(f"⚠️ {self.model_name} not available, using fallback")
self.model = None
except Exception as e:
print(f"❌ Error loading {self.model_name}: {e}")
self.model = None
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
start_time = time.time()
if self.model is None or len(audio) == 0:
if len(audio) > 0:
energy = np.sum(audio ** 2)
threshold = 0.01
probability = min(energy / threshold, 1.0)
is_speech = energy > threshold
else:
probability = 0.0
is_speech = False
return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
try:
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
# Convert audio to PANNs sample rate
if LIBROSA_AVAILABLE:
audio_resampled = librosa.resample(audio.astype(float),
orig_sr=16000,
target_sr=self.sample_rate)
else:
# Simple resampling fallback
resample_factor = self.sample_rate / 16000
audio_resampled = np.interp(
np.linspace(0, len(audio) - 1, int(len(audio) * resample_factor)),
np.arange(len(audio)),
audio
)
# Ensure minimum length for PANNs (10 seconds) using wrap mode instead of zero padding
min_samples = 10 * self.sample_rate # 10 seconds for optimal performance
if len(audio_resampled) < min_samples:
if LIBROSA_AVAILABLE:
audio_resampled = librosa.util.fix_length(audio_resampled, size=min_samples, mode='wrap')
else:
# Fallback: repeat the signal
repeat_factor = int(np.ceil(min_samples / len(audio_resampled)))
audio_resampled = np.tile(audio_resampled, repeat_factor)[:min_samples]
clip_probs, _ = self.model.inference(audio_resampled[np.newaxis, :],
input_sr=self.sample_rate)
# Find speech-related indices
speech_indices = []
for i, lbl in enumerate(labels):
if any(word in lbl.lower() for word in ['speech', 'voice', 'talk', 'conversation', 'speaking']):
speech_indices.append(i)
if not speech_indices:
# Fallback to a known speech index if available
try:
speech_indices = [labels.index('Speech')]
except ValueError:
# If 'Speech' label doesn't exist, use first 10 indices as approximation
speech_indices = list(range(min(10, len(labels))))
speech_prob = clip_probs[0, speech_indices].mean().item()
return VADResult(float(speech_prob), speech_prob > 0.5, self.model_name, time.time()-start_time, timestamp)
except Exception as e:
print(f"Error in {self.model_name}: {e}")
if len(audio) > 0:
energy = np.sum(audio ** 2)
threshold = 0.01
probability = min(energy / threshold, 1.0)
is_speech = energy > threshold
else:
probability = 0.0
is_speech = False
return VADResult(probability, is_speech, f"{self.model_name} (error)", time.time() - start_time, timestamp)
class OptimizedAST:
def __init__(self):
self.model_name = "AST"
self.sample_rate = 16000
self.model = None
self.feature_extractor = None
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.prediction_cache = {} # Cache para evitar recálculos
self.cache_window = 1.0 # Cachear resultados por segundo
self.load_model()
def load_model(self):
try:
if AST_AVAILABLE:
model_name = "MIT/ast-finetuned-audioset-10-10-0.4593"
self.feature_extractor = ASTFeatureExtractor.from_pretrained(model_name)
self.model = ASTForAudioClassification.from_pretrained(model_name)
self.model.to(self.device)
# Use FP16 for faster inference on GPU
if self.device.type == 'cuda':
self.model = self.model.half()
print(f"✅ {self.model_name} loaded with FP16 optimization")
else:
print(f"✅ {self.model_name} loaded successfully")
self.model.eval()
else:
print(f"⚠️ {self.model_name} not available, using fallback")
self.model = None
except Exception as e:
print(f"❌ Error loading {self.model_name}: {e}")
self.model = None
def predict(self, audio: np.ndarray, timestamp: float = 0.0, full_audio: np.ndarray = None) -> VADResult:
start_time = time.time()
if self.model is None or len(audio) == 0:
# Enhanced fallback using spectral features
if len(audio) > 0:
energy = np.sum(audio ** 2)
if LIBROSA_AVAILABLE:
spectral_features = librosa.feature.spectral_rolloff(y=audio, sr=self.sample_rate)
spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=self.sample_rate))
# Combine multiple features for better speech detection
probability = min((energy * 100 + spectral_centroid / 500) / 2, 1.0)
else:
probability = min(energy * 50, 1.0)
is_speech = probability > 0.3
else:
probability = 0.0
is_speech = False
return VADResult(probability, is_speech, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
try:
# Cache key based on timestamp rounded to cache window
cache_key = int(timestamp / self.cache_window)
# Check cache first
if cache_key in self.prediction_cache:
cached_result = self.prediction_cache[cache_key]
# Return cached result with updated timestamp
return VADResult(
cached_result.probability,
cached_result.is_speech,
cached_result.model_name + " (cached)",
time.time() - start_time,
timestamp
)
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
# Use longer context for AST - preferably 6.4 seconds (1024 frames)
if full_audio is not None and len(full_audio) >= 6.4 * self.sample_rate:
# Take 6.4-second window centered around current timestamp
center_pos = int(timestamp * self.sample_rate)
window_size = int(3.2 * self.sample_rate) # 3.2 seconds each side
start_pos = max(0, center_pos - window_size)
end_pos = min(len(full_audio), center_pos + window_size)
# Ensure we have at least 6.4 seconds
if end_pos - start_pos < 6.4 * self.sample_rate:
end_pos = min(len(full_audio), start_pos + int(6.4 * self.sample_rate))
if end_pos - start_pos < 6.4 * self.sample_rate:
start_pos = max(0, end_pos - int(6.4 * self.sample_rate))
audio_for_ast = full_audio[start_pos:end_pos]
else:
audio_for_ast = audio
# Ensure minimum length for AST (6.4 seconds for 1024 frames)
min_samples = int(6.4 * self.sample_rate) # 6.4 seconds
if len(audio_for_ast) < min_samples:
if LIBROSA_AVAILABLE:
audio_for_ast = librosa.util.fix_length(audio_for_ast, size=min_samples, mode='wrap')
else:
# Fallback: repeat the signal
repeat_factor = int(np.ceil(min_samples / len(audio_for_ast)))
audio_for_ast = np.tile(audio_for_ast, repeat_factor)[:min_samples]
# Truncate if too long (AST can handle up to ~10s, but we'll use 8s max for efficiency)
max_samples = 8 * self.sample_rate
if len(audio_for_ast) > max_samples:
audio_for_ast = audio_for_ast[:max_samples]
# Feature extraction with proper AST parameters (closer to 1024 frames)
inputs = self.feature_extractor(
audio_for_ast,
sampling_rate=self.sample_rate,
return_tensors="pt",
max_length=1024, # Proper AST context
padding="max_length", # Ensure consistent length
truncation=True
)
# Move inputs to correct device and dtype
inputs = {k: v.to(self.device) for k, v in inputs.items()}
if self.device.type == 'cuda' and hasattr(self.model, 'half'):
# Convert inputs to FP16 if model is in FP16
inputs = {k: v.half() if v.dtype == torch.float32 else v for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
probs = torch.sigmoid(logits)
# Find speech-related classes
label2id = self.model.config.label2id
speech_indices = []
speech_keywords = ['speech', 'voice', 'talk', 'conversation', 'speaking']
for lbl, idx in label2id.items():
if any(word in lbl.lower() for word in speech_keywords):
speech_indices.append(idx)
if speech_indices:
speech_prob = probs[0, speech_indices].mean().item()
# Apply more reasonable thresholding for AST with lower threshold
if speech_prob < 0.15 and np.sum(audio_for_ast ** 2) > 0.001:
speech_prob = min(speech_prob * 2.5, 0.6) # Moderate boost, cap at 0.6
else:
# Fallback to energy-based detection with higher threshold
energy = np.sum(audio_for_ast ** 2) / len(audio_for_ast) # Normalize by length
speech_prob = min(energy * 50, 1.0)
# Use lower threshold specifically for AST (0.25 instead of 0.4)
is_speech_ast = speech_prob > 0.25
result = VADResult(float(speech_prob), is_speech_ast, self.model_name, time.time()-start_time, timestamp)
# Cache the result
self.prediction_cache[cache_key] = result
# Clean old cache entries (keep only last 30 seconds for longer sessions)
cache_keys_to_remove = [k for k in self.prediction_cache.keys() if k < cache_key - 30]
for k in cache_keys_to_remove:
del self.prediction_cache[k]
return result
except Exception as e:
print(f"Error in {self.model_name}: {e}")
# Enhanced fallback
if len(audio) > 0:
energy = np.sum(audio ** 2) / len(audio) # Normalize by length
probability = min(energy * 100, 1.0) # More conservative scaling
is_speech = energy > 0.001 # Lower threshold for fallback
else:
probability = 0.0
is_speech = False
return VADResult(probability, is_speech, f"{self.model_name} (error)", time.time() - start_time, timestamp)
# ===== AUDIO PROCESSOR =====
class AudioProcessor:
def __init__(self, sample_rate=16000):
self.sample_rate = sample_rate
self.chunk_duration = 4.0
self.chunk_size = int(sample_rate * self.chunk_duration)
self.n_fft = 2048
self.hop_length = 256
self.n_mels = 128
self.fmin = 20
self.fmax = 8000
self.base_window = 0.064
self.base_hop = 0.032
# Model-specific window sizes (each model gets appropriate context)
self.model_windows = {
"Silero-VAD": 0.032, # 32ms exactly as required (512 samples)
"WebRTC-VAD": 0.03, # 30ms frames
"E-PANNs": 6.0, # 6 seconds minimum for reliable results
"PANNs": 10.0, # 10 seconds for optimal performance
"AST": 6.4 # ~6.4 seconds (1024 frames * 6.25ms)
}
# Model-specific hop sizes for efficiency
self.model_hop_sizes = {
"Silero-VAD": 0.016, # 16ms hop for Silero
"WebRTC-VAD": 0.01, # 10ms hop for WebRTC
"E-PANNs": 1.0, # Process every 1s but with 6s window
"PANNs": 2.0, # Process every 2s but with 10s window
"AST": 1.0 # Process every 1s but with 6.4s window
}
# Model-specific thresholds for better detection
self.model_thresholds = {
"Silero-VAD": 0.5,
"WebRTC-VAD": 0.5,
"E-PANNs": 0.4,
"PANNs": 0.4,
"AST": 0.25
}
self.delay_compensation = 0.0
self.correlation_threshold = 0.7
def process_audio(self, audio):
if audio is None:
return np.array([])
try:
if isinstance(audio, tuple):
sample_rate, audio_data = audio
if sample_rate != self.sample_rate and LIBROSA_AVAILABLE:
audio_data = librosa.resample(audio_data.astype(float),
orig_sr=sample_rate,
target_sr=self.sample_rate)
else:
audio_data = audio
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1)
if np.max(np.abs(audio_data)) > 0:
audio_data = audio_data / np.max(np.abs(audio_data))
return audio_data
except Exception as e:
print(f"Audio processing error: {e}")
return np.array([])
def compute_high_res_spectrogram(self, audio_data):
try:
if LIBROSA_AVAILABLE and len(audio_data) > 0:
stft = librosa.stft(
audio_data,
n_fft=self.n_fft,
hop_length=self.hop_length,
win_length=self.n_fft,
window='hann',
center=False
)
power_spec = np.abs(stft) ** 2
mel_basis = librosa.filters.mel(
sr=self.sample_rate,
n_fft=self.n_fft,
n_mels=self.n_mels,
fmin=self.fmin,
fmax=self.fmax
)
mel_spec = np.dot(mel_basis, power_spec)
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
time_frames = np.arange(mel_spec_db.shape[1]) * self.hop_length / self.sample_rate
return mel_spec_db, time_frames
else:
from scipy import signal
f, t, Sxx = signal.spectrogram(
audio_data,
self.sample_rate,
nperseg=self.n_fft,
noverlap=self.n_fft - self.hop_length,
window='hann'
)
mel_spec_db = np.zeros((self.n_mels, Sxx.shape[1]))
mel_freqs = np.logspace(
np.log10(self.fmin),
np.log10(min(self.fmax, self.sample_rate/2)),
self.n_mels + 1
)
for i in range(self.n_mels):
f_start = mel_freqs[i]
f_end = mel_freqs[i + 1]
bin_start = int(f_start * len(f) / (self.sample_rate/2))
bin_end = int(f_end * len(f) / (self.sample_rate/2))
if bin_end > bin_start:
mel_spec_db[i, :] = np.mean(Sxx[bin_start:bin_end, :], axis=0)
mel_spec_db = 10 * np.log10(mel_spec_db + 1e-10)
return mel_spec_db, t
except Exception as e:
print(f"Spectrogram computation error: {e}")
dummy_spec = np.zeros((self.n_mels, 200))
dummy_time = np.linspace(0, len(audio_data) / self.sample_rate, 200)
return dummy_spec, dummy_time
def detect_onset_offset_advanced(self, vad_results: List[VADResult], threshold: float = 0.5) -> List[OnsetOffset]:
onsets_offsets = []
if len(vad_results) < 3:
return onsets_offsets
models = {}
for result in vad_results:
if result.model_name not in models:
models[result.model_name] = []
models[result.model_name].append(result)
for model_name, results in models.items():
if len(results) < 3:
continue
results.sort(key=lambda x: x.timestamp)
timestamps = np.array([r.timestamp for r in results])
probabilities = np.array([r.probability for r in results])
if len(probabilities) > 5:
window_size = min(5, len(probabilities) // 3)
probabilities = np.convolve(probabilities, np.ones(window_size)/window_size, mode='same')
upper_thresh = threshold + 0.1
lower_thresh = threshold - 0.1
in_speech_segment = False
current_onset_time = -1
for i in range(1, len(results)):
prev_prob = probabilities[i-1]
curr_prob = probabilities[i]
curr_time = timestamps[i]
if not in_speech_segment and prev_prob <= upper_thresh and curr_prob > upper_thresh:
in_speech_segment = True
current_onset_time = curr_time - self.delay_compensation
elif in_speech_segment and prev_prob >= lower_thresh and curr_prob < lower_thresh:
in_speech_segment = False
if current_onset_time >= 0:
offset_time = curr_time - self.delay_compensation
onsets_offsets.append(OnsetOffset(
onset_time=max(0, current_onset_time),
offset_time=offset_time,
model_name=model_name,
confidence=np.mean(probabilities[
(timestamps >= current_onset_time) &
(timestamps <= offset_time)
]) if len(probabilities) > 0 else curr_prob
))
current_onset_time = -1
if in_speech_segment and current_onset_time >= 0:
onsets_offsets.append(OnsetOffset(
onset_time=max(0, current_onset_time),
offset_time=timestamps[-1],
model_name=model_name,
confidence=np.mean(probabilities[-3:]) if len(probabilities) >= 3 else probabilities[-1]
))
return onsets_offsets
def estimate_delay_compensation(self, audio_data, vad_results):
try:
if len(audio_data) == 0 or len(vad_results) == 0:
return 0.0
window_size = int(self.sample_rate * self.base_window)
hop_size = int(self.sample_rate * self.base_hop)
energy_signal = []
for i in range(0, len(audio_data) - window_size, hop_size):
window = audio_data[i:i + window_size]
energy = np.sum(window ** 2)
energy_signal.append(energy)
energy_signal = np.array(energy_signal)
if len(energy_signal) == 0:
return 0.0
energy_signal = (energy_signal - np.mean(energy_signal)) / (np.std(energy_signal) + 1e-8)
vad_times = np.array([r.timestamp for r in vad_results])
vad_probs = np.array([r.probability for r in vad_results])
energy_times = np.arange(len(energy_signal)) * self.base_hop
vad_interp = np.interp(energy_times, vad_times, vad_probs)
vad_interp = (vad_interp - np.mean(vad_interp)) / (np.std(vad_interp) + 1e-8)
if len(energy_signal) > 10 and len(vad_interp) > 10:
correlation = np.correlate(energy_signal, vad_interp, mode='full')
delay_samples = np.argmax(correlation) - len(vad_interp) + 1
delay_seconds = delay_samples * self.base_hop
max_corr = np.max(correlation) / (len(vad_interp) * np.std(energy_signal) * np.std(vad_interp))
if max_corr > self.correlation_threshold:
self.delay_compensation = np.clip(delay_seconds, -0.1, 0.1)
return self.delay_compensation
except Exception as e:
print(f"Delay estimation error: {e}")
return 0.0
# ===== ENHANCED VISUALIZATION =====
def create_realtime_plot(audio_data: np.ndarray, vad_results: List[VADResult],
onsets_offsets: List[OnsetOffset], processor: AudioProcessor,
model_a: str, model_b: str, threshold: float):
if not PLOTLY_AVAILABLE:
return None
try:
mel_spec_db, time_frames = processor.compute_high_res_spectrogram(audio_data)
freq_axis = np.linspace(processor.fmin, processor.fmax, processor.n_mels)
fig = make_subplots(
rows=2, cols=1,
subplot_titles=(f"Model A: {model_a}", f"Model B: {model_b}"),
vertical_spacing=0.02,
shared_xaxes=True,
specs=[[{"secondary_y": True}], [{"secondary_y": True}]]
)
colorscale = 'Viridis'
fig.add_trace(
go.Heatmap(
z=mel_spec_db,
x=time_frames,
y=freq_axis,
colorscale=colorscale,
showscale=False,
hovertemplate='Time: %{x:.2f}s
Freq: %{y:.0f}Hz
Power: %{z:.1f}dB
Freq: %{y:.0f}Hz
Power: %{z:.1f}dB
Probability: %{y:.3f}
Probability: %{y:.3f}
Multi-Model Real-time Speech Detection Framework