import streamlit as st import yfinance as yf import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from datetime import datetime, timedelta import requests from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score from sklearn.preprocessing import StandardScaler from catboost import CatBoostRegressor import shap import ta import matplotlib.pyplot as plt import warnings import openai warnings.filterwarnings('ignore') # Initialize the OpenAI client OPENAI_API_KEY = "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" # Replace with your actual OpenAI API key openai.api_key = OPENAI_API_KEY # Alpha Vantage API key ALPHA_VANTAGE_API_KEY = "JK0DVDNTEYBTBP5L" # GPT Assistant ID ASSISTANT_ID = "asst_Fl3rRrRijb8FJDpqjBexfUBp" # Custom CSS st.markdown(""" """, unsafe_allow_html=True) def get_financial_data(ticker, end_date): base_url = "https://www.alphavantage.co/query" functions = ['INCOME_STATEMENT', 'BALANCE_SHEET', 'CASH_FLOW'] data = {} for function in functions: params = { "function": function, "symbol": ticker, "apikey": ALPHA_VANTAGE_API_KEY } response = requests.get(base_url, params=params) if response.status_code == 200: data[function] = response.json() else: raise Exception(f"Failed to fetch {function} data: {response.status_code}") for function, content in data.items(): if 'quarterlyReports' in content: content['quarterlyReports'] = [ report for report in content['quarterlyReports'] if datetime.strptime(report['fiscalDateEnding'], '%Y-%m-%d').date() <= end_date ] if 'annualReports' in content: content['annualReports'] = [ report for report in content['annualReports'] if datetime.strptime(report['fiscalDateEnding'], '%Y-%m-%d').date() <= end_date ] return data def get_earnings_dates(ticker): url = f"https://www.alphavantage.co/query?function=EARNINGS&symbol={ticker}&apikey={ALPHA_VANTAGE_API_KEY}" response = requests.get(url) data = response.json() earnings_dates = {} for report in data.get('quarterlyEarnings', []): fiscal_date = report['fiscalDateEnding'] reported_date = report['reportedDate'] earnings_dates[fiscal_date] = reported_date return earnings_dates def get_earnings_data(ticker): url = f"https://www.alphavantage.co/query?function=EARNINGS&symbol={ticker}&apikey={ALPHA_VANTAGE_API_KEY}" response = requests.get(url) data = response.json() quarterly_earnings = data.get('quarterlyEarnings', []) df = pd.DataFrame(quarterly_earnings) df['fiscalDateEnding'] = pd.to_datetime(df['fiscalDateEnding']) df['reportedDate'] = pd.to_datetime(df['reportedDate']) df = df.set_index('reportedDate') numeric_columns = ['reportedEPS', 'estimatedEPS', 'surprise', 'surprisePercentage'] for col in numeric_columns: df[col] = pd.to_numeric(df[col], errors='coerce') return df def process_financial_data(data, earnings_dates, earnings_data): quarterly_data = {} for statement_type, statement_data in data.items(): if 'quarterlyReports' in statement_data: for report in statement_data['quarterlyReports']: fiscal_date = report['fiscalDateEnding'] release_date = earnings_dates.get(fiscal_date, fiscal_date) if release_date not in quarterly_data: quarterly_data[release_date] = {} quarterly_data[release_date].update({f"{statement_type}_{k}": v for k, v in report.items()}) df = pd.DataFrame.from_dict(quarterly_data, orient='index') df.index = pd.to_datetime(df.index) df = df.sort_index() df = df.join(earnings_data, how='left') for col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') return df def get_stock_data(ticker, start_date, end_date): df = yf.download(ticker, start=start_date, end=end_date) df['Price_Pct_Change'] = df['Close'].pct_change() df['RSI'] = ta.momentum.RSIIndicator(df['Close']).rsi() df['WILLR'] = ta.momentum.WilliamsRIndicator(df['High'], df['Low'], df['Close']).williams_r() bb = ta.volatility.BollingerBands(df['Close']) df['BB_upper'] = bb.bollinger_hband() df['BB_middle'] = bb.bollinger_mavg() df['BB_lower'] = bb.bollinger_lband() df['OBV'] = ta.volume.OnBalanceVolumeIndicator(df['Close'], df['Volume']).on_balance_volume() df['ATR'] = ta.volatility.AverageTrueRange(df['High'], df['Low'], df['Close']).average_true_range() df['MACD'] = ta.trend.MACD(df['Close']).macd() df['ADX'] = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close']).adx() df['CCI'] = ta.trend.CCIIndicator(df['High'], df['Low'], df['Close']).cci() indicator_columns = ['RSI', 'WILLR', 'BB_upper', 'BB_middle', 'BB_lower', 'OBV', 'ATR', 'MACD', 'ADX', 'CCI'] for column in indicator_columns: df[f'{column}_ROC'] = df[column].pct_change() return df def add_financial_ratios(X): def safe_divide(a, b): return np.where(b != 0, a / b, np.nan) X['PE_Ratio'] = safe_divide(X['BALANCE_SHEET_totalShareholderEquity'], X['INCOME_STATEMENT_netIncome']) X['PB_Ratio'] = safe_divide(X['BALANCE_SHEET_totalAssets'], X['BALANCE_SHEET_totalShareholderEquity']) X['Debt_to_Equity'] = safe_divide(X['BALANCE_SHEET_totalLiabilities'], X['BALANCE_SHEET_totalShareholderEquity']) X['ROE'] = safe_divide(X['INCOME_STATEMENT_netIncome'], X['BALANCE_SHEET_totalShareholderEquity']) X['ROA'] = safe_divide(X['INCOME_STATEMENT_netIncome'], X['BALANCE_SHEET_totalAssets']) return X def prepare_data(quarterly_df, stock_df, end_date): quarterly_df.index = pd.to_datetime(quarterly_df.index).date stock_df.index = pd.to_datetime(stock_df.index).date quarterly_df = quarterly_df[quarterly_df.index <= end_date] stock_df = stock_df[stock_df.index <= end_date] start_date = min(quarterly_df.index.min(), stock_df.index.min()) all_dates = pd.date_range(start=start_date, end=end_date, freq='D').date quarterly_df_reindexed = quarterly_df.reindex(all_dates).ffill() stock_df_reindexed = stock_df.reindex(all_dates).ffill() merged_df = pd.concat([stock_df_reindexed['Close'], quarterly_df_reindexed], axis=1) merged_df = merged_df.dropna(subset=['Close']) if merged_df.empty: raise ValueError("No overlapping data between stock prices and financial statements.") X = merged_df.drop('Close', axis=1) y = merged_df['Close'] X = X.fillna(X.mean()) X['EPS_Surprise'] = X['reportedEPS'] - X['estimatedEPS'] X['EPS_Surprise_Percentage'] = X['surprisePercentage'] X = add_financial_ratios(X) scaler_X = StandardScaler() scaler_y = StandardScaler() X_scaled = pd.DataFrame(scaler_X.fit_transform(X), columns=X.columns, index=X.index) y_scaled = pd.Series(scaler_y.fit_transform(y.values.reshape(-1, 1)).flatten(), index=y.index) return X_scaled, y_scaled, merged_df.index, scaler_X, scaler_y def train_catboost_model(X_train, X_test, y_train, y_test): model = CatBoostRegressor( iterations=1000, learning_rate=0.1, depth=6, loss_function='RMSE', random_state=42, verbose=100 ) model.fit(X_train, y_train, eval_set=(X_test, y_test), early_stopping_rounds=50) return model def evaluate_model(model, X_test, y_test, scaler_y): y_pred_scaled = model.predict(X_test) y_pred = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten() y_test_unscaled = scaler_y.inverse_transform(y_test.values.reshape(-1, 1)).flatten() mse = mean_squared_error(y_test_unscaled, y_pred) r2 = r2_score(y_test_unscaled, y_pred) return r2 def conformal_prediction(model, X_train, y_train, X_test, scaler_y, alpha=0.1): model.fit(X_train, y_train) y_pred_train = model.predict(X_train) y_pred_train_unscaled = scaler_y.inverse_transform(y_pred_train.reshape(-1, 1)).flatten() y_train_unscaled = scaler_y.inverse_transform(y_train.values.reshape(-1, 1)).flatten() relative_errors = np.abs((y_train_unscaled - y_pred_train_unscaled) / y_pred_train_unscaled) error_threshold = np.percentile(relative_errors, (1 - alpha) * 100) y_pred_test = model.predict(X_test) y_pred_test_unscaled = scaler_y.inverse_transform(y_pred_test.reshape(-1, 1)).flatten() lower_bound_unscaled = y_pred_test_unscaled * (1 - error_threshold) upper_bound_unscaled = y_pred_test_unscaled * (1 + error_threshold) return y_pred_test_unscaled, lower_bound_unscaled, upper_bound_unscaled def plot_results(dates, y, fair_values, lower_bound, upper_bound, scaler_y): y_unscaled = scaler_y.inverse_transform(y.values.reshape(-1, 1)).flatten() fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02, row_heights=[0.7, 0.3]) fig.add_trace(go.Scatter(x=dates, y=y_unscaled, mode='lines', name='Actual Price', line=dict(color='blue')), row=1, col=1) fig.add_trace(go.Scatter(x=dates, y=fair_values, mode='lines', name='Fair Value', line=dict(color='red')), row=1, col=1) fig.add_trace(go.Scatter(x=dates, y=upper_bound, mode='lines', name='Upper Bound', line=dict(color='gray', width=0)), row=1, col=1) fig.add_trace(go.Scatter(x=dates, y=lower_bound, mode='lines', name='Lower Bound', line=dict(color='gray', width=0), fill='tonexty'), row=1, col=1) percent_error = ((fair_values - y_unscaled) / y_unscaled) * 100 fig.add_trace(go.Scatter(x=dates, y=percent_error, mode='lines', name='Percent Error', line=dict(color='purple')), row=2, col=1) fig.update_layout(height=800, title_text="Stock Price, Fair Value, and Percent Error") fig.update_xaxes(title_text="Date", row=2, col=1) fig.update_yaxes(title_text="Price", row=1, col=1) fig.update_yaxes(title_text="Percent Error", row=2, col=1) return fig def get_monthly_seasonality(ticker, start_date, end_date): data = yf.download(ticker, start=start_date, end=end_date) monthly_data = data['Adj Close'].resample('M').last() monthly_returns = monthly_data.pct_change() monthly_returns = monthly_returns.to_frame() monthly_returns['Month'] = monthly_returns.index.month seasonality = monthly_returns.groupby('Month')['Adj Close'].agg(['mean', 'median', 'count', lambda x: (x > 0).mean()]) seasonality.columns = ['Mean Change%', 'Median Change%', 'Count', 'Positive Periods'] return seasonality def plot_monthly_seasonality(seasonality, ticker, start_date, end_date): months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] fig = go.Figure() fig.add_trace(go.Bar( x=months, y=seasonality['Positive Periods'] * 100, name='Positive Periods', marker_color=['green' if x > 0.5 else 'red' for x in seasonality['Positive Periods']], text=[f"{seasonality['Positive Periods'][i]*100:.1f}%
{seasonality['Mean Change%'][i]*100:.2f}%" for i in range(1, 13)], textposition='auto' )) fig.add_trace(go.Scatter( x=months, y=seasonality['Mean Change%'] * 100, name='Mean Change%', mode='lines+markers', line=dict(color='yellow', width=2) )) fig.update_layout( title=f'Monthly Seasonality for {ticker}
{start_date} to {end_date}', xaxis_title='Month', yaxis_title='Percentage', template='plotly_dark', showlegend=True, legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), height=600, margin=dict(l=50, r=50, t=100, b=50) ) fig.add_hline(y=50, line_dash="dash", line_color="gray") fig.add_hline(y=0, line_dash="dash", line_color="gray") fig.update_yaxes(ticksuffix="%", range=[0, 100]) return fig def prepare_financial_data_for_gpt(financial_data): def format_financial_data(data, report_type): formatted_data = f"{report_type} (Last 5 Years):\n" if report_type in data: reports = data[report_type].get('annualReports', [])[:5] for report in reports: formatted_data += f"Fiscal Date Ending: {report.get('fiscalDateEnding', 'N/A')}\n" for key, value in report.items(): if key != 'fiscalDateEnding': formatted_data += f"{key}: {value}\n" formatted_data += "\n" return formatted_data income_statement = format_financial_data(financial_data, 'INCOME_STATEMENT') balance_sheet = format_financial_data(financial_data, 'BALANCE_SHEET') cash_flow = format_financial_data(financial_data, 'CASH_FLOW') return f"{income_statement}\n{balance_sheet}\n{cash_flow}" def get_gpt_analysis(ticker, financial_data): formatted_data = prepare_financial_data_for_gpt(financial_data) prompt = f"Analyze the following financial data for {ticker} and provide insights:\n\n{formatted_data}" try: response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": "You are a financial analyst."}, {"role": "user", "content": prompt} ], max_tokens=500, n=1, stop=None, temperature=0.5, ) analysis = response.choices[0].message['content'].strip() return analysis except Exception as e: st.error(f"OpenAI API error: {e}") return "GPT Assistant analysis failed. Please check the API integration." def plot_interactive_logarithmic_stock_chart(ticker, start_date, end_date): stock = yf.Ticker(ticker) data = stock.history(start=start_date, end=end_date) x = (data.index - data.index[0]).days y = np.log(data['Close']) slope, intercept = np.polyfit(x, y, 1) future_days = 365 * 10 all_days = np.arange(len(x) + future_days) log_trend = np.exp(intercept + slope * all_days) inner_upper_band = log_trend * 2 inner_lower_band = log_trend / 2 outer_upper_band = log_trend * 4 outer_lower_band = log_trend / 4 extended_dates = pd.date_range(start=data.index[0], periods=len(all_days), freq='D') fig = go.Figure() fig.add_trace(go.Scatter(x=data.index, y=data['Close'], mode='lines', name='Close Price', line=dict(color='blue'))) fig.add_trace(go.Scatter(x=extended_dates, y=log_trend, mode='lines', name='Log Trend', line=dict(color='red'))) fig.add_trace(go.Scatter(x=extended_dates, y=inner_upper_band, mode='lines', name='Inner Upper Band', line=dict(color='green'))) fig.add_trace(go.Scatter(x=extended_dates, y=inner_lower_band, mode='lines', name='Inner Lower Band', line=dict(color='green'))) fig.add_trace(go.Scatter(x=extended_dates, y=outer_upper_band, mode='lines', name='Outer Upper Band', line=dict(color='orange'))) fig.add_trace(go.Scatter(x=extended_dates, y=outer_lower_band, mode='lines', name='Outer Lower Band', line=dict(color='orange'))) fig.update_layout( title=f'{ticker} Stock Price (Logarithmic Scale) with Extended Trend Lines and Outer Bands', xaxis_title='Date', yaxis_title='Price (Log Scale)', yaxis_type="log", legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.8)'), hovermode='x unified', height=800 ) fig.update_xaxes( rangeslider_visible=True, rangeselector=dict( buttons=list([ dict(count=1, label="1m", step="month", stepmode="backward"), dict(count=6, label="6m", step="month", stepmode="backward"), dict(count=1, label="YTD", step="year", stepmode="todate"), dict(count=1, label="1y", step="year", stepmode="backward"), dict(step="all") ]) ) ) return fig def analyze_stock(ticker, start_date, end_date, use_ai_assistant): try: financial_data = get_financial_data(ticker, end_date) earnings_dates = get_earnings_dates(ticker) earnings_data = get_earnings_data(ticker) quarterly_df = process_financial_data(financial_data, earnings_dates, earnings_data) stock_df = get_stock_data(ticker, start_date, end_date) if quarterly_df.empty: st.error("No financial data available for processing.") return None X_scaled, y_scaled, dates, scaler_X, scaler_y = prepare_data(quarterly_df, stock_df, end_date) X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42) model = train_catboost_model(X_train, X_test, y_train, y_test) r2 = evaluate_model(model, X_test, y_test, scaler_y) if r2 < 0.5: st.warning("Model performance is poor. Results may not be reliable.") fair_values, lower_bound, upper_bound = conformal_prediction(model, X_train, y_train, X_scaled, scaler_y) fig = plot_results(dates, y_scaled, fair_values, lower_bound, upper_bound, scaler_y) feature_importance = model.feature_importances_ feature_importance_df = pd.DataFrame({'feature': X_scaled.columns, 'importance': feature_importance}) feature_importance_df = feature_importance_df.sort_values('importance', ascending=False) explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_scaled) shap_fig, ax = plt.subplots(figsize=(10, 6)) shap.summary_plot(shap_values, X_scaled, plot_type="bar", show=False) plt.title("SHAP Feature Importance") plt.tight_layout() seasonality = get_monthly_seasonality(ticker, start_date, end_date) seasonality_fig = plot_monthly_seasonality(seasonality, ticker, start_date, end_date) log_chart = plot_interactive_logarithmic_stock_chart(ticker, start_date, end_date) gpt_analysis = get_gpt_analysis(ticker, financial_data) if use_ai_assistant else "AI assistant analysis not requested." latest_close = stock_df['Close'].iloc[-1] latest_fair_value = fair_values[-1] latest_lower_bound = lower_bound[-1] latest_upper_bound = upper_bound[-1] percentage_change = ((latest_fair_value - latest_close) / latest_close) * 100 fair_price_html = f"""

Fair Price Analysis

Current Price: ${latest_close:.2f}

Estimated Fair Value: ${latest_fair_value:.2f}

Price Prediction Range: ${latest_lower_bound:.2f} to ${latest_upper_bound:.2f}

R-squared Score: {r2:.4f}

Top 10 most important features for fair value prediction:

{feature_importance_df.head(10).to_string(index=False)}
""" current_month = datetime.now().month next_month = (current_month % 12) + 1 current_month_return = seasonality.loc[current_month, 'Mean Change%'] * 100 next_month_return = seasonality.loc[next_month, 'Mean Change%'] * 100 current_month_win_rate = seasonality.loc[current_month, 'Positive Periods'] * 100 next_month_win_rate = seasonality.loc[next_month, 'Positive Periods'] * 100 seasonality_html = f"""

Seasonality Analysis ({start_date} to {end_date})

Current month ({datetime.now().strftime('%B')}):

Average return: {current_month_return:.2f}%

Probability of positive return: {current_month_win_rate:.1f}%

Next month ({(datetime.now() + timedelta(days=31)).strftime('%B')}):

Average return: {next_month_return:.2f}%

Probability of positive return: {next_month_win_rate:.1f}%

""" return { 'fair_price_html': fair_price_html, 'fig': fig, 'shap_fig': shap_fig, 'seasonality_fig': seasonality_fig, 'seasonality_html': seasonality_html, 'gpt_analysis': gpt_analysis, 'log_chart': log_chart, 'feature_importance_df': feature_importance_df.head(10), 'percentage_change': percentage_change } except Exception as e: st.error(f"An error occurred: {str(e)}") return None def main(): st.title("Advanced Stock Analysis App") st.markdown("Enter a stock ticker and date range to perform comprehensive stock analysis.") col1, col2, col3, col4 = st.columns([2,2,2,1]) with col1: ticker = st.text_input("Stock Ticker", value="MSFT") with col2: start_date = st.date_input("Start Date", value=datetime(2015, 1, 1)) with col3: end_date = st.date_input("End Date", value=datetime.now()) with col4: use_ai_assistant = st.checkbox("Use AI Assistant") if st.button("Analyze Stock", key="analyze_button"): with st.spinner('Analyzing stock data...'): results = analyze_stock(ticker, start_date, end_date, use_ai_assistant) if results: st.header("Fair Price Analysis") st.markdown(results['fair_price_html'], unsafe_allow_html=True) st.subheader("Fair Price Prediction") st.plotly_chart(results['fig'], use_container_width=True) col1, col2 = st.columns(2) with col1: st.subheader("SHAP Feature Importance") st.pyplot(results['shap_fig']) with col2: st.subheader("Top 10 Important Features") st.dataframe(results['feature_importance_df'], height=400) st.subheader("Monthly Seasonality") st.plotly_chart(results['seasonality_fig'], use_container_width=True) st.markdown(results['seasonality_html'], unsafe_allow_html=True) if results['gpt_analysis'] != "AI assistant analysis not requested.": st.subheader("AI Assistant Analysis") st.text_area("Analysis", value=results['gpt_analysis'], height=300) st.subheader("Logarithmic Stock Chart") st.plotly_chart(results['log_chart'], use_container_width=True) if __name__ == "__main__": main()