|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import sys |
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
print("π VoxLect Indic LID Whisper Large v3 - Final Setup") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
from google.colab import drive |
|
|
|
|
|
|
|
|
print("π¦ Installing packages...") |
|
|
|
|
|
|
|
|
print("π₯ Cloning VoxLect repository...") |
|
|
|
|
|
|
|
|
sys.path.insert(0, '/content/voxlect') |
|
|
sys.path.insert(0, '/content/voxlect/src') |
|
|
|
|
|
print("β
Installation complete!") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import transformers.models.whisper.modeling_whisper as whisper_modeling |
|
|
|
|
|
print("π§ Applying attention compatibility patch...") |
|
|
|
|
|
_OriginalWhisperAttention = whisper_modeling.WhisperAttention |
|
|
|
|
|
class PatchedWhisperAttention(_OriginalWhisperAttention): |
|
|
def _get_attn_impl(self): |
|
|
try: |
|
|
attn_impl = super()._get_attn_impl() |
|
|
if attn_impl is None: |
|
|
return "eager" |
|
|
return attn_impl |
|
|
except AttributeError: |
|
|
return "eager" |
|
|
|
|
|
whisper_modeling.WhisperAttention = PatchedWhisperAttention |
|
|
|
|
|
print("β
Monkey patch applied.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
import librosa |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from datetime import datetime |
|
|
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report |
|
|
from IPython.display import display |
|
|
|
|
|
|
|
|
from src.model.dialect.whisper_dialect import WhisperWrapper |
|
|
|
|
|
|
|
|
CUSTOM_FOLDER_MAPPING = { |
|
|
"as": "assamese", "bn": "bengali", "br": "bodo", "doi": "dogri", |
|
|
"en": "english", "gu": "gujarati", "hi": "hindi", "kn": "kannada", |
|
|
"ks": "kashmiri", "kok": "konkani", "mai": "maithili", "ml": "malayalam", |
|
|
"mni": "manipuri", "mr": "marathi", "ne": "nepali", "or": "odia", |
|
|
"pa": "punjabi", "sa": "sanskrit", "sat": "santali", "sd": "sindhi", |
|
|
"ta": "tamil", "te": "telugu", "ur": "urdu" |
|
|
} |
|
|
|
|
|
|
|
|
LABEL_LIST = [ |
|
|
"assamese", "bengali", "bodo", "dogri", "english", "gujarati", |
|
|
"hindi", "kannada", "kashmiri", "konkani", "maithili", "malayalam", |
|
|
"manipuri", "marathi", "nepali", "odia", "punjabi", "sanskrit", |
|
|
"santali", "sindhi", "tamil", "telugu", "urdu" |
|
|
] |
|
|
|
|
|
|
|
|
AUDIO_FOLDER = "/content/drive/MyDrive/Audio_files" |
|
|
RESULTS_FOLDER = "/content/drive/MyDrive/voxlect_1_results" |
|
|
os.makedirs(RESULTS_FOLDER, exist_ok=True) |
|
|
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
MODEL_NAME = "tiantiaf/voxlect-indic-lid-whisper-large-v3" |
|
|
print(f"π§ Device: {device}") |
|
|
|
|
|
print(f"π Loading model: {MODEL_NAME}") |
|
|
model = WhisperWrapper.from_pretrained(MODEL_NAME).to(device) |
|
|
model.eval() |
|
|
print("β
Model loaded successfully!") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def trim_silence(audio, threshold=0.01, win=2048, hop=512): |
|
|
rms = librosa.feature.rms(y=audio, frame_length=win, hop_length=hop)[0] |
|
|
mask = rms > threshold * (rms.max() if rms.size else 1.0) |
|
|
if not mask.any(): |
|
|
return audio |
|
|
idx = np.where(mask)[0] |
|
|
start = max(int(idx[0] * hop), 0) |
|
|
end = min(int((idx[-1] + 1) * hop), len(audio)) |
|
|
return audio[start:end] |
|
|
|
|
|
def load_audio(file_path, target_sr=16000, max_duration=15.0): |
|
|
try: |
|
|
audio, sr = librosa.load(file_path, sr=target_sr, mono=True) |
|
|
|
|
|
|
|
|
audio = trim_silence(audio, threshold=0.01) |
|
|
|
|
|
|
|
|
max_samples = int(max_duration * target_sr) |
|
|
if len(audio) > max_samples: |
|
|
audio = audio[:max_samples] |
|
|
min_samples = int(3.0 * target_sr) |
|
|
if len(audio) < min_samples: |
|
|
audio = np.pad(audio, (0, min_samples - len(audio)), 'constant') |
|
|
|
|
|
|
|
|
peak = np.abs(audio).max() |
|
|
if peak > 0: |
|
|
audio = audio / peak |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return torch.from_numpy(audio).float().unsqueeze(0) |
|
|
except Exception as e: |
|
|
print(f" β Error loading {os.path.basename(file_path)}: {e}") |
|
|
return None |
|
|
|
|
|
def predict_language(audio_tensor, k=5): |
|
|
if audio_tensor is None: |
|
|
return {"predicted_language": "error", "confidence": 0.0, "top_5_predictions": [], "error_message": "Audio load failed"} |
|
|
try: |
|
|
audio_tensor = audio_tensor.to(device) |
|
|
with torch.no_grad(): |
|
|
logits, _ = model(audio_tensor, return_feature=True) |
|
|
if logits.dim() == 1: |
|
|
logits = logits.unsqueeze(0) |
|
|
if logits.size(0) != 1: |
|
|
logits = logits[:1, :] |
|
|
|
|
|
probs = F.softmax(logits, dim=1)[0] |
|
|
top_probs, top_idx = torch.topk(probs, k) |
|
|
|
|
|
top = [] |
|
|
for rank, (p, ix) in enumerate(zip(top_probs.tolist(), top_idx.tolist()), start=1): |
|
|
idx = int(ix) |
|
|
lang = LABEL_LIST[idx] if 0 <= idx < len(LABEL_LIST) else f"unknown_{idx}" |
|
|
top.append({"rank": rank, "language": lang, "confidence": float(p)}) |
|
|
|
|
|
return {"predicted_language": top[0]["language"], "confidence": top[0]["confidence"], "top_5_predictions": top} |
|
|
except Exception as e: |
|
|
return {"predicted_language": "error", "confidence": 0.0, "top_5_predictions": [], "error_message": str(e)} |
|
|
|
|
|
def find_audio_files(base_path): |
|
|
if not os.path.exists(base_path): |
|
|
print(f"β Path not found: {base_path}") |
|
|
return [] |
|
|
audio_files = [] |
|
|
for root, _, files in os.walk(base_path): |
|
|
folder = os.path.basename(root).lower() |
|
|
gt = CUSTOM_FOLDER_MAPPING.get(folder, "unknown") |
|
|
for file in files: |
|
|
if file.lower().endswith(('.wav', '.mp3', '.m4a', '.flac', '.ogg')): |
|
|
audio_files.append({ |
|
|
"file_path": os.path.join(root, file), |
|
|
"filename": file, |
|
|
"ground_truth": gt |
|
|
}) |
|
|
print(f"β
Found {len(audio_files)} audio files.") |
|
|
if audio_files: |
|
|
print("π Ground Truth Distribution:") |
|
|
print(pd.Series([f['ground_truth'] for f in audio_files]).value_counts()) |
|
|
return audio_files |
|
|
|
|
|
print("β
Audio & prediction functions ready.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_batch_processing(): |
|
|
files = find_audio_files(AUDIO_FOLDER) |
|
|
if not files: |
|
|
return pd.DataFrame() |
|
|
|
|
|
results = [] |
|
|
total = len(files) |
|
|
print("\nπ Processing audio files...") |
|
|
for i, f in enumerate(files, 1): |
|
|
print(f" [{i}/{total}] Processing {f['filename']}...", end="") |
|
|
audio_tensor = load_audio(f['file_path']) |
|
|
pred = predict_language(audio_tensor) |
|
|
if pred['predicted_language'] == 'error': |
|
|
print(f" -> Error: {pred.get('error_message', 'Unknown error')}") |
|
|
else: |
|
|
print(f" -> Predicted: {pred['predicted_language']}") |
|
|
results.append({**f, **pred}) |
|
|
|
|
|
df = pd.DataFrame(results) |
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
out_csv = f"{RESULTS_FOLDER}/voxlect_results_{ts}.csv" |
|
|
df.to_csv(out_csv, index=False) |
|
|
print(f"\nβ
Saved results to: {out_csv}") |
|
|
return df |
|
|
|
|
|
results_df = run_batch_processing() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_detailed_analysis(df): |
|
|
print("\n" + "=" * 70) |
|
|
print("π DETAILED ACCURACY ANALYSIS") |
|
|
print("=" * 70) |
|
|
|
|
|
valid = df[(df['ground_truth'] != 'unknown') & (df['predicted_language'] != 'error')].copy() |
|
|
if valid.empty: |
|
|
print("β No valid results for analysis.") |
|
|
return |
|
|
|
|
|
y_true = valid['ground_truth'].values |
|
|
y_pred = valid['predicted_language'].values |
|
|
|
|
|
print(f"\nπ― Overall Accuracy (Top-1): {accuracy_score(y_true, y_pred):.2%}") |
|
|
|
|
|
labels = sorted(set(list(y_true) + list(y_pred))) |
|
|
print("\nπ Classification Report:") |
|
|
print(classification_report(y_true, y_pred, labels=labels, zero_division=0)) |
|
|
|
|
|
print("\nπ Confusion Matrix:") |
|
|
cm = confusion_matrix(y_true, y_pred, labels=labels) |
|
|
cm_df = pd.DataFrame(cm, index=labels, columns=labels) |
|
|
display(cm_df) |
|
|
|
|
|
mis = valid[valid['ground_truth'] != valid['predicted_language']].copy() |
|
|
if not mis.empty: |
|
|
print("\nβ Top 10 Most Common Misclassifications:") |
|
|
top_errs = (mis.groupby(['ground_truth', 'predicted_language']) |
|
|
.size().sort_values(ascending=False).head(10)) |
|
|
print(top_errs) |
|
|
|
|
|
if 'top_5_predictions' in mis.columns: |
|
|
def correct_rank(row): |
|
|
true_lang = row['ground_truth'] |
|
|
preds = row['top_5_predictions'] |
|
|
if isinstance(preds, str): |
|
|
try: |
|
|
preds = eval(preds) |
|
|
except Exception: |
|
|
return None |
|
|
for p in preds: |
|
|
if p.get('language') == true_lang: |
|
|
return p.get('rank') |
|
|
return None |
|
|
|
|
|
mis['correct_rank_in_top5'] = mis.apply(correct_rank, axis=1) |
|
|
c_in_top5 = mis['correct_rank_in_top5'].notna().sum() |
|
|
print(f"\nπ Correct language in Top-5 for misclassified: {c_in_top5}/{len(mis)} ({c_in_top5/len(mis):.1%})") |
|
|
|
|
|
top1_correct = (valid['ground_truth'] == valid['predicted_language']).sum() |
|
|
top5_acc = (top1_correct + c_in_top5) / len(valid) |
|
|
print(f"π― Overall Accuracy (Top-5): {top5_acc:.2%}") |
|
|
|
|
|
print("\nπ Analysis complete!") |
|
|
|
|
|
if 'results_df' in locals() and not results_df.empty: |
|
|
run_detailed_analysis(results_df) |
|
|
else: |
|
|
print("Run Cell 5 first to generate results.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report |
|
|
|
|
|
def export_detailed_excel(results_df, results_folder, label_order=None, filename_prefix="voxlect_analysis"): |
|
|
|
|
|
if results_df is None or results_df.empty: |
|
|
print("β No results to export. Run batch processing first.") |
|
|
return None |
|
|
|
|
|
|
|
|
valid = results_df[ |
|
|
(results_df['ground_truth'] != 'unknown') & |
|
|
(results_df['predicted_language'] != 'error') & |
|
|
results_df['predicted_language'].notna() |
|
|
].copy() |
|
|
|
|
|
|
|
|
if label_order is None: |
|
|
label_order = sorted(set(valid['ground_truth']) | set(valid['predicted_language'])) |
|
|
|
|
|
|
|
|
top1_acc = accuracy_score(valid['ground_truth'], valid['predicted_language']) |
|
|
|
|
|
|
|
|
top5_acc = None |
|
|
if 'top_5_predictions' in valid.columns: |
|
|
def correct_in_top5(row): |
|
|
target = row['ground_truth'] |
|
|
preds = row['top_5_predictions'] |
|
|
if isinstance(preds, str): |
|
|
try: |
|
|
preds = json.loads(preds) |
|
|
except Exception: |
|
|
try: |
|
|
preds = eval(preds) |
|
|
except Exception: |
|
|
return False |
|
|
for p in preds: |
|
|
if p.get('language') == target: |
|
|
return True |
|
|
return False |
|
|
|
|
|
valid['correct_in_top5'] = valid.apply(correct_in_top5, axis=1) |
|
|
top5_acc = (valid['correct_in_top5'].sum() / len(valid)) if len(valid) else None |
|
|
|
|
|
|
|
|
cls_report = classification_report( |
|
|
valid['ground_truth'], |
|
|
valid['predicted_language'], |
|
|
labels=label_order, |
|
|
zero_division=0, |
|
|
output_dict=True |
|
|
) |
|
|
report_df = pd.DataFrame(cls_report).T.reset_index().rename(columns={"index": "label"}) |
|
|
|
|
|
|
|
|
cm = confusion_matrix(valid['ground_truth'], valid['predicted_language'], labels=label_order) |
|
|
cm_df = pd.DataFrame(cm, index=label_order, columns=label_order) |
|
|
cm_df.index.name = "True\\Pred" |
|
|
|
|
|
|
|
|
mis = valid[valid['ground_truth'] != valid['predicted_language']].copy() |
|
|
if not mis.empty: |
|
|
top_mis = (mis.groupby(['ground_truth', 'predicted_language']) |
|
|
.size() |
|
|
.reset_index(name='count') |
|
|
.sort_values('count', ascending=False)) |
|
|
else: |
|
|
top_mis = pd.DataFrame(columns=['ground_truth', 'predicted_language', 'count']) |
|
|
|
|
|
|
|
|
overview_rows = [ |
|
|
{"metric": "total_predictions", "value": int(len(results_df))}, |
|
|
{"metric": "valid_predictions_for_eval", "value": int(len(valid))}, |
|
|
{"metric": "top1_accuracy", "value": float(top1_acc) if len(valid) else None}, |
|
|
{"metric": "top5_accuracy", "value": float(top5_acc) if top5_acc is not None else None}, |
|
|
] |
|
|
overview_df = pd.DataFrame(overview_rows) |
|
|
|
|
|
|
|
|
full_preds = results_df.copy() |
|
|
if 'top_5_predictions' in full_preds.columns: |
|
|
full_preds['top_5_predictions'] = full_preds['top_5_predictions'].apply( |
|
|
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (list, dict)) else str(x) |
|
|
) |
|
|
|
|
|
|
|
|
ts = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S") |
|
|
out_xlsx = os.path.join(results_folder, f"{filename_prefix}_{ts}.xlsx") |
|
|
|
|
|
with pd.ExcelWriter(out_xlsx, engine="xlsxwriter") as writer: |
|
|
overview_df.to_excel(writer, sheet_name="overview", index=False) |
|
|
report_df.to_excel(writer, sheet_name="per_language_metrics", index=False) |
|
|
cm_df.to_excel(writer, sheet_name="confusion_matrix") |
|
|
top_mis.to_excel(writer, sheet_name="top_misclassifications", index=False) |
|
|
full_preds.to_excel(writer, sheet_name="full_predictions", index=False) |
|
|
|
|
|
|
|
|
for sheet in ["overview", "per_language_metrics", "confusion_matrix", "top_misclassifications", "full_predictions"]: |
|
|
try: |
|
|
ws = writer.sheets[sheet] |
|
|
for i, col in enumerate(pd.read_excel(out_xlsx, sheet_name=sheet).columns): |
|
|
width = max(12, min(60, int(full_preds[col].astype(str).map(len).max()) if sheet == "full_predictions" else 20)) |
|
|
ws.set_column(i, i, width) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
print(f"β
Excel report saved to: {out_xlsx}") |
|
|
return out_xlsx |
|
|
|
|
|
|
|
|
excel_path = export_detailed_excel(results_df, RESULTS_FOLDER, label_order=sorted(set(LABEL_LIST))) |
|
|
print("Excel path:", excel_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
|
|
|
def parse_top5(cell): |
|
|
"""Return a list of dicts [{'rank':int,'language':str,'confidence':float}, ...] from cell.""" |
|
|
if isinstance(cell, list): |
|
|
return cell |
|
|
if isinstance(cell, dict): |
|
|
return [cell] |
|
|
if isinstance(cell, str): |
|
|
|
|
|
try: |
|
|
v = json.loads(cell) |
|
|
return v if isinstance(v, list) else [v] |
|
|
except Exception: |
|
|
try: |
|
|
v = eval(cell) |
|
|
return v if isinstance(v, list) else [v] |
|
|
except Exception: |
|
|
return [] |
|
|
return [] |
|
|
|
|
|
def compute_rank_in_top5(df): |
|
|
"""Add 'correct_rank_in_top5' and a readable verdict column per row.""" |
|
|
df = df.copy() |
|
|
|
|
|
def get_rank(row): |
|
|
gt = row.get('ground_truth', None) |
|
|
preds = parse_top5(row.get('top_5_predictions', [])) |
|
|
if not gt or not preds: |
|
|
return None |
|
|
for p in preds: |
|
|
if isinstance(p, dict) and p.get('language') == gt: |
|
|
|
|
|
r = p.get('rank', None) |
|
|
try: |
|
|
r = int(r) |
|
|
if 1 <= r <= 5: |
|
|
return r |
|
|
except Exception: |
|
|
pass |
|
|
return None |
|
|
|
|
|
df['correct_rank_in_top5'] = df.apply(get_rank, axis=1) |
|
|
df['top5_verdict'] = df['correct_rank_in_top5'].apply( |
|
|
lambda r: f"Rank {int(r)}" if pd.notna(r) else "Not-in-Top-5" |
|
|
) |
|
|
return df |
|
|
|
|
|
def per_language_rank_summary(df): |
|
|
"""Build a per-language summary of rank distribution (1..5 and Not-in-Top-5).""" |
|
|
|
|
|
subset = df[(df['ground_truth'].notna()) & (df['predicted_language'] != 'error')].copy() |
|
|
subset['rank_bin'] = subset['correct_rank_in_top5'].apply(lambda r: int(r) if pd.notna(r) else 0) |
|
|
|
|
|
|
|
|
counts = (subset |
|
|
.groupby(['ground_truth', 'rank_bin']) |
|
|
.size() |
|
|
.reset_index(name='count')) |
|
|
|
|
|
|
|
|
rank_cols = {0: "Not-in-Top-5", 1: "Rank 1", 2: "Rank 2", 3: "Rank 3", 4: "Rank 4", 5: "Rank 5"} |
|
|
summary = (counts |
|
|
.assign(rank_label=lambda x: x['rank_bin'].map(rank_cols)) |
|
|
.pivot(index='ground_truth', columns='rank_label', values='count') |
|
|
.fillna(0) |
|
|
.astype(int) |
|
|
.reset_index() |
|
|
.rename(columns={'ground_truth': 'language'})) |
|
|
|
|
|
|
|
|
summary['Total'] = summary[[c for c in summary.columns if c.startswith('Rank ') or c == 'Not-in-Top-5']].sum(axis=1) |
|
|
in_top5_cols = [c for c in summary.columns if c.startswith('Rank ')] |
|
|
summary['In-Top-5'] = summary[in_top5_cols].sum(axis=1) |
|
|
summary['In-Top-5 Rate'] = (summary['In-Top-5'] / summary['Total']).replace([np.inf, np.nan], 0.0) |
|
|
|
|
|
|
|
|
ordered_cols = ['language', 'Total', 'In-Top-5', 'In-Top-5 Rate', 'Rank 1', 'Rank 2', 'Rank 3', 'Rank 4', 'Rank 5', 'Not-in-Top-5'] |
|
|
for c in ordered_cols: |
|
|
if c not in summary.columns: |
|
|
summary[c] = 0 if c != 'In-Top-5 Rate' else 0.0 |
|
|
summary = summary[ordered_cols] |
|
|
return summary.sort_values(by=['In-Top-5 Rate','Rank 1','Rank 2','Rank 3','Rank 4','Rank 5'], ascending=False) |
|
|
|
|
|
|
|
|
results_ranked = compute_rank_in_top5(results_df) |
|
|
|
|
|
|
|
|
display(results_ranked[['filename','ground_truth','predicted_language','top5_verdict','correct_rank_in_top5']].head(20)) |
|
|
|
|
|
|
|
|
rank_summary = per_language_rank_summary(results_ranked) |
|
|
display(rank_summary) |
|
|
|
|
|
|
|
|
from datetime import datetime |
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
|
|
rank_csv = os.path.join(RESULTS_FOLDER, f"top5_rank_per_file_{ts}.csv") |
|
|
results_ranked.to_csv(rank_csv, index=False) |
|
|
print("β
Per-file Topβ5 rank CSV:", rank_csv) |
|
|
|
|
|
summary_csv = os.path.join(RESULTS_FOLDER, f"top5_rank_summary_{ts}.csv") |
|
|
rank_summary.to_csv(summary_csv, index=False) |
|
|
print("β
Per-language Topβ5 rank summary CSV:", summary_csv) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import librosa |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import os |
|
|
|
|
|
def compute_features(row, target_sr=16000): |
|
|
p = row['file_path'] |
|
|
try: |
|
|
y, sr = librosa.load(p, sr=target_sr, mono=True) |
|
|
dur = len(y) / target_sr |
|
|
|
|
|
|
|
|
rms = librosa.feature.rms(y=y, frame_length=2048, hop_length=512)[0] |
|
|
global_rms = np.sqrt(np.mean(y**2) + 1e-12) |
|
|
active_mask = rms > 0.1 * np.max(rms) if rms.size else np.array([False]) |
|
|
active_rms = np.mean(rms[active_mask]) if active_mask.any() else 0.0 |
|
|
snr_proxy = 20.0 * np.log10((active_rms + 1e-9) / (global_rms + 1e-9)) |
|
|
|
|
|
|
|
|
thr = 0.02 * np.max(rms) if rms.size else 0.0 |
|
|
silence_ratio = float((rms < thr).mean() if rms.size else 1.0) |
|
|
|
|
|
|
|
|
sc = librosa.feature.spectral_centroid(y=y, sr=target_sr)[0] |
|
|
sc_mean = float(np.mean(sc)) if sc.size else 0.0 |
|
|
|
|
|
return pd.Series({ |
|
|
'duration_s': dur, |
|
|
'snr_proxy_db': float(snr_proxy), |
|
|
'silence_ratio': silence_ratio, |
|
|
'spec_centroid_mean': sc_mean |
|
|
}) |
|
|
except Exception as e: |
|
|
return pd.Series({ |
|
|
'duration_s': np.nan, |
|
|
'snr_proxy_db': np.nan, |
|
|
'silence_ratio': np.nan, |
|
|
'spec_centroid_mean': np.nan |
|
|
}) |
|
|
|
|
|
features = results_df.apply(compute_features, axis=1) |
|
|
results_feat = pd.concat([results_df, features], axis=1) |
|
|
print("β
Features added: ['duration_s','snr_proxy_db','silence_ratio','spec_centroid_mean']") |
|
|
display(results_feat.head()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
def extract_top1_conf(row): |
|
|
preds = row.get('top_5_predictions', []) |
|
|
if isinstance(preds, str): |
|
|
try: |
|
|
import json |
|
|
preds = json.loads(preds) |
|
|
except Exception: |
|
|
preds = eval(preds) |
|
|
if isinstance(preds, list) and preds: |
|
|
return float(preds[0].get('confidence', np.nan)) |
|
|
return np.nan |
|
|
|
|
|
def compute_ece(df, n_bins=15): |
|
|
df = df.copy() |
|
|
df['top1_conf'] = df.apply(extract_top1_conf, axis=1) |
|
|
df = df[(df['predicted_language'] != 'error') & df['top1_conf'].notna()] |
|
|
|
|
|
conf = df['top1_conf'].to_numpy() |
|
|
correct = (df['predicted_language'] == df['ground_truth']).to_numpy().astype(float) |
|
|
|
|
|
bins = np.linspace(0.0, 1.0, n_bins + 1) |
|
|
ece = 0.0 |
|
|
bin_stats = [] |
|
|
for i in range(n_bins): |
|
|
m, M = bins[i], bins[i+1] |
|
|
mask = (conf >= m) & (conf < M) if i < n_bins-1 else (conf >= m) & (conf <= M) |
|
|
if mask.any(): |
|
|
acc = correct[mask].mean() |
|
|
conf_mean = conf[mask].mean() |
|
|
wt = mask.mean() |
|
|
ece += wt * abs(acc - conf_mean) |
|
|
bin_stats.append({'bin_low': m, 'bin_high': M, 'bin_acc': acc, 'bin_conf': conf_mean, 'weight': wt}) |
|
|
else: |
|
|
bin_stats.append({'bin_low': m, 'bin_high': M, 'bin_acc': np.nan, 'bin_conf': np.nan, 'weight': 0.0}) |
|
|
|
|
|
return ece, pd.DataFrame(bin_stats) |
|
|
|
|
|
ece, ece_bins = compute_ece(results_feat) |
|
|
print(f"π― Expected Calibration Error (ECE): {ece:.4f}") |
|
|
display(ece_bins) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
def slice_acc(df, col, bins): |
|
|
df = df.copy() |
|
|
df = df[(df['predicted_language'] != 'error') & df[col].notna()] |
|
|
labels = [f"[{bins[i]:.2f},{bins[i+1]:.2f})" for i in range(len(bins)-1)] |
|
|
df['bin'] = pd.cut(df[col], bins=bins, labels=labels, include_lowest=True) |
|
|
grp = df.groupby('bin').apply(lambda x: (x['ground_truth'] == x['predicted_language']).mean()) |
|
|
return grp.reset_index(name=f'accuracy_by_{col}') |
|
|
|
|
|
dur_bins = [0, 2, 4, 6, 8, 12, np.inf] |
|
|
snr_bins = [-40, -10, 0, 5, 10, 20, np.inf] |
|
|
sil_bins = [0, 0.2, 0.4, 0.6, 0.8, 1.01] |
|
|
|
|
|
acc_dur = slice_acc(results_feat, 'duration_s', dur_bins) |
|
|
acc_snr = slice_acc(results_feat, 'snr_proxy_db', snr_bins) |
|
|
acc_sil = slice_acc(results_feat, 'silence_ratio', sil_bins) |
|
|
|
|
|
print("β±οΈ Accuracy vs Duration:") |
|
|
display(acc_dur) |
|
|
print("π Accuracy vs SNR proxy:") |
|
|
display(acc_snr) |
|
|
print("π€« Accuracy vs Silence ratio:") |
|
|
display(acc_sil) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from collections import defaultdict |
|
|
import pandas as pd |
|
|
|
|
|
valid = results_df[(results_df['ground_truth'] != 'unknown') & (results_df['predicted_language'] != 'error')].copy() |
|
|
pairs = valid[valid['ground_truth'] != valid['predicted_language']][['ground_truth','predicted_language']] |
|
|
flow = (pairs.groupby(['ground_truth','predicted_language']).size() |
|
|
.reset_index(name='count') |
|
|
.sort_values('count', ascending=False)) |
|
|
print("π Top asymmetric confusions:") |
|
|
display(flow.head(30)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
from sklearn.metrics import silhouette_score |
|
|
import pandas as pd |
|
|
|
|
|
def extract_embeddings(df, batch_size=16): |
|
|
embs = [] |
|
|
labs = [] |
|
|
for _, row in df.iterrows(): |
|
|
t = load_audio(row['file_path']) |
|
|
if t is None: |
|
|
continue |
|
|
with torch.no_grad(): |
|
|
|
|
|
try: |
|
|
logits, feat = model(t.to(device), return_feature=True) |
|
|
|
|
|
if feat is None: |
|
|
continue |
|
|
feat_np = feat.detach().cpu().numpy() |
|
|
if feat_np.ndim == 3: |
|
|
feat_np = feat_np.mean(axis=1) |
|
|
elif feat_np.ndim == 2: |
|
|
pass |
|
|
else: |
|
|
continue |
|
|
embs.append(feat_np.squeeze(0)) |
|
|
labs.append(row['ground_truth']) |
|
|
except Exception: |
|
|
continue |
|
|
if not embs: |
|
|
return None, None |
|
|
X = np.vstack(embs) |
|
|
y = np.array(labs) |
|
|
return X, y |
|
|
|
|
|
sample = valid.groupby('ground_truth').head(20).reset_index(drop=True) if len(valid) > 0 else pd.DataFrame() |
|
|
X, y = extract_embeddings(sample) if not sample.empty else (None, None) |
|
|
if X is not None and len(np.unique(y)) > 1 and len(y) >= 10: |
|
|
sil = silhouette_score(X, y, metric='euclidean') |
|
|
print(f"π Silhouette score (higher=better cluster separation): {sil:.3f}") |
|
|
else: |
|
|
print("βΉοΈ Not enough data or embeddings to compute silhouette score.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import pandas as pd |
|
|
import json |
|
|
|
|
|
def top5_gap(row): |
|
|
preds = row.get('top_5_predictions', []) |
|
|
if isinstance(preds, str): |
|
|
try: |
|
|
preds = json.loads(preds) |
|
|
except Exception: |
|
|
preds = eval(preds) |
|
|
if not preds or len(preds) < 2: |
|
|
return np.nan |
|
|
return float(preds[0]['confidence'] - preds[1]['confidence']) |
|
|
|
|
|
valid = results_feat[(results_feat['ground_truth'] != 'unknown') & (results_feat['predicted_language'] != 'error')].copy() |
|
|
valid['top5_gap'] = valid.apply(top5_gap, axis=1) |
|
|
|
|
|
|
|
|
hard_mis = valid[valid['ground_truth'] != valid['predicted_language']].copy() |
|
|
hard_mis = hard_mis.sort_values(['top5_gap','snr_proxy_db','duration_s'], ascending=[True, True, True]).head(30) |
|
|
print("π₯ Hardest misclassifications (low margin, low SNR/duration):") |
|
|
display(hard_mis[['filename','ground_truth','predicted_language','top5_gap','snr_proxy_db','duration_s','silence_ratio']]) |
|
|
|
|
|
|
|
|
ambig_correct = valid[valid['ground_truth'] == valid['predicted_language']].copy() |
|
|
ambig_correct = ambig_correct.sort_values(['top5_gap','snr_proxy_db','duration_s'], ascending=[True, True, True]).head(30) |
|
|
print("π Ambiguous but correct (low margin):") |
|
|
display(ambig_correct[['filename','ground_truth','predicted_language','top5_gap','snr_proxy_db','duration_s','silence_ratio']]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import sys, subprocess, os |
|
|
def ensure_pkg(pkg): |
|
|
try: |
|
|
__import__(pkg) |
|
|
except Exception: |
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg]) |
|
|
|
|
|
ensure_pkg("xlsxwriter") |
|
|
|
|
|
from pandas import ExcelWriter |
|
|
ts = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S") |
|
|
xlsx_path = os.path.join(RESULTS_FOLDER, f"voxlect_extended_analysis_{ts}.xlsx") |
|
|
|
|
|
with pd.ExcelWriter(xlsx_path, engine="xlsxwriter") as w: |
|
|
results_feat.to_excel(w, sheet_name="results_with_features", index=False) |
|
|
if 'ece' in locals(): |
|
|
pd.DataFrame([{'ECE': ece}]).to_excel(w, sheet_name="calibration_overview", index=False) |
|
|
ece_bins.to_excel(w, sheet_name="calibration_bins", index=False) |
|
|
acc_dur.to_excel(w, sheet_name="acc_vs_duration", index=False) |
|
|
acc_snr.to_excel(w, sheet_name="acc_vs_snr", index=False) |
|
|
acc_sil.to_excel(w, sheet_name="acc_vs_silence", index=False) |
|
|
flow.to_excel(w, sheet_name="confusion_asymmetry", index=False) |
|
|
if 'hard_mis' in locals(): |
|
|
hard_mis.to_excel(w, sheet_name="hard_misclassifications", index=False) |
|
|
if 'ambig_correct' in locals(): |
|
|
ambig_correct.to_excel(w, sheet_name="ambiguous_correct", index=False) |
|
|
|
|
|
print("β
Extended analysis Excel saved to:", xlsx_path) |
|
|
|