voxlect / app.py
kasimali's picture
Upload folder using huggingface_hub
5fae178 verified
# VOXLECT
# ============================================================================
# CELL 1: SETUP AND INSTALLATION (VERIFIED)
# ============================================================================
import os
import sys
import warnings
warnings.filterwarnings('ignore')
print("πŸš€ VoxLect Indic LID Whisper Large v3 - Final Setup")
print("=" * 60)
# Mount Google Drive
from google.colab import drive
# Install packages
print("πŸ“¦ Installing packages...")
# Clone VoxLect repository (correct syntax)
print("πŸ“₯ Cloning VoxLect repository...")
# Python path
sys.path.insert(0, '/content/voxlect')
sys.path.insert(0, '/content/voxlect/src')
print("βœ… Installation complete!")
# ============================================================================
# CELL 2: MANDATORY MONKEY PATCH (ATTENTION COMPATIBILITY)
# ============================================================================
import transformers.models.whisper.modeling_whisper as whisper_modeling
print("πŸ”§ Applying attention compatibility patch...")
_OriginalWhisperAttention = whisper_modeling.WhisperAttention
class PatchedWhisperAttention(_OriginalWhisperAttention):
def _get_attn_impl(self):
try:
attn_impl = super()._get_attn_impl()
if attn_impl is None:
return "eager"
return attn_impl
except AttributeError:
return "eager"
whisper_modeling.WhisperAttention = PatchedWhisperAttention
print("βœ… Monkey patch applied.")
# ============================================================================
# CELL 3: MODEL LOADING & LABEL LIST
# ============================================================================
import torch
import torch.nn.functional as F
import librosa
import pandas as pd
import numpy as np
from datetime import datetime
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from IPython.display import display
# Import VoxLect after patch
from src.model.dialect.whisper_dialect import WhisperWrapper
# Folder code -> ground truth mapping
CUSTOM_FOLDER_MAPPING = {
"as": "assamese", "bn": "bengali", "br": "bodo", "doi": "dogri",
"en": "english", "gu": "gujarati", "hi": "hindi", "kn": "kannada",
"ks": "kashmiri", "kok": "konkani", "mai": "maithili", "ml": "malayalam",
"mni": "manipuri", "mr": "marathi", "ne": "nepali", "or": "odia",
"pa": "punjabi", "sa": "sanskrit", "sat": "santali", "sd": "sindhi",
"ta": "tamil", "te": "telugu", "ur": "urdu"
}
# IMPORTANT: label order used by the model (adjust if the model card lists a different order)
LABEL_LIST = [
"assamese", "bengali", "bodo", "dogri", "english", "gujarati",
"hindi", "kannada", "kashmiri", "konkani", "maithili", "malayalam",
"manipuri", "marathi", "nepali", "odia", "punjabi", "sanskrit",
"santali", "sindhi", "tamil", "telugu", "urdu"
]
# Update these paths
AUDIO_FOLDER = "/content/drive/MyDrive/Audio_files" # <-- set your path
RESULTS_FOLDER = "/content/drive/MyDrive/voxlect_1_results"
os.makedirs(RESULTS_FOLDER, exist_ok=True)
# Device and model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
MODEL_NAME = "tiantiaf/voxlect-indic-lid-whisper-large-v3"
print(f"πŸ”§ Device: {device}")
print(f"πŸ”„ Loading model: {MODEL_NAME}")
model = WhisperWrapper.from_pretrained(MODEL_NAME).to(device)
model.eval()
print("βœ… Model loaded successfully!")
# ============================================================================
# CELL 4: AUDIO IO & PREDICTION (ROBUST)
# ============================================================================
def trim_silence(audio, threshold=0.01, win=2048, hop=512):
rms = librosa.feature.rms(y=audio, frame_length=win, hop_length=hop)[0]
mask = rms > threshold * (rms.max() if rms.size else 1.0)
if not mask.any():
return audio
idx = np.where(mask)[0]
start = max(int(idx[0] * hop), 0)
end = min(int((idx[-1] + 1) * hop), len(audio))
return audio[start:end]
def load_audio(file_path, target_sr=16000, max_duration=15.0):
try:
audio, sr = librosa.load(file_path, sr=target_sr, mono=True)
# Optional: trim leading/trailing silence to improve discrimination
audio = trim_silence(audio, threshold=0.01)
# Duration control
max_samples = int(max_duration * target_sr)
if len(audio) > max_samples:
audio = audio[:max_samples]
min_samples = int(3.0 * target_sr)
if len(audio) < min_samples:
audio = np.pad(audio, (0, min_samples - len(audio)), 'constant')
# Normalize peak to 1 for stability across files
peak = np.abs(audio).max()
if peak > 0:
audio = audio / peak
# Diagnostics (optional; comment out after verifying)
# print(f"dbg: {os.path.basename(file_path)} len={len(audio)} mean={audio.mean():.4f} std={audio.std():.4f}")
return torch.from_numpy(audio).float().unsqueeze(0)
except Exception as e:
print(f" ❌ Error loading {os.path.basename(file_path)}: {e}")
return None
def predict_language(audio_tensor, k=5):
if audio_tensor is None:
return {"predicted_language": "error", "confidence": 0.0, "top_5_predictions": [], "error_message": "Audio load failed"}
try:
audio_tensor = audio_tensor.to(device)
with torch.no_grad():
logits, _ = model(audio_tensor, return_feature=True) # expect [1, C]
if logits.dim() == 1:
logits = logits.unsqueeze(0)
if logits.size(0) != 1:
logits = logits[:1, :]
probs = F.softmax(logits, dim=1)[0] # softmax over classes
top_probs, top_idx = torch.topk(probs, k)
top = []
for rank, (p, ix) in enumerate(zip(top_probs.tolist(), top_idx.tolist()), start=1):
idx = int(ix)
lang = LABEL_LIST[idx] if 0 <= idx < len(LABEL_LIST) else f"unknown_{idx}"
top.append({"rank": rank, "language": lang, "confidence": float(p)})
return {"predicted_language": top[0]["language"], "confidence": top[0]["confidence"], "top_5_predictions": top}
except Exception as e:
return {"predicted_language": "error", "confidence": 0.0, "top_5_predictions": [], "error_message": str(e)}
def find_audio_files(base_path):
if not os.path.exists(base_path):
print(f"❌ Path not found: {base_path}")
return []
audio_files = []
for root, _, files in os.walk(base_path):
folder = os.path.basename(root).lower()
gt = CUSTOM_FOLDER_MAPPING.get(folder, "unknown")
for file in files:
if file.lower().endswith(('.wav', '.mp3', '.m4a', '.flac', '.ogg')):
audio_files.append({
"file_path": os.path.join(root, file),
"filename": file,
"ground_truth": gt
})
print(f"βœ… Found {len(audio_files)} audio files.")
if audio_files:
print("πŸ“Š Ground Truth Distribution:")
print(pd.Series([f['ground_truth'] for f in audio_files]).value_counts())
return audio_files
print("βœ… Audio & prediction functions ready.")
# ============================================================================
# CELL 5: BATCH PROCESSING -> CSV
# ============================================================================
def run_batch_processing():
files = find_audio_files(AUDIO_FOLDER)
if not files:
return pd.DataFrame()
results = []
total = len(files)
print("\nπŸš€ Processing audio files...")
for i, f in enumerate(files, 1):
print(f" [{i}/{total}] Processing {f['filename']}...", end="")
audio_tensor = load_audio(f['file_path'])
pred = predict_language(audio_tensor)
if pred['predicted_language'] == 'error':
print(f" -> Error: {pred.get('error_message', 'Unknown error')}")
else:
print(f" -> Predicted: {pred['predicted_language']}")
results.append({**f, **pred})
df = pd.DataFrame(results)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out_csv = f"{RESULTS_FOLDER}/voxlect_results_{ts}.csv"
df.to_csv(out_csv, index=False)
print(f"\nβœ… Saved results to: {out_csv}")
return df
results_df = run_batch_processing()
# ============================================================================
# CELL 6: DETAILED ACCURACY ANALYSIS
# ============================================================================
def run_detailed_analysis(df):
print("\n" + "=" * 70)
print("πŸ“Š DETAILED ACCURACY ANALYSIS")
print("=" * 70)
valid = df[(df['ground_truth'] != 'unknown') & (df['predicted_language'] != 'error')].copy()
if valid.empty:
print("❌ No valid results for analysis.")
return
y_true = valid['ground_truth'].values
y_pred = valid['predicted_language'].values
print(f"\n🎯 Overall Accuracy (Top-1): {accuracy_score(y_true, y_pred):.2%}")
labels = sorted(set(list(y_true) + list(y_pred)))
print("\nπŸ“ˆ Classification Report:")
print(classification_report(y_true, y_pred, labels=labels, zero_division=0))
print("\nπŸ”€ Confusion Matrix:")
cm = confusion_matrix(y_true, y_pred, labels=labels)
cm_df = pd.DataFrame(cm, index=labels, columns=labels)
display(cm_df)
mis = valid[valid['ground_truth'] != valid['predicted_language']].copy()
if not mis.empty:
print("\n❌ Top 10 Most Common Misclassifications:")
top_errs = (mis.groupby(['ground_truth', 'predicted_language'])
.size().sort_values(ascending=False).head(10))
print(top_errs)
if 'top_5_predictions' in mis.columns:
def correct_rank(row):
true_lang = row['ground_truth']
preds = row['top_5_predictions']
if isinstance(preds, str):
try:
preds = eval(preds)
except Exception:
return None
for p in preds:
if p.get('language') == true_lang:
return p.get('rank')
return None
mis['correct_rank_in_top5'] = mis.apply(correct_rank, axis=1)
c_in_top5 = mis['correct_rank_in_top5'].notna().sum()
print(f"\nπŸ” Correct language in Top-5 for misclassified: {c_in_top5}/{len(mis)} ({c_in_top5/len(mis):.1%})")
top1_correct = (valid['ground_truth'] == valid['predicted_language']).sum()
top5_acc = (top1_correct + c_in_top5) / len(valid)
print(f"🎯 Overall Accuracy (Top-5): {top5_acc:.2%}")
print("\n🏁 Analysis complete!")
if 'results_df' in locals() and not results_df.empty:
run_detailed_analysis(results_df)
else:
print("Run Cell 5 first to generate results.")
# ============================================================================
# CELL: EXPORT DETAILED ANALYSIS TO EXCEL
# Requires: results_df (from Cell 5), LABEL_LIST, RESULTS_FOLDER
# ============================================================================
import json
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
def export_detailed_excel(results_df, results_folder, label_order=None, filename_prefix="voxlect_analysis"):
# Guard
if results_df is None or results_df.empty:
print("❌ No results to export. Run batch processing first.")
return None
# Prepare valid subset
valid = results_df[
(results_df['ground_truth'] != 'unknown') &
(results_df['predicted_language'] != 'error') &
results_df['predicted_language'].notna()
].copy()
# Choose label order for confusion matrix/report
if label_order is None:
label_order = sorted(set(valid['ground_truth']) | set(valid['predicted_language']))
# Overall Top‑1 accuracy
top1_acc = accuracy_score(valid['ground_truth'], valid['predicted_language'])
# Top‑5 accuracy (if top_5_predictions available)
top5_acc = None
if 'top_5_predictions' in valid.columns:
def correct_in_top5(row):
target = row['ground_truth']
preds = row['top_5_predictions']
if isinstance(preds, str):
try:
preds = json.loads(preds)
except Exception:
try:
preds = eval(preds)
except Exception:
return False
for p in preds:
if p.get('language') == target:
return True
return False
valid['correct_in_top5'] = valid.apply(correct_in_top5, axis=1)
top5_acc = (valid['correct_in_top5'].sum() / len(valid)) if len(valid) else None
# Classification report (dict β†’ DataFrame)
cls_report = classification_report(
valid['ground_truth'],
valid['predicted_language'],
labels=label_order,
zero_division=0,
output_dict=True
)
report_df = pd.DataFrame(cls_report).T.reset_index().rename(columns={"index": "label"})
# Confusion matrix
cm = confusion_matrix(valid['ground_truth'], valid['predicted_language'], labels=label_order)
cm_df = pd.DataFrame(cm, index=label_order, columns=label_order)
cm_df.index.name = "True\\Pred"
# Top misclassifications
mis = valid[valid['ground_truth'] != valid['predicted_language']].copy()
if not mis.empty:
top_mis = (mis.groupby(['ground_truth', 'predicted_language'])
.size()
.reset_index(name='count')
.sort_values('count', ascending=False))
else:
top_mis = pd.DataFrame(columns=['ground_truth', 'predicted_language', 'count'])
# Overview sheet
overview_rows = [
{"metric": "total_predictions", "value": int(len(results_df))},
{"metric": "valid_predictions_for_eval", "value": int(len(valid))},
{"metric": "top1_accuracy", "value": float(top1_acc) if len(valid) else None},
{"metric": "top5_accuracy", "value": float(top5_acc) if top5_acc is not None else None},
]
overview_df = pd.DataFrame(overview_rows)
# Full predictions (convert top_5_predictions to JSON string for readability)
full_preds = results_df.copy()
if 'top_5_predictions' in full_preds.columns:
full_preds['top_5_predictions'] = full_preds['top_5_predictions'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (list, dict)) else str(x)
)
# Write to Excel with multiple sheets
ts = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
out_xlsx = os.path.join(results_folder, f"{filename_prefix}_{ts}.xlsx")
with pd.ExcelWriter(out_xlsx, engine="xlsxwriter") as writer:
overview_df.to_excel(writer, sheet_name="overview", index=False)
report_df.to_excel(writer, sheet_name="per_language_metrics", index=False)
cm_df.to_excel(writer, sheet_name="confusion_matrix")
top_mis.to_excel(writer, sheet_name="top_misclassifications", index=False)
full_preds.to_excel(writer, sheet_name="full_predictions", index=False)
# Optional: auto width
for sheet in ["overview", "per_language_metrics", "confusion_matrix", "top_misclassifications", "full_predictions"]:
try:
ws = writer.sheets[sheet]
for i, col in enumerate(pd.read_excel(out_xlsx, sheet_name=sheet).columns):
width = max(12, min(60, int(full_preds[col].astype(str).map(len).max()) if sheet == "full_predictions" else 20))
ws.set_column(i, i, width)
except Exception:
pass
print(f"βœ… Excel report saved to: {out_xlsx}")
return out_xlsx
# Run export
excel_path = export_detailed_excel(results_df, RESULTS_FOLDER, label_order=sorted(set(LABEL_LIST)))
print("Excel path:", excel_path)
# ============================================================================
# CELL: TOP-5 RANK POSITION ANALYSIS
# Requires: results_df (from batch), where 'top_5_predictions' is a list/dict or JSON string
# ============================================================================
import json
import pandas as pd
import numpy as np
def parse_top5(cell):
"""Return a list of dicts [{'rank':int,'language':str,'confidence':float}, ...] from cell."""
if isinstance(cell, list):
return cell
if isinstance(cell, dict):
return [cell]
if isinstance(cell, str):
# Try JSON first, then eval as fallback
try:
v = json.loads(cell)
return v if isinstance(v, list) else [v]
except Exception:
try:
v = eval(cell)
return v if isinstance(v, list) else [v]
except Exception:
return []
return []
def compute_rank_in_top5(df):
"""Add 'correct_rank_in_top5' and a readable verdict column per row."""
df = df.copy()
def get_rank(row):
gt = row.get('ground_truth', None)
preds = parse_top5(row.get('top_5_predictions', []))
if not gt or not preds:
return None
for p in preds:
if isinstance(p, dict) and p.get('language') == gt:
# Ensure rank is int-like and 1-based
r = p.get('rank', None)
try:
r = int(r)
if 1 <= r <= 5:
return r
except Exception:
pass
return None
df['correct_rank_in_top5'] = df.apply(get_rank, axis=1)
df['top5_verdict'] = df['correct_rank_in_top5'].apply(
lambda r: f"Rank {int(r)}" if pd.notna(r) else "Not-in-Top-5"
)
return df
def per_language_rank_summary(df):
"""Build a per-language summary of rank distribution (1..5 and Not-in-Top-5)."""
# Consider only rows with known ground truth and a prediction attempt
subset = df[(df['ground_truth'].notna()) & (df['predicted_language'] != 'error')].copy()
subset['rank_bin'] = subset['correct_rank_in_top5'].apply(lambda r: int(r) if pd.notna(r) else 0) # 0 = Not-in-Top-5
# Pivot counts per language vs rank_bin
counts = (subset
.groupby(['ground_truth', 'rank_bin'])
.size()
.reset_index(name='count'))
# Make a wide table with columns for Rank1..Rank5 and Not-in-Top-5
rank_cols = {0: "Not-in-Top-5", 1: "Rank 1", 2: "Rank 2", 3: "Rank 3", 4: "Rank 4", 5: "Rank 5"}
summary = (counts
.assign(rank_label=lambda x: x['rank_bin'].map(rank_cols))
.pivot(index='ground_truth', columns='rank_label', values='count')
.fillna(0)
.astype(int)
.reset_index()
.rename(columns={'ground_truth': 'language'}))
# Add totals and in-top-5 rate
summary['Total'] = summary[[c for c in summary.columns if c.startswith('Rank ') or c == 'Not-in-Top-5']].sum(axis=1)
in_top5_cols = [c for c in summary.columns if c.startswith('Rank ')]
summary['In-Top-5'] = summary[in_top5_cols].sum(axis=1)
summary['In-Top-5 Rate'] = (summary['In-Top-5'] / summary['Total']).replace([np.inf, np.nan], 0.0)
# Order columns nicely
ordered_cols = ['language', 'Total', 'In-Top-5', 'In-Top-5 Rate', 'Rank 1', 'Rank 2', 'Rank 3', 'Rank 4', 'Rank 5', 'Not-in-Top-5']
for c in ordered_cols:
if c not in summary.columns:
summary[c] = 0 if c != 'In-Top-5 Rate' else 0.0
summary = summary[ordered_cols]
return summary.sort_values(by=['In-Top-5 Rate','Rank 1','Rank 2','Rank 3','Rank 4','Rank 5'], ascending=False)
# 1) Compute per-row rank
results_ranked = compute_rank_in_top5(results_df)
# 2) Show first few rows with the new columns
display(results_ranked[['filename','ground_truth','predicted_language','top5_verdict','correct_rank_in_top5']].head(20))
# 3) Build per-language summary
rank_summary = per_language_rank_summary(results_ranked)
display(rank_summary)
# 4) Optionally save both to Excel or CSV
from datetime import datetime
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
rank_csv = os.path.join(RESULTS_FOLDER, f"top5_rank_per_file_{ts}.csv")
results_ranked.to_csv(rank_csv, index=False)
print("βœ… Per-file Top‑5 rank CSV:", rank_csv)
summary_csv = os.path.join(RESULTS_FOLDER, f"top5_rank_summary_{ts}.csv")
rank_summary.to_csv(summary_csv, index=False)
print("βœ… Per-language Top‑5 rank summary CSV:", summary_csv)
# ============================================================================
# CELL A: FEATURE EXTRACTION (DURATION, SNR, SILENCE RATIO)
# ============================================================================
import librosa
import numpy as np
import pandas as pd
import os
def compute_features(row, target_sr=16000):
p = row['file_path']
try:
y, sr = librosa.load(p, sr=target_sr, mono=True)
dur = len(y) / target_sr
# Energy-based SNR proxy: ratio of voiced/active RMS to global RMS (not true SNR but indicative)
rms = librosa.feature.rms(y=y, frame_length=2048, hop_length=512)[0]
global_rms = np.sqrt(np.mean(y**2) + 1e-12)
active_mask = rms > 0.1 * np.max(rms) if rms.size else np.array([False])
active_rms = np.mean(rms[active_mask]) if active_mask.any() else 0.0
snr_proxy = 20.0 * np.log10((active_rms + 1e-9) / (global_rms + 1e-9))
# Silence ratio: frames below threshold
thr = 0.02 * np.max(rms) if rms.size else 0.0
silence_ratio = float((rms < thr).mean() if rms.size else 1.0)
# Spectral centroid mean (proxy for brightness / channel)
sc = librosa.feature.spectral_centroid(y=y, sr=target_sr)[0]
sc_mean = float(np.mean(sc)) if sc.size else 0.0
return pd.Series({
'duration_s': dur,
'snr_proxy_db': float(snr_proxy),
'silence_ratio': silence_ratio,
'spec_centroid_mean': sc_mean
})
except Exception as e:
return pd.Series({
'duration_s': np.nan,
'snr_proxy_db': np.nan,
'silence_ratio': np.nan,
'spec_centroid_mean': np.nan
})
features = results_df.apply(compute_features, axis=1)
results_feat = pd.concat([results_df, features], axis=1)
print("βœ… Features added: ['duration_s','snr_proxy_db','silence_ratio','spec_centroid_mean']")
display(results_feat.head())
# ============================================================================
# CELL B: CALIBRATION & EXPECTED CALIBRATION ERROR (ECE)
# ============================================================================
import numpy as np
import pandas as pd
def extract_top1_conf(row):
preds = row.get('top_5_predictions', [])
if isinstance(preds, str):
try:
import json
preds = json.loads(preds)
except Exception:
preds = eval(preds)
if isinstance(preds, list) and preds:
return float(preds[0].get('confidence', np.nan))
return np.nan
def compute_ece(df, n_bins=15):
df = df.copy()
df['top1_conf'] = df.apply(extract_top1_conf, axis=1)
df = df[(df['predicted_language'] != 'error') & df['top1_conf'].notna()]
conf = df['top1_conf'].to_numpy()
correct = (df['predicted_language'] == df['ground_truth']).to_numpy().astype(float)
bins = np.linspace(0.0, 1.0, n_bins + 1)
ece = 0.0
bin_stats = []
for i in range(n_bins):
m, M = bins[i], bins[i+1]
mask = (conf >= m) & (conf < M) if i < n_bins-1 else (conf >= m) & (conf <= M)
if mask.any():
acc = correct[mask].mean()
conf_mean = conf[mask].mean()
wt = mask.mean()
ece += wt * abs(acc - conf_mean)
bin_stats.append({'bin_low': m, 'bin_high': M, 'bin_acc': acc, 'bin_conf': conf_mean, 'weight': wt})
else:
bin_stats.append({'bin_low': m, 'bin_high': M, 'bin_acc': np.nan, 'bin_conf': np.nan, 'weight': 0.0})
return ece, pd.DataFrame(bin_stats)
ece, ece_bins = compute_ece(results_feat)
print(f"🎯 Expected Calibration Error (ECE): {ece:.4f}")
display(ece_bins)
# ============================================================================
# CELL C: ROBUSTNESS SLICES (DURATION, SNR, SILENCE)
# ============================================================================
import numpy as np
import pandas as pd
def slice_acc(df, col, bins):
df = df.copy()
df = df[(df['predicted_language'] != 'error') & df[col].notna()]
labels = [f"[{bins[i]:.2f},{bins[i+1]:.2f})" for i in range(len(bins)-1)]
df['bin'] = pd.cut(df[col], bins=bins, labels=labels, include_lowest=True)
grp = df.groupby('bin').apply(lambda x: (x['ground_truth'] == x['predicted_language']).mean())
return grp.reset_index(name=f'accuracy_by_{col}')
dur_bins = [0, 2, 4, 6, 8, 12, np.inf]
snr_bins = [-40, -10, 0, 5, 10, 20, np.inf]
sil_bins = [0, 0.2, 0.4, 0.6, 0.8, 1.01]
acc_dur = slice_acc(results_feat, 'duration_s', dur_bins)
acc_snr = slice_acc(results_feat, 'snr_proxy_db', snr_bins)
acc_sil = slice_acc(results_feat, 'silence_ratio', sil_bins)
print("⏱️ Accuracy vs Duration:")
display(acc_dur)
print("πŸ”Š Accuracy vs SNR proxy:")
display(acc_snr)
print("🀫 Accuracy vs Silence ratio:")
display(acc_sil)
# ============================================================================
# CELL D: CONFUSION ASYMMETRY TABLE
# ============================================================================
from collections import defaultdict
import pandas as pd
valid = results_df[(results_df['ground_truth'] != 'unknown') & (results_df['predicted_language'] != 'error')].copy()
pairs = valid[valid['ground_truth'] != valid['predicted_language']][['ground_truth','predicted_language']]
flow = (pairs.groupby(['ground_truth','predicted_language']).size()
.reset_index(name='count')
.sort_values('count', ascending=False))
print("πŸ”€ Top asymmetric confusions:")
display(flow.head(30))
# ============================================================================
# CELL E: EMBEDDING CLUSTER QUALITY (SILHOUETTE)
# ============================================================================
import torch
import numpy as np
from sklearn.metrics import silhouette_score
import pandas as pd
def extract_embeddings(df, batch_size=16):
embs = []
labs = []
for _, row in df.iterrows():
t = load_audio(row['file_path'])
if t is None:
continue
with torch.no_grad():
# Many wrappers return (logits, features); if not, skip
try:
logits, feat = model(t.to(device), return_feature=True)
# Flatten feature vector (assume [1, D] or [1, T, D] -> take mean over time)
if feat is None:
continue
feat_np = feat.detach().cpu().numpy()
if feat_np.ndim == 3: # [B, T, D]
feat_np = feat_np.mean(axis=1)
elif feat_np.ndim == 2: # [B, D]
pass
else:
continue
embs.append(feat_np.squeeze(0))
labs.append(row['ground_truth'])
except Exception:
continue
if not embs:
return None, None
X = np.vstack(embs)
y = np.array(labs)
return X, y
sample = valid.groupby('ground_truth').head(20).reset_index(drop=True) if len(valid) > 0 else pd.DataFrame()
X, y = extract_embeddings(sample) if not sample.empty else (None, None)
if X is not None and len(np.unique(y)) > 1 and len(y) >= 10:
sil = silhouette_score(X, y, metric='euclidean')
print(f"πŸ“ Silhouette score (higher=better cluster separation): {sil:.3f}")
else:
print("ℹ️ Not enough data or embeddings to compute silhouette score.")
# ============================================================================
# CELL F: HARD-EXAMPLE MINING
# ============================================================================
import pandas as pd
import json
def top5_gap(row):
preds = row.get('top_5_predictions', [])
if isinstance(preds, str):
try:
preds = json.loads(preds)
except Exception:
preds = eval(preds)
if not preds or len(preds) < 2:
return np.nan
return float(preds[0]['confidence'] - preds[1]['confidence'])
valid = results_feat[(results_feat['ground_truth'] != 'unknown') & (results_feat['predicted_language'] != 'error')].copy()
valid['top5_gap'] = valid.apply(top5_gap, axis=1)
# Hardest misclassifications: small margin, wrong prediction
hard_mis = valid[valid['ground_truth'] != valid['predicted_language']].copy()
hard_mis = hard_mis.sort_values(['top5_gap','snr_proxy_db','duration_s'], ascending=[True, True, True]).head(30)
print("πŸ”₯ Hardest misclassifications (low margin, low SNR/duration):")
display(hard_mis[['filename','ground_truth','predicted_language','top5_gap','snr_proxy_db','duration_s','silence_ratio']])
# Ambiguous-but-correct: small margin but correct prediction
ambig_correct = valid[valid['ground_truth'] == valid['predicted_language']].copy()
ambig_correct = ambig_correct.sort_values(['top5_gap','snr_proxy_db','duration_s'], ascending=[True, True, True]).head(30)
print("πŸŒ€ Ambiguous but correct (low margin):")
display(ambig_correct[['filename','ground_truth','predicted_language','top5_gap','snr_proxy_db','duration_s','silence_ratio']])
# ============================================================================
# CELL G: SAVE EXTENDED ANALYSIS TO EXCEL
# ============================================================================
import sys, subprocess, os
def ensure_pkg(pkg):
try:
__import__(pkg)
except Exception:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])
ensure_pkg("xlsxwriter")
from pandas import ExcelWriter
ts = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
xlsx_path = os.path.join(RESULTS_FOLDER, f"voxlect_extended_analysis_{ts}.xlsx")
with pd.ExcelWriter(xlsx_path, engine="xlsxwriter") as w:
results_feat.to_excel(w, sheet_name="results_with_features", index=False)
if 'ece' in locals():
pd.DataFrame([{'ECE': ece}]).to_excel(w, sheet_name="calibration_overview", index=False)
ece_bins.to_excel(w, sheet_name="calibration_bins", index=False)
acc_dur.to_excel(w, sheet_name="acc_vs_duration", index=False)
acc_snr.to_excel(w, sheet_name="acc_vs_snr", index=False)
acc_sil.to_excel(w, sheet_name="acc_vs_silence", index=False)
flow.to_excel(w, sheet_name="confusion_asymmetry", index=False)
if 'hard_mis' in locals():
hard_mis.to_excel(w, sheet_name="hard_misclassifications", index=False)
if 'ambig_correct' in locals():
ambig_correct.to_excel(w, sheet_name="ambiguous_correct", index=False)
print("βœ… Extended analysis Excel saved to:", xlsx_path)