|
|
import gradio as gr
|
|
|
import numpy as np
|
|
|
import torch
|
|
|
import time
|
|
|
import warnings
|
|
|
from dataclasses import dataclass
|
|
|
from typing import List, Tuple, Dict
|
|
|
import threading
|
|
|
import queue
|
|
|
import os
|
|
|
import requests
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
|
|
|
try:
|
|
|
import librosa
|
|
|
LIBROSA_AVAILABLE = True
|
|
|
print("✅ Librosa available")
|
|
|
except ImportError:
|
|
|
LIBROSA_AVAILABLE = False
|
|
|
print("⚠️ Librosa not available, using scipy fallback")
|
|
|
|
|
|
try:
|
|
|
import webrtcvad
|
|
|
WEBRTC_AVAILABLE = True
|
|
|
print("✅ WebRTC VAD available")
|
|
|
except ImportError:
|
|
|
WEBRTC_AVAILABLE = False
|
|
|
print("⚠️ WebRTC VAD not available, using fallback")
|
|
|
|
|
|
try:
|
|
|
import plotly.graph_objects as go
|
|
|
from plotly.subplots import make_subplots
|
|
|
PLOTLY_AVAILABLE = True
|
|
|
print("✅ Plotly available")
|
|
|
except ImportError:
|
|
|
PLOTLY_AVAILABLE = False
|
|
|
print("⚠️ Plotly not available")
|
|
|
|
|
|
|
|
|
try:
|
|
|
from panns_inference import AudioTagging, labels
|
|
|
PANNS_AVAILABLE = True
|
|
|
print("✅ PANNs available")
|
|
|
except ImportError:
|
|
|
PANNS_AVAILABLE = False
|
|
|
print("⚠️ PANNs not available, using fallback")
|
|
|
|
|
|
|
|
|
try:
|
|
|
from transformers import ASTForAudioClassification, ASTFeatureExtractor
|
|
|
import transformers
|
|
|
AST_AVAILABLE = True
|
|
|
print("✅ AST (Transformers) available")
|
|
|
except ImportError:
|
|
|
AST_AVAILABLE = False
|
|
|
print("⚠️ AST not available, using fallback")
|
|
|
|
|
|
print("🚀 Creating Real-time VAD Demo...")
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class VADResult:
|
|
|
probability: float
|
|
|
is_speech: bool
|
|
|
model_name: str
|
|
|
processing_time: float
|
|
|
timestamp: float
|
|
|
|
|
|
@dataclass
|
|
|
class OnsetOffset:
|
|
|
onset_time: float
|
|
|
offset_time: float
|
|
|
model_name: str
|
|
|
confidence: float
|
|
|
|
|
|
|
|
|
|
|
|
class OptimizedSileroVAD:
|
|
|
def __init__(self):
|
|
|
self.model = None
|
|
|
self.sample_rate = 16000
|
|
|
self.model_name = "Silero-VAD"
|
|
|
self.load_model()
|
|
|
|
|
|
def load_model(self):
|
|
|
try:
|
|
|
self.model, _ = torch.hub.load(
|
|
|
repo_or_dir='snakers4/silero-vad',
|
|
|
model='silero_vad',
|
|
|
force_reload=False,
|
|
|
onnx=False
|
|
|
)
|
|
|
self.model.eval()
|
|
|
print(f"✅ {self.model_name} loaded successfully")
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error loading {self.model_name}: {e}")
|
|
|
self.model = None
|
|
|
|
|
|
def reset_states(self):
|
|
|
if self.model:
|
|
|
self.model.reset_states()
|
|
|
|
|
|
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
|
|
|
start_time = time.time()
|
|
|
|
|
|
if self.model is None or len(audio) == 0:
|
|
|
return VADResult(0.0, False, f"{self.model_name} (unavailable)", time.time() - start_time, timestamp)
|
|
|
|
|
|
try:
|
|
|
if len(audio.shape) > 1: audio = audio.mean(axis=1)
|
|
|
|
|
|
|
|
|
|
|
|
audio_tensor = torch.FloatTensor(audio).unsqueeze(0)
|
|
|
|
|
|
with torch.no_grad():
|
|
|
speech_prob = self.model(audio_tensor, self.sample_rate).item()
|
|
|
|
|
|
is_speech = speech_prob > 0.5
|
|
|
processing_time = time.time() - start_time
|
|
|
|
|
|
return VADResult(speech_prob, is_speech, self.model_name, processing_time, timestamp)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
|
|
|
|
|
|
class OptimizedWebRTCVAD:
|
|
|
def __init__(self):
|
|
|
self.model_name = "WebRTC-VAD"
|
|
|
self.sample_rate = 16000
|
|
|
self.frame_duration = 10
|
|
|
self.frame_size = int(self.sample_rate * self.frame_duration / 1000)
|
|
|
|
|
|
if WEBRTC_AVAILABLE:
|
|
|
try:
|
|
|
self.vad = webrtcvad.Vad(3)
|
|
|
print(f"✅ {self.model_name} loaded successfully")
|
|
|
except: self.vad = None
|
|
|
else: self.vad = None
|
|
|
|
|
|
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
|
|
|
start_time = time.time()
|
|
|
|
|
|
if self.vad is None or len(audio) == 0:
|
|
|
return VADResult(0.0, False, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
|
|
|
|
|
|
try:
|
|
|
if len(audio.shape) > 1: audio = audio.mean(axis=1)
|
|
|
audio_int16 = (audio * 32767).astype(np.int16)
|
|
|
|
|
|
speech_frames, total_frames = 0, 0
|
|
|
|
|
|
for i in range(0, len(audio_int16) - self.frame_size + 1, self.frame_size):
|
|
|
frame = audio_int16[i:i + self.frame_size].tobytes()
|
|
|
if self.vad.is_speech(frame, self.sample_rate):
|
|
|
speech_frames += 1
|
|
|
total_frames += 1
|
|
|
|
|
|
probability = speech_frames / max(total_frames, 1)
|
|
|
is_speech = probability > 0.5
|
|
|
|
|
|
return VADResult(probability, is_speech, self.model_name, time.time() - start_time, timestamp)
|
|
|
|
|
|
except Exception as e:
|
|
|
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
|
|
|
|
|
|
class OptimizedEPANNs:
|
|
|
def __init__(self):
|
|
|
self.model_name = "E-PANNs"
|
|
|
self.sample_rate = 16000
|
|
|
print(f"✅ {self.model_name} initialized")
|
|
|
|
|
|
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
|
|
|
start_time = time.time()
|
|
|
if len(audio) == 0: return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
|
|
|
|
|
|
try:
|
|
|
if LIBROSA_AVAILABLE:
|
|
|
mel_spec = librosa.feature.melspectrogram(y=audio, sr=self.sample_rate, n_mels=64)
|
|
|
energy = np.mean(librosa.power_to_db(mel_spec, ref=np.max))
|
|
|
else:
|
|
|
from scipy import signal
|
|
|
_, _, Sxx = signal.spectrogram(audio, self.sample_rate)
|
|
|
energy = np.mean(10 * np.log10(Sxx + 1e-10))
|
|
|
|
|
|
speech_score = (energy + 100) / 50
|
|
|
probability = np.clip(speech_score, 0, 1)
|
|
|
|
|
|
return VADResult(probability, probability > 0.6, self.model_name, time.time() - start_time, timestamp)
|
|
|
except Exception as e:
|
|
|
return VADResult(0.0, False, self.model_name, time.time() - start_time, timestamp)
|
|
|
|
|
|
class OptimizedPANNs:
|
|
|
def __init__(self):
|
|
|
self.model_name = "PANNs"
|
|
|
self.sample_rate = 32000
|
|
|
self.model = None
|
|
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
self.cached_clip_prob = None
|
|
|
self.load_model()
|
|
|
|
|
|
def load_model(self):
|
|
|
try:
|
|
|
if PANNS_AVAILABLE:
|
|
|
self.model = AudioTagging(checkpoint_path=None, device=self.device)
|
|
|
print(f"✅ {self.model_name} loaded successfully")
|
|
|
else: self.model = None
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error loading {self.model_name}: {e}")
|
|
|
self.model = None
|
|
|
|
|
|
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
|
|
|
if self.cached_clip_prob is not None:
|
|
|
return VADResult(self.cached_clip_prob, self.cached_clip_prob > 0.5, self.model_name, 0.0, timestamp)
|
|
|
|
|
|
start_time = time.time()
|
|
|
if self.model is None or len(audio) == 0:
|
|
|
return VADResult(0.0, False, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
|
|
|
|
|
|
try:
|
|
|
|
|
|
clip_probs, _ = self.model.inference(audio[np.newaxis, :], input_sr=self.sample_rate)
|
|
|
|
|
|
|
|
|
speech_idx = [i for i, lbl in enumerate(labels) if 'speech' in lbl.lower() or 'voice' in lbl.lower()]
|
|
|
if not speech_idx: speech_idx = [labels.index('Speech')]
|
|
|
|
|
|
speech_prob = clip_probs[0, speech_idx].mean().item()
|
|
|
self.cached_clip_prob = float(speech_prob)
|
|
|
|
|
|
return VADResult(self.cached_clip_prob, self.cached_clip_prob > 0.5, self.model_name, time.time() - start_time, timestamp)
|
|
|
except Exception as e:
|
|
|
return VADResult(0.0, False, f"{self.model_name} (error)", time.time() - start_time, timestamp)
|
|
|
|
|
|
class OptimizedAST:
|
|
|
def __init__(self):
|
|
|
self.model_name = "AST"
|
|
|
self.sample_rate = 16000
|
|
|
self.model = None
|
|
|
self.feature_extractor = None
|
|
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
self.cached_clip_prob = None
|
|
|
self.load_model()
|
|
|
|
|
|
def load_model(self):
|
|
|
try:
|
|
|
if AST_AVAILABLE:
|
|
|
model_path = "MIT/ast-finetuned-audioset-10-10-0.4593"
|
|
|
self.feature_extractor = ASTFeatureExtractor.from_pretrained(model_path)
|
|
|
self.model = ASTForAudioClassification.from_pretrained(model_path).to(self.device).eval()
|
|
|
print(f"✅ {self.model_name} loaded successfully")
|
|
|
else: self.model = None
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error loading {self.model_name}: {e}")
|
|
|
self.model = None
|
|
|
|
|
|
def predict(self, audio: np.ndarray, timestamp: float = 0.0) -> VADResult:
|
|
|
if self.cached_clip_prob is not None:
|
|
|
return VADResult(self.cached_clip_prob, self.cached_clip_prob > 0.5, self.model_name, 0.0, timestamp)
|
|
|
|
|
|
start_time = time.time()
|
|
|
if self.model is None or len(audio) < self.sample_rate * 2:
|
|
|
return VADResult(0.0, False, f"{self.model_name} (fallback)", time.time() - start_time, timestamp)
|
|
|
|
|
|
try:
|
|
|
inputs = self.feature_extractor(audio, sampling_rate=self.sample_rate, return_tensors="pt").to(self.device)
|
|
|
with torch.no_grad():
|
|
|
probs = torch.sigmoid(self.model(**inputs).logits)
|
|
|
|
|
|
|
|
|
label2id = self.model.config.label2id
|
|
|
speech_idx = [idx for lbl, idx in label2id.items() if 'speech' in lbl.lower() or 'voice' in lbl.lower()]
|
|
|
|
|
|
speech_prob = probs[0, speech_idx].mean().item()
|
|
|
self.cached_clip_prob = float(speech_prob)
|
|
|
|
|
|
return VADResult(self.cached_clip_prob, self.cached_clip_prob > 0.5, self.model_name, time.time() - start_time, timestamp)
|
|
|
except Exception as e:
|
|
|
return VADResult(0.0, False, f"{self.model_name} (error)", time.time() - start_time, timestamp)
|
|
|
|
|
|
|
|
|
|
|
|
class AudioProcessor:
|
|
|
def __init__(self, sample_rate=16000):
|
|
|
self.sample_rate = sample_rate
|
|
|
|
|
|
|
|
|
self.window_size = 0.064
|
|
|
self.hop_size = 0.016
|
|
|
self.n_fft = int(self.sample_rate * self.window_size)
|
|
|
self.hop_length = int(self.sample_rate * self.hop_size)
|
|
|
|
|
|
self.n_mels = 128
|
|
|
self.fmin = 20
|
|
|
self.fmax = 8000
|
|
|
|
|
|
def process_audio(self, audio):
|
|
|
if audio is None: return np.array([])
|
|
|
try:
|
|
|
sample_rate, audio_data = audio
|
|
|
if sample_rate != self.sample_rate and LIBROSA_AVAILABLE:
|
|
|
audio_data = librosa.resample(audio_data.astype(float), orig_sr=sample_rate, target_sr=self.sample_rate)
|
|
|
if len(audio_data.shape) > 1: audio_data = audio_data.mean(axis=1)
|
|
|
if np.max(np.abs(audio_data)) > 0: audio_data /= np.max(np.abs(audio_data))
|
|
|
return audio_data
|
|
|
except Exception as e:
|
|
|
return np.array([])
|
|
|
|
|
|
def compute_high_res_spectrogram(self, audio_data):
|
|
|
try:
|
|
|
if LIBROSA_AVAILABLE and len(audio_data) > 0:
|
|
|
stft = librosa.stft(audio_data, n_fft=self.n_fft, hop_length=self.hop_length, center=False)
|
|
|
mel_spec = librosa.feature.melspectrogram(S=np.abs(stft)**2, sr=self.sample_rate, n_fft=self.n_fft, hop_length=self.hop_length, n_mels=self.n_mels)
|
|
|
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
|
|
|
time_frames = librosa.times_like(mel_spec_db, sr=self.sample_rate, hop_length=self.hop_length, n_fft=self.n_fft)
|
|
|
return mel_spec_db, time_frames
|
|
|
return np.array([[]]), np.array([])
|
|
|
except Exception as e:
|
|
|
return np.array([[]]), np.array([])
|
|
|
|
|
|
def detect_onset_offset_advanced(self, vad_results: List[VADResult], threshold: float = 0.5) -> List[OnsetOffset]:
|
|
|
onsets_offsets = []
|
|
|
models = {res.model_name for res in vad_results}
|
|
|
|
|
|
for model_name in models:
|
|
|
results = sorted([r for r in vad_results if r.model_name == model_name], key=lambda x: x.timestamp)
|
|
|
if len(results) < 2: continue
|
|
|
|
|
|
timestamps = np.array([r.timestamp for r in results])
|
|
|
probabilities = np.array([r.probability for r in results])
|
|
|
|
|
|
|
|
|
probs_smooth = np.convolve(probabilities, np.ones(3)/3, mode='same')
|
|
|
|
|
|
upper = threshold
|
|
|
lower = threshold * 0.5
|
|
|
|
|
|
in_speech = False
|
|
|
onset_time = -1
|
|
|
for i, prob in enumerate(probs_smooth):
|
|
|
if not in_speech and prob > upper:
|
|
|
in_speech = True
|
|
|
onset_time = timestamps[i]
|
|
|
elif in_speech and prob < lower:
|
|
|
in_speech = False
|
|
|
onsets_offsets.append(OnsetOffset(onset_time, timestamps[i], model_name, np.mean(probabilities[(timestamps >= onset_time) & (timestamps <= timestamps[i])])))
|
|
|
if in_speech:
|
|
|
onsets_offsets.append(OnsetOffset(onset_time, timestamps[-1], model_name, np.mean(probabilities[timestamps >= onset_time])))
|
|
|
|
|
|
return onsets_offsets
|
|
|
|
|
|
|
|
|
|
|
|
def create_realtime_plot(audio_data: np.ndarray, vad_results: List[VADResult],
|
|
|
onsets_offsets: List[OnsetOffset], processor: AudioProcessor,
|
|
|
model_a: str, model_b: str, threshold: float):
|
|
|
|
|
|
if not PLOTLY_AVAILABLE or len(audio_data) == 0: return go.Figure()
|
|
|
|
|
|
mel_spec_db, time_frames = processor.compute_high_res_spectrogram(audio_data)
|
|
|
if mel_spec_db.size == 0: return go.Figure()
|
|
|
|
|
|
fig = make_subplots(rows=2, cols=1, subplot_titles=(f"Model A: {model_a}", f"Model B: {model_b}"),
|
|
|
vertical_spacing=0.05, shared_xaxes=True, specs=[[{"secondary_y": True}], [{"secondary_y": True}]])
|
|
|
|
|
|
heatmap_args = dict(z=mel_spec_db, x=time_frames, y=np.linspace(processor.fmin, processor.fmax, processor.n_mels),
|
|
|
colorscale='Viridis', showscale=False)
|
|
|
fig.add_trace(go.Heatmap(**heatmap_args, name=f'Spectrogram {model_a}'), row=1, col=1)
|
|
|
fig.add_trace(go.Heatmap(**heatmap_args, name=f'Spectrogram {model_b}'), row=2, col=1)
|
|
|
|
|
|
data_a = [r for r in vad_results if r.model_name.startswith(model_a)]
|
|
|
data_b = [r for r in vad_results if r.model_name.startswith(model_b)]
|
|
|
|
|
|
if data_a: fig.add_trace(go.Scatter(x=[r.timestamp for r in data_a], y=[r.probability for r in data_a], mode='lines', line=dict(color='yellow', width=3), name=f'{model_a} Prob.'), row=1, col=1, secondary_y=True)
|
|
|
if data_b: fig.add_trace(go.Scatter(x=[r.timestamp for r in data_b], y=[r.probability for r in data_b], mode='lines', line=dict(color='orange', width=3), name=f'{model_b} Prob.'), row=2, col=1, secondary_y=True)
|
|
|
|
|
|
|
|
|
fig.add_hline(y=threshold, line=dict(color='cyan', width=2, dash='dash'), row=1, col=1, secondary_y=True)
|
|
|
fig.add_hline(y=threshold, line=dict(color='cyan', width=2, dash='dash'), row=2, col=1, secondary_y=True)
|
|
|
|
|
|
events_a = [e for e in onsets_offsets if e.model_name.startswith(model_a)]
|
|
|
events_b = [e for e in onsets_offsets if e.model_name.startswith(model_b)]
|
|
|
|
|
|
for event in events_a:
|
|
|
fig.add_vline(x=event.onset_time, line=dict(color='lime', width=3), row=1, col=1)
|
|
|
fig.add_vline(x=event.offset_time, line=dict(color='red', width=3), row=1, col=1)
|
|
|
for event in events_b:
|
|
|
fig.add_vline(x=event.offset_time, line=dict(color='red', width=3), row=2, col=1)
|
|
|
fig.add_vline(x=event.onset_time, line=dict(color='lime', width=3), row=2, col=1)
|
|
|
|
|
|
fig.update_layout(height=600, title_text="Real-Time Speech Visualizer", plot_bgcolor='black', paper_bgcolor='white', font_color='black')
|
|
|
fig.update_yaxes(title_text="Frequency (Hz)", range=[processor.fmin, processor.fmax], secondary_y=False)
|
|
|
fig.update_yaxes(title_text="Probability", range=[0, 1], secondary_y=True)
|
|
|
fig.update_xaxes(title_text="Time (seconds)", row=2, col=1)
|
|
|
|
|
|
return fig
|
|
|
|
|
|
|
|
|
|
|
|
class VADDemo:
|
|
|
def __init__(self):
|
|
|
self.processor = AudioProcessor()
|
|
|
self.models = {
|
|
|
'Silero-VAD': OptimizedSileroVAD(), 'WebRTC-VAD': OptimizedWebRTCVAD(),
|
|
|
'E-PANNs': OptimizedEPANNs(), 'PANNs': OptimizedPANNs(), 'AST': OptimizedAST()
|
|
|
}
|
|
|
print("🎤 VAD Demo initialized with all modules.")
|
|
|
|
|
|
def process_audio_with_events(self, audio, model_a, model_b, threshold):
|
|
|
if audio is None: return None, "🔇 No audio detected", "Ready..."
|
|
|
|
|
|
try:
|
|
|
processed_audio = self.processor.process_audio(audio)
|
|
|
if len(processed_audio) == 0: return None, "Audio empty", "No data"
|
|
|
|
|
|
|
|
|
for model in self.models.values():
|
|
|
if hasattr(model, 'cached_clip_prob'): model.cached_clip_prob = None
|
|
|
if hasattr(model, 'reset_states'): model.reset_states()
|
|
|
|
|
|
|
|
|
if 'PANNs' in self.models:
|
|
|
audio_32k = librosa.resample(processed_audio, orig_sr=self.processor.sample_rate, target_sr=32000)
|
|
|
self.models['PANNs'].predict(audio_32k, 0.0)
|
|
|
if 'AST' in self.models:
|
|
|
self.models['AST'].predict(processed_audio, 0.0)
|
|
|
|
|
|
|
|
|
vad_results = []
|
|
|
window = int(self.processor.sample_rate * self.processor.window_size)
|
|
|
hop = int(self.processor.sample_rate * self.hop_size)
|
|
|
silero_chunk_size = 512
|
|
|
|
|
|
for i in range(0, len(processed_audio) - window + 1, hop):
|
|
|
timestamp = i / self.processor.sample_rate
|
|
|
chunk_1024 = processed_audio[i : i + window]
|
|
|
|
|
|
|
|
|
chunk_512 = chunk_1024[-silero_chunk_size:]
|
|
|
|
|
|
for model_name in list(set([model_a, model_b])):
|
|
|
model = self.models[model_name]
|
|
|
|
|
|
if model_name == 'Silero-VAD':
|
|
|
current_chunk = chunk_512
|
|
|
else:
|
|
|
current_chunk = chunk_1024
|
|
|
|
|
|
result = model.predict(current_chunk, timestamp)
|
|
|
result.is_speech = result.probability > threshold
|
|
|
vad_results.append(result)
|
|
|
|
|
|
onsets_offsets = self.processor.detect_onset_offset_advanced(vad_results, threshold)
|
|
|
fig = create_realtime_plot(processed_audio, vad_results, onsets_offsets, self.processor, model_a, model_b, threshold)
|
|
|
|
|
|
status_msg = f"🎙️ Speech detected" if any(e.offset_time > e.onset_time for e in onsets_offsets) else "🔇 No speech detected"
|
|
|
details_text = f"Analyzed {len(processed_audio)/self.processor.sample_rate:.2f}s. Found {len(onsets_offsets)} speech events."
|
|
|
|
|
|
return fig, status_msg, details_text
|
|
|
except Exception as e:
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
return None, f"❌ Error: {e}", traceback.format_exc()
|
|
|
|
|
|
|
|
|
demo_app = VADDemo()
|
|
|
interface = create_interface()
|
|
|
interface.launch(share=True, debug=False) |