import os import json import random import datetime as dt from typing import Dict, List, Tuple, Optional import gradio as gr # ------------------------------ # Paths (override with env vars) # ------------------------------ QA_PATH = os.getenv("QA_PATH", "./spatial_qa_output.json") VALIDATION_PATH = os.getenv("VALIDATION_PATH", "./validation_reports_output.json") ASSIGNMENTS_PATH = os.getenv("ASSIGNMENTS_PATH", "/data/assignments.json") PROGRESS_PATH = os.getenv("PROGRESS_PATH", "/data/progress.json") USERS_PATH = os.getenv("USERS_PATH", "./users.json") EXPORT_DIR = os.getenv("EXPORT_DIR", "/data") # ------------------------------ # Utilities # ------------------------------ def _safe_read_json(path: str, default): try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return default def _safe_write_json(path: str, obj): """Safely write JSON file, with fallback to in-memory storage if writing fails.""" try: os.makedirs(os.path.dirname(path), exist_ok=True) tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(obj, f, indent=2, ensure_ascii=False) os.replace(tmp, path) return True except (PermissionError, OSError, IOError) as e: print(f"Warning: Could not write to {path}: {e}") print("Running in read-only mode - data will not persist between sessions") return False # ------------------------------ # In-memory storage for read-only environments # ------------------------------ _in_memory_assignments = None _in_memory_progress = None _file_write_enabled = True def _get_assignments(): """Get assignments from file or in-memory storage.""" global _in_memory_assignments if _in_memory_assignments is not None: return _in_memory_assignments return _safe_read_json(ASSIGNMENTS_PATH, {}) def _set_assignments(assignments): """Set assignments to file and/or in-memory storage.""" global _in_memory_assignments, _file_write_enabled _in_memory_assignments = assignments if _file_write_enabled: success = _safe_write_json(ASSIGNMENTS_PATH, assignments) if not success: _file_write_enabled = False def _get_progress(): """Get progress from file or in-memory storage.""" global _in_memory_progress if _in_memory_progress is not None: return _in_memory_progress return _safe_read_json(PROGRESS_PATH, {}) def _set_progress(progress): """Set progress to file and/or in-memory storage.""" global _in_memory_progress, _file_write_enabled _in_memory_progress = progress if _file_write_enabled: success = _safe_write_json(PROGRESS_PATH, progress) if not success: _file_write_enabled = False # ------------------------------ # Load data # ------------------------------ def load_data() -> Dict[str, Dict]: """Return dict keyed by instance_id with: - findings (str) - impressions (str) - qa_pairs (list of {'question','answer'})""" with open(QA_PATH, "r", encoding="utf-8") as f: qa_data = json.load(f) with open(VALIDATION_PATH, "r", encoding="utf-8") as f: val_data = json.load(f) data = {} missing_in_val = [] for inst_id, payload in qa_data.items(): if inst_id not in val_data: missing_in_val.append(inst_id) continue find_str = ( val_data[inst_id].get("Findings_EN") or val_data[inst_id].get("Findings") or "" ) impr_str = ( val_data[inst_id].get("Impressions_EN") or val_data[inst_id].get("Impressions") or "" ) pairs = payload.get("qa_pairs", []) # normalize normalized_pairs = [] for p in pairs: normalized_pairs.append( { "question": str(p.get("question", "")).strip(), "answer": str(p.get("answer", "")).strip(), } ) data[inst_id] = { "findings": find_str.strip(), "impressions": impr_str.strip(), "qa_pairs": normalized_pairs, } if not data: raise RuntimeError("No overlapping instances between QA and Validation files. " "Check the JSON files and their keys.") return data DATA = load_data() INSTANCE_IDS = sorted(list(DATA.keys())) def load_users() -> List[str]: """Load users from JSON file, fallback to default if file doesn't exist.""" users_data = _safe_read_json(USERS_PATH, {}) if "users" in users_data and isinstance(users_data["users"], list): return [str(user).strip() for user in users_data["users"] if user and str(user).strip()] # Fallback to default users return [f"user_{i+1:02d}" for i in range(20)] def get_default_seed() -> int: """Load default seed from users JSON file.""" users_data = _safe_read_json(USERS_PATH, {}) return users_data.get("default_seed", 42) DEFAULT_USERS = load_users() DEFAULT_SEED = get_default_seed() # ---------------------------------- # Assignment & Progress persistence # ---------------------------------- def init_or_load_assignments(default_users: List[str], seed: int = 42) -> Dict[str, List[str]]: """Load existing assignments or create a balanced random split.""" assignments = _get_assignments() if assignments: # filter out instances no longer present; preserve order for u, lst in list(assignments.items()): assignments[u] = [x for x in lst if x in INSTANCE_IDS] _set_assignments(assignments) return assignments return create_assignments(default_users, seed) def create_assignments(usernames: List[str], seed: int) -> Dict[str, List[str]]: usernames = [u.strip() for u in usernames if u and u.strip()] if len(usernames) == 0: raise gr.Error("Please provide at least one username.") # Each user gets exactly 10 instances instances_per_user = 10 total_instances_needed = len(usernames) * instances_per_user if total_instances_needed > len(INSTANCE_IDS): raise gr.Error(f"Not enough instances available. Need {total_instances_needed} but only have {len(INSTANCE_IDS)}.") insts = INSTANCE_IDS.copy() rng = random.Random(int(seed)) rng.shuffle(insts) # Take only the number of instances we need selected_insts = insts[:total_instances_needed] buckets = [[] for _ in usernames] for i, inst in enumerate(selected_insts): buckets[i % len(usernames)].append(inst) assignments = {usernames[i]: buckets[i] for i in range(len(usernames))} _set_assignments(assignments) return assignments ASSIGNMENTS = init_or_load_assignments(DEFAULT_USERS, seed=DEFAULT_SEED) def available_users() -> List[str]: return sorted(list(_get_assignments().keys())) def load_progress() -> Dict: return _get_progress() PROGRESS = load_progress() def _ensure_user_progress_struct(user: str): """Initialize user progress skeleton for new users/instances.""" global PROGRESS if user not in PROGRESS: PROGRESS[user] = {} # ensure entries exist for assigned instances for inst in ASSIGNMENTS.get(user, []): n = len(DATA[inst]["qa_pairs"]) if inst not in PROGRESS[user]: PROGRESS[user][inst] = { "answers": [None] * n } else: # pad or trim if needed ans = PROGRESS[user][inst].get("answers", []) if len(ans) < n: ans = ans + [None] * (n - len(ans)) elif len(ans) > n: ans = ans[:n] PROGRESS[user][inst]["answers"] = ans _set_progress(PROGRESS) def save_eval(user: str, inst: str, q_idx: int, relevant_choice: Optional[str], correct_choice: Optional[str], note: str = "") -> str: """Persist evaluation for a single QA pair.""" if not user or not inst: return "Select a user to begin." _ensure_user_progress_struct(user) global PROGRESS if inst not in PROGRESS[user]: PROGRESS[user][inst] = {"answers": [None] * len(DATA[inst]["qa_pairs"])} # map choices to booleans rel = None if relevant_choice == "✓ Relevant": rel = True elif relevant_choice == "✗ Not relevant": rel = False corr = None if rel is True: if correct_choice == "✓ Correct": corr = True elif correct_choice == "✗ Incorrect": corr = False # build record record = { "relevant": rel, "correct": corr if rel is True else None, "note": (note or "").strip(), "saved_at": dt.datetime.utcnow().isoformat() + "Z" } # save answers = PROGRESS[user][inst]["answers"] while q_idx >= len(answers): # safety answers.append(None) answers[q_idx] = record _set_progress(PROGRESS) label = f"Saved: {user} • {inst} • Q{q_idx+1} → relevant={rel}" if rel is True: label += f", correct={corr}" return label def summarize_user(user: str) -> Tuple[str, List[List]]: """Return summary text & table for a user's progress.""" if not user: return ("—", []) _ensure_user_progress_struct(user) assignments = _get_assignments() progress = _get_progress() total = 0 done = 0 rows = [] for inst in assignments.get(user, []): qa_n = len(DATA[inst]["qa_pairs"]) total += qa_n answers = progress[user][inst]["answers"] c_done = sum(1 for a in answers if a is not None) done += c_done rows.append([inst, c_done, qa_n]) txt = f"Progress: {done} / {total} QA pairs completed across {len(assignments.get(user, []))} assigned instances." return txt, rows def next_unfinished(user: str) -> Tuple[Optional[str], Optional[int]]: """Return (instance_id, q_index) for the next unfinished QA pair for the user.""" _ensure_user_progress_struct(user) assignments = _get_assignments() progress = _get_progress() for inst in assignments.get(user, []): answers = progress[user][inst]["answers"] for i, a in enumerate(answers): if a is None: return inst, i return None, None def first_unfinished_in_instance(user: str, inst: str) -> int: _ensure_user_progress_struct(user) progress = _get_progress() answers = progress[user][inst]["answers"] for i, a in enumerate(answers): if a is None: return i return 0 def get_payload(inst: str, q_idx: int) -> Tuple[str, str, str, str, str]: """Return Q, A, findings, impressions, header text.""" pairs = DATA[inst]["qa_pairs"] n = len(pairs) if n == 0: q = "" a = "" header = f"{inst} — No questions (0/0)" f = DATA[inst]["findings"] im = DATA[inst]["impressions"] return q, a, f, im, header q_idx = max(0, min(q_idx, n-1)) q = pairs[q_idx]["question"] a = pairs[q_idx]["answer"] f = DATA[inst]["findings"] im = DATA[inst]["impressions"] header = f"{inst} — Question {q_idx+1} / {n}" return q, a, f, im, header def export_user_results(user: str) -> str: """Write a CSV + JSON export for the selected user and return a status string & file paths.""" if not user: return "Select a user to export results." _ensure_user_progress_struct(user) assignments = _get_assignments() progress = _get_progress() # Build flat rows rows = [] for inst in assignments.get(user, []): pairs = DATA[inst]["qa_pairs"] answers = progress[user][inst]["answers"] for i in range(len(pairs)): ans = answers[i] rows.append({ "user": user, "instance_id": inst, "q_index": i+1, "question": pairs[i]["question"], "answer": pairs[i]["answer"], "relevant": None if ans is None else ans.get("relevant"), "correct": None if ans is None else ans.get("correct"), "note": None if ans is None else ans.get("note"), "saved_at": None if ans is None else ans.get("saved_at"), }) # Try to export to files, fallback to in-memory if not possible try: ts = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S") json_path = os.path.join(EXPORT_DIR, f"results_{user}_{ts}.json") csv_path = os.path.join(EXPORT_DIR, f"results_{user}_{ts}.csv") _safe_write_json(json_path, rows) # Write CSV import csv with open(csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=list(rows[0].keys())) writer.writeheader() writer.writerows(rows) return f"Exported {len(rows)} rows.\nJSON: {json_path}\nCSV: {csv_path}" except (PermissionError, OSError, IOError): # Fallback: return data as JSON string import json json_str = json.dumps(rows, indent=2, ensure_ascii=False) return f"Export data (read-only mode):\n\n{json_str[:1000]}{'...' if len(json_str) > 1000 else ''}" # ------------------------------ # Gradio UI # ------------------------------ with gr.Blocks(title="Spatial QA Validator", theme=gr.themes.Glass()) as demo: # Check if running in read-only mode read_only_status = "" if not _file_write_enabled: read_only_status = "\n\n⚠️ **Running in read-only mode** - Progress will not persist between sessions" gr.Markdown("## Spatial QA Validation Tool\n" "Left: Findings & Impression (Ground Truth)\n\n" "Right: Spatial QA pairs (Q/A) to validate.\n\n" "For each Q/A:\n" "1) Mark if the **Question is relevant** to the Findings/Impression.\n" "2) If **relevant**, mark whether the **Answer is correct**.\n" + read_only_status) with gr.Row(): with gr.Column(scale=1, min_width=260): user_dd = gr.Dropdown(choices=available_users(), label="Select user", interactive=True) load_btn = gr.Button("Load my queue", variant="primary") progress_text = gr.Markdown("") progress_table = gr.Dataframe(headers=["Instance", "Done", "Total"], row_count=0, interactive=False) inst_dd = gr.Dropdown(choices=[], label="Assigned instance", interactive=True, visible=False) q_slider = gr.Slider(1, 10, value=1, step=1, label="Question #", interactive=True, visible=False) export_btn = gr.Button("Export my results") with gr.Column(scale=2, min_width=600): # Left panel: Findings/Impressions with gr.Row(): with gr.Column(scale=1, elem_classes=["left-panel"]): findings_tb = gr.Textbox(label="Findings (Ground Truth)", lines=16, interactive=False) impressions_tb = gr.Textbox(label="Impression (Ground Truth)", lines=6, interactive=False) with gr.Column(scale=1, elem_classes=["left-panel"]): header_md = gr.Markdown("") question_md = gr.Markdown("") answer_md = gr.Markdown("") relevant_radio = gr.Radio(choices=["✓ Relevant", "✗ Not relevant"], label="1) Is the QUESTION relevant to Findings/Impression?", interactive=True) correct_radio = gr.Radio(choices=["✓ Correct", "✗ Incorrect"], label="2) If relevant, is the ANSWER correct?", interactive=False) note_tb = gr.Textbox(label="Optional note", lines=2, placeholder="Any comments...") with gr.Row(): save_btn = gr.Button("Save", variant="secondary") save_next_btn = gr.Button("Save & Next", variant="primary") skip_btn = gr.Button("Skip to next unfinished") nav_info = gr.Markdown("") # Move Admin accordion below so user_dd exists before wiring its updates with gr.Accordion("Setup (admin) — define users and (re)deal assignments", open=False): with gr.Row(): users_csv = gr.Textbox(value=",".join(DEFAULT_USERS), label="Usernames (comma-separated)", lines=2) seed_num = gr.Number(value=DEFAULT_SEED, precision=0, label="Assignment Seed (change & Apply)") apply_btn = gr.Button("Apply / (Re)create Assignments", variant="secondary") assign_info = gr.Markdown() def apply_users(u_csv, seed): usernames = [x.strip() for x in (u_csv or "").split(",") if x.strip()] new_assign = create_assignments(usernames, int(seed or 0)) user_list = ", ".join(sorted(new_assign.keys())) # summarize counts counts = {u: len(v) for u, v in new_assign.items()} table = "\n".join([f"- **{u}**: {counts[u]} instances" for u in sorted(counts)]) total_assigned = sum(counts.values()) total_available = len(INSTANCE_IDS) unassigned = total_available - total_assigned return gr.update(choices=available_users(), value=None), f"Assignments updated for {len(new_assign)} users (10 instances each):\n{table}\n\n**Summary:** {total_assigned} instances assigned, {unassigned} instances left unassigned out of {total_available} total." # now that user_dd exists, reference it directly in outputs apply_btn.click(apply_users, inputs=[users_csv, seed_num], outputs=[user_dd, assign_info]) # ---- Wiring functions ---- def _load_user(user: str): if not user: raise gr.Error("Please select a user.") # Refresh assignments if changed by admin init_or_load_assignments(DEFAULT_USERS, seed=DEFAULT_SEED) _ensure_user_progress_struct(user) # summary txt, rows = summarize_user(user) # initial pointer = next unfinished overall inst, q_idx = next_unfinished(user) assignments = _get_assignments() if inst is None: # user is done return ( gr.update(choices=assignments.get(user, []), visible=True, value=None), gr.update(visible=False), "", "", "", "", "", gr.update(value=None, interactive=True), gr.update(value=None, interactive=False), "", # note txt, rows, "All assigned QA pairs are completed. 🎉" ) # populate content q, a, f, im, header = get_payload(inst, q_idx) n = len(DATA[inst]["qa_pairs"]) slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(q_idx+1 if n > 0 else 1)) nav = f"{assignments.get(user, []).index(inst)+1}/{len(assignments.get(user, []))} • Q{(q_idx+1) if n>0 else 0}/{n}" return ( gr.update(choices=assignments.get(user, []), visible=True, value=inst), slider_update, f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}", gr.update(value=None, interactive=True), gr.update(value=None, interactive=False), "", txt, rows, nav ) load_btn.click( _load_user, inputs=[user_dd], outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md, relevant_radio, correct_radio, note_tb, progress_text, progress_table, nav_info] ) def _inst_changed(user: str, inst: str): if not user or not inst: return gr.update(visible=False), "", "", "", "", "", gr.update(interactive=True), gr.update(interactive=False), "", "" idx = first_unfinished_in_instance(user, inst) q, a, f, im, header = get_payload(inst, idx) n = len(DATA[inst]["qa_pairs"]) slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(idx+1 if n > 0 else 1)) assignments = _get_assignments() return ( slider_update, f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}", gr.update(value=None, interactive=True), gr.update(value=None, interactive=False), "", f"{assignments.get(user, []).index(inst)+1}/{len(assignments.get(user, []))} • Q{(idx+1) if n>0 else 0}/{n}" ) inst_dd.change( _inst_changed, inputs=[user_dd, inst_dd], outputs=[q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md, relevant_radio, correct_radio, note_tb, nav_info] ) def _q_changed(inst: str, q_no: int): if not inst: return "", "", "", "" idx = int(q_no) - 1 q, a, f, im, header = get_payload(inst, idx) return f"**{header}**", f"**Q:** {q}", f"**A:** {a}", "" q_slider.change(_q_changed, inputs=[inst_dd, q_slider], outputs=[header_md, question_md, answer_md, note_tb]) def _relevant_changed(rel_choice: Optional[str]): if rel_choice == "✓ Relevant": return gr.update(interactive=True) else: # reset and disable return gr.update(value=None, interactive=False) relevant_radio.change(_relevant_changed, inputs=[relevant_radio], outputs=[correct_radio]) def _save(user: str, inst: str, q_no: int, rel_choice: Optional[str], corr_choice: Optional[str], note: str): if not (user and inst and q_no): raise gr.Error("Missing user/instance/question selection.") idx = int(q_no) - 1 msg = save_eval(user, inst, idx, rel_choice, corr_choice, note) txt, rows = summarize_user(user) return msg, txt, rows save_btn.click( _save, inputs=[user_dd, inst_dd, q_slider, relevant_radio, correct_radio, note_tb], outputs=[nav_info, progress_text, progress_table] ) def _save_and_next(user: str, inst: str, q_no: int, rel_choice: Optional[str], corr_choice: Optional[str], note: str): if not (user and inst and q_no): raise gr.Error("Missing user/instance/question selection.") idx = int(q_no) - 1 msg = save_eval(user, inst, idx, rel_choice, corr_choice, note) # jump to next unfinished (global) nxt_inst, nxt_idx = next_unfinished(user) assignments = _get_assignments() if nxt_inst is None: txt, rows = summarize_user(user) return ( gr.Dropdown.update(value=None), gr.update(visible=False), "", "", "", "", "", # q/a gr.update(value=None, interactive=True), gr.update(value=None, interactive=False), "", f"{msg}\n\nAll assigned QA pairs are completed. 🎉", txt, rows ) # else load that payload q, a, f, im, header = get_payload(nxt_inst, nxt_idx) n = len(DATA[nxt_inst]["qa_pairs"]) txt, rows = summarize_user(user) slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(nxt_idx+1 if n > 0 else 1)) return ( gr.update(value=nxt_inst), slider_update, f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}", gr.update(value=None, interactive=True), gr.update(value=None, interactive=False), "", msg, txt, rows ) save_next_btn.click( _save_and_next, inputs=[user_dd, inst_dd, q_slider, relevant_radio, correct_radio, note_tb], outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md, relevant_radio, correct_radio, note_tb, nav_info, progress_text, progress_table] ) def _skip_to_next(user: str): if not user: raise gr.Error("Please select a user.") inst, idx = next_unfinished(user) assignments = _get_assignments() if inst is None: txt, rows = summarize_user(user) return ( gr.update(value=None), gr.update(visible=False), "", "", "", "", "", gr.update(value=None, interactive=True), gr.update(value=None, interactive=False), "", "All assigned QA pairs are completed. 🎉", txt, rows ) q, a, f, im, header = get_payload(inst, idx) n = len(DATA[inst]["qa_pairs"]) txt, rows = summarize_user(user) slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(idx+1 if n > 0 else 1)) return ( gr.update(value=inst), slider_update, f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}", gr.update(value=None, interactive=True), gr.update(value=None, interactive=False), "", "Jumped to next unfinished.", txt, rows ) skip_btn.click( _skip_to_next, inputs=[user_dd], outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md, relevant_radio, correct_radio, note_tb, nav_info, progress_text, progress_table] ) def _export(user: str): msg = export_user_results(user) return msg export_btn.click(_export, inputs=[user_dd], outputs=[nav_info]) if __name__ == "__main__": # server_name '0.0.0.0' allows remote access if hosted; keep default port demo.launch(share=True)