From 455ffdc556a0c3d01ed4f719b91169ad6563828e Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 9 Feb 2026 03:59:57 +0000 Subject: [PATCH 1/3] Add helper tooltips to research config inputs Co-authored-by: aarjava <218419324+aarjava@users.noreply.github.com> --- .Jules/palette.md | 3 + src/dashboard.py | 387 ++++++++++++++++++++++++++++++---------------- 2 files changed, 255 insertions(+), 135 deletions(-) create mode 100644 .Jules/palette.md diff --git a/.Jules/palette.md b/.Jules/palette.md new file mode 100644 index 0000000..7af09f3 --- /dev/null +++ b/.Jules/palette.md @@ -0,0 +1,3 @@ +## 2025-02-18 - Verifying Streamlit Tooltips +**Learning:** Streamlit tooltips are rendered as `stTooltipIcon` divs that create a separate `stTooltipContent` overlay on hover. Verification requires locating the icon, hovering (often with force or careful mouse movement), and waiting for the content to appear in the overlay. +**Action:** Use `page.locator("[data-testid='stTooltipIcon']").hover()` and verify text in `page.locator("[data-testid='stTooltipContent']")`. Ensure viewport is large enough. diff --git a/src/dashboard.py b/src/dashboard.py index 1f0ecb3..d8ef944 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -20,7 +20,8 @@ except ImportError: import sys import os - sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.modules import data_model, signals, backtester from src.modules.config import ( PRESET_UNIVERSE, @@ -39,9 +40,9 @@ def get_cache_key(*args) -> str: # Initialize session state for caching expensive computations -if 'computed_signals' not in st.session_state: +if "computed_signals" not in st.session_state: st.session_state.computed_signals = {} -if 'backtest_results' not in st.session_state: +if "backtest_results" not in st.session_state: st.session_state.backtest_results = {} @@ -50,11 +51,12 @@ def get_cache_key(*args) -> str: page_title="Quantitative Research Dashboard", page_icon="โ™Ÿ๏ธ", layout="wide", - initial_sidebar_state="expanded" + initial_sidebar_state="expanded", ) # --- CSS Styling --- -st.markdown(""" +st.markdown( + """ -""", unsafe_allow_html=True) +""", + unsafe_allow_html=True, +) # --- Sidebar Inputs --- with st.sidebar: st.title("๐ŸŽ›๏ธ Research Config") - + st.subheader("1. Asset Selection") t_mode = st.radio("Selection Mode", ["Preset Universe", "Custom Ticker"], horizontal=True) - + if t_mode == "Preset Universe": ticker = st.selectbox("Symbol", PRESET_UNIVERSE, index=0) else: ticker = st.text_input("Enter Symbol (Yahoo Finance)", value="NVDA").upper() - + st.subheader("2. Time Horizon") date_mode = st.selectbox("Date Range", ["Last 5 Years", "Last 10 Years", "Max", "Custom"]) - + if date_mode == "Custom": d_col1, d_col2 = st.columns(2) - start_date = d_col1.date_input("Start", value=datetime.today() - timedelta(days=365*2)) + start_date = d_col1.date_input("Start", value=datetime.today() - timedelta(days=365 * 2)) end_date = d_col2.date_input("End", value=datetime.today()) period_arg = "max" else: @@ -97,33 +101,58 @@ def get_cache_key(*args) -> str: st.subheader("3. Signal Parameters") sma_window = st.slider( - "Trend SMA Window", 10, 200, DEFAULT_SMA_WINDOW, 10, - help="Lookback days for Simple Moving Average trend signal." + "Trend SMA Window", + 10, + 200, + DEFAULT_SMA_WINDOW, + 10, + help="Lookback days for Simple Moving Average trend signal.", ) mom_window = st.slider( - "Momentum Lookback (Months)", 1, 24, DEFAULT_MOMENTUM_WINDOW, 1, - help="Lookback months for Momentum signal." + "Momentum Lookback (Months)", + 1, + 24, + DEFAULT_MOMENTUM_WINDOW, + 1, + help="Lookback months for Momentum signal.", ) - + st.markdown("---") st.subheader("4. Research Rigor") use_oos = st.toggle( - "Out-of-Sample Mode", + "Out-of-Sample Mode", value=False, - help="Uses expanding-window quantiles for regime classification to avoid look-ahead bias. Enable for rigorous backtesting." + help="Uses expanding-window quantiles for regime classification to avoid look-ahead bias. Enable for rigorous backtesting.", ) if use_oos: st.success("โœ“ Look-ahead bias removed") else: st.info("Using full-sample quantiles (exploratory mode)") - + vol_q_high = st.slider( - "High Volatility Quantile", 0.5, 0.95, DEFAULT_VOL_QUANTILE_HIGH, 0.05 + "High Volatility Quantile", + 0.5, + 0.95, + DEFAULT_VOL_QUANTILE_HIGH, + 0.05, + help="Threshold for identifying High Volatility regimes. e.g., 0.80 means the top 20% most volatile days.", ) - + st.subheader("5. Backtest Settings") - bt_cost = st.number_input("Transaction Cost (bps)", value=DEFAULT_COST_BPS, step=1) / 10000 - allow_short = st.checkbox("Allow Short Selling?", value=False) + bt_cost = ( + st.number_input( + "Transaction Cost (bps)", + value=DEFAULT_COST_BPS, + step=1, + help="Trading friction per trade. 10 bps = 0.10%.", + ) + / 10000 + ) + allow_short = st.checkbox( + "Allow Short Selling?", + value=False, + help="If checked, strategy will short sell when trend is negative.", + ) # --- Data Ingestion --- @@ -142,7 +171,9 @@ def get_cache_key(*args) -> str: df = raw_df.copy() if len(df) < MIN_DATA_POINTS: - st.warning(f"Not enough data points for selected range/period (need at least {MIN_DATA_POINTS}).") + st.warning( + f"Not enough data points for selected range/period (need at least {MIN_DATA_POINTS})." + ) st.stop() # --- Signal Calculation (with session state caching) --- @@ -150,7 +181,9 @@ def get_cache_key(*args) -> str: if signal_cache_key not in st.session_state.computed_signals: with st.spinner("Computing technical indicators..."): - computed_df = signals.add_technical_indicators(df, sma_window=sma_window, mom_window=mom_window) + computed_df = signals.add_technical_indicators( + df, sma_window=sma_window, mom_window=mom_window + ) st.session_state.computed_signals[signal_cache_key] = computed_df df = st.session_state.computed_signals[signal_cache_key].copy() @@ -158,51 +191,72 @@ def get_cache_key(*args) -> str: # --- Regime Detection --- # Using 21-day annualized vol with option for out-of-sample analysis df = signals.detect_volatility_regime( - df, - vol_col='Vol_21d', - quantile_high=vol_q_high, + df, + vol_col="Vol_21d", + quantile_high=vol_q_high, quantile_low=0.25, - use_expanding=use_oos # Toggle between in-sample and out-of-sample + use_expanding=use_oos, # Toggle between in-sample and out-of-sample ) # --- Dashboard Header --- st.markdown("## ๐Ÿ” Research Question") -st.markdown("> **How sensitive is trend-following performance to volatility regimes in US equities?**") +st.markdown( + "> **How sensitive is trend-following performance to volatility regimes in US equities?**" +) latest = df.iloc[-1] prev = df.iloc[-2] -chg_pct = latest['Daily_Return'] +chg_pct = latest["Daily_Return"] h1, h2, h3, h4 = st.columns(4) h1.metric("Asset", f"{ticker} (${latest['Close']:.2f})", f"{chg_pct:.2%}") -h2.metric("Current Regime", latest['Vol_Regime']) +h2.metric("Current Regime", latest["Vol_Regime"]) h3.metric(f"Volatility ({vol_q_high:.0%}-tile)", f"{latest['Vol_21d']:.2%}") -h4.metric("Trend Status", "BULLISH" if latest['Close'] > latest[f'SMA_{sma_window}'] else "BEARISH") +h4.metric("Trend Status", "BULLISH" if latest["Close"] > latest[f"SMA_{sma_window}"] else "BEARISH") # --- Tabs --- -tab_ov, tab_regime, tab_bt, tab_rep = st.tabs(["๐Ÿ“ˆ Overview", "๐ŸŒช๏ธ Regime Analysis", "๐Ÿงช Backtest Engine", "๐Ÿ“„ Report"]) +tab_ov, tab_regime, tab_bt, tab_rep = st.tabs( + ["๐Ÿ“ˆ Overview", "๐ŸŒช๏ธ Regime Analysis", "๐Ÿงช Backtest Engine", "๐Ÿ“„ Report"] +) # --- TAB 1: OVERVIEW --- with tab_ov: # Interactive Price Chart fig = go.Figure() - fig.add_trace(go.Scatter(x=df.index, y=df['Close'], name='Close Price', line=dict(color='white', width=1))) - fig.add_trace(go.Scatter(x=df.index, y=df[f'SMA_{sma_window}'], name=f'{sma_window}-Day SMA', line=dict(color='#ff9f43', width=1))) - + fig.add_trace( + go.Scatter(x=df.index, y=df["Close"], name="Close Price", line=dict(color="white", width=1)) + ) + fig.add_trace( + go.Scatter( + x=df.index, + y=df[f"SMA_{sma_window}"], + name=f"{sma_window}-Day SMA", + line=dict(color="#ff9f43", width=1), + ) + ) + # Highlight High Volatility Regimes # Filter high vol periods - high_vol_mask = df['Vol_Regime'] == 'High' + high_vol_mask = df["Vol_Regime"] == "High" # We can plot markers or shade areas. Shading is valid but tricky in Plotly without shapes list. # Let's plot points high_vol_pts = df[high_vol_mask] - fig.add_trace(go.Scatter(x=high_vol_pts.index, y=high_vol_pts['Close'], mode='markers', name='High Volatility', marker=dict(color='red', size=2))) - + fig.add_trace( + go.Scatter( + x=high_vol_pts.index, + y=high_vol_pts["Close"], + mode="markers", + name="High Volatility", + marker=dict(color="red", size=2), + ) + ) + fig.update_layout( title=f"{ticker} Price History & Regime Context", yaxis_title="Price ($)", template="plotly_dark", height=500, - hovermode="x unified" + hovermode="x unified", ) st.plotly_chart(fig, use_container_width=True) st.caption("Red dots indicate days classified as 'High Volatility' regime.") @@ -210,200 +264,263 @@ def get_cache_key(*args) -> str: # --- TAB 2: REGIME ANALYSIS --- with tab_regime: st.subheader("Volatility Regime Classification") - + c1, c2 = st.columns(2) with c1: # Scatter: Vol vs Returns needed? Maybe just distribution - fig_hist = px.histogram(df, x="Vol_21d", color="Vol_Regime", nbins=50, title="Volatility Distribution", template="plotly_dark", - color_discrete_map={"High": "#ff4b4b", "Low": "#00ff00", "Normal": "#888888"}) + fig_hist = px.histogram( + df, + x="Vol_21d", + color="Vol_Regime", + nbins=50, + title="Volatility Distribution", + template="plotly_dark", + color_discrete_map={"High": "#ff4b4b", "Low": "#00ff00", "Normal": "#888888"}, + ) st.plotly_chart(fig_hist, use_container_width=True) - + with c2: # Pie chart of time spent in regimes - regime_counts = df['Vol_Regime'].value_counts() - fig_pie = px.pie(values=regime_counts, names=regime_counts.index, title="Time Spent in Regimes", template="plotly_dark", - color=regime_counts.index, color_discrete_map={"High": "#ff4b4b", "Low": "#00ff00", "Normal": "#888888"}) + regime_counts = df["Vol_Regime"].value_counts() + fig_pie = px.pie( + values=regime_counts, + names=regime_counts.index, + title="Time Spent in Regimes", + template="plotly_dark", + color=regime_counts.index, + color_discrete_map={"High": "#ff4b4b", "Low": "#00ff00", "Normal": "#888888"}, + ) st.plotly_chart(fig_pie, use_container_width=True) - + st.markdown("### Regime Characteristics") - stats = df.groupby('Vol_Regime')[['Daily_Return', 'Vol_21d']].mean() + stats = df.groupby("Vol_Regime")[["Daily_Return", "Vol_21d"]].mean() # Annualize return - stats['Ann_Return'] = stats['Daily_Return'] * 252 + stats["Ann_Return"] = stats["Daily_Return"] * 252 st.dataframe(stats.style.format("{:.2%}")) # --- TAB 3: BACKTEST --- with tab_bt: st.subheader("Strategy Simulation") - + # Out-of-sample mode indicator if use_oos: - st.success("๐Ÿ”ฌ **Out-of-Sample Mode Active** - Regime classification uses only past data at each point") - + st.success( + "๐Ÿ”ฌ **Out-of-Sample Mode Active** - Regime classification uses only past data at each point" + ) + # Define Strategy # Trend Following - df['Signal_Trend'] = np.where(df['Close'] > df[f'SMA_{sma_window}'], 1, -1 if allow_short else 0) - - # Run Backtest (with session state caching) - bt_cache_key = get_cache_key( - signal_cache_key, bt_cost, allow_short, use_oos, vol_q_high + df["Signal_Trend"] = np.where( + df["Close"] > df[f"SMA_{sma_window}"], 1, -1 if allow_short else 0 ) - + + # Run Backtest (with session state caching) + bt_cache_key = get_cache_key(signal_cache_key, bt_cost, allow_short, use_oos, vol_q_high) + if bt_cache_key not in st.session_state.backtest_results: with st.spinner("Running backtest simulation..."): - res_df = backtester.run_backtest(df, 'Signal_Trend', cost_bps=bt_cost, rebalance_freq='M') + res_df = backtester.run_backtest( + df, "Signal_Trend", cost_bps=bt_cost, rebalance_freq="M" + ) st.session_state.backtest_results[bt_cache_key] = res_df - + res_df = st.session_state.backtest_results[bt_cache_key] - + if not res_df.empty: # Add Regime to Backtest Results (forward fill valid for analysis) - res_df['Vol_Regime'] = df['Vol_Regime'] - + res_df["Vol_Regime"] = df["Vol_Regime"] + # 1. Global Metrics with Bootstrap CI strat_metrics = backtester.calculate_perf_metrics( - res_df['Equity_Strategy'], - include_bootstrap_ci=True, - n_bootstrap=500 + res_df["Equity_Strategy"], include_bootstrap_ci=True, n_bootstrap=500 ) - bench_metrics = backtester.calculate_perf_metrics(res_df['Equity_Benchmark']) - + bench_metrics = backtester.calculate_perf_metrics(res_df["Equity_Benchmark"]) + col_m1, col_m2, col_m3, col_m4 = st.columns(4) col_m1.metric("Global CAGR", f"{strat_metrics['CAGR']:.2%}") - + # Show Sharpe with CI if available sharpe_display = f"{strat_metrics['Sharpe']:.2f}" - if strat_metrics.get('Sharpe_CI_Lower') is not None: - sharpe_display += f" [{strat_metrics['Sharpe_CI_Lower']:.2f}, {strat_metrics['Sharpe_CI_Upper']:.2f}]" + if strat_metrics.get("Sharpe_CI_Lower") is not None: + sharpe_display += ( + f" [{strat_metrics['Sharpe_CI_Lower']:.2f}, {strat_metrics['Sharpe_CI_Upper']:.2f}]" + ) col_m2.metric("Sharpe (95% CI)", sharpe_display) - + col_m3.metric("Max Drawdown", f"{strat_metrics['MaxDD']:.2%}") col_m4.metric("Max DD Duration", f"{strat_metrics.get('MaxDD_Duration', 0)} days") - + # Additional metrics row col_a1, col_a2, col_a3, col_a4 = st.columns(4) col_a1.metric("Sortino", f"{strat_metrics.get('Sortino', 0):.2f}") col_a2.metric("Calmar", f"{strat_metrics.get('Calmar', 0):.2f}") col_a3.metric("Win Rate", f"{strat_metrics.get('WinRate', 0):.1%}") col_a4.metric("Avg DD Duration", f"{strat_metrics.get('AvgDD_Duration', 0):.0f} days") - + # 2. Equity Curve fig_eq = go.Figure() - fig_eq.add_trace(go.Scatter(x=res_df.index, y=res_df['Equity_Strategy'], name='Trend Strategy', line=dict(color='#00ff00'))) - fig_eq.add_trace(go.Scatter(x=res_df.index, y=res_df['Equity_Benchmark'], name='Buy & Hold', line=dict(color='gray', dash='dot'))) + fig_eq.add_trace( + go.Scatter( + x=res_df.index, + y=res_df["Equity_Strategy"], + name="Trend Strategy", + line=dict(color="#00ff00"), + ) + ) + fig_eq.add_trace( + go.Scatter( + x=res_df.index, + y=res_df["Equity_Benchmark"], + name="Buy & Hold", + line=dict(color="gray", dash="dot"), + ) + ) fig_eq.update_layout(title="Equity Curve", template="plotly_dark", height=400) st.plotly_chart(fig_eq, use_container_width=True) - + # 3. Drawdown Chart with st.expander("๐Ÿ“‰ Drawdown Analysis", expanded=False): fig_dd = go.Figure() - fig_dd.add_trace(go.Scatter( - x=res_df.index, y=res_df['DD_Strategy'] * 100, - name='Strategy Drawdown', fill='tozeroy', - line=dict(color='#ff4b4b') - )) - fig_dd.add_trace(go.Scatter( - x=res_df.index, y=res_df['DD_Benchmark'] * 100, - name='Benchmark Drawdown', - line=dict(color='gray', dash='dot') - )) + fig_dd.add_trace( + go.Scatter( + x=res_df.index, + y=res_df["DD_Strategy"] * 100, + name="Strategy Drawdown", + fill="tozeroy", + line=dict(color="#ff4b4b"), + ) + ) + fig_dd.add_trace( + go.Scatter( + x=res_df.index, + y=res_df["DD_Benchmark"] * 100, + name="Benchmark Drawdown", + line=dict(color="gray", dash="dot"), + ) + ) fig_dd.update_layout( title="Underwater Equity (Drawdown %)", yaxis_title="Drawdown (%)", template="plotly_dark", - height=300 + height=300, ) st.plotly_chart(fig_dd, use_container_width=True) - + # 4. Conditional Analysis st.markdown("### ๐Ÿ”ฌ Conditional Performance by Regime") st.info("Does the strategy outperform during High Volatility?") - - cond_stats = backtester.calculate_conditional_stats(res_df, 'Strategy_Net_Return', 'Vol_Regime') - + + cond_stats = backtester.calculate_conditional_stats( + res_df, "Strategy_Net_Return", "Vol_Regime" + ) + # Add Benchmark Conditional Stats for comparison - bench_cond = backtester.calculate_conditional_stats(res_df, 'Daily_Return', 'Vol_Regime') - + bench_cond = backtester.calculate_conditional_stats(res_df, "Daily_Return", "Vol_Regime") + # Merge - comparison = pd.concat([cond_stats.add_suffix('_Strat'), bench_cond.add_suffix('_Bench')], axis=1) - + comparison = pd.concat( + [cond_stats.add_suffix("_Strat"), bench_cond.add_suffix("_Bench")], axis=1 + ) + # Reorder columns - handle missing columns gracefully available_cols = [] - for col in ['Ann_Return_Strat', 'Ann_Return_Bench', 'Sharpe_Strat', 'Sharpe_Bench', 'WinRate_Strat']: + for col in [ + "Ann_Return_Strat", + "Ann_Return_Bench", + "Sharpe_Strat", + "Sharpe_Bench", + "WinRate_Strat", + ]: if col in comparison.columns: available_cols.append(col) comparison = comparison[available_cols] - - st.dataframe(comparison.style.background_gradient(cmap='RdYlGn', subset=['Ann_Return_Strat', 'Sharpe_Strat']).format("{:.2f}")) - - st.markdown("**Key Insight:** Compare 'Sharpe_Strat' vs 'Sharpe_Bench' in the **High** volatility row.") - + + st.dataframe( + comparison.style.background_gradient( + cmap="RdYlGn", subset=["Ann_Return_Strat", "Sharpe_Strat"] + ).format("{:.2f}") + ) + + st.markdown( + "**Key Insight:** Compare 'Sharpe_Strat' vs 'Sharpe_Bench' in the **High** volatility row." + ) + # 5. Walk-Forward Validation (Advanced) with st.expander("๐Ÿš€ Walk-Forward Validation (Advanced)", expanded=False): st.markdown(""" Walk-forward validation splits data into rolling train/test windows to evaluate out-of-sample performance. This is more rigorous than a single full-sample backtest. """) - + wf_col1, wf_col2 = st.columns(2) - wf_train = wf_col1.number_input("Training Window (months)", value=24, min_value=6, max_value=60) - wf_test = wf_col2.number_input("Test Window (months)", value=6, min_value=1, max_value=12) - + wf_train = wf_col1.number_input( + "Training Window (months)", value=24, min_value=6, max_value=60 + ) + wf_test = wf_col2.number_input( + "Test Window (months)", value=6, min_value=1, max_value=12 + ) + if st.button("Run Walk-Forward Analysis"): with st.spinner("Running walk-forward validation..."): wf_results = backtester.walk_forward_backtest( - df, 'Signal_Trend', + df, + "Signal_Trend", train_months=wf_train, test_months=wf_test, cost_bps=bt_cost, - rebalance_freq='M' + rebalance_freq="M", ) - + if wf_results: st.success(f"โœ… Completed {wf_results['n_periods']} walk-forward periods") - - wf_summary = wf_results['summary'] + + wf_summary = wf_results["summary"] wf_c1, wf_c2, wf_c3 = st.columns(3) wf_c1.metric("OOS CAGR", f"{wf_summary.get('CAGR', 0):.2%}") wf_c2.metric("OOS Sharpe", f"{wf_summary.get('Sharpe', 0):.2f}") wf_c3.metric("OOS Max DD", f"{wf_summary.get('MaxDD', 0):.2%}") - + # Show per-period results st.markdown("#### Per-Period Results") period_data = [] - for p in wf_results['periods']: - period_data.append({ - 'Test Period': f"{p['test_start']} to {p['test_end']}", - 'CAGR': p['metrics'].get('CAGR', 0), - 'Sharpe': p['metrics'].get('Sharpe', 0), - 'MaxDD': p['metrics'].get('MaxDD', 0) - }) - st.dataframe(pd.DataFrame(period_data).style.format({ - 'CAGR': '{:.2%}', - 'Sharpe': '{:.2f}', - 'MaxDD': '{:.2%}' - })) + for p in wf_results["periods"]: + period_data.append( + { + "Test Period": f"{p['test_start']} to {p['test_end']}", + "CAGR": p["metrics"].get("CAGR", 0), + "Sharpe": p["metrics"].get("Sharpe", 0), + "MaxDD": p["metrics"].get("MaxDD", 0), + } + ) + st.dataframe( + pd.DataFrame(period_data).style.format( + {"CAGR": "{:.2%}", "Sharpe": "{:.2f}", "MaxDD": "{:.2%}"} + ) + ) else: - st.warning("Insufficient data for walk-forward validation with current settings.") + st.warning( + "Insufficient data for walk-forward validation with current settings." + ) # --- TAB 4: REPORT --- with tab_rep: st.subheader("Research Note Generation") - + st.markdown("### Findings Summary") st.write(f"**Asset**: {ticker}") st.write(f"**Trend Model**: {sma_window}-Day SMA") - + if not res_df.empty: # Create text summary - high_vol_perf = cond_stats.loc['High', 'Sharpe'] if 'High' in cond_stats.index else 0 - normal_vol_perf = cond_stats.loc['Normal', 'Sharpe'] if 'Normal' in cond_stats.index else 0 - + high_vol_perf = cond_stats.loc["High", "Sharpe"] if "High" in cond_stats.index else 0 + normal_vol_perf = cond_stats.loc["Normal", "Sharpe"] if "Normal" in cond_stats.index else 0 + st.success(f"Strategy Sharpe in High Vol: **{high_vol_perf:.2f}**") st.info(f"Strategy Sharpe in Normal Vol: **{normal_vol_perf:.2f}**") - + st.download_button( label="Download Full Research Data (CSV)", - data=res_df.to_csv().encode('utf-8'), + data=res_df.to_csv().encode("utf-8"), file_name=f"{ticker}_research_data.csv", - mime="text/csv" + mime="text/csv", ) From 92b06f95a6607d2c10cc0a68a63b9ea49e4867c5 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 9 Feb 2026 04:09:31 +0000 Subject: [PATCH 2/3] Add helper tooltips to research config inputs and fix repo-wide formatting Co-authored-by: aarjava <218419324+aarjava@users.noreply.github.com> --- src/dashboard.py | 47 ++--- src/import_prices.py | 20 ++- src/modules/__init__.py | 3 +- src/modules/backtester.py | 295 ++++++++++++++++---------------- src/modules/config.py | 7 +- src/modules/data_model.py | 40 ++--- src/modules/signals.py | 120 ++++++------- src/modules/signals_advanced.py | 174 +++++++++---------- tests/test_backtester.py | 195 ++++++++++----------- tests/test_data_model.py | 158 ++++++++--------- tests/test_signals.py | 100 ++++++----- tests/test_signals_advanced.py | 205 +++++++++++----------- verify_logic.py | 53 +++--- 13 files changed, 698 insertions(+), 719 deletions(-) diff --git a/src/dashboard.py b/src/dashboard.py index d8ef944..1ce5eef 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -1,35 +1,36 @@ -import streamlit as st -import pandas as pd +import hashlib +from datetime import datetime, timedelta + import numpy as np -import plotly.graph_objects as go +import pandas as pd import plotly.express as px -from datetime import datetime, timedelta -import hashlib +import plotly.graph_objects as go +import streamlit as st # Import custom modules try: - from modules import data_model, signals, backtester + from modules import backtester, data_model, signals from modules.config import ( - PRESET_UNIVERSE, - DEFAULT_SMA_WINDOW, + DEFAULT_COST_BPS, DEFAULT_MOMENTUM_WINDOW, + DEFAULT_SMA_WINDOW, DEFAULT_VOL_QUANTILE_HIGH, - DEFAULT_COST_BPS, MIN_DATA_POINTS, + PRESET_UNIVERSE, ) except ImportError: - import sys import os + import sys sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - from src.modules import data_model, signals, backtester + from src.modules import backtester, data_model, signals from src.modules.config import ( - PRESET_UNIVERSE, - DEFAULT_SMA_WINDOW, + DEFAULT_COST_BPS, DEFAULT_MOMENTUM_WINDOW, + DEFAULT_SMA_WINDOW, DEFAULT_VOL_QUANTILE_HIGH, - DEFAULT_COST_BPS, MIN_DATA_POINTS, + PRESET_UNIVERSE, ) @@ -224,14 +225,16 @@ def get_cache_key(*args) -> str: # Interactive Price Chart fig = go.Figure() fig.add_trace( - go.Scatter(x=df.index, y=df["Close"], name="Close Price", line=dict(color="white", width=1)) + go.Scatter( + x=df.index, y=df["Close"], name="Close Price", line={"color": "white", "width": 1} + ) ) fig.add_trace( go.Scatter( x=df.index, y=df[f"SMA_{sma_window}"], name=f"{sma_window}-Day SMA", - line=dict(color="#ff9f43", width=1), + line={"color": "#ff9f43", "width": 1}, ) ) @@ -247,7 +250,7 @@ def get_cache_key(*args) -> str: y=high_vol_pts["Close"], mode="markers", name="High Volatility", - marker=dict(color="red", size=2), + marker={"color": "red", "size": 2}, ) ) @@ -364,7 +367,7 @@ def get_cache_key(*args) -> str: x=res_df.index, y=res_df["Equity_Strategy"], name="Trend Strategy", - line=dict(color="#00ff00"), + line={"color": "#00ff00"}, ) ) fig_eq.add_trace( @@ -372,7 +375,7 @@ def get_cache_key(*args) -> str: x=res_df.index, y=res_df["Equity_Benchmark"], name="Buy & Hold", - line=dict(color="gray", dash="dot"), + line={"color": "gray", "dash": "dot"}, ) ) fig_eq.update_layout(title="Equity Curve", template="plotly_dark", height=400) @@ -387,7 +390,7 @@ def get_cache_key(*args) -> str: y=res_df["DD_Strategy"] * 100, name="Strategy Drawdown", fill="tozeroy", - line=dict(color="#ff4b4b"), + line={"color": "#ff4b4b"}, ) ) fig_dd.add_trace( @@ -395,7 +398,7 @@ def get_cache_key(*args) -> str: x=res_df.index, y=res_df["DD_Benchmark"] * 100, name="Benchmark Drawdown", - line=dict(color="gray", dash="dot"), + line={"color": "gray", "dash": "dot"}, ) ) fig_dd.update_layout( @@ -448,7 +451,7 @@ def get_cache_key(*args) -> str: # 5. Walk-Forward Validation (Advanced) with st.expander("๐Ÿš€ Walk-Forward Validation (Advanced)", expanded=False): st.markdown(""" - Walk-forward validation splits data into rolling train/test windows to evaluate + Walk-forward validation splits data into rolling train/test windows to evaluate out-of-sample performance. This is more rigorous than a single full-sample backtest. """) diff --git a/src/import_prices.py b/src/import_prices.py index 9bfcab9..51e1c89 100644 --- a/src/import_prices.py +++ b/src/import_prices.py @@ -1,38 +1,40 @@ from modules import data_model, signals + def analyze_stock(ticker_symbol, period="1y"): """ Demonstrates using the new modular architecture. """ print(f"\n--- Analyzing {ticker_symbol} ---") - + # 1. Use the data model df = data_model.fetch_stock_data(ticker_symbol, period=period) - + if df.empty: print(f"No data found for {ticker_symbol}") return None # 2. Use the signals module df = signals.add_technical_indicators(df) - + latest = df.iloc[-1] - + print(f"Current Price: ${latest['Close']:.2f}") print(f"50-Day SMA: ${latest['SMA_50']:.2f}") print(f"Momentum: {latest['Momentum_12M_1M']:.2%}") print(f"RSI (14): {latest['RSI_14']:.2f}") - - if latest['Close'] > latest['SMA_50']: + + if latest["Close"] > latest["SMA_50"]: print("Trend: BULLISH ๐Ÿ‚") else: print("Trend: BEARISH ๐Ÿป") - + return df + if __name__ == "__main__": # Add your favorite stocks here! portfolio = ["AAPL", "MSFT", "GOOGL", "TSLA"] - + for ticker in portfolio: - analyze_stock(ticker) \ No newline at end of file + analyze_stock(ticker) diff --git a/src/modules/__init__.py b/src/modules/__init__.py index a3b58e6..02149cd 100644 --- a/src/modules/__init__.py +++ b/src/modules/__init__.py @@ -1,3 +1,2 @@ # HedgeFund Dashboard - Core Modules -from . import data_model, signals, signals_advanced, backtester, config - +from . import backtester, config, data_model, signals, signals_advanced diff --git a/src/modules/backtester.py b/src/modules/backtester.py index 3a989c0..4a21b11 100644 --- a/src/modules/backtester.py +++ b/src/modules/backtester.py @@ -10,11 +10,12 @@ - Walk-forward validation """ -import pandas as pd -import numpy as np import logging -from typing import Literal, Optional, Dict, Any, Tuple, List from dataclasses import dataclass +from typing import Any, Dict, List, Literal, Optional, Tuple + +import numpy as np +import pandas as pd try: from .config import TRADING_DAYS_PER_YEAR @@ -28,6 +29,7 @@ @dataclass class PerformanceMetrics: """Container for performance metrics with confidence intervals.""" + cagr: float volatility: float sharpe: float @@ -39,7 +41,7 @@ class PerformanceMetrics: avg_dd_duration: float = 0.0 # Days calmar: float = 0.0 win_rate: float = 0.0 - + def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for backward compatibility.""" return { @@ -53,52 +55,52 @@ def to_dict(self) -> Dict[str, Any]: "MaxDD_Duration": self.max_dd_duration, "AvgDD_Duration": self.avg_dd_duration, "Calmar": self.calmar, - "WinRate": self.win_rate + "WinRate": self.win_rate, } def bootstrap_sharpe_ci( - returns: pd.Series, - n_bootstrap: int = 1000, + returns: pd.Series, + n_bootstrap: int = 1000, confidence_level: float = 0.95, - random_state: Optional[int] = None + random_state: Optional[int] = None, ) -> Tuple[float, float]: """ Calculate bootstrap confidence interval for Sharpe ratio. - + Args: returns: Series of daily returns. n_bootstrap: Number of bootstrap samples. confidence_level: Confidence level (e.g., 0.95 for 95% CI). random_state: Random seed for reproducibility. - + Returns: Tuple of (lower_bound, upper_bound) for the CI. """ if len(returns) < 30: logger.warning("Insufficient data for reliable bootstrap CI (n < 30)") return (np.nan, np.nan) - + rng = np.random.default_rng(random_state) sharpes = [] - + returns_arr = returns.dropna().values n = len(returns_arr) - + for _ in range(n_bootstrap): sample = rng.choice(returns_arr, size=n, replace=True) sample_std = sample.std() if sample_std > 0: sample_sharpe = sample.mean() / sample_std * np.sqrt(TRADING_DAYS_PER_YEAR) sharpes.append(sample_sharpe) - + if not sharpes: return (np.nan, np.nan) - + alpha = 1 - confidence_level lower = np.percentile(sharpes, alpha / 2 * 100) upper = np.percentile(sharpes, (1 - alpha / 2) * 100) - + logger.debug(f"Bootstrap Sharpe CI ({confidence_level:.0%}): [{lower:.3f}, {upper:.3f}]") return (lower, upper) @@ -106,23 +108,23 @@ def bootstrap_sharpe_ci( def calculate_drawdown_duration(equity_curve: pd.Series) -> Tuple[int, float]: """ Calculate maximum and average drawdown duration. - + Args: equity_curve: Series of equity values (cumulative returns). - + Returns: Tuple of (max_duration_days, avg_duration_days). """ if equity_curve.empty: return (0, 0.0) - + rolling_max = equity_curve.cummax() underwater = equity_curve < rolling_max - + # Find contiguous underwater periods underwater_periods = [] current_duration = 0 - + for is_underwater in underwater: if is_underwater: current_duration += 1 @@ -130,36 +132,36 @@ def calculate_drawdown_duration(equity_curve: pd.Series) -> Tuple[int, float]: if current_duration > 0: underwater_periods.append(current_duration) current_duration = 0 - + # Don't forget the last period if still underwater if current_duration > 0: underwater_periods.append(current_duration) - + if not underwater_periods: return (0, 0.0) - + max_duration = max(underwater_periods) avg_duration = sum(underwater_periods) / len(underwater_periods) - + logger.debug(f"Drawdown durations: max={max_duration} days, avg={avg_duration:.1f} days") return (max_duration, avg_duration) def calculate_perf_metrics( - equity_curve: pd.Series, + equity_curve: pd.Series, freq: int = TRADING_DAYS_PER_YEAR, include_bootstrap_ci: bool = False, - n_bootstrap: int = 1000 + n_bootstrap: int = 1000, ) -> Dict[str, Any]: """ Calculate comprehensive performance metrics. - + Args: equity_curve: Series of equity values (starting from 1.0). freq: Trading days per year for annualization. include_bootstrap_ci: Whether to compute bootstrap CI for Sharpe. n_bootstrap: Number of bootstrap samples. - + Returns: Dictionary of performance metrics. """ @@ -169,7 +171,7 @@ def calculate_perf_metrics( # Returns daily_rets = equity_curve.pct_change().dropna() - + # Total Time try: years = (equity_curve.index[-1] - equity_curve.index[0]).days / 365.25 @@ -180,11 +182,11 @@ def calculate_perf_metrics( logger.warning(f"Could not calculate period: {e}") years = 1 - total_return = equity_curve.iloc[-1] - 1 + equity_curve.iloc[-1] - 1 cagr = (equity_curve.iloc[-1]) ** (1 / years) - 1 # Volatility - ann_vol = daily_rets.std() * (freq ** 0.5) + ann_vol = daily_rets.std() * (freq**0.5) # Sharpe (Assume 0% risk free for simplicity) sharpe = cagr / ann_vol if ann_vol != 0 else 0 @@ -192,32 +194,28 @@ def calculate_perf_metrics( # Bootstrap CI for Sharpe sharpe_ci_lower, sharpe_ci_upper = None, None if include_bootstrap_ci: - sharpe_ci_lower, sharpe_ci_upper = bootstrap_sharpe_ci( - daily_rets, n_bootstrap=n_bootstrap - ) + sharpe_ci_lower, sharpe_ci_upper = bootstrap_sharpe_ci(daily_rets, n_bootstrap=n_bootstrap) # Sortino (Downside deviation) downside_rets = daily_rets[daily_rets < 0] - downside_dev = downside_rets.std() * (freq ** 0.5) if len(downside_rets) > 0 else 0 + downside_dev = downside_rets.std() * (freq**0.5) if len(downside_rets) > 0 else 0 sortino = cagr / downside_dev if downside_dev != 0 else 0 # Max Drawdown rolling_max = equity_curve.cummax() drawdown = (equity_curve - rolling_max) / rolling_max max_dd = drawdown.min() - + # Drawdown Duration max_dd_duration, avg_dd_duration = calculate_drawdown_duration(equity_curve) - + # Calmar Ratio calmar = cagr / abs(max_dd) if max_dd != 0 else 0 - + # Win Rate (Daily) win_rate = (daily_rets > 0).mean() - logger.info( - f"Performance: CAGR={cagr:.2%}, Sharpe={sharpe:.2f}, MaxDD={max_dd:.2%}" - ) + logger.info(f"Performance: CAGR={cagr:.2%}, Sharpe={sharpe:.2f}, MaxDD={max_dd:.2%}") return { "CAGR": cagr, @@ -230,25 +228,25 @@ def calculate_perf_metrics( "MaxDD_Duration": max_dd_duration, "AvgDD_Duration": avg_dd_duration, "Calmar": calmar, - "WinRate": win_rate + "WinRate": win_rate, } def run_backtest( - df: pd.DataFrame, - signal_col: str, - cost_bps: float = 0.0010, - rebalance_freq: Literal['D', 'W', 'M'] = 'M' + df: pd.DataFrame, + signal_col: str, + cost_bps: float = 0.0010, + rebalance_freq: Literal["D", "W", "M"] = "M", ) -> pd.DataFrame: """ Run a vectorized backtest based on a signal column. - + Args: df: DataFrame with date index, 'Close', 'Daily_Return' and the signal column. signal_col: Name of column with 1 (Long), 0 (Cash), -1 (Short). cost_bps: Cost per trade (e.g., 0.0010 for 10bps). rebalance_freq: 'D' for daily, 'W' for weekly, 'M' for monthly. - + Returns: DataFrame with backtest results including equity curves and drawdowns. """ @@ -256,101 +254,103 @@ def run_backtest( logger.error(f"Invalid input: empty df or missing signal column '{signal_col}'") return pd.DataFrame() - logger.info(f"Running backtest: signal={signal_col}, cost={cost_bps*10000:.0f}bps, freq={rebalance_freq}") - + logger.info( + f"Running backtest: signal={signal_col}, cost={cost_bps*10000:.0f}bps, freq={rebalance_freq}" + ) + bt_df = df.copy() - + # 1. Signal Processing - if rebalance_freq == 'D': + if rebalance_freq == "D": # Daily Rebalance: Position today is determined by Signal yesterday - bt_df['Position'] = bt_df[signal_col].shift(1).fillna(0) - - elif rebalance_freq == 'W': + bt_df["Position"] = bt_df[signal_col].shift(1).fillna(0) + + elif rebalance_freq == "W": # Weekly Rebalance - bt_df['Period'] = bt_df.index.to_period('W') - weekly_signals = bt_df.groupby('Period')[signal_col].last() + bt_df["Period"] = bt_df.index.to_period("W") + weekly_signals = bt_df.groupby("Period")[signal_col].last() weekly_positions = weekly_signals.shift(1) - bt_df['Position'] = bt_df['Period'].map(weekly_positions) - bt_df['Position'] = bt_df['Position'].fillna(0) - - elif rebalance_freq == 'M': + bt_df["Position"] = bt_df["Period"].map(weekly_positions) + bt_df["Position"] = bt_df["Position"].fillna(0) + + elif rebalance_freq == "M": # Monthly Rebalance - bt_df['Period'] = bt_df.index.to_period('M') - monthly_signals = bt_df.groupby('Period')[signal_col].last() + bt_df["Period"] = bt_df.index.to_period("M") + monthly_signals = bt_df.groupby("Period")[signal_col].last() monthly_positions = monthly_signals.shift(1) - bt_df['Position'] = bt_df['Period'].map(monthly_positions) - bt_df['Position'] = bt_df['Position'].fillna(0) + bt_df["Position"] = bt_df["Period"].map(monthly_positions) + bt_df["Position"] = bt_df["Position"].fillna(0) else: logger.error(f"Invalid rebalance frequency: {rebalance_freq}") return pd.DataFrame() - + # 2. Strategy Returns - bt_df['Strategy_Return'] = bt_df['Position'] * bt_df['Daily_Return'] - + bt_df["Strategy_Return"] = bt_df["Position"] * bt_df["Daily_Return"] + # 3. Transaction Costs - bt_df['Position_Change'] = bt_df['Position'].diff().abs().fillna(0) - bt_df['Cost'] = bt_df['Position_Change'] * cost_bps - bt_df['Strategy_Net_Return'] = bt_df['Strategy_Return'] - bt_df['Cost'] - + bt_df["Position_Change"] = bt_df["Position"].diff().abs().fillna(0) + bt_df["Cost"] = bt_df["Position_Change"] * cost_bps + bt_df["Strategy_Net_Return"] = bt_df["Strategy_Return"] - bt_df["Cost"] + # 4. Equity Curves - bt_df['Equity_Benchmark'] = (1 + bt_df['Daily_Return']).cumprod() - bt_df['Equity_Strategy'] = (1 + bt_df['Strategy_Net_Return']).cumprod() - + bt_df["Equity_Benchmark"] = (1 + bt_df["Daily_Return"]).cumprod() + bt_df["Equity_Strategy"] = (1 + bt_df["Strategy_Net_Return"]).cumprod() + # 5. Drawdown Curves - bt_df['DD_Benchmark'] = (bt_df['Equity_Benchmark'] / bt_df['Equity_Benchmark'].cummax()) - 1 - bt_df['DD_Strategy'] = (bt_df['Equity_Strategy'] / bt_df['Equity_Strategy'].cummax()) - 1 - + bt_df["DD_Benchmark"] = (bt_df["Equity_Benchmark"] / bt_df["Equity_Benchmark"].cummax()) - 1 + bt_df["DD_Strategy"] = (bt_df["Equity_Strategy"] / bt_df["Equity_Strategy"].cummax()) - 1 + logger.info( f"Backtest complete: {len(bt_df)} days, " f"Final equity: {bt_df['Equity_Strategy'].iloc[-1]:.2f}" ) - + return bt_df def calculate_conditional_stats( - df: pd.DataFrame, - strategy_col: str, - regime_col: str + df: pd.DataFrame, strategy_col: str, regime_col: str ) -> pd.DataFrame: """ Calculate performance stats conditioned on a regime column. - + Args: df: DataFrame with strategy returns and regime column. strategy_col: Column name of strategy returns. regime_col: Column name of regime classification. - + Returns: DataFrame with metrics per regime. """ if df.empty or regime_col not in df.columns: logger.warning(f"Invalid input for conditional stats: missing '{regime_col}'") return pd.DataFrame() - + regimes = df[regime_col].unique() results = [] - + for reg in regimes: subset = df[df[regime_col] == reg][strategy_col] - + if subset.empty: continue - + avg_ret = subset.mean() * TRADING_DAYS_PER_YEAR - vol = subset.std() * (TRADING_DAYS_PER_YEAR ** 0.5) + vol = subset.std() * (TRADING_DAYS_PER_YEAR**0.5) sharpe = avg_ret / vol if vol != 0 else 0 win_rate = (subset > 0).mean() - - results.append({ - "Regime": reg, - "Ann_Return": avg_ret, - "Volatility": vol, - "Sharpe": sharpe, - "WinRate": win_rate, - "Count": len(subset) - }) - + + results.append( + { + "Regime": reg, + "Ann_Return": avg_ret, + "Volatility": vol, + "Sharpe": sharpe, + "WinRate": win_rate, + "Count": len(subset), + } + ) + logger.debug(f"Conditional stats calculated for {len(results)} regimes") return pd.DataFrame(results).set_index("Regime") @@ -361,15 +361,15 @@ def walk_forward_backtest( train_months: int = 24, test_months: int = 6, cost_bps: float = 0.0010, - rebalance_freq: Literal['D', 'W', 'M'] = 'M' + rebalance_freq: Literal["D", "W", "M"] = "M", ) -> Dict[str, Any]: """ Perform walk-forward validation with rolling training windows. - + This method splits the data into overlapping train/test periods, evaluates the strategy on each out-of-sample test period, and aggregates the results. - + Args: df: DataFrame with date index, 'Close', 'Daily_Return', and signal column. signal_col: Name of column with 1 (Long), 0 (Cash), -1 (Short). @@ -377,7 +377,7 @@ def walk_forward_backtest( test_months: Number of months for test window. cost_bps: Transaction cost in basis points. rebalance_freq: Rebalancing frequency. - + Returns: Dictionary containing: - 'summary': Aggregated performance metrics @@ -387,95 +387,90 @@ def walk_forward_backtest( if df.empty or signal_col not in df.columns: logger.error("Invalid input for walk-forward backtest") return {} - - logger.info( - f"Walk-forward validation: train={train_months}m, test={test_months}m" - ) - + + logger.info(f"Walk-forward validation: train={train_months}m, test={test_months}m") + # Convert to monthly periods for slicing df = df.copy() - df['YearMonth'] = df.index.to_period('M') - unique_months = df['YearMonth'].unique() - + df["YearMonth"] = df.index.to_period("M") + unique_months = df["YearMonth"].unique() + total_months = len(unique_months) min_required = train_months + test_months - + if total_months < min_required: - logger.warning( - f"Insufficient data: {total_months} months < {min_required} required" - ) + logger.warning(f"Insufficient data: {total_months} months < {min_required} required") return {} - + periods_results: List[Dict[str, Any]] = [] all_oos_returns: List[pd.Series] = [] - + # Walk forward through the data start_idx = 0 while start_idx + min_required <= total_months: # Define train and test periods train_end_idx = start_idx + train_months test_end_idx = train_end_idx + test_months - + train_months_range = unique_months[start_idx:train_end_idx] test_months_range = unique_months[train_end_idx:test_end_idx] - + # Filter data - train_mask = df['YearMonth'].isin(train_months_range) - test_mask = df['YearMonth'].isin(test_months_range) - - train_df = df[train_mask].copy() + train_mask = df["YearMonth"].isin(train_months_range) + test_mask = df["YearMonth"].isin(test_months_range) + + df[train_mask].copy() test_df = df[test_mask].copy() - + if len(test_df) == 0: break - + # Run backtest on test period only (signal already generated) bt_results = run_backtest( test_df, signal_col, cost_bps=cost_bps, rebalance_freq=rebalance_freq ) - + if bt_results.empty: start_idx += test_months continue - + # Calculate metrics for this period - period_metrics = calculate_perf_metrics(bt_results['Equity_Strategy']) - - periods_results.append({ - "train_start": str(train_months_range[0]), - "train_end": str(train_months_range[-1]), - "test_start": str(test_months_range[0]), - "test_end": str(test_months_range[-1]), - "metrics": period_metrics - }) - - all_oos_returns.append(bt_results['Strategy_Net_Return']) - + period_metrics = calculate_perf_metrics(bt_results["Equity_Strategy"]) + + periods_results.append( + { + "train_start": str(train_months_range[0]), + "train_end": str(train_months_range[-1]), + "test_start": str(test_months_range[0]), + "test_end": str(test_months_range[-1]), + "metrics": period_metrics, + } + ) + + all_oos_returns.append(bt_results["Strategy_Net_Return"]) + # Slide forward by test_months start_idx += test_months - + if not periods_results: logger.warning("No valid walk-forward periods") return {} - + # Aggregate out-of-sample returns oos_returns = pd.concat(all_oos_returns) oos_equity = (1 + oos_returns).cumprod() - + # Calculate aggregate metrics - aggregate_metrics = calculate_perf_metrics( - oos_equity, include_bootstrap_ci=True - ) - + aggregate_metrics = calculate_perf_metrics(oos_equity, include_bootstrap_ci=True) + logger.info( f"Walk-forward complete: {len(periods_results)} periods, " f"OOS Sharpe={aggregate_metrics.get('Sharpe', 0):.2f}" ) - + return { "summary": aggregate_metrics, "periods": periods_results, "oos_returns": oos_returns, - "n_periods": len(periods_results) + "n_periods": len(periods_results), } - diff --git a/src/modules/config.py b/src/modules/config.py index 14425d8..9d26e43 100644 --- a/src/modules/config.py +++ b/src/modules/config.py @@ -19,16 +19,13 @@ # === Backtest Defaults === DEFAULT_COST_BPS = 10 # In basis points (10 bps = 0.10%) -DEFAULT_REBALANCE_FREQ = 'M' # Monthly +DEFAULT_REBALANCE_FREQ = "M" # Monthly # === Caching === CACHE_TTL_SECONDS = 3600 * 24 # 24 hours # === Asset Universe === -PRESET_UNIVERSE = [ - "SPY", "QQQ", "IWM", "GLD", "TLT", - "XLK", "XLE", "BTC-USD", "ETH-USD" -] +PRESET_UNIVERSE = ["SPY", "QQQ", "IWM", "GLD", "TLT", "XLK", "XLE", "BTC-USD", "ETH-USD"] # === Minimum Data Requirements === MIN_DATA_POINTS = 50 diff --git a/src/modules/data_model.py b/src/modules/data_model.py index 91132a7..5750518 100644 --- a/src/modules/data_model.py +++ b/src/modules/data_model.py @@ -4,12 +4,13 @@ Supports fetching from Yahoo Finance with caching for performance. """ -import yfinance as yf -import pandas as pd -import streamlit as st import logging from typing import Optional +import pandas as pd +import streamlit as st +import yfinance as yf + try: from .config import CACHE_TTL_SECONDS except ImportError: @@ -20,44 +21,40 @@ @st.cache_data(ttl=CACHE_TTL_SECONDS) -def fetch_stock_data( - ticker: str, - period: str = "10y", - interval: str = "1d" -) -> pd.DataFrame: +def fetch_stock_data(ticker: str, period: str = "10y", interval: str = "1d") -> pd.DataFrame: """ Fetch historical OHLCV data from Yahoo Finance with caching. - + Args: ticker: The asset symbol (e.g., 'SPY', 'BTC-USD'). period: Time period string - '1y', '5y', '10y', 'max', etc. interval: Data interval - '1d', '1wk', '1mo'. - + Returns: DataFrame with Date index and columns: Open, High, Low, Close, Volume. Returns empty DataFrame on error. """ logger.info(f"Fetching data for {ticker}, period={period}, interval={interval}") - + try: stock = yf.Ticker(ticker) df = stock.history(period=period, interval=interval) - + if df.empty: logger.warning(f"No data returned for {ticker}") return df - + # Ensure index is datetime if not isinstance(df.index, pd.DatetimeIndex): df.index = pd.to_datetime(df.index) - + # Drop timezone if present for consistency if df.index.tz is not None: df.index = df.index.tz_localize(None) - + logger.info(f"Fetched {len(df)} rows for {ticker}") return df - + except Exception as e: logger.error(f"Error fetching data for {ticker}: {e}") st.error(f"Error fetching data for {ticker}: {e}") @@ -67,10 +64,10 @@ def fetch_stock_data( def validate_ticker(ticker: str) -> bool: """ Validate if a ticker symbol exists and has data. - + Args: ticker: The ticker symbol to validate. - + Returns: True if ticker is valid and has data, False otherwise. """ @@ -78,7 +75,7 @@ def validate_ticker(ticker: str) -> bool: stock = yf.Ticker(ticker) info = stock.info # Check if we got valid info back - return info.get('regularMarketPrice') is not None + return info.get("regularMarketPrice") is not None except Exception as e: logger.debug(f"Ticker validation failed for {ticker}: {e}") return False @@ -87,10 +84,10 @@ def validate_ticker(ticker: str) -> bool: def get_ticker_info(ticker: str) -> Optional[dict]: """ Get basic info about a ticker. - + Args: ticker: The ticker symbol. - + Returns: Dictionary with ticker info or None on error. """ @@ -107,4 +104,3 @@ def get_ticker_info(ticker: str) -> Optional[dict]: except Exception as e: logger.debug(f"Could not get info for {ticker}: {e}") return None - diff --git a/src/modules/signals.py b/src/modules/signals.py index 73dd7a9..8e1d534 100644 --- a/src/modules/signals.py +++ b/src/modules/signals.py @@ -1,13 +1,14 @@ -import pandas as pd +from typing import Optional + import numpy as np -from typing import Literal, Optional +import pandas as pd try: from .config import ( - TRADING_DAYS_PER_YEAR, - TRADING_DAYS_PER_MONTH, DEFAULT_RSI_WINDOW, MIN_PERIODS_FOR_EXPANDING, + TRADING_DAYS_PER_MONTH, + TRADING_DAYS_PER_YEAR, ) except ImportError: # Fallback for direct execution @@ -18,75 +19,69 @@ def add_technical_indicators( - df: pd.DataFrame, - sma_window: int = 50, - mom_window: int = 12, - vol_window: int = 21 + df: pd.DataFrame, sma_window: int = 50, mom_window: int = 12, vol_window: int = 21 ) -> pd.DataFrame: """ Adds technical indicators to the dataframe. - + Args: df: OHLCV data with 'Close' column. sma_window: Trend lookback in days. mom_window: Momentum lookback in months. vol_window: Volatility lookback in days. - + Returns: DataFrame with added indicator columns. """ if df.empty: return df - + df = df.copy() - + # 1. Trend: Moving Averages - df[f'SMA_{sma_window}'] = df['Close'].rolling(window=sma_window).mean() - df['SMA_200'] = df['Close'].rolling(window=200).mean() # Standard long-term benchmark - + df[f"SMA_{sma_window}"] = df["Close"].rolling(window=sma_window).mean() + df["SMA_200"] = df["Close"].rolling(window=200).mean() # Standard long-term benchmark + # 2. Momentum (12-1 Month equivalent) # We approximate months as 21 trading days. # Momentum 12-1 = Return from 12 months ago to 1 month ago. lag_start = TRADING_DAYS_PER_MONTH # Skip most recent month lag_end_custom = mom_window * TRADING_DAYS_PER_MONTH - - df[f'Momentum_{mom_window}M_1M'] = ( - df['Close'].shift(lag_start) / df['Close'].shift(lag_end_custom) - 1 + + df[f"Momentum_{mom_window}M_1M"] = ( + df["Close"].shift(lag_start) / df["Close"].shift(lag_end_custom) - 1 ) - + # 3. Volatility (Annualized) - df['Daily_Return'] = df['Close'].pct_change() - df[f'Vol_{vol_window}d'] = ( - df['Daily_Return'].rolling(window=vol_window).std() - * (TRADING_DAYS_PER_YEAR ** 0.5) + df["Daily_Return"] = df["Close"].pct_change() + df[f"Vol_{vol_window}d"] = df["Daily_Return"].rolling(window=vol_window).std() * ( + TRADING_DAYS_PER_YEAR**0.5 ) - + # 4. Relative Strength Index (RSI) - Vectorized calculation - delta = df['Close'].diff() + delta = df["Close"].diff() gain = delta.where(delta > 0, 0.0).rolling(window=DEFAULT_RSI_WINDOW).mean() loss = (-delta.where(delta < 0, 0.0)).rolling(window=DEFAULT_RSI_WINDOW).mean() rs = gain / loss.replace(0, np.nan) # Avoid division by zero - df['RSI_14'] = 100 - (100 / (1 + rs)) - + df["RSI_14"] = 100 - (100 / (1 + rs)) + # 5. Distance from SMA (Trend Strength) - df['Trend_Strength_Pct'] = ( - (df['Close'] - df[f'SMA_{sma_window}']) / df[f'SMA_{sma_window}'] - ) - + df["Trend_Strength_Pct"] = (df["Close"] - df[f"SMA_{sma_window}"]) / df[f"SMA_{sma_window}"] + return df def detect_volatility_regime( - df: pd.DataFrame, - vol_col: str = 'Vol_21d', - quantile_high: float = 0.75, + df: pd.DataFrame, + vol_col: str = "Vol_21d", + quantile_high: float = 0.75, quantile_low: float = 0.25, use_expanding: bool = False, - min_periods: Optional[int] = None + min_periods: Optional[int] = None, ) -> pd.DataFrame: """ Classifies periods into Volatility Regimes (Low, Normal, High). - + Args: df: DataFrame containing the volatility column. vol_col: Name of the volatility column. @@ -97,13 +92,13 @@ def detect_volatility_regime( If False, uses full-sample quantiles (faster, for exploratory analysis). min_periods: Minimum periods required for expanding window calculation. Only used if use_expanding=True. Defaults to MIN_PERIODS_FOR_EXPANDING. - + Returns: DataFrame with 'Vol_Regime' column: 'High' if vol > quantile_high threshold 'Low' if vol < quantile_low threshold 'Normal' otherwise - + Note: When use_expanding=False (default), regime classification uses full-sample quantiles which introduces look-ahead bias. This is acceptable for exploratory @@ -111,61 +106,58 @@ def detect_volatility_regime( """ if df.empty or vol_col not in df.columns: return df - + df = df.copy() - + if min_periods is None: min_periods = MIN_PERIODS_FOR_EXPANDING - + if use_expanding: # OUT-OF-SAMPLE: Expanding window quantiles (no look-ahead bias) # At each point in time, we only use data available up to that point thresh_high = df[vol_col].expanding(min_periods=min_periods).quantile(quantile_high) thresh_low = df[vol_col].expanding(min_periods=min_periods).quantile(quantile_low) - + # Vectorized regime classification with expanding thresholds - df['Vol_Regime'] = 'Normal' - df.loc[df[vol_col] > thresh_high, 'Vol_Regime'] = 'High' - df.loc[df[vol_col] < thresh_low, 'Vol_Regime'] = 'Low' - + df["Vol_Regime"] = "Normal" + df.loc[df[vol_col] > thresh_high, "Vol_Regime"] = "High" + df.loc[df[vol_col] < thresh_low, "Vol_Regime"] = "Low" + # Mark early periods as 'Unknown' where we don't have enough data - df.loc[thresh_high.isna(), 'Vol_Regime'] = 'Unknown' + df.loc[thresh_high.isna(), "Vol_Regime"] = "Unknown" else: # IN-SAMPLE: Full-sample quantiles (look-ahead bias, but standard for regime analysis) # Use this for exploratory analysis and visualization thresh_high = df[vol_col].quantile(quantile_high) thresh_low = df[vol_col].quantile(quantile_low) - - conditions = [ - (df[vol_col] > thresh_high), - (df[vol_col] < thresh_low) - ] - choices = ['High', 'Low'] - - df['Vol_Regime'] = np.select(conditions, choices, default='Normal') - + + conditions = [(df[vol_col] > thresh_high), (df[vol_col] < thresh_low)] + choices = ["High", "Low"] + + df["Vol_Regime"] = np.select(conditions, choices, default="Normal") + return df def detect_volatility_regime_oos( - df: pd.DataFrame, - vol_col: str = 'Vol_21d', - quantile_high: float = 0.75, + df: pd.DataFrame, + vol_col: str = "Vol_21d", + quantile_high: float = 0.75, quantile_low: float = 0.25, - min_periods: int = MIN_PERIODS_FOR_EXPANDING + min_periods: int = MIN_PERIODS_FOR_EXPANDING, ) -> pd.DataFrame: """ Convenience wrapper for out-of-sample regime detection. - + This function should be used for backtesting to ensure no look-ahead bias. - + Args: df: DataFrame containing the volatility column. vol_col: Name of the volatility column. quantile_high: Percentile threshold for High Volatility. quantile_low: Percentile threshold for Low Volatility. min_periods: Minimum periods required before classification starts. - + Returns: DataFrame with 'Vol_Regime' column using expanding-window quantiles. """ @@ -175,5 +167,5 @@ def detect_volatility_regime_oos( quantile_high=quantile_high, quantile_low=quantile_low, use_expanding=True, - min_periods=min_periods + min_periods=min_periods, ) diff --git a/src/modules/signals_advanced.py b/src/modules/signals_advanced.py index d49db0b..8ed8903 100644 --- a/src/modules/signals_advanced.py +++ b/src/modules/signals_advanced.py @@ -5,16 +5,17 @@ including mean reversion, volatility breakout, and composite signals. """ -import pandas as pd -import numpy as np -from typing import Literal, Optional, Tuple import logging +from typing import Optional, Tuple + +import numpy as np +import pandas as pd try: from .config import ( - TRADING_DAYS_PER_YEAR, - TRADING_DAYS_PER_MONTH, DEFAULT_RSI_WINDOW, + TRADING_DAYS_PER_MONTH, + TRADING_DAYS_PER_YEAR, ) except ImportError: TRADING_DAYS_PER_YEAR = 252 @@ -25,61 +26,58 @@ def calculate_bollinger_bands( - df: pd.DataFrame, - window: int = 20, - num_std: float = 2.0, - price_col: str = 'Close' + df: pd.DataFrame, window: int = 20, num_std: float = 2.0, price_col: str = "Close" ) -> pd.DataFrame: """ Calculate Bollinger Bands for mean reversion signals. - + Args: df: DataFrame with price data. window: Lookback window for moving average. num_std: Number of standard deviations for bands. price_col: Column name for price data. - + Returns: DataFrame with added columns: BB_Middle, BB_Upper, BB_Lower, BB_Width, BB_Position. """ if df.empty or price_col not in df.columns: logger.warning(f"Invalid input for Bollinger Bands: missing '{price_col}'") return df - + df = df.copy() - + # Calculate bands - df['BB_Middle'] = df[price_col].rolling(window=window).mean() + df["BB_Middle"] = df[price_col].rolling(window=window).mean() rolling_std = df[price_col].rolling(window=window).std() - df['BB_Upper'] = df['BB_Middle'] + (rolling_std * num_std) - df['BB_Lower'] = df['BB_Middle'] - (rolling_std * num_std) - + df["BB_Upper"] = df["BB_Middle"] + (rolling_std * num_std) + df["BB_Lower"] = df["BB_Middle"] - (rolling_std * num_std) + # Band width (volatility measure) - df['BB_Width'] = (df['BB_Upper'] - df['BB_Lower']) / df['BB_Middle'] - + df["BB_Width"] = (df["BB_Upper"] - df["BB_Lower"]) / df["BB_Middle"] + # Price position within bands (-1 at lower, 0 at middle, +1 at upper) - df['BB_Position'] = (df[price_col] - df['BB_Middle']) / (df['BB_Upper'] - df['BB_Middle']) - + df["BB_Position"] = (df[price_col] - df["BB_Middle"]) / (df["BB_Upper"] - df["BB_Middle"]) + logger.debug(f"Bollinger Bands calculated: window={window}, std={num_std}") return df def generate_mean_reversion_signal( df: pd.DataFrame, - rsi_col: str = 'RSI_14', + rsi_col: str = "RSI_14", oversold: int = 30, overbought: int = 70, use_bollinger: bool = True, - bb_position_col: str = 'BB_Position' + bb_position_col: str = "BB_Position", ) -> pd.Series: """ Generate mean reversion signal based on RSI and optionally Bollinger Bands. - + Signal Logic: - Buy (1): RSI oversold AND (optionally) price near lower BB - Sell (-1): RSI overbought AND (optionally) price near upper BB - Hold (0): Otherwise - + Args: df: DataFrame with RSI and optionally BB columns. rsi_col: Column name for RSI values. @@ -87,119 +85,119 @@ def generate_mean_reversion_signal( overbought: RSI threshold for overbought condition. use_bollinger: Whether to incorporate Bollinger Band position. bb_position_col: Column name for BB position. - + Returns: Series with signal values: 1 (buy), -1 (sell), 0 (hold). """ if df.empty or rsi_col not in df.columns: logger.error(f"Missing required column: {rsi_col}") return pd.Series(0, index=df.index) - + signal = pd.Series(0, index=df.index) - + # RSI conditions rsi_oversold = df[rsi_col] < oversold rsi_overbought = df[rsi_col] > overbought - + if use_bollinger and bb_position_col in df.columns: # Combine with Bollinger Band position bb_low = df[bb_position_col] < -0.8 # Near lower band - bb_high = df[bb_position_col] > 0.8 # Near upper band - - signal[rsi_oversold & bb_low] = 1 # Strong buy + bb_high = df[bb_position_col] > 0.8 # Near upper band + + signal[rsi_oversold & bb_low] = 1 # Strong buy signal[rsi_overbought & bb_high] = -1 # Strong sell else: # RSI only signal[rsi_oversold] = 1 signal[rsi_overbought] = -1 - + logger.info(f"Mean reversion signal: {(signal == 1).sum()} buy, {(signal == -1).sum()} sell") return signal def generate_volatility_breakout_signal( df: pd.DataFrame, - vol_col: str = 'Vol_21d', + vol_col: str = "Vol_21d", vol_threshold_percentile: float = 0.80, - trend_col: Optional[str] = None + trend_col: Optional[str] = None, ) -> pd.Series: """ Generate volatility breakout signal. - + Signal Logic: - When volatility spikes above threshold, follow the trend direction - This aims to capture momentum during volatility expansion - + Args: df: DataFrame with volatility data. vol_col: Column name for volatility. vol_threshold_percentile: Percentile threshold for "high" vol. trend_col: Optional column indicating trend (1=up, -1=down). - + Returns: Series with signal values. """ if df.empty or vol_col not in df.columns: logger.error(f"Missing required column: {vol_col}") return pd.Series(0, index=df.index) - + df = df.copy() - + # Expanding threshold (no look-ahead bias) vol_threshold = df[vol_col].expanding(min_periods=60).quantile(vol_threshold_percentile) high_vol = df[vol_col] > vol_threshold - + # Determine trend direction from recent returns if not provided if trend_col is None or trend_col not in df.columns: # Use 5-day return direction - df['_temp_trend'] = np.sign(df['Close'].pct_change(5)) - trend = df['_temp_trend'] + df["_temp_trend"] = np.sign(df["Close"].pct_change(5)) + trend = df["_temp_trend"] else: trend = df[trend_col] - + # Signal: follow trend when vol is high signal = pd.Series(0, index=df.index) signal[high_vol] = trend[high_vol] - + # Clean up - if '_temp_trend' in df.columns: - df.drop('_temp_trend', axis=1, inplace=True) - + if "_temp_trend" in df.columns: + df.drop("_temp_trend", axis=1, inplace=True) + logger.info(f"Volatility breakout signal: {(signal != 0).sum()} active days") return signal def generate_dual_momentum_signal( df: pd.DataFrame, - abs_mom_col: str = 'Momentum_12M_1M', + abs_mom_col: str = "Momentum_12M_1M", rel_benchmark_return: Optional[pd.Series] = None, - abs_threshold: float = 0.0 + abs_threshold: float = 0.0, ) -> pd.Series: """ Generate dual momentum signal (absolute + relative momentum). - + Signal Logic: - Long (1): Positive absolute momentum AND better than benchmark - Cash (0): Otherwise - + This is based on Gary Antonacci's dual momentum research. - + Args: df: DataFrame with momentum column. abs_mom_col: Column for absolute momentum. rel_benchmark_return: Optional benchmark return series for relative momentum. abs_threshold: Threshold for considering momentum "positive". - + Returns: Series with signal values: 1 (long) or 0 (cash). """ if df.empty or abs_mom_col not in df.columns: logger.error(f"Missing required column: {abs_mom_col}") return pd.Series(0, index=df.index) - + # Absolute momentum: is the asset trending up? abs_mom_positive = df[abs_mom_col] > abs_threshold - + if rel_benchmark_return is not None: # Relative momentum: is the asset beating the benchmark? rel_mom_positive = df[abs_mom_col] > rel_benchmark_return @@ -209,52 +207,51 @@ def generate_dual_momentum_signal( # Just absolute momentum signal = pd.Series(0, index=df.index) signal[abs_mom_positive] = 1 - - logger.info(f"Dual momentum signal: {(signal == 1).sum()} long days, {(signal == 0).sum()} cash days") + + logger.info( + f"Dual momentum signal: {(signal == 1).sum()} long days, {(signal == 0).sum()} cash days" + ) return signal def generate_composite_signal( - df: pd.DataFrame, - signals: dict, - weights: Optional[dict] = None, - threshold: float = 0.5 + df: pd.DataFrame, signals: dict, weights: Optional[dict] = None, threshold: float = 0.5 ) -> pd.Series: """ Combine multiple signals into a composite signal. - + Args: df: DataFrame (used for index). signals: Dictionary of {name: signal_series}. weights: Optional dictionary of {name: weight}. Defaults to equal weights. threshold: Threshold for composite signal to trigger position. - + Returns: Series with signal values: 1 (long), -1 (short), 0 (neutral). """ if not signals: logger.error("No signals provided") return pd.Series(0, index=df.index) - + # Default to equal weights if weights is None: weights = {name: 1.0 / len(signals) for name in signals} - + # Normalize weights total_weight = sum(weights.values()) weights = {k: v / total_weight for k, v in weights.items()} - + # Calculate weighted average composite = pd.Series(0.0, index=df.index) for name, signal in signals.items(): weight = weights.get(name, 0) composite += signal * weight - + # Convert to discrete signal final_signal = pd.Series(0, index=df.index) final_signal[composite >= threshold] = 1 final_signal[composite <= -threshold] = -1 - + logger.info( f"Composite signal: {(final_signal == 1).sum()} long, " f"{(final_signal == -1).sum()} short, {(final_signal == 0).sum()} neutral" @@ -262,38 +259,35 @@ def generate_composite_signal( return final_signal -def calculate_atr( - df: pd.DataFrame, - window: int = 14 -) -> pd.Series: +def calculate_atr(df: pd.DataFrame, window: int = 14) -> pd.Series: """ Calculate Average True Range (ATR) for position sizing and stops. - + Args: df: DataFrame with High, Low, Close columns. window: ATR lookback period. - + Returns: Series with ATR values. """ - if df.empty or not all(col in df.columns for col in ['High', 'Low', 'Close']): + if df.empty or not all(col in df.columns for col in ["High", "Low", "Close"]): logger.warning("Missing required columns for ATR calculation") return pd.Series(dtype=float, index=df.index if not df.empty else None) - - high = df['High'] - low = df['Low'] - close = df['Close'] - + + high = df["High"] + low = df["Low"] + close = df["Close"] + # True Range tr1 = high - low tr2 = abs(high - close.shift(1)) tr3 = abs(low - close.shift(1)) - + true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) - + # Average True Range atr = true_range.rolling(window=window).mean() - + logger.debug(f"ATR calculated: window={window}, current={atr.iloc[-1]:.4f}") return atr @@ -303,32 +297,32 @@ def calculate_position_size( risk_per_trade: float, atr: float, atr_multiplier: float = 2.0, - price: float = 1.0 + price: float = 1.0, ) -> Tuple[int, float]: """ Calculate position size based on ATR volatility. - + Args: account_value: Total account value. risk_per_trade: Fraction of account to risk (e.g., 0.01 for 1%). atr: Current ATR value. atr_multiplier: Multiplier for stop distance (e.g., 2 ATR). price: Current asset price. - + Returns: Tuple of (shares, stop_distance). """ if atr <= 0 or price <= 0: logger.warning("Invalid ATR or price for position sizing") return (0, 0.0) - + risk_amount = account_value * risk_per_trade stop_distance = atr * atr_multiplier - + # Position size = Risk Amount / Risk per Share position_value = risk_amount / (stop_distance / price) shares = int(position_value / price) - + logger.debug( f"Position size: {shares} shares, " f"stop_distance={stop_distance:.2f}, risk=${risk_amount:.2f}" diff --git a/tests/test_backtester.py b/tests/test_backtester.py index ea3c256..78b10d4 100644 --- a/tests/test_backtester.py +++ b/tests/test_backtester.py @@ -1,178 +1,181 @@ +import os +import sys import unittest -import pandas as pd + import numpy as np -import sys -import os +import pandas as pd # Add parent directory to path for imports -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.modules import backtester class TestBacktester(unittest.TestCase): - + def setUp(self): """Create test dataframes with known properties.""" - dates = pd.date_range(start='2020-01-01', periods=100) - self.df = pd.DataFrame({ - 'Close': np.linspace(100, 200, 100), - 'Daily_Return': np.full(100, 0.01), # Constant 1% return - 'Signal': np.full(100, 1) # Always Long - }, index=dates) - + dates = pd.date_range(start="2020-01-01", periods=100) + self.df = pd.DataFrame( + { + "Close": np.linspace(100, 200, 100), + "Daily_Return": np.full(100, 0.01), # Constant 1% return + "Signal": np.full(100, 1), # Always Long + }, + index=dates, + ) + # Create larger dataset for walk-forward tests - dates_long = pd.date_range(start='2018-01-01', periods=1000) - self.df_long = pd.DataFrame({ - 'Close': np.cumprod(1 + np.random.randn(1000) * 0.01) * 100, - 'Daily_Return': np.random.randn(1000) * 0.01, - 'Signal': np.where(np.random.rand(1000) > 0.5, 1, 0) - }, index=dates_long) - + dates_long = pd.date_range(start="2018-01-01", periods=1000) + self.df_long = pd.DataFrame( + { + "Close": np.cumprod(1 + np.random.randn(1000) * 0.01) * 100, + "Daily_Return": np.random.randn(1000) * 0.01, + "Signal": np.where(np.random.rand(1000) > 0.5, 1, 0), + }, + index=dates_long, + ) + def test_run_backtest_daily(self): """Test basic daily rebalancing backtest.""" data = self.df.copy() - res = backtester.run_backtest(data, 'Signal', cost_bps=0.0, rebalance_freq='D') - - self.assertIn('Strategy_Return', res.columns) - self.assertIn('Equity_Strategy', res.columns) - self.assertIn('DD_Strategy', res.columns) - + res = backtester.run_backtest(data, "Signal", cost_bps=0.0, rebalance_freq="D") + + self.assertIn("Strategy_Return", res.columns) + self.assertIn("Equity_Strategy", res.columns) + self.assertIn("DD_Strategy", res.columns) + # Since return is positive and we are long, equity should grow - self.assertTrue(res['Equity_Strategy'].iloc[-1] > 1.0) - + self.assertTrue(res["Equity_Strategy"].iloc[-1] > 1.0) + def test_run_backtest_weekly(self): """Test weekly rebalancing backtest.""" data = self.df.copy() - res = backtester.run_backtest(data, 'Signal', cost_bps=0.0, rebalance_freq='W') - - self.assertIn('Equity_Strategy', res.columns) - self.assertTrue(res['Equity_Strategy'].iloc[-1] > 1.0) - + res = backtester.run_backtest(data, "Signal", cost_bps=0.0, rebalance_freq="W") + + self.assertIn("Equity_Strategy", res.columns) + self.assertTrue(res["Equity_Strategy"].iloc[-1] > 1.0) + def test_run_backtest_monthly(self): """Test monthly rebalancing backtest.""" data = self.df.copy() - res = backtester.run_backtest(data, 'Signal', cost_bps=0.0, rebalance_freq='M') - - self.assertIn('Equity_Strategy', res.columns) - + res = backtester.run_backtest(data, "Signal", cost_bps=0.0, rebalance_freq="M") + + self.assertIn("Equity_Strategy", res.columns) + def test_metrics_basic(self): """Test basic performance metrics calculation.""" - equity = pd.Series([1.0, 1.1, 1.21], index=pd.date_range('2020-01-01', periods=3)) + equity = pd.Series([1.0, 1.1, 1.21], index=pd.date_range("2020-01-01", periods=3)) metrics = backtester.calculate_perf_metrics(equity) - - self.assertIn('CAGR', metrics) - self.assertIn('Sharpe', metrics) - self.assertIn('MaxDD', metrics) - self.assertIn('WinRate', metrics) - self.assertTrue(metrics['CAGR'] > 0) - + + self.assertIn("CAGR", metrics) + self.assertIn("Sharpe", metrics) + self.assertIn("MaxDD", metrics) + self.assertIn("WinRate", metrics) + self.assertTrue(metrics["CAGR"] > 0) + def test_metrics_with_bootstrap_ci(self): """Test performance metrics with bootstrap CI.""" # Need more data for reliable CI - dates = pd.date_range('2020-01-01', periods=100) + dates = pd.date_range("2020-01-01", periods=100) returns = np.random.randn(100) * 0.01 + 0.001 # Slight positive drift equity = pd.Series((1 + returns).cumprod(), index=dates) - + metrics = backtester.calculate_perf_metrics( equity, include_bootstrap_ci=True, n_bootstrap=100 ) - - self.assertIn('Sharpe_CI_Lower', metrics) - self.assertIn('Sharpe_CI_Upper', metrics) - + + self.assertIn("Sharpe_CI_Lower", metrics) + self.assertIn("Sharpe_CI_Upper", metrics) + # CI bounds should exist and be ordered - if not np.isnan(metrics['Sharpe_CI_Lower']): - self.assertLessEqual(metrics['Sharpe_CI_Lower'], metrics['Sharpe_CI_Upper']) - + if not np.isnan(metrics["Sharpe_CI_Lower"]): + self.assertLessEqual(metrics["Sharpe_CI_Lower"], metrics["Sharpe_CI_Upper"]) + def test_drawdown_duration(self): """Test drawdown duration calculation.""" # Create equity curve with known drawdown equity = pd.Series([1.0, 1.1, 1.0, 0.9, 0.85, 0.9, 1.0, 1.1]) - + max_dd, avg_dd = backtester.calculate_drawdown_duration(equity) - + self.assertGreater(max_dd, 0) self.assertGreater(avg_dd, 0) - + def test_bootstrap_sharpe_ci(self): """Test bootstrap Sharpe CI directly.""" np.random.seed(42) returns = pd.Series(np.random.randn(100) * 0.01) - + lower, upper = backtester.bootstrap_sharpe_ci( returns, n_bootstrap=500, confidence_level=0.95, random_state=42 ) - + self.assertFalse(np.isnan(lower)) self.assertFalse(np.isnan(upper)) self.assertLess(lower, upper) - + def test_bootstrap_sharpe_ci_insufficient_data(self): """Test bootstrap CI with insufficient data.""" returns = pd.Series([0.01, 0.02, 0.01]) # Only 3 points - + lower, upper = backtester.bootstrap_sharpe_ci(returns) - + # Should return NaN due to insufficient data self.assertTrue(np.isnan(lower)) self.assertTrue(np.isnan(upper)) - + def test_conditional_stats(self): """Test conditional statistics calculation.""" - df = pd.DataFrame({ - 'Strategy_Net_Return': np.random.randn(100) * 0.01, - 'Vol_Regime': ['High'] * 30 + ['Normal'] * 40 + ['Low'] * 30 - }) - - stats = backtester.calculate_conditional_stats( - df, 'Strategy_Net_Return', 'Vol_Regime' + df = pd.DataFrame( + { + "Strategy_Net_Return": np.random.randn(100) * 0.01, + "Vol_Regime": ["High"] * 30 + ["Normal"] * 40 + ["Low"] * 30, + } ) - - self.assertIn('High', stats.index) - self.assertIn('Normal', stats.index) - self.assertIn('Low', stats.index) - self.assertIn('Sharpe', stats.columns) - + + stats = backtester.calculate_conditional_stats(df, "Strategy_Net_Return", "Vol_Regime") + + self.assertIn("High", stats.index) + self.assertIn("Normal", stats.index) + self.assertIn("Low", stats.index) + self.assertIn("Sharpe", stats.columns) + def test_walk_forward_backtest(self): """Test walk-forward validation.""" # Use the longer dataset result = backtester.walk_forward_backtest( - self.df_long, - 'Signal', + self.df_long, + "Signal", train_months=12, test_months=3, cost_bps=0.001, - rebalance_freq='M' + rebalance_freq="M", ) - - self.assertIn('summary', result) - self.assertIn('periods', result) - self.assertIn('n_periods', result) - self.assertGreater(result['n_periods'], 0) - + + self.assertIn("summary", result) + self.assertIn("periods", result) + self.assertIn("n_periods", result) + self.assertGreater(result["n_periods"], 0) + def test_walk_forward_insufficient_data(self): """Test walk-forward with insufficient data.""" result = backtester.walk_forward_backtest( - self.df, # Only 100 days - 'Signal', - train_months=24, - test_months=6 + self.df, "Signal", train_months=24, test_months=6 # Only 100 days ) - + # Should return empty dict due to insufficient data self.assertEqual(result, {}) - + def test_empty_dataframe_handling(self): """Test that functions handle empty dataframes gracefully.""" empty_df = pd.DataFrame() - - result = backtester.run_backtest(empty_df, 'Signal') + + result = backtester.run_backtest(empty_df, "Signal") self.assertTrue(result.empty) - + metrics = backtester.calculate_perf_metrics(pd.Series(dtype=float)) self.assertEqual(metrics, {}) - -if __name__ == '__main__': - unittest.main() +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_data_model.py b/tests/test_data_model.py index 0fb4573..2562996 100644 --- a/tests/test_data_model.py +++ b/tests/test_data_model.py @@ -1,159 +1,161 @@ """Tests for data model module.""" +import os +import sys import unittest +from unittest.mock import MagicMock, patch + import pandas as pd -import sys -import os -from unittest.mock import patch, MagicMock # Add parent directory to path for imports -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.modules import data_model class TestFetchStockData(unittest.TestCase): """Tests for fetch_stock_data function.""" - - @patch('src.modules.data_model.yf.Ticker') + + @patch("src.modules.data_model.yf.Ticker") def test_successful_fetch(self, mock_ticker): """Test successful data fetch.""" # Create mock data - dates = pd.date_range('2020-01-01', periods=10) - mock_df = pd.DataFrame({ - 'Open': [100] * 10, - 'High': [105] * 10, - 'Low': [95] * 10, - 'Close': [102] * 10, - 'Volume': [1000000] * 10 - }, index=dates) - + dates = pd.date_range("2020-01-01", periods=10) + mock_df = pd.DataFrame( + { + "Open": [100] * 10, + "High": [105] * 10, + "Low": [95] * 10, + "Close": [102] * 10, + "Volume": [1000000] * 10, + }, + index=dates, + ) + mock_ticker_instance = MagicMock() mock_ticker_instance.history.return_value = mock_df mock_ticker.return_value = mock_ticker_instance - + # Clear cache to ensure fresh call data_model.fetch_stock_data.clear() - - result = data_model.fetch_stock_data('TEST', period='1y') - + + result = data_model.fetch_stock_data("TEST", period="1y") + self.assertFalse(result.empty) self.assertEqual(len(result), 10) - self.assertIn('Close', result.columns) - - @patch('src.modules.data_model.yf.Ticker') + self.assertIn("Close", result.columns) + + @patch("src.modules.data_model.yf.Ticker") def test_empty_data_handling(self, mock_ticker): """Test handling of empty data response.""" mock_ticker_instance = MagicMock() mock_ticker_instance.history.return_value = pd.DataFrame() mock_ticker.return_value = mock_ticker_instance - + data_model.fetch_stock_data.clear() - result = data_model.fetch_stock_data('INVALID', period='1y') - + result = data_model.fetch_stock_data("INVALID", period="1y") + self.assertTrue(result.empty) - - @patch('src.modules.data_model.yf.Ticker') + + @patch("src.modules.data_model.yf.Ticker") def test_timezone_handling(self, mock_ticker): """Test that timezone is removed from index.""" - dates = pd.date_range('2020-01-01', periods=5, tz='America/New_York') - mock_df = pd.DataFrame({ - 'Close': [100, 101, 102, 103, 104] - }, index=dates) - + dates = pd.date_range("2020-01-01", periods=5, tz="America/New_York") + mock_df = pd.DataFrame({"Close": [100, 101, 102, 103, 104]}, index=dates) + mock_ticker_instance = MagicMock() mock_ticker_instance.history.return_value = mock_df mock_ticker.return_value = mock_ticker_instance - + data_model.fetch_stock_data.clear() - result = data_model.fetch_stock_data('SPY', period='1y') - + result = data_model.fetch_stock_data("SPY", period="1y") + # Timezone should be removed self.assertIsNone(result.index.tz) class TestValidateTicker(unittest.TestCase): """Tests for validate_ticker function.""" - - @patch('src.modules.data_model.yf.Ticker') + + @patch("src.modules.data_model.yf.Ticker") def test_valid_ticker(self, mock_ticker): """Test validation of a valid ticker.""" - mock_info = {'regularMarketPrice': 150.0, 'longName': 'Apple Inc.'} + mock_info = {"regularMarketPrice": 150.0, "longName": "Apple Inc."} mock_ticker_instance = MagicMock() mock_ticker_instance.info = mock_info mock_ticker.return_value = mock_ticker_instance - - result = data_model.validate_ticker('AAPL') - + + result = data_model.validate_ticker("AAPL") + self.assertTrue(result) - - @patch('src.modules.data_model.yf.Ticker') + + @patch("src.modules.data_model.yf.Ticker") def test_invalid_ticker(self, mock_ticker): """Test validation of an invalid ticker.""" - mock_info = {'regularMarketPrice': None} + mock_info = {"regularMarketPrice": None} mock_ticker_instance = MagicMock() mock_ticker_instance.info = mock_info mock_ticker.return_value = mock_ticker_instance - - result = data_model.validate_ticker('INVALIDTICKER123') - + + result = data_model.validate_ticker("INVALIDTICKER123") + self.assertFalse(result) - - @patch('src.modules.data_model.yf.Ticker') + + @patch("src.modules.data_model.yf.Ticker") def test_api_error_handling(self, mock_ticker): """Test handling of API errors.""" mock_ticker.side_effect = Exception("API Error") - - result = data_model.validate_ticker('ERROR') - + + result = data_model.validate_ticker("ERROR") + self.assertFalse(result) class TestGetTickerInfo(unittest.TestCase): """Tests for get_ticker_info function.""" - - @patch('src.modules.data_model.yf.Ticker') + + @patch("src.modules.data_model.yf.Ticker") def test_successful_info_fetch(self, mock_ticker): """Test successful ticker info fetch.""" mock_info = { - 'longName': 'Apple Inc.', - 'sector': 'Technology', - 'industry': 'Consumer Electronics', - 'currency': 'USD', - 'exchange': 'NASDAQ' + "longName": "Apple Inc.", + "sector": "Technology", + "industry": "Consumer Electronics", + "currency": "USD", + "exchange": "NASDAQ", } mock_ticker_instance = MagicMock() mock_ticker_instance.info = mock_info mock_ticker.return_value = mock_ticker_instance - - result = data_model.get_ticker_info('AAPL') - + + result = data_model.get_ticker_info("AAPL") + self.assertIsNotNone(result) - self.assertEqual(result['name'], 'Apple Inc.') - self.assertEqual(result['sector'], 'Technology') - - @patch('src.modules.data_model.yf.Ticker') + self.assertEqual(result["name"], "Apple Inc.") + self.assertEqual(result["sector"], "Technology") + + @patch("src.modules.data_model.yf.Ticker") def test_missing_info_fields(self, mock_ticker): """Test handling of missing info fields.""" - mock_info = {'longName': 'Test Company'} # Missing other fields + mock_info = {"longName": "Test Company"} # Missing other fields mock_ticker_instance = MagicMock() mock_ticker_instance.info = mock_info mock_ticker.return_value = mock_ticker_instance - - result = data_model.get_ticker_info('TEST') - + + result = data_model.get_ticker_info("TEST") + self.assertIsNotNone(result) - self.assertEqual(result['name'], 'Test Company') - self.assertEqual(result['sector'], 'N/A') # Default value - - @patch('src.modules.data_model.yf.Ticker') + self.assertEqual(result["name"], "Test Company") + self.assertEqual(result["sector"], "N/A") # Default value + + @patch("src.modules.data_model.yf.Ticker") def test_api_error_returns_none(self, mock_ticker): """Test that API errors return None.""" mock_ticker.side_effect = Exception("API Error") - - result = data_model.get_ticker_info('ERROR') - + + result = data_model.get_ticker_info("ERROR") + self.assertIsNone(result) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_signals.py b/tests/test_signals.py index e629f4d..f7f3d04 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -1,118 +1,114 @@ +import os +import sys import unittest -import pandas as pd + import numpy as np -import sys -import os +import pandas as pd # Add parent directory to path for imports -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.modules import signals class TestSignals(unittest.TestCase): - + def setUp(self): """Create test dataframes with known properties.""" # Create a dummy dataframe with 300 days of upward trending data - dates = pd.date_range(start='2020-01-01', periods=300) + dates = pd.date_range(start="2020-01-01", periods=300) prices = np.linspace(100, 200, 300) - self.df = pd.DataFrame({'Close': prices}, index=dates) - + self.df = pd.DataFrame({"Close": prices}, index=dates) + def test_sma_calculation(self): """Test that SMA is calculated correctly.""" result = signals.add_technical_indicators(self.df, sma_window=50, mom_window=12) - - self.assertIn('SMA_50', result.columns) + + self.assertIn("SMA_50", result.columns) # SMA should not be nan at the end - self.assertFalse(np.isnan(result['SMA_50'].iloc[-1])) - + self.assertFalse(np.isnan(result["SMA_50"].iloc[-1])) + # Check logic: In a perfect linear uptrend, Price > SMA - self.assertTrue(result['Close'].iloc[-1] > result['SMA_50'].iloc[-1]) + self.assertTrue(result["Close"].iloc[-1] > result["SMA_50"].iloc[-1]) def test_sma_200_always_calculated(self): """Test that 200-day SMA is always calculated as benchmark.""" result = signals.add_technical_indicators(self.df, sma_window=50) - self.assertIn('SMA_200', result.columns) + self.assertIn("SMA_200", result.columns) def test_momentum_calculation(self): """Test momentum calculation for various lookback windows.""" result = signals.add_technical_indicators(self.df, sma_window=50, mom_window=12) - col_name = 'Momentum_12M_1M' + col_name = "Momentum_12M_1M" self.assertIn(col_name, result.columns) - + # Momentum should be positive for uptrend self.assertTrue(result[col_name].iloc[-1] > 0) def test_rsi_bounds(self): """Test that RSI stays within 0-100 bounds.""" result = signals.add_technical_indicators(self.df, sma_window=50, mom_window=12) - self.assertIn('RSI_14', result.columns) - - valid_rsi = result['RSI_14'].dropna() + self.assertIn("RSI_14", result.columns) + + valid_rsi = result["RSI_14"].dropna() self.assertTrue((valid_rsi >= 0).all()) self.assertTrue((valid_rsi <= 100).all()) def test_volatility_regime_in_sample(self): """Test in-sample regime detection (full-sample quantiles).""" # Create a df with varying volatility - dates = pd.date_range('2020-01-01', periods=100) - df = pd.DataFrame({'Vol_21d': np.random.rand(100)}, index=dates) - + dates = pd.date_range("2020-01-01", periods=100) + df = pd.DataFrame({"Vol_21d": np.random.rand(100)}, index=dates) + # Force some high and low values df.iloc[0:10, 0] = 0.01 # Low df.iloc[90:100, 0] = 1.0 # High - - res = signals.detect_volatility_regime( - df, 'Vol_21d', 0.8, 0.2, use_expanding=False - ) - - self.assertIn('Vol_Regime', res.columns) + + res = signals.detect_volatility_regime(df, "Vol_21d", 0.8, 0.2, use_expanding=False) + + self.assertIn("Vol_Regime", res.columns) # Check that we have High, Low, and Normal labels - unique_regimes = res['Vol_Regime'].unique() - self.assertIn('High', unique_regimes) - self.assertIn('Low', unique_regimes) + unique_regimes = res["Vol_Regime"].unique() + self.assertIn("High", unique_regimes) + self.assertIn("Low", unique_regimes) def test_volatility_regime_out_of_sample(self): """Test out-of-sample regime detection (expanding-window quantiles).""" - dates = pd.date_range('2020-01-01', periods=100) - df = pd.DataFrame({'Vol_21d': np.random.rand(100)}, index=dates) - + dates = pd.date_range("2020-01-01", periods=100) + df = pd.DataFrame({"Vol_21d": np.random.rand(100)}, index=dates) + # Force some high and low values df.iloc[0:10, 0] = 0.01 # Low df.iloc[90:100, 0] = 1.0 # High - + res = signals.detect_volatility_regime( - df, 'Vol_21d', 0.8, 0.2, use_expanding=True, min_periods=20 + df, "Vol_21d", 0.8, 0.2, use_expanding=True, min_periods=20 ) - - self.assertIn('Vol_Regime', res.columns) + + self.assertIn("Vol_Regime", res.columns) # Early periods should be 'Unknown' due to insufficient data - self.assertIn('Unknown', res['Vol_Regime'].iloc[:20].values) + self.assertIn("Unknown", res["Vol_Regime"].iloc[:20].values) def test_volatility_regime_oos_wrapper(self): """Test the convenience wrapper for out-of-sample regime detection.""" - dates = pd.date_range('2020-01-01', periods=100) - df = pd.DataFrame({'Vol_21d': np.random.rand(100)}, index=dates) - - res = signals.detect_volatility_regime_oos( - df, 'Vol_21d', min_periods=20 - ) - - self.assertIn('Vol_Regime', res.columns) + dates = pd.date_range("2020-01-01", periods=100) + df = pd.DataFrame({"Vol_21d": np.random.rand(100)}, index=dates) + + res = signals.detect_volatility_regime_oos(df, "Vol_21d", min_periods=20) + + self.assertIn("Vol_Regime", res.columns) # Verify it uses expanding window (early periods should be 'Unknown') - self.assertIn('Unknown', res['Vol_Regime'].iloc[:20].values) + self.assertIn("Unknown", res["Vol_Regime"].iloc[:20].values) def test_empty_dataframe_handling(self): """Test that functions handle empty dataframes gracefully.""" empty_df = pd.DataFrame() - + result = signals.add_technical_indicators(empty_df) self.assertTrue(result.empty) - + result = signals.detect_volatility_regime(empty_df) self.assertTrue(result.empty) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() - diff --git a/tests/test_signals_advanced.py b/tests/test_signals_advanced.py index 1fb03e3..30ce23b 100644 --- a/tests/test_signals_advanced.py +++ b/tests/test_signals_advanced.py @@ -1,218 +1,216 @@ """Tests for advanced signal generation module.""" +import os +import sys import unittest -import pandas as pd + import numpy as np -import sys -import os +import pandas as pd # Add parent directory to path for imports -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.modules import signals_advanced class TestBollingerBands(unittest.TestCase): """Tests for Bollinger Bands calculation.""" - + def setUp(self): """Create test data.""" np.random.seed(42) - dates = pd.date_range(start='2020-01-01', periods=100) + dates = pd.date_range(start="2020-01-01", periods=100) # Random walk price series returns = np.random.randn(100) * 0.02 prices = 100 * np.cumprod(1 + returns) - self.df = pd.DataFrame({'Close': prices}, index=dates) - + self.df = pd.DataFrame({"Close": prices}, index=dates) + def test_bollinger_bands_columns(self): """Test that all BB columns are created.""" result = signals_advanced.calculate_bollinger_bands(self.df) - - self.assertIn('BB_Middle', result.columns) - self.assertIn('BB_Upper', result.columns) - self.assertIn('BB_Lower', result.columns) - self.assertIn('BB_Width', result.columns) - self.assertIn('BB_Position', result.columns) - + + self.assertIn("BB_Middle", result.columns) + self.assertIn("BB_Upper", result.columns) + self.assertIn("BB_Lower", result.columns) + self.assertIn("BB_Width", result.columns) + self.assertIn("BB_Position", result.columns) + def test_bollinger_bands_order(self): """Test that upper > middle > lower.""" result = signals_advanced.calculate_bollinger_bands(self.df) - + # After warmup period valid_data = result.iloc[25:] - self.assertTrue((valid_data['BB_Upper'] > valid_data['BB_Middle']).all()) - self.assertTrue((valid_data['BB_Middle'] > valid_data['BB_Lower']).all()) - + self.assertTrue((valid_data["BB_Upper"] > valid_data["BB_Middle"]).all()) + self.assertTrue((valid_data["BB_Middle"] > valid_data["BB_Lower"]).all()) + def test_bollinger_bands_width_positive(self): """Test that band width is always positive.""" result = signals_advanced.calculate_bollinger_bands(self.df) - valid_width = result['BB_Width'].dropna() + valid_width = result["BB_Width"].dropna() self.assertTrue((valid_width > 0).all()) class TestMeanReversionSignal(unittest.TestCase): """Tests for mean reversion signal generation.""" - + def setUp(self): """Create test data with RSI and BB columns.""" - dates = pd.date_range(start='2020-01-01', periods=100) - self.df = pd.DataFrame({ - 'RSI_14': np.linspace(20, 80, 100), # RSI from oversold to overbought - 'BB_Position': np.linspace(-1.5, 1.5, 100) # BB from below to above - }, index=dates) - + dates = pd.date_range(start="2020-01-01", periods=100) + self.df = pd.DataFrame( + { + "RSI_14": np.linspace(20, 80, 100), # RSI from oversold to overbought + "BB_Position": np.linspace(-1.5, 1.5, 100), # BB from below to above + }, + index=dates, + ) + def test_signal_values(self): """Test that signals are in valid range.""" signal = signals_advanced.generate_mean_reversion_signal(self.df) - + self.assertTrue(signal.isin([-1, 0, 1]).all()) - + def test_oversold_buy_signal(self): """Test that oversold conditions generate buy signals.""" # Create oversold data - df = pd.DataFrame({ - 'RSI_14': [25, 28, 29], - 'BB_Position': [-0.9, -0.85, -0.95] - }) - - signal = signals_advanced.generate_mean_reversion_signal( - df, oversold=30, overbought=70 - ) - + df = pd.DataFrame({"RSI_14": [25, 28, 29], "BB_Position": [-0.9, -0.85, -0.95]}) + + signal = signals_advanced.generate_mean_reversion_signal(df, oversold=30, overbought=70) + # Should have buy signals self.assertTrue((signal == 1).any()) - + def test_overbought_sell_signal(self): """Test that overbought conditions generate sell signals.""" - df = pd.DataFrame({ - 'RSI_14': [75, 78, 80], - 'BB_Position': [0.9, 0.85, 0.95] - }) - - signal = signals_advanced.generate_mean_reversion_signal( - df, oversold=30, overbought=70 - ) - + df = pd.DataFrame({"RSI_14": [75, 78, 80], "BB_Position": [0.9, 0.85, 0.95]}) + + signal = signals_advanced.generate_mean_reversion_signal(df, oversold=30, overbought=70) + # Should have sell signals self.assertTrue((signal == -1).any()) class TestVolatilityBreakoutSignal(unittest.TestCase): """Tests for volatility breakout signal.""" - + def setUp(self): """Create test data with volatility.""" np.random.seed(42) - dates = pd.date_range(start='2020-01-01', periods=200) - self.df = pd.DataFrame({ - 'Close': np.cumprod(1 + np.random.randn(200) * 0.01) * 100, - 'Vol_21d': np.abs(np.random.randn(200) * 0.1) + 0.1 - }, index=dates) - + dates = pd.date_range(start="2020-01-01", periods=200) + self.df = pd.DataFrame( + { + "Close": np.cumprod(1 + np.random.randn(200) * 0.01) * 100, + "Vol_21d": np.abs(np.random.randn(200) * 0.1) + 0.1, + }, + index=dates, + ) + # Spike volatility at end - self.df.loc[self.df.index[-20:], 'Vol_21d'] = 0.5 - + self.df.loc[self.df.index[-20:], "Vol_21d"] = 0.5 + def test_signal_generation(self): """Test that signal is generated.""" signal = signals_advanced.generate_volatility_breakout_signal(self.df) - + self.assertEqual(len(signal), len(self.df)) self.assertTrue(signal.isin([-1, 0, 1]).all()) class TestDualMomentumSignal(unittest.TestCase): """Tests for dual momentum signal.""" - + def setUp(self): """Create test data with momentum.""" - dates = pd.date_range(start='2020-01-01', periods=100) - self.df = pd.DataFrame({ - 'Momentum_12M_1M': np.linspace(-0.2, 0.3, 100) # -20% to +30% - }, index=dates) - + dates = pd.date_range(start="2020-01-01", periods=100) + self.df = pd.DataFrame( + {"Momentum_12M_1M": np.linspace(-0.2, 0.3, 100)}, index=dates # -20% to +30% + ) + def test_positive_momentum_long(self): """Test that positive momentum generates long signal.""" signal = signals_advanced.generate_dual_momentum_signal(self.df) - + # Positive momentum should have some long signals - positive_mom_mask = self.df['Momentum_12M_1M'] > 0 + positive_mom_mask = self.df["Momentum_12M_1M"] > 0 self.assertTrue((signal[positive_mom_mask] == 1).any()) - + def test_negative_momentum_cash(self): """Test that negative momentum is cash.""" signal = signals_advanced.generate_dual_momentum_signal(self.df) - + # Negative momentum should be cash (0) - negative_mom_mask = self.df['Momentum_12M_1M'] < 0 + negative_mom_mask = self.df["Momentum_12M_1M"] < 0 self.assertTrue((signal[negative_mom_mask] == 0).all()) class TestCompositeSignal(unittest.TestCase): """Tests for composite signal generation.""" - + def setUp(self): """Create test signals.""" - dates = pd.date_range(start='2020-01-01', periods=10) + dates = pd.date_range(start="2020-01-01", periods=10) self.df = pd.DataFrame(index=dates) - + self.signals = { - 'trend': pd.Series([1, 1, 1, 0, -1, -1, 1, 1, 0, 0], index=dates), - 'momentum': pd.Series([1, 1, 0, 0, 0, -1, 1, 0, 0, 1], index=dates), + "trend": pd.Series([1, 1, 1, 0, -1, -1, 1, 1, 0, 0], index=dates), + "momentum": pd.Series([1, 1, 0, 0, 0, -1, 1, 0, 0, 1], index=dates), } - + def test_equal_weight_combination(self): """Test equal weight signal combination.""" - signal = signals_advanced.generate_composite_signal( - self.df, self.signals, threshold=0.5 - ) - + signal = signals_advanced.generate_composite_signal(self.df, self.signals, threshold=0.5) + self.assertEqual(len(signal), len(self.df)) self.assertTrue(signal.isin([-1, 0, 1]).all()) - + def test_weighted_combination(self): """Test weighted signal combination.""" - weights = {'trend': 0.7, 'momentum': 0.3} + weights = {"trend": 0.7, "momentum": 0.3} signal = signals_advanced.generate_composite_signal( self.df, self.signals, weights=weights, threshold=0.5 ) - + self.assertTrue(signal.isin([-1, 0, 1]).all()) class TestATR(unittest.TestCase): """Tests for ATR calculation.""" - + def setUp(self): """Create OHLC test data.""" np.random.seed(42) - dates = pd.date_range(start='2020-01-01', periods=50) + dates = pd.date_range(start="2020-01-01", periods=50) close = np.cumprod(1 + np.random.randn(50) * 0.01) * 100 - - self.df = pd.DataFrame({ - 'Open': close * (1 + np.random.randn(50) * 0.005), - 'High': close * (1 + np.abs(np.random.randn(50) * 0.01)), - 'Low': close * (1 - np.abs(np.random.randn(50) * 0.01)), - 'Close': close - }, index=dates) - + + self.df = pd.DataFrame( + { + "Open": close * (1 + np.random.randn(50) * 0.005), + "High": close * (1 + np.abs(np.random.randn(50) * 0.01)), + "Low": close * (1 - np.abs(np.random.randn(50) * 0.01)), + "Close": close, + }, + index=dates, + ) + def test_atr_positive(self): """Test that ATR is always positive.""" atr = signals_advanced.calculate_atr(self.df) - + valid_atr = atr.dropna() self.assertTrue((valid_atr > 0).all()) - + def test_atr_window(self): """Test custom ATR window.""" atr_14 = signals_advanced.calculate_atr(self.df, window=14) atr_7 = signals_advanced.calculate_atr(self.df, window=7) - + # Shorter window should have values earlier self.assertTrue(atr_7.first_valid_index() <= atr_14.first_valid_index()) class TestPositionSizing(unittest.TestCase): """Tests for position sizing.""" - + def test_basic_position_size(self): """Test basic position sizing calculation.""" shares, stop = signals_advanced.calculate_position_size( @@ -220,24 +218,21 @@ def test_basic_position_size(self): risk_per_trade=0.01, # 1% risk atr=2.0, atr_multiplier=2.0, - price=50.0 + price=50.0, ) - + self.assertGreater(shares, 0) self.assertEqual(stop, 4.0) # 2 * 2 ATR - + def test_zero_atr_handling(self): """Test handling of zero ATR.""" shares, stop = signals_advanced.calculate_position_size( - account_value=100000, - risk_per_trade=0.01, - atr=0, - price=50.0 + account_value=100000, risk_per_trade=0.01, atr=0, price=50.0 ) - + self.assertEqual(shares, 0) self.assertEqual(stop, 0.0) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/verify_logic.py b/verify_logic.py index b54bfe6..f71a495 100644 --- a/verify_logic.py +++ b/verify_logic.py @@ -1,45 +1,46 @@ - -import pandas as pd import numpy as np import yfinance as yf + # Copying the logic from dashboard.py for verification def calculate_metrics(df): """Calculates SMAs, Volatility, Returns, and Momentum.""" if df.empty: return df df = df.copy() - df['SMA_50'] = df['Close'].rolling(window=50).mean() - df['Daily_Return'] = df['Close'].pct_change() - df['Momentum_12_1'] = df['Close'].shift(21) / df['Close'].shift(252) - 1 + df["SMA_50"] = df["Close"].rolling(window=50).mean() + df["Daily_Return"] = df["Close"].pct_change() + df["Momentum_12_1"] = df["Close"].shift(21) / df["Close"].shift(252) - 1 return df + def run_backtest(df): - if df.empty or 'SMA_50' not in df.columns: + if df.empty or "SMA_50" not in df.columns: return None bt_df = df.copy() # 1. Trend Signal - bt_df['Signal_Trend'] = np.where(bt_df['Close'] > bt_df['SMA_50'], 1, 0) - + bt_df["Signal_Trend"] = np.where(bt_df["Close"] > bt_df["SMA_50"], 1, 0) + # 2. Monthly Rebalance Logic - bt_df['Month'] = bt_df.index.to_period('M') - monthly_signals = bt_df.groupby('Month')['Signal_Trend'].last() + bt_df["Month"] = bt_df.index.to_period("M") + monthly_signals = bt_df.groupby("Month")["Signal_Trend"].last() monthly_positions = monthly_signals.shift(1) - bt_df['Position'] = bt_df['Month'].map(monthly_positions) - bt_df['Position'] = bt_df['Position'].fillna(0) - + bt_df["Position"] = bt_df["Month"].map(monthly_positions) + bt_df["Position"] = bt_df["Position"].fillna(0) + # 3. Returns and Costs - bt_df['Strategy_Return'] = bt_df['Position'] * bt_df['Daily_Return'] - bt_df['Trade_Size'] = bt_df['Position'].diff().abs().fillna(0) + bt_df["Strategy_Return"] = bt_df["Position"] * bt_df["Daily_Return"] + bt_df["Trade_Size"] = bt_df["Position"].diff().abs().fillna(0) cost_bps = 0.0010 - bt_df['Cost'] = bt_df['Trade_Size'] * cost_bps - bt_df['Strategy_Net_Return'] = bt_df['Strategy_Return'] - bt_df['Cost'] - - bt_df['Equity_Strategy'] = (1 + bt_df['Strategy_Net_Return']).cumprod() - + bt_df["Cost"] = bt_df["Trade_Size"] * cost_bps + bt_df["Strategy_Net_Return"] = bt_df["Strategy_Return"] - bt_df["Cost"] + + bt_df["Equity_Strategy"] = (1 + bt_df["Strategy_Net_Return"]).cumprod() + return bt_df + # Test print("Fetching data...") try: @@ -50,13 +51,17 @@ def run_backtest(df): print(f"Data fetched: {len(df)} rows") df = calculate_metrics(df) print("Metrics calculated.") - + bt_results = run_backtest(df) if bt_results is not None: print("Backtest run successfully.") - print(bt_results[['Close', 'SMA_50', 'Signal_Trend', 'Position', 'Strategy_Net_Return']].tail()) - - final_return = bt_results['Equity_Strategy'].iloc[-1] - 1 + print( + bt_results[ + ["Close", "SMA_50", "Signal_Trend", "Position", "Strategy_Net_Return"] + ].tail() + ) + + final_return = bt_results["Equity_Strategy"].iloc[-1] - 1 print(f"Final Strategy Return: {final_return:.2%}") else: print("Backtest returned None") From 87d61724ecb4d0370bbabed894c449d2a207d4dd Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 9 Feb 2026 04:17:07 +0000 Subject: [PATCH 3/3] Add helper tooltips to research config inputs and fix repo-wide formatting Co-authored-by: aarjava <218419324+aarjava@users.noreply.github.com> --- src/modules/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/__init__.py b/src/modules/__init__.py index 02149cd..f6c36cb 100644 --- a/src/modules/__init__.py +++ b/src/modules/__init__.py @@ -1,2 +1,2 @@ # HedgeFund Dashboard - Core Modules -from . import backtester, config, data_model, signals, signals_advanced +from . import backtester, config, data_model, signals, signals_advanced # noqa: F401