Mashrafi2827's picture
Create app.py
05dcf61 verified
import os
import json
import random
import datetime as dt
from typing import Dict, List, Tuple, Optional
import gradio as gr
# ------------------------------
# Paths (override with env vars)
# ------------------------------
QA_PATH = os.getenv("QA_PATH", "./spatial_qa_output.json")
VALIDATION_PATH = os.getenv("VALIDATION_PATH", "./validation_reports_output.json")
ASSIGNMENTS_PATH = os.getenv("ASSIGNMENTS_PATH", "/data/assignments.json")
PROGRESS_PATH = os.getenv("PROGRESS_PATH", "/data/progress.json")
USERS_PATH = os.getenv("USERS_PATH", "./users.json")
EXPORT_DIR = os.getenv("EXPORT_DIR", "/data")
# ------------------------------
# Utilities
# ------------------------------
def _safe_read_json(path: str, default):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return default
def _safe_write_json(path: str, obj):
"""Safely write JSON file, with fallback to in-memory storage if writing fails."""
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(obj, f, indent=2, ensure_ascii=False)
os.replace(tmp, path)
return True
except (PermissionError, OSError, IOError) as e:
print(f"Warning: Could not write to {path}: {e}")
print("Running in read-only mode - data will not persist between sessions")
return False
# ------------------------------
# In-memory storage for read-only environments
# ------------------------------
_in_memory_assignments = None
_in_memory_progress = None
_file_write_enabled = True
def _get_assignments():
"""Get assignments from file or in-memory storage."""
global _in_memory_assignments
if _in_memory_assignments is not None:
return _in_memory_assignments
return _safe_read_json(ASSIGNMENTS_PATH, {})
def _set_assignments(assignments):
"""Set assignments to file and/or in-memory storage."""
global _in_memory_assignments, _file_write_enabled
_in_memory_assignments = assignments
if _file_write_enabled:
success = _safe_write_json(ASSIGNMENTS_PATH, assignments)
if not success:
_file_write_enabled = False
def _get_progress():
"""Get progress from file or in-memory storage."""
global _in_memory_progress
if _in_memory_progress is not None:
return _in_memory_progress
return _safe_read_json(PROGRESS_PATH, {})
def _set_progress(progress):
"""Set progress to file and/or in-memory storage."""
global _in_memory_progress, _file_write_enabled
_in_memory_progress = progress
if _file_write_enabled:
success = _safe_write_json(PROGRESS_PATH, progress)
if not success:
_file_write_enabled = False
# ------------------------------
# Load data
# ------------------------------
def load_data() -> Dict[str, Dict]:
"""Return dict keyed by instance_id with:
- findings (str)
- impressions (str)
- qa_pairs (list of {'question','answer'})"""
with open(QA_PATH, "r", encoding="utf-8") as f:
qa_data = json.load(f)
with open(VALIDATION_PATH, "r", encoding="utf-8") as f:
val_data = json.load(f)
data = {}
missing_in_val = []
for inst_id, payload in qa_data.items():
if inst_id not in val_data:
missing_in_val.append(inst_id)
continue
find_str = (
val_data[inst_id].get("Findings_EN")
or val_data[inst_id].get("Findings")
or ""
)
impr_str = (
val_data[inst_id].get("Impressions_EN")
or val_data[inst_id].get("Impressions")
or ""
)
pairs = payload.get("qa_pairs", [])
# normalize
normalized_pairs = []
for p in pairs:
normalized_pairs.append(
{
"question": str(p.get("question", "")).strip(),
"answer": str(p.get("answer", "")).strip(),
}
)
data[inst_id] = {
"findings": find_str.strip(),
"impressions": impr_str.strip(),
"qa_pairs": normalized_pairs,
}
if not data:
raise RuntimeError("No overlapping instances between QA and Validation files. "
"Check the JSON files and their keys.")
return data
DATA = load_data()
INSTANCE_IDS = sorted(list(DATA.keys()))
def load_users() -> List[str]:
"""Load users from JSON file, fallback to default if file doesn't exist."""
users_data = _safe_read_json(USERS_PATH, {})
if "users" in users_data and isinstance(users_data["users"], list):
return [str(user).strip() for user in users_data["users"] if user and str(user).strip()]
# Fallback to default users
return [f"user_{i+1:02d}" for i in range(20)]
def get_default_seed() -> int:
"""Load default seed from users JSON file."""
users_data = _safe_read_json(USERS_PATH, {})
return users_data.get("default_seed", 42)
DEFAULT_USERS = load_users()
DEFAULT_SEED = get_default_seed()
# ----------------------------------
# Assignment & Progress persistence
# ----------------------------------
def init_or_load_assignments(default_users: List[str], seed: int = 42) -> Dict[str, List[str]]:
"""Load existing assignments or create a balanced random split."""
assignments = _get_assignments()
if assignments:
# filter out instances no longer present; preserve order
for u, lst in list(assignments.items()):
assignments[u] = [x for x in lst if x in INSTANCE_IDS]
_set_assignments(assignments)
return assignments
return create_assignments(default_users, seed)
def create_assignments(usernames: List[str], seed: int) -> Dict[str, List[str]]:
usernames = [u.strip() for u in usernames if u and u.strip()]
if len(usernames) == 0:
raise gr.Error("Please provide at least one username.")
# Each user gets exactly 10 instances
instances_per_user = 10
total_instances_needed = len(usernames) * instances_per_user
if total_instances_needed > len(INSTANCE_IDS):
raise gr.Error(f"Not enough instances available. Need {total_instances_needed} but only have {len(INSTANCE_IDS)}.")
insts = INSTANCE_IDS.copy()
rng = random.Random(int(seed))
rng.shuffle(insts)
# Take only the number of instances we need
selected_insts = insts[:total_instances_needed]
buckets = [[] for _ in usernames]
for i, inst in enumerate(selected_insts):
buckets[i % len(usernames)].append(inst)
assignments = {usernames[i]: buckets[i] for i in range(len(usernames))}
_set_assignments(assignments)
return assignments
ASSIGNMENTS = init_or_load_assignments(DEFAULT_USERS, seed=DEFAULT_SEED)
def available_users() -> List[str]:
return sorted(list(_get_assignments().keys()))
def load_progress() -> Dict:
return _get_progress()
PROGRESS = load_progress()
def _ensure_user_progress_struct(user: str):
"""Initialize user progress skeleton for new users/instances."""
global PROGRESS
if user not in PROGRESS:
PROGRESS[user] = {}
# ensure entries exist for assigned instances
for inst in ASSIGNMENTS.get(user, []):
n = len(DATA[inst]["qa_pairs"])
if inst not in PROGRESS[user]:
PROGRESS[user][inst] = {
"answers": [None] * n
}
else:
# pad or trim if needed
ans = PROGRESS[user][inst].get("answers", [])
if len(ans) < n:
ans = ans + [None] * (n - len(ans))
elif len(ans) > n:
ans = ans[:n]
PROGRESS[user][inst]["answers"] = ans
_set_progress(PROGRESS)
def save_eval(user: str, inst: str, q_idx: int,
relevant_choice: Optional[str], correct_choice: Optional[str], note: str = "") -> str:
"""Persist evaluation for a single QA pair."""
if not user or not inst:
return "Select a user to begin."
_ensure_user_progress_struct(user)
global PROGRESS
if inst not in PROGRESS[user]:
PROGRESS[user][inst] = {"answers": [None] * len(DATA[inst]["qa_pairs"])}
# map choices to booleans
rel = None
if relevant_choice == "βœ“ Relevant":
rel = True
elif relevant_choice == "βœ— Not relevant":
rel = False
corr = None
if rel is True:
if correct_choice == "βœ“ Correct":
corr = True
elif correct_choice == "βœ— Incorrect":
corr = False
# build record
record = {
"relevant": rel,
"correct": corr if rel is True else None,
"note": (note or "").strip(),
"saved_at": dt.datetime.utcnow().isoformat() + "Z"
}
# save
answers = PROGRESS[user][inst]["answers"]
while q_idx >= len(answers): # safety
answers.append(None)
answers[q_idx] = record
_set_progress(PROGRESS)
label = f"Saved: {user} β€’ {inst} β€’ Q{q_idx+1} β†’ relevant={rel}"
if rel is True:
label += f", correct={corr}"
return label
def summarize_user(user: str) -> Tuple[str, List[List]]:
"""Return summary text & table for a user's progress."""
if not user:
return ("β€”", [])
_ensure_user_progress_struct(user)
assignments = _get_assignments()
progress = _get_progress()
total = 0
done = 0
rows = []
for inst in assignments.get(user, []):
qa_n = len(DATA[inst]["qa_pairs"])
total += qa_n
answers = progress[user][inst]["answers"]
c_done = sum(1 for a in answers if a is not None)
done += c_done
rows.append([inst, c_done, qa_n])
txt = f"Progress: {done} / {total} QA pairs completed across {len(assignments.get(user, []))} assigned instances."
return txt, rows
def next_unfinished(user: str) -> Tuple[Optional[str], Optional[int]]:
"""Return (instance_id, q_index) for the next unfinished QA pair for the user."""
_ensure_user_progress_struct(user)
assignments = _get_assignments()
progress = _get_progress()
for inst in assignments.get(user, []):
answers = progress[user][inst]["answers"]
for i, a in enumerate(answers):
if a is None:
return inst, i
return None, None
def first_unfinished_in_instance(user: str, inst: str) -> int:
_ensure_user_progress_struct(user)
progress = _get_progress()
answers = progress[user][inst]["answers"]
for i, a in enumerate(answers):
if a is None:
return i
return 0
def get_payload(inst: str, q_idx: int) -> Tuple[str, str, str, str, str]:
"""Return Q, A, findings, impressions, header text."""
pairs = DATA[inst]["qa_pairs"]
n = len(pairs)
if n == 0:
q = ""
a = ""
header = f"{inst} β€” No questions (0/0)"
f = DATA[inst]["findings"]
im = DATA[inst]["impressions"]
return q, a, f, im, header
q_idx = max(0, min(q_idx, n-1))
q = pairs[q_idx]["question"]
a = pairs[q_idx]["answer"]
f = DATA[inst]["findings"]
im = DATA[inst]["impressions"]
header = f"{inst} β€” Question {q_idx+1} / {n}"
return q, a, f, im, header
def export_user_results(user: str) -> str:
"""Write a CSV + JSON export for the selected user and return a status string & file paths."""
if not user:
return "Select a user to export results."
_ensure_user_progress_struct(user)
assignments = _get_assignments()
progress = _get_progress()
# Build flat rows
rows = []
for inst in assignments.get(user, []):
pairs = DATA[inst]["qa_pairs"]
answers = progress[user][inst]["answers"]
for i in range(len(pairs)):
ans = answers[i]
rows.append({
"user": user,
"instance_id": inst,
"q_index": i+1,
"question": pairs[i]["question"],
"answer": pairs[i]["answer"],
"relevant": None if ans is None else ans.get("relevant"),
"correct": None if ans is None else ans.get("correct"),
"note": None if ans is None else ans.get("note"),
"saved_at": None if ans is None else ans.get("saved_at"),
})
# Try to export to files, fallback to in-memory if not possible
try:
ts = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S")
json_path = os.path.join(EXPORT_DIR, f"results_{user}_{ts}.json")
csv_path = os.path.join(EXPORT_DIR, f"results_{user}_{ts}.csv")
_safe_write_json(json_path, rows)
# Write CSV
import csv
with open(csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
return f"Exported {len(rows)} rows.\nJSON: {json_path}\nCSV: {csv_path}"
except (PermissionError, OSError, IOError):
# Fallback: return data as JSON string
import json
json_str = json.dumps(rows, indent=2, ensure_ascii=False)
return f"Export data (read-only mode):\n\n{json_str[:1000]}{'...' if len(json_str) > 1000 else ''}"
# ------------------------------
# Gradio UI
# ------------------------------
with gr.Blocks(title="Spatial QA Validator", theme=gr.themes.Glass()) as demo:
# Check if running in read-only mode
read_only_status = ""
if not _file_write_enabled:
read_only_status = "\n\n⚠️ **Running in read-only mode** - Progress will not persist between sessions"
gr.Markdown("## Spatial QA Validation Tool\n"
"Left: Findings & Impression (Ground Truth)\n\n"
"Right: Spatial QA pairs (Q/A) to validate.\n\n"
"For each Q/A:\n"
"1) Mark if the **Question is relevant** to the Findings/Impression.\n"
"2) If **relevant**, mark whether the **Answer is correct**.\n" + read_only_status)
with gr.Row():
with gr.Column(scale=1, min_width=260):
user_dd = gr.Dropdown(choices=available_users(), label="Select user", interactive=True)
load_btn = gr.Button("Load my queue", variant="primary")
progress_text = gr.Markdown("")
progress_table = gr.Dataframe(headers=["Instance", "Done", "Total"], row_count=0, interactive=False)
inst_dd = gr.Dropdown(choices=[], label="Assigned instance", interactive=True, visible=False)
q_slider = gr.Slider(1, 10, value=1, step=1, label="Question #", interactive=True, visible=False)
export_btn = gr.Button("Export my results")
with gr.Column(scale=2, min_width=600):
# Left panel: Findings/Impressions
with gr.Row():
with gr.Column(scale=1, elem_classes=["left-panel"]):
findings_tb = gr.Textbox(label="Findings (Ground Truth)", lines=16, interactive=False)
impressions_tb = gr.Textbox(label="Impression (Ground Truth)", lines=6, interactive=False)
with gr.Column(scale=1, elem_classes=["left-panel"]):
header_md = gr.Markdown("")
question_md = gr.Markdown("")
answer_md = gr.Markdown("")
relevant_radio = gr.Radio(choices=["βœ“ Relevant", "βœ— Not relevant"],
label="1) Is the QUESTION relevant to Findings/Impression?",
interactive=True)
correct_radio = gr.Radio(choices=["βœ“ Correct", "βœ— Incorrect"],
label="2) If relevant, is the ANSWER correct?",
interactive=False)
note_tb = gr.Textbox(label="Optional note", lines=2, placeholder="Any comments...")
with gr.Row():
save_btn = gr.Button("Save", variant="secondary")
save_next_btn = gr.Button("Save & Next", variant="primary")
skip_btn = gr.Button("Skip to next unfinished")
nav_info = gr.Markdown("")
# Move Admin accordion below so user_dd exists before wiring its updates
with gr.Accordion("Setup (admin) β€” define users and (re)deal assignments", open=False):
with gr.Row():
users_csv = gr.Textbox(value=",".join(DEFAULT_USERS),
label="Usernames (comma-separated)",
lines=2)
seed_num = gr.Number(value=DEFAULT_SEED, precision=0,
label="Assignment Seed (change & Apply)")
apply_btn = gr.Button("Apply / (Re)create Assignments", variant="secondary")
assign_info = gr.Markdown()
def apply_users(u_csv, seed):
usernames = [x.strip() for x in (u_csv or "").split(",") if x.strip()]
new_assign = create_assignments(usernames, int(seed or 0))
user_list = ", ".join(sorted(new_assign.keys()))
# summarize counts
counts = {u: len(v) for u, v in new_assign.items()}
table = "\n".join([f"- **{u}**: {counts[u]} instances" for u in sorted(counts)])
total_assigned = sum(counts.values())
total_available = len(INSTANCE_IDS)
unassigned = total_available - total_assigned
return gr.update(choices=available_users(), value=None), f"Assignments updated for {len(new_assign)} users (10 instances each):\n{table}\n\n**Summary:** {total_assigned} instances assigned, {unassigned} instances left unassigned out of {total_available} total."
# now that user_dd exists, reference it directly in outputs
apply_btn.click(apply_users, inputs=[users_csv, seed_num], outputs=[user_dd, assign_info])
# ---- Wiring functions ----
def _load_user(user: str):
if not user:
raise gr.Error("Please select a user.")
# Refresh assignments if changed by admin
init_or_load_assignments(DEFAULT_USERS, seed=DEFAULT_SEED)
_ensure_user_progress_struct(user)
# summary
txt, rows = summarize_user(user)
# initial pointer = next unfinished overall
inst, q_idx = next_unfinished(user)
assignments = _get_assignments()
if inst is None:
# user is done
return (
gr.update(choices=assignments.get(user, []), visible=True, value=None),
gr.update(visible=False),
"", "", "", "", "",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"", # note
txt, rows, "All assigned QA pairs are completed. πŸŽ‰"
)
# populate content
q, a, f, im, header = get_payload(inst, q_idx)
n = len(DATA[inst]["qa_pairs"])
slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(q_idx+1 if n > 0 else 1))
nav = f"{assignments.get(user, []).index(inst)+1}/{len(assignments.get(user, []))} β€’ Q{(q_idx+1) if n>0 else 0}/{n}"
return (
gr.update(choices=assignments.get(user, []), visible=True, value=inst),
slider_update,
f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
txt, rows, nav
)
load_btn.click(
_load_user,
inputs=[user_dd],
outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md,
relevant_radio, correct_radio, note_tb, progress_text, progress_table, nav_info]
)
def _inst_changed(user: str, inst: str):
if not user or not inst:
return gr.update(visible=False), "", "", "", "", "", gr.update(interactive=True), gr.update(interactive=False), "", ""
idx = first_unfinished_in_instance(user, inst)
q, a, f, im, header = get_payload(inst, idx)
n = len(DATA[inst]["qa_pairs"])
slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(idx+1 if n > 0 else 1))
assignments = _get_assignments()
return (
slider_update,
f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
f"{assignments.get(user, []).index(inst)+1}/{len(assignments.get(user, []))} β€’ Q{(idx+1) if n>0 else 0}/{n}"
)
inst_dd.change(
_inst_changed,
inputs=[user_dd, inst_dd],
outputs=[q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md,
relevant_radio, correct_radio, note_tb, nav_info]
)
def _q_changed(inst: str, q_no: int):
if not inst:
return "", "", "", ""
idx = int(q_no) - 1
q, a, f, im, header = get_payload(inst, idx)
return f"**{header}**", f"**Q:** {q}", f"**A:** {a}", ""
q_slider.change(_q_changed, inputs=[inst_dd, q_slider],
outputs=[header_md, question_md, answer_md, note_tb])
def _relevant_changed(rel_choice: Optional[str]):
if rel_choice == "βœ“ Relevant":
return gr.update(interactive=True)
else:
# reset and disable
return gr.update(value=None, interactive=False)
relevant_radio.change(_relevant_changed, inputs=[relevant_radio], outputs=[correct_radio])
def _save(user: str, inst: str, q_no: int, rel_choice: Optional[str], corr_choice: Optional[str], note: str):
if not (user and inst and q_no):
raise gr.Error("Missing user/instance/question selection.")
idx = int(q_no) - 1
msg = save_eval(user, inst, idx, rel_choice, corr_choice, note)
txt, rows = summarize_user(user)
return msg, txt, rows
save_btn.click(
_save,
inputs=[user_dd, inst_dd, q_slider, relevant_radio, correct_radio, note_tb],
outputs=[nav_info, progress_text, progress_table]
)
def _save_and_next(user: str, inst: str, q_no: int, rel_choice: Optional[str], corr_choice: Optional[str], note: str):
if not (user and inst and q_no):
raise gr.Error("Missing user/instance/question selection.")
idx = int(q_no) - 1
msg = save_eval(user, inst, idx, rel_choice, corr_choice, note)
# jump to next unfinished (global)
nxt_inst, nxt_idx = next_unfinished(user)
assignments = _get_assignments()
if nxt_inst is None:
txt, rows = summarize_user(user)
return (
gr.Dropdown.update(value=None),
gr.update(visible=False),
"", "", "",
"", "", # q/a
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
f"{msg}\n\nAll assigned QA pairs are completed. πŸŽ‰",
txt, rows
)
# else load that payload
q, a, f, im, header = get_payload(nxt_inst, nxt_idx)
n = len(DATA[nxt_inst]["qa_pairs"])
txt, rows = summarize_user(user)
slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(nxt_idx+1 if n > 0 else 1))
return (
gr.update(value=nxt_inst),
slider_update,
f, im, f"**{header}**",
f"**Q:** {q}", f"**A:** {a}",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
msg,
txt, rows
)
save_next_btn.click(
_save_and_next,
inputs=[user_dd, inst_dd, q_slider, relevant_radio, correct_radio, note_tb],
outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md,
relevant_radio, correct_radio, note_tb, nav_info, progress_text, progress_table]
)
def _skip_to_next(user: str):
if not user:
raise gr.Error("Please select a user.")
inst, idx = next_unfinished(user)
assignments = _get_assignments()
if inst is None:
txt, rows = summarize_user(user)
return (
gr.update(value=None),
gr.update(visible=False),
"", "", "",
"", "",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
"All assigned QA pairs are completed. πŸŽ‰",
txt, rows
)
q, a, f, im, header = get_payload(inst, idx)
n = len(DATA[inst]["qa_pairs"])
txt, rows = summarize_user(user)
slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(idx+1 if n > 0 else 1))
return (
gr.update(value=inst),
slider_update,
f, im, f"**{header}**",
f"**Q:** {q}", f"**A:** {a}",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
"Jumped to next unfinished.",
txt, rows
)
skip_btn.click(
_skip_to_next,
inputs=[user_dd],
outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md,
relevant_radio, correct_radio, note_tb, nav_info, progress_text, progress_table]
)
def _export(user: str):
msg = export_user_results(user)
return msg
export_btn.click(_export, inputs=[user_dd], outputs=[nav_info])
if __name__ == "__main__":
# server_name '0.0.0.0' allows remote access if hosted; keep default port
demo.launch(share=True)