elismasilva's picture
ajusted structure
f7824b9
import os
import json
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
RAW_DB_URL = os.getenv("DATABASE_URL")
# SQLAlchemy Configuration (Protocol fix for newer versions: postgres:// -> postgresql://)
SQLALCHEMY_DB_URL = RAW_DB_URL.replace("postgres://", "postgresql://") if RAW_DB_URL else None
# --- SQLALCHEMY SETUP (For Pandas Read Operations) ---
try:
if not SQLALCHEMY_DB_URL:
raise ValueError("DATABASE_URL is not set in environment.")
# Create global engine for connection pooling
db_engine = create_engine(SQLALCHEMY_DB_URL)
print("✅ SQLAlchemy Engine initialized.")
except Exception as e:
print(f"❌ Error creating engine: {e}")
db_engine = None
# --- PSYCOPG2 SETUP (For Write Operations) ---
def get_raw_connection():
"""
Creates a raw psycopg2 connection.
Used for UPDATE/INSERT operations where Pandas overhead is not needed.
"""
if not RAW_DB_URL:
raise ValueError("DATABASE_URL is not set.")
return psycopg2.connect(RAW_DB_URL)
# ==============================================================================
# READ OPERATIONS (Using SQLAlchemy + Pandas)
# Note: Pandas manages the connection opening/closing automatically via the engine.
# ==============================================================================
def fetch_distinct_repos():
"""
Fetches a list of unique repository URLs present in the 'items' table.
Used to populate the repository selection dropdown in the Dashboard.
"""
try:
# We query 'items' because it contains the raw synced data
query = "SELECT DISTINCT repo FROM items ORDER BY repo"
df = pd.read_sql(query, db_engine)
slugs = df['repo'].tolist()
# Transform slugs (owner/repo) into Full URLs
urls = [f"https://github.com/{slug}" for slug in slugs]
return urls
except Exception as e:
print(f"Error fetching repos: {e}")
return []
def fetch_dashboard_stats(repo_url=None):
"""
Calculates aggregate statistics (Total, Resolved, etc.), optionally filtered by repository.
Returns a DataFrame for the Donut Chart.
"""
try:
query = "SELECT verdict, count(*) as count FROM issue_reports"
params = []
if repo_url:
# Handle potential trailing slashes for exact matching
clean_url = repo_url.rstrip('/')
query += " WHERE (repo_url = %s OR repo_url = %s)"
params.extend([clean_url, clean_url + '/'])
query += " GROUP BY verdict"
return pd.read_sql(query, db_engine, params=tuple(params))
except Exception as e:
print(f"Stats Error: {e}")
return pd.DataFrame(columns=['verdict', 'count'])
def fetch_issues_dataframe(view_filter="pending", repo_url=None):
"""
Fetches the main list of issues for the table.
Performs a LEFT JOIN between 'items' (GitHub Data) and 'issue_reports' (AI Analysis).
"""
try:
# Base Query: Retrieves everything from 'items' and attaches report data if available
base_query = """
SELECT
i.number as issue_number,
i.title,
i.state as github_state,
-- URL: Use report URL if exists, otherwise construct from slug
COALESCE(r.repo_url, 'https://github.com/' || i.repo) as repo_url,
-- Verdict: Default to 'Pending Analysis' if null
COALESCE(r.verdict, 'Pending Analysis') as verdict,
r.llm_model,
r.confidence,
r.priority,
-- Status: Default to 'new' if null
COALESCE(r.status, 'new') as status,
r.proposed_action,
-- Date: Use analysis date if available, else GitHub update date
COALESCE(r.updated_at, i.updated_at) as updated_at
FROM items i
-- Soft Join: Match issue number AND ensure repo matches (via slug check)
LEFT JOIN issue_reports r
ON i.number = r.issue_number
AND r.repo_url ILIKE '%%' || i.repo || '%%'
WHERE (i.state = 'open' OR r.status = 'executed') AND i.is_pr = FALSE
"""
params = []
# Apply Repo Filter (Filtering the 'items' table via slug)
if repo_url:
# Convert full URL to slug (e.g. "https://github.com/user/repo" -> "user/repo")
slug = repo_url.replace("https://github.com/", "").replace("http://github.com/", "").strip("/")
base_query += " AND i.repo = %s"
params.append(slug)
# Apply View Filter
if "Action" in view_filter or "pending" in view_filter:
# Show only issues waiting for approval
base_query += " AND r.status = 'pending_approval' AND r.proposed_action IS NOT NULL ORDER BY r.updated_at DESC"
else:
# Show all issues (limit for performance)
base_query += " ORDER BY i.number DESC LIMIT 1000"
return pd.read_sql(base_query, db_engine, params=tuple(params))
except Exception as e:
print(f"Dataframe Error: {e}")
return pd.DataFrame()
def get_total_open_issues_count(repo_url=None) -> int:
"""
Counts the total number of open issues in the raw items table.
Used for the top of the Efficiency Funnel chart.
"""
try:
query = "SELECT count(*) as total FROM items WHERE state = 'open'"
params = []
if repo_url:
slug = repo_url.replace("https://github.com/", "").strip("/")
query += " AND repo = %s"
params.append(slug)
df = pd.read_sql(query, db_engine, params=tuple(params))
if not df.empty:
return int(df.iloc[0]['total'])
return 0
except Exception as e:
print(f"Error counting open issues: {e}")
return 0
def fetch_agent_logs(limit=20):
"""
Fetches the latest activity logs from the agent_traces table.
"""
try:
query = f"SELECT created_at, event_type, message, issue_number FROM agent_traces ORDER BY created_at DESC LIMIT {limit}"
return pd.read_sql(query, db_engine)
except Exception as e:
print(f"Logs Error: {e}")
return pd.DataFrame()
# ==============================================================================
# WRITE OPERATIONS (Using Psycopg2 Raw Connection)
# Requires manual cursor management and connection closing.
# ==============================================================================
def fetch_issue_details_by_id(issue_number, repo_url):
"""
Fetches the full markdown report, proposed action, and status for a specific issue.
Used when clicking a row in the table.
"""
conn = get_raw_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT analysis_body, proposed_action, status, thought_process FROM issue_reports WHERE issue_number = %s AND repo_url = %s",
(issue_number, repo_url)
)
row = cursor.fetchone()
return row if row else (None, None, None, None)
except Exception as e:
print(f"Error fetching details: {e}")
return (None, None, None, None)
finally:
cursor.close()
conn.close()
def get_proposed_action_payload(issue_number, repo_url):
"""
Retrieves the JSON action payload for execution.
"""
conn = get_raw_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT proposed_action FROM issue_reports WHERE issue_number = %s AND repo_url = %s",
(issue_number, repo_url)
)
row = cursor.fetchone()
return row[0] if row else None
except Exception as e:
print(f"Error fetching action: {e}")
return None
finally:
cursor.close()
conn.close()
def update_issue_status(issue_number, repo_url, new_status, final_comment=None):
"""
Updates status and optionally saves the final comment used.
"""
conn = get_raw_connection()
cursor = conn.cursor()
try:
if final_comment is not None:
cursor.execute(
"""
UPDATE issue_reports
SET status = %s,
proposed_action = jsonb_set(COALESCE(proposed_action, '{}'), '{comment}', to_jsonb(%s::text))
WHERE issue_number = %s AND repo_url = %s
""",
(new_status, final_comment, issue_number, repo_url)
)
else:
cursor.execute(
"UPDATE issue_reports SET status = %s WHERE issue_number = %s AND repo_url = %s",
(new_status, issue_number, repo_url)
)
conn.commit()
except Exception as e:
print(f"Error updating status: {e}")
finally:
cursor.close()
conn.close()
def save_analysis_report(
repo_url: str,
issue_number: int,
provider: str,
model: str,
verdict: str,
body: str,
thought: str = None,
action: dict = None,
priority: str = None,
duplicate_of: int = None
):
"""
Saves or updates an analysis report in the database.
Used by the Dashboard Chatbot when performing manual re-analysis.
"""
conn = get_raw_connection()
cursor = conn.cursor()
try:
cursor.execute(
"""
INSERT INTO issue_reports (repo_url, issue_number, llm_provider, llm_model, verdict, analysis_body, thought_process, proposed_action, priority, duplicate_of, updated_at, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), 'pending_approval')
ON CONFLICT (repo_url, issue_number)
DO UPDATE SET
llm_provider = EXCLUDED.llm_provider,
llm_model = EXCLUDED.llm_model,
verdict = EXCLUDED.verdict,
analysis_body = EXCLUDED.analysis_body,
thought_process = EXCLUDED.thought_process,
proposed_action = EXCLUDED.proposed_action,
priority = EXCLUDED.priority,
duplicate_of = EXCLUDED.duplicate_of,
status = 'pending_approval',
updated_at = NOW();
""",
(repo_url, issue_number, provider, model, verdict, body, thought, json.dumps(action) if action else None, priority, duplicate_of),
)
conn.commit()
except Exception as e:
print(f"Error saving report: {e}")
finally:
cursor.close()
conn.close()