import os import json import psycopg2 import pandas as pd from sqlalchemy import create_engine RAW_DB_URL = os.getenv("DATABASE_URL") # SQLAlchemy Configuration (Protocol fix for newer versions: postgres:// -> postgresql://) SQLALCHEMY_DB_URL = RAW_DB_URL.replace("postgres://", "postgresql://") if RAW_DB_URL else None # --- SQLALCHEMY SETUP (For Pandas Read Operations) --- try: if not SQLALCHEMY_DB_URL: raise ValueError("DATABASE_URL is not set in environment.") # Create global engine for connection pooling db_engine = create_engine(SQLALCHEMY_DB_URL) print("✅ SQLAlchemy Engine initialized.") except Exception as e: print(f"❌ Error creating engine: {e}") db_engine = None # --- PSYCOPG2 SETUP (For Write Operations) --- def get_raw_connection(): """ Creates a raw psycopg2 connection. Used for UPDATE/INSERT operations where Pandas overhead is not needed. """ if not RAW_DB_URL: raise ValueError("DATABASE_URL is not set.") return psycopg2.connect(RAW_DB_URL) # ============================================================================== # READ OPERATIONS (Using SQLAlchemy + Pandas) # Note: Pandas manages the connection opening/closing automatically via the engine. # ============================================================================== def fetch_distinct_repos(): """ Fetches a list of unique repository URLs present in the 'items' table. Used to populate the repository selection dropdown in the Dashboard. """ try: # We query 'items' because it contains the raw synced data query = "SELECT DISTINCT repo FROM items ORDER BY repo" df = pd.read_sql(query, db_engine) slugs = df['repo'].tolist() # Transform slugs (owner/repo) into Full URLs urls = [f"https://github.com/{slug}" for slug in slugs] return urls except Exception as e: print(f"Error fetching repos: {e}") return [] def fetch_dashboard_stats(repo_url=None): """ Calculates aggregate statistics (Total, Resolved, etc.), optionally filtered by repository. Returns a DataFrame for the Donut Chart. """ try: query = "SELECT verdict, count(*) as count FROM issue_reports" params = [] if repo_url: # Handle potential trailing slashes for exact matching clean_url = repo_url.rstrip('/') query += " WHERE (repo_url = %s OR repo_url = %s)" params.extend([clean_url, clean_url + '/']) query += " GROUP BY verdict" return pd.read_sql(query, db_engine, params=tuple(params)) except Exception as e: print(f"Stats Error: {e}") return pd.DataFrame(columns=['verdict', 'count']) def fetch_issues_dataframe(view_filter="pending", repo_url=None): """ Fetches the main list of issues for the table. Performs a LEFT JOIN between 'items' (GitHub Data) and 'issue_reports' (AI Analysis). """ try: # Base Query: Retrieves everything from 'items' and attaches report data if available base_query = """ SELECT i.number as issue_number, i.title, i.state as github_state, -- URL: Use report URL if exists, otherwise construct from slug COALESCE(r.repo_url, 'https://github.com/' || i.repo) as repo_url, -- Verdict: Default to 'Pending Analysis' if null COALESCE(r.verdict, 'Pending Analysis') as verdict, r.llm_model, r.confidence, r.priority, -- Status: Default to 'new' if null COALESCE(r.status, 'new') as status, r.proposed_action, -- Date: Use analysis date if available, else GitHub update date COALESCE(r.updated_at, i.updated_at) as updated_at FROM items i -- Soft Join: Match issue number AND ensure repo matches (via slug check) LEFT JOIN issue_reports r ON i.number = r.issue_number AND r.repo_url ILIKE '%%' || i.repo || '%%' WHERE (i.state = 'open' OR r.status = 'executed') AND i.is_pr = FALSE """ params = [] # Apply Repo Filter (Filtering the 'items' table via slug) if repo_url: # Convert full URL to slug (e.g. "https://github.com/user/repo" -> "user/repo") slug = repo_url.replace("https://github.com/", "").replace("http://github.com/", "").strip("/") base_query += " AND i.repo = %s" params.append(slug) # Apply View Filter if "Action" in view_filter or "pending" in view_filter: # Show only issues waiting for approval base_query += " AND r.status = 'pending_approval' AND r.proposed_action IS NOT NULL ORDER BY r.updated_at DESC" else: # Show all issues (limit for performance) base_query += " ORDER BY i.number DESC LIMIT 1000" return pd.read_sql(base_query, db_engine, params=tuple(params)) except Exception as e: print(f"Dataframe Error: {e}") return pd.DataFrame() def get_total_open_issues_count(repo_url=None) -> int: """ Counts the total number of open issues in the raw items table. Used for the top of the Efficiency Funnel chart. """ try: query = "SELECT count(*) as total FROM items WHERE state = 'open'" params = [] if repo_url: slug = repo_url.replace("https://github.com/", "").strip("/") query += " AND repo = %s" params.append(slug) df = pd.read_sql(query, db_engine, params=tuple(params)) if not df.empty: return int(df.iloc[0]['total']) return 0 except Exception as e: print(f"Error counting open issues: {e}") return 0 def fetch_agent_logs(limit=20): """ Fetches the latest activity logs from the agent_traces table. """ try: query = f"SELECT created_at, event_type, message, issue_number FROM agent_traces ORDER BY created_at DESC LIMIT {limit}" return pd.read_sql(query, db_engine) except Exception as e: print(f"Logs Error: {e}") return pd.DataFrame() # ============================================================================== # WRITE OPERATIONS (Using Psycopg2 Raw Connection) # Requires manual cursor management and connection closing. # ============================================================================== def fetch_issue_details_by_id(issue_number, repo_url): """ Fetches the full markdown report, proposed action, and status for a specific issue. Used when clicking a row in the table. """ conn = get_raw_connection() cursor = conn.cursor() try: cursor.execute( "SELECT analysis_body, proposed_action, status, thought_process FROM issue_reports WHERE issue_number = %s AND repo_url = %s", (issue_number, repo_url) ) row = cursor.fetchone() return row if row else (None, None, None, None) except Exception as e: print(f"Error fetching details: {e}") return (None, None, None, None) finally: cursor.close() conn.close() def get_proposed_action_payload(issue_number, repo_url): """ Retrieves the JSON action payload for execution. """ conn = get_raw_connection() cursor = conn.cursor() try: cursor.execute( "SELECT proposed_action FROM issue_reports WHERE issue_number = %s AND repo_url = %s", (issue_number, repo_url) ) row = cursor.fetchone() return row[0] if row else None except Exception as e: print(f"Error fetching action: {e}") return None finally: cursor.close() conn.close() def update_issue_status(issue_number, repo_url, new_status, final_comment=None): """ Updates status and optionally saves the final comment used. """ conn = get_raw_connection() cursor = conn.cursor() try: if final_comment is not None: cursor.execute( """ UPDATE issue_reports SET status = %s, proposed_action = jsonb_set(COALESCE(proposed_action, '{}'), '{comment}', to_jsonb(%s::text)) WHERE issue_number = %s AND repo_url = %s """, (new_status, final_comment, issue_number, repo_url) ) else: cursor.execute( "UPDATE issue_reports SET status = %s WHERE issue_number = %s AND repo_url = %s", (new_status, issue_number, repo_url) ) conn.commit() except Exception as e: print(f"Error updating status: {e}") finally: cursor.close() conn.close() def save_analysis_report( repo_url: str, issue_number: int, provider: str, model: str, verdict: str, body: str, thought: str = None, action: dict = None, priority: str = None, duplicate_of: int = None ): """ Saves or updates an analysis report in the database. Used by the Dashboard Chatbot when performing manual re-analysis. """ conn = get_raw_connection() cursor = conn.cursor() try: cursor.execute( """ INSERT INTO issue_reports (repo_url, issue_number, llm_provider, llm_model, verdict, analysis_body, thought_process, proposed_action, priority, duplicate_of, updated_at, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), 'pending_approval') ON CONFLICT (repo_url, issue_number) DO UPDATE SET llm_provider = EXCLUDED.llm_provider, llm_model = EXCLUDED.llm_model, verdict = EXCLUDED.verdict, analysis_body = EXCLUDED.analysis_body, thought_process = EXCLUDED.thought_process, proposed_action = EXCLUDED.proposed_action, priority = EXCLUDED.priority, duplicate_of = EXCLUDED.duplicate_of, status = 'pending_approval', updated_at = NOW(); """, (repo_url, issue_number, provider, model, verdict, body, thought, json.dumps(action) if action else None, priority, duplicate_of), ) conn.commit() except Exception as e: print(f"Error saving report: {e}") finally: cursor.close() conn.close()