ITH stands for “Investment Time Horizon”. It is a concept that has been gaining traction among investors in recent years. It is a way to quantify and measure risk in a time-varying framework, and is used to help investors plan for their future investments. It is not to be confused with holding period.
Cathie Wood, CEO and founder of Ark Invest, says she invests on a five-year investment time horizon. Wood’s mantra is that ARK invests with a minimum five-year time horizon, and that volatility is to be expected in the industry-disrupting and cutting-edge stocks it targets.
At Eon Labs Ltd., although we expect volatility in your strategy performance, we only accept trading strategies that have short-term ITH, which must satisfy the following criteria: minimum rolling RRR $(RRR_H)$ during the entire backtesting period is equal or larger than one (1), where the investment time horizon $(_H)$ is equal or less than 28 days.
$$ H \leq 28\,days $$
where $H$ is the investment time horizon or the observed length of every rolling window subsample during the submitted backtesting period.
In other words, ITH is the maximum duration required for an investment to not only recover from a drawdown that occurs after reaching an equity peak but also to surpass the peak by a specific percentage. This predetermined percentage must be equal to or greater than the drawdown percentage. In essence, the ITH measures the time it takes for an investment to bounce back and achieve additional growth following a decline in value.
ITH application is detailed in Profit Sharing page. It is closely related to the other two ideas:
Given that only at Settlement can the SNRF be topped-up by ATSR Fee or debited to ATSR according to the arrears schedule, the projected ITH or the timing of each Settlement is dynamically determined by the combination of three (3) criteria:
In short, all three criteria must be met for a Settlement to take place.
Scenario. Feel free to copy this ‣ to your own Google Workplace to find out more about how SNRF is structured to balance the interests of ATSR and Eon Labs Ltd.. Also, check out the Fair Pay page for more info.
Using Time Horizons to Reach Your Investing Goals
ITH is expressed in terms of 1-to-1 risk (drawdown from the peak) vs return (Excess Gain beyond the peak).
Feel free to substitute the synthetic csv file data in the NAV_data folder with your own data to generate the HTML ITH results.
November 2, 2024 users can define their own custom csv folder where their performance data csv is located
June 23, 2024 Refactored for code structure optimization and consistency.
June 4, 2024 Refactored with Numba JIT for faster generation.
November 20, 2023 ITH script updated: Fractional percentage PnL column can be generated from NAV column automatically.
from rich.logging import RichHandler; from rich.traceback import install; install(show_locals=True)
import logging; logging.basicConfig(level="INFO", format="%(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True, markup=True)]); logger = logging.getLogger("rich")
from pathlib import Path; import sys; sys.path.insert(0, str(next((p for p in Path.cwd().parents if p.name == "ml_feature_set"), Path.cwd()))); __package__ = Path.cwd().parent.name
#-------------------------------------------------------------------------------------------------------------------------------------------------------------------------#
import pandas as pd; import webbrowser, os, glob, subprocess, platform, json, shutil; import numpy as np, pandas as pd, plotly.graph_objs as go; from plotly.subplots import make_subplots
from scipy import stats;from scipy.stats import gmean; from plotly.graph_objs.layout import XAxis, YAxis; from typing import NamedTuple; from numba import njit
from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn, BarColumn, TextColumn
class IthConfig(NamedTuple):
delete_everything: bool = False
output_dir: Path = Path('synth_ithes')
TMAEG_dynamically_determined_by: str = "mdd"
TMAEG: float = 0.05
date_initiate: str = '2020-01-30'
date_conclude: str = '2023-07-25'
ith_epochs_lower_bound: int = 10
ith_epochs_upper_bound: int = 100000
sr_lower_bound: float = 0.5
sr_upper_bound: float = 9.9
aggcv_low_bound: float = 0
aggcv_up_bound: float = 0.70
qualified_results: int = 0
required_qualified_results: int = 15
class SyntheticNavParams(NamedTuple):
start_date: str = '2020-01-30' # Start date for NAV data
end_date: str = '2023-07-25' # End date for NAV data
avg_daily_return: float = 0.00010123 # Average daily return; higher values increase the overall upward trend
daily_return_volatility: float = 0.009 # Daily return volatility; higher values increase the daily fluctuations
df: int = 5 # Degrees of freedom for the t-distribution; lower values increase the likelihood of extreme returns
drawdown_prob: float = 0.05 # Probability of entering a drawdown; higher values increase the frequency of drawdowns
drawdown_magnitude_low: float = 0.001 # Lower bound of drawdown magnitude; higher values increase the minimum drawdown size
drawdown_magnitude_high: float = 0.003 # Upper bound of drawdown magnitude; higher values increase the maximum drawdown size
drawdown_recovery_prob: float = 0.02 # Probability of recovering from a drawdown; higher values increase the likelihood of recovery
class ProcessingParams(NamedTuple):
trading_year_days_crypto: int = 365
trading_year_days_other: int = 252
retries: int = 20
delay: int = 3
class PlotConfig(NamedTuple):
margin: dict = {"l": 50, "r": 50, "b": 50, "t": 50, "pad": 4}
paper_bgcolor: str = "DarkSlateGrey"
plot_bgcolor: str = "Black"
legend_font_family: str = "Courier New"
legend_font_size: int = 12
legend_font_color: str = "White"
legend_bgcolor: str = "DarkSlateGrey"
legend_bordercolor: str = "White"
legend_borderwidth: int = 2
global_font_family: str = "Monospace"
global_font_size: int = 12
global_font_color: str = "White"
annotation_font_size: int = 16
annotation_font_color: str = "White"
gridcolor: str = "dimgray"
xaxis_tickmode: str = 'array'
xaxis_gridwidth: float = 0.5
yaxis_gridwidth: float = 0.5
auto_open_index_html: bool = True # New attribute to control auto-opening of index HTML
auto_open_nav_html: bool = False # New attribute to control auto-opening of NAV HTML
auto_open_live_view: bool = False # New attribute to control auto-opening of live view HTML
# Use the configuration
config = IthConfig()
synthetic_nav_params = SyntheticNavParams()
processing_params = ProcessingParams()
plot_config = PlotConfig()
# Use the configuration
if config.delete_everything:
if config.output_dir.exists():
shutil.rmtree(config.output_dir)
logger.info("Sayonara, files! 🚀") # Your files have left the chat
else:
logger.info("Hmm, nothing to delete here. 🤔")
# Create anew after the great purge
config.output_dir.mkdir(parents=True, exist_ok=True)
nav_dir = config.output_dir / "NAV_data"
nav_dir.mkdir(parents=True, exist_ok=True)
# Set constants
date_duration = (pd.to_datetime(config.date_conclude) - pd.to_datetime(config.date_initiate)).days
ith_epochs_lower_bound = int(np.floor(date_duration / 28 / 6)); logger.debug(f'{ith_epochs_lower_bound=}')
ith_epochs_upper_bound = config.ith_epochs_upper_bound
sr_lower_bound = config.sr_lower_bound
sr_upper_bound = config.sr_upper_bound
aggcv_low_bound = config.aggcv_low_bound
aggcv_up_bound = config.aggcv_up_bound
qualified_results = config.qualified_results
# Try to load existing data
existing_csv_files = glob.glob(str(nav_dir / '*.csv'))
logger.debug(f'{existing_csv_files=}')
def load_config(filename="config.json"):
'''
config.json file contains the following:
{
"api_key": "key_string_here"
}
'''
with open(filename, 'r') as file:
config = json.load(file)
return config
from typing import NamedTuple
def generate_synthetic_nav(params: SyntheticNavParams):
dates = pd.date_range(params.start_date, params.end_date)
walk = stats.t.rvs(params.df, loc=params.avg_daily_return, scale=params.daily_return_volatility, size=len(dates))
walk = np.cumsum(walk)
drawdown = False
for i in range(len(dates)):
if drawdown:
walk[i] -= np.random.uniform(params.drawdown_magnitude_low, params.drawdown_magnitude_high)
if np.random.rand() < params.drawdown_recovery_prob:
drawdown = False
elif np.random.rand() < params.drawdown_prob:
drawdown = True
walk = walk - walk[0] + 1 # Normalize the series so that it starts with 1
nav = pd.DataFrame(data=walk, index=dates, columns=['NAV'])
nav.index.name = 'Date'
nav['PnL'] = nav['NAV'].diff()
nav['PnL'] = nav['PnL'].fillna(nav['NAV'].iloc[0] - 1) # Adjust the first PnL value accordingly
return nav
@njit
def _sharpe_ratio_numba_helper(returns: np.ndarray, nperiods: float, rf: float = 0., annualize: bool = True) -> float:
valid_returns = returns[~np.isnan(returns)]
n = len(valid_returns)
if n < 2:
return np.nan
mean_returns = np.mean(valid_returns)
std_dev = np.sqrt(np.sum((valid_returns - mean_returns) ** 2) / (n - 1))
if std_dev == 0:
return np.nan
mean_diff = mean_returns - rf
return np.sqrt(nperiods) * (mean_diff / std_dev) if annualize else mean_diff / std_dev
def sharpe_ratio_numba(returns: np.ndarray, granularity: str, market_type: str = 'crypto', rf: float = 0., annualize: bool = True) -> float:
trading_year_days = processing_params.trading_year_days_crypto if market_type == 'crypto' else processing_params.trading_year_days_other
if 'd' in granularity:
nperiods = trading_year_days / int(granularity.replace('d', ''))
elif 'm' in granularity:
nperiods = (trading_year_days * 24 * 60) / int(granularity.replace('m', ''))
else:
raise ValueError("Invalid granularity format. Use '1d', '2d', ..., '1m', '2m', ...")
return _sharpe_ratio_numba_helper(returns, nperiods, rf, annualize)
class MaxDrawdownResult(NamedTuple):
max_drawdown: float
def max_drawdown(nav_values) -> MaxDrawdownResult:
max_drawdown = np.max(1 - nav_values / np.maximum.accumulate(nav_values))
return MaxDrawdownResult(max_drawdown=max_drawdown)
def save_files(fig, filename, output_dir, nav_data, uid, source_file=None):
"""Save HTML and CSV files with optional source file name in output filename"""
try:
output_dir = Path(output_dir).resolve()
if not output_dir.exists():
logger.warning(f"Creating missing output directory: {output_dir}")
output_dir.mkdir(parents=True, exist_ok=True)
# Add source filename to output if provided
if source_file:
source_name = Path(source_file).stem
filename = filename.replace('.html', f'_{source_name}.html')
# Remove '_NAV_data' suffix if present
filename = filename.replace('_NAV_data.html', '.html')
html_path = output_dir / filename
csv_path = output_dir / filename.replace('.html', '.csv')
logger.debug(f"Saving files to: HTML={html_path}, CSV={csv_path}")
# Verify write permissions before attempting to save
if not os.access(str(output_dir), os.W_OK):
logger.critical(f"No write permission for directory: {output_dir}")
return False
# Save files with explicit error handling
try:
fig.write_html(str(html_path), include_plotlyjs='cdn', full_html=False)
except Exception as e:
logger.critical(f"Failed to save HTML file: {str(e)}", exc_info=True)
return False
try:
nav_data.to_csv(str(csv_path))
except Exception as e:
logger.critical(f"Failed to save CSV file: {str(e)}", exc_info=True)
return False
# Verify files were created with correct size
if not html_path.exists() or html_path.stat().st_size == 0:
logger.critical(f"HTML file not created or empty: {html_path}")
return False
if not csv_path.exists() or csv_path.stat().st_size == 0:
logger.critical(f"CSV file not created or empty: {csv_path}")
return False
logger.info(f"Successfully saved files to {output_dir}")
return True
except Exception as e:
logger.critical(f"Error in save_files: {str(e)}", exc_info=True)
return False
class DeepestTroughsResult(NamedTuple):
deepest_troughs_after_new_high: pd.Series
new_high_flag: pd.Series
def deepest_troughs_and_new_highs(nav_values, running_max_nav) -> DeepestTroughsResult:
deepest_troughs_after_new_high = pd.Series(index=nav_values.index, dtype=float)
new_high_flag = pd.Series(index=nav_values.index, dtype=int)
current_max = running_max_nav[0]
current_trough = nav_values[0]
for i in range(1, len(nav_values)):
if running_max_nav[i] > current_max:
current_max = running_max_nav[i]
current_trough = nav_values[i]
new_high_flag[i] = 1
elif nav_values[i] < current_trough:
current_trough = nav_values[i]
deepest_troughs_after_new_high[i] = current_trough
return DeepestTroughsResult(deepest_troughs_after_new_high, new_high_flag)
class MaxDDPointsResult(NamedTuple):
max_dd_points: pd.Series
def max_dd_points_after_new_high(drawdowns, new_high_flag) -> MaxDDPointsResult:
max_dd_points = pd.Series(np.zeros(len(drawdowns)), index=drawdowns.index)
current_max_dd = 0
max_dd_index = -1
for i in range(1, len(drawdowns)):
if new_high_flag[i] == 1:
if max_dd_index != -1:
max_dd_points[max_dd_index] = current_max_dd
current_max_dd = 0
max_dd_index = -1
else:
if drawdowns[i] > current_max_dd:
current_max_dd = drawdowns[i]
max_dd_index = i
if max_dd_index != -1:
max_dd_points[max_dd_index] = current_max_dd
return MaxDDPointsResult(max_dd_points)
class GeometricMeanDrawdownResult(NamedTuple):
geometric_mean: float
def geometric_mean_of_drawdown(nav_values) -> GeometricMeanDrawdownResult:
running_max_nav = nav_values.cummax()
deepest_troughs_result = deepest_troughs_and_new_highs(nav_values, running_max_nav)
drawdowns_to_deepest_troughs = running_max_nav - deepest_troughs_result.deepest_troughs_after_new_high
max_dd_points_result = max_dd_points_after_new_high(drawdowns_to_deepest_troughs, deepest_troughs_result.new_high_flag)
max_dd_points_fraction = max_dd_points_result.max_dd_points / running_max_nav
spike_values = max_dd_points_fraction[max_dd_points_fraction > 0]
if spike_values.empty: # Check if spike_values is empty
geometric_mean = np.nan # Return NaN or some other appropriate value
else:
geometric_mean = gmean(spike_values)
return GeometricMeanDrawdownResult(geometric_mean=geometric_mean)
class ExcessGainLossResult(NamedTuple):
excess_gains: np.ndarray
excess_losses: np.ndarray
num_of_ith_epochs: int
ith_epochs: np.ndarray
ith_intervals_cv: float
@njit
def _excess_gain_excess_loss_numba(nav, hurdle):
excess_gain = excess_loss = 0
excess_gains = [0]
excess_losses = [0]
excess_gains_at_ith_epoch = [0]
last_reset_state = False
ith_epochs = [False] * len(nav)
endorsing_crest = endorsing_nadir = candidate_crest = candidate_nadir = nav[0]
for i, (equity, next_equity) in enumerate(zip(nav[:-1], nav[1:])):
if next_equity > candidate_crest:
excess_gain = next_equity / endorsing_crest - 1 if endorsing_crest != 0 else 0
candidate_crest = next_equity
if next_equity < candidate_nadir:
excess_loss = 1 - next_equity / endorsing_crest
candidate_nadir = next_equity
reset_candidate_nadir_excess_gain_and_excess_loss = excess_gain > abs(excess_loss) and excess_gain > hurdle and candidate_crest >= endorsing_crest
if reset_candidate_nadir_excess_gain_and_excess_loss:
endorsing_crest = candidate_crest
endorsing_nadir = candidate_nadir = equity
excess_gains_at_ith_epoch.append(excess_gain if not last_reset_state else 0)
else:
endorsing_nadir = min(endorsing_nadir, equity)
excess_gains_at_ith_epoch.append(0)
last_reset_state = reset_candidate_nadir_excess_gain_and_excess_loss
excess_gains.append(excess_gain)
excess_losses.append(excess_loss)
if reset_candidate_nadir_excess_gain_and_excess_loss:
excess_gain = excess_loss = 0
ith_epoch_condition = len(excess_gains) > 1 and excess_gains[-1] > excess_losses[-1] and excess_gains[-1] > hurdle
ith_epochs[i + 1] = ith_epoch_condition
num_of_ith_epochs = ith_epochs.count(True)
ith_interval_separators = [i for i, x in enumerate(ith_epochs) if x]
ith_interval_separators.insert(0, 0)
ith_intervals = np.diff(np.array(ith_interval_separators)) # Convert to NumPy array before using np.diff
ith_intervals_cv = np.std(ith_intervals) / np.mean(ith_intervals) if len(ith_intervals) > 0 else np.nan
return ExcessGainLossResult(
excess_gains=np.array(excess_gains),
excess_losses=np.array(excess_losses),
num_of_ith_epochs=num_of_ith_epochs,
ith_epochs=np.array(ith_epochs),
ith_intervals_cv=ith_intervals_cv
)
def excess_gain_excess_loss_numba(hurdle, nav):
original_df = nav.copy() if isinstance(nav, pd.DataFrame) and 'NAV' in nav.columns else None
nav = nav['NAV'].values if original_df is not None else nav.values
result = _excess_gain_excess_loss_numba(nav, hurdle)
if original_df is not None:
original_df['Excess Gains'] = result.excess_gains
original_df['Excess Losses'] = result.excess_losses
original_df['ITHEs'] = result.ith_epochs
return original_df
else:
return result
def get_first_non_zero_digits(num, digit_count):
# Get first 12 non-zero, non-decimal, non-negative digits
non_zero_digits = ''.join([i for i in str(num) if i not in ['0', '.', '-']][:12])
# Then trim or pad to the desired length
uid = (non_zero_digits + '0' * digit_count)[:digit_count]
return uid
class PnLResult(NamedTuple):
nav_data: pd.DataFrame
def pnl_from_nav(nav_data) -> PnLResult:
"""Calculate PnL from NAV as a fractional percentage."""
try:
nav_copy = nav_data.copy() # Create explicit copy to avoid chained assignment
nav_copy.loc[:, 'PnL'] = nav_copy['NAV'].diff() / nav_copy['NAV'].shift(1)
nav_copy.loc[nav_copy.index[0], 'PnL'] = 0 # Set first row PnL
return PnLResult(nav_data=nav_copy)
except Exception as e:
logger.critical(f"Failed to calculate PnL: {str(e)}", exc_info=True)
raise
class ProcessNavDataResult(NamedTuple):
qualified_results: int
sharpe_ratio: float
num_of_ith_epochs: int
filename: str
uid: str
fig: go.Figure # Add fig to the NamedTuple
def log_results(results):
headers = ["TMAEG", "Sharpe Ratio", "Num of ITH Epochs", "EL CV", "ITH CV", "AGG CV", "Days to ITHE", "Filename"]
df = pd.DataFrame(results, columns=headers)
# Filter out rows where Filename is None
df = df[df['Filename'].notna()]
# Add a rank column for AGG CV
df['AGG CV Rank'] = df['AGG CV'].rank(method='min')
# Add a rank column for Sharpe Ratio (largest value ranks as #1)
df['Sharpe Ratio Rank'] = df['Sharpe Ratio'].rank(method='min', ascending=False)
# Create a new column with HTML links
df['Link'] = df['Filename'].apply(lambda x: f'<a href="synth_ithes/{x}" target="_blank">Open</a>')
# Drop the Filename column as it's now redundant
df = df.drop(columns=['Filename'])
html_table = df.to_html(classes='table table-striped', index=False, table_id='results_table', escape=False)
# Add DataTables script to enable sorting and set page length
html_output = f"""
<html>
<head>
<link rel="stylesheet" type="text/css" href="<https://cdn.datatables.net/1.10.21/css/jquery.dataTables.css>">
<script type="text/javascript" charset="utf8" src="<https://code.jquery.com/jquery-3.5.1.js>"></script>
<script type="text/javascript" charset="utf8" src="<https://cdn.datatables.net/1.10.21/js/jquery.dataTables.js>"></script>
<script>
$(document).ready(function() {{
$('#results_table').DataTable({{
"pageLength": 200,
"lengthMenu": [[200, 500, 1000, -1], [200, 500, 1000, "All"]]
}});
}});
</script>
</head>
<body>
{html_table}
</body>
</html>
"""
html_file_path = "results.html"
with open(html_file_path, "w") as file:
file.write(html_output)
logger.info("Results have been written to results.html")
# Automatically open the HTML file in the default web browser if enabled
if plot_config.auto_open_index_html:
webbrowser.open(f"file://{os.path.abspath(html_file_path)}")
# Initialize a list to store results
results = []
def process_nav_data(nav_data, output_dir, qualified_results, nav_dir, TMAEG, bypass_thresholds=False) -> ProcessNavDataResult:
# Extract the first six non-zero digits from the first two rows of the NAV column
uid_part1 = get_first_non_zero_digits(nav_data['NAV'].iloc[0], 6)
uid_part2 = get_first_non_zero_digits(nav_data['NAV'].iloc[1], 6)
# Concatenate the two parts to form the UID
uid = uid_part1 + uid_part2
logger.debug(f'{uid=}')
# Initialize filename with a default value
filename = None
ith_durations = None
excess_losses_at_ithes = None
fig = None # Initialize fig to None
# Calculate days_elapsed here
days_elapsed = (nav_data.index[-1] - nav_data.index[0]).days
sharpe_ratio = sharpe_ratio_numba(nav_data['PnL'].dropna().values, '1d')
calculated_nav = excess_gain_excess_loss_numba(TMAEG, nav_data)
if isinstance(calculated_nav, pd.DataFrame):
ith_epochs = calculated_nav[calculated_nav['ITHEs']].index
num_of_ith_epochs = len(ith_epochs)
else:
ith_epochs = nav_data.index[calculated_nav.ith_epochs]
num_of_ith_epochs = calculated_nav.num_of_ith_epochs
# Add detailed logging for threshold checks
logger.debug(f"Threshold check details: bypass_thresholds={bypass_thresholds}")
logger.debug(f"SR check: {sr_lower_bound} < {sharpe_ratio} < {sr_upper_bound}")
logger.debug(f"ITH epochs check: {ith_epochs_lower_bound} < {num_of_ith_epochs} < {ith_epochs_upper_bound}")
if bypass_thresholds or (sr_lower_bound < sharpe_ratio < sr_upper_bound and
ith_epochs_lower_bound < num_of_ith_epochs < ith_epochs_upper_bound):
if bypass_thresholds:
logger.debug("Bypassing thresholds for custom CSV.")
else:
logger.debug("Thresholds met for synthetic data.")
logger.info(f"Found {num_of_ith_epochs=}, {sharpe_ratio=}")
ith_dates = calculated_nav[calculated_nav['ITHEs']].index
ith_dates = ith_dates.insert(0, calculated_nav.index[0])
ith_dates = ith_dates.append(pd.Index([calculated_nav.index[-1]])); logger.debug(f'fixed: {ith_dates=}')
ithe_ct = len(ith_dates) - 2
days_taken_to_ithe = days_elapsed / ithe_ct if ithe_ct > 0 else np.nan
ith_indices = [calculated_nav.index.get_loc(date) for date in ith_dates]
ith_durations = np.diff(ith_indices); logger.debug(f'{ith_durations=}')
ith_cv = np.std(ith_durations) / np.mean(ith_durations)
# Calculate the coefficient of variation for Excess Losses at ITHEs
excess_losses_at_ithes = calculated_nav[calculated_nav['ITHEs']]['Excess Losses']
excess_losses_at_ithes = excess_losses_at_ithes[excess_losses_at_ithes != 0] # Exclude zero values
last_excess_loss = calculated_nav['Excess Losses'].iloc[-1] # Include the last value of Excess Losses (even if it is not flagged with ITHE True), unless it's already included
if not calculated_nav['ITHEs'].iloc[-1]: # Check if the last value of ITHE is False
excess_losses_at_ithes = pd.concat([excess_losses_at_ithes, pd.Series([last_excess_loss], index=[calculated_nav.index[-1]])])
if excess_losses_at_ithes.empty: # Check if excess_losses_at_ithes is empty
el_cv = np.nan # Return NaN or some other appropriate value
else:
el_cv = np.std(excess_losses_at_ithes) / np.mean(excess_losses_at_ithes)
aggcv = max(el_cv, ith_cv)
logger.debug(f'{aggcv=}')
# Add logging for aggcv check
logger.debug(f"AGGCV check: {aggcv_low_bound} < {aggcv} < {aggcv_up_bound}")
if bypass_thresholds or (aggcv_low_bound < aggcv < aggcv_up_bound):
if bypass_thresholds:
source_name = Path(nav_dir).stem if nav_dir else "unknown"
filename = (
f"nav_ming_xu_"
f"EL_{el_cv:.5f}_"
f"ITHC_{ith_cv:.5f}_"
f"TMAEG_{TMAEG:.5f}_"
f"ITHEs_{ithe_ct}_"
f"D2ITHE_{days_taken_to_ithe:.2f}_"
f"SR_{sharpe_ratio:.4f}_"
f"UID_{uid}.html"
)
else:
filename = (
f"EL_{el_cv:.5f}_"
f"ITHC_{ith_cv:.5f}_"
f"TMAEG_{TMAEG:.5f}_"
f"ITHEs_{ithe_ct}_"
f"D2ITHE_{days_taken_to_ithe:.2f}_"
f"SR_{sharpe_ratio:.4f}_"
f"UID_{uid}.html"
)
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.01, subplot_titles=('NAV', 'Excess Gains & Losses'))
ith_epochs = calculated_nav[calculated_nav['ITHEs']].index
num_of_ith_epochs = len(ith_epochs)
ithes_dir = output_dir / f"ITHEs_{num_of_ith_epochs}"
ithes_dir.mkdir(parents=True, exist_ok=True)
crossover_epochs = calculated_nav.loc[ith_epochs]
fig.add_trace(go.Scatter(x=crossover_epochs.index, y=crossover_epochs['NAV'], mode='markers', name='ITHEs on NAV', marker=dict(color='darkgoldenrod', size=20)), row=1, col=1)
fig.add_trace(go.Scatter(x=crossover_epochs.index, y=crossover_epochs['Excess Gains'], mode='markers', name='ITHEs on Excess Gains', marker=dict(color='blue', size=20)), row=2, col=1)
fig.add_trace(go.Scatter(x=calculated_nav.index, y=calculated_nav['NAV'], mode='lines', name='NAV'), row=1, col=1)
fig.add_trace(go.Scatter(x=calculated_nav.index, y=calculated_nav['Excess Gains'], mode='lines', name='Excess Gains', line=dict(color='green')), row=2, col=1)
fig.add_trace(go.Scatter(x=calculated_nav.index, y=calculated_nav['Excess Losses'], mode='lines', name='Excess Losses', line=dict(color='red')), row=2, col=1)
fig.update_layout(
title=f'{num_of_ith_epochs} ITH Epochs -- {filename}',
autosize=True, # Enable autosize for responsive layout
margin=plot_config.margin,
paper_bgcolor=plot_config.paper_bgcolor,
plot_bgcolor=plot_config.plot_bgcolor,
legend=dict(
x=0.01,
y=0.98,
traceorder="normal",
font=dict(
family=plot_config.legend_font_family,
size=plot_config.legend_font_size,
color=plot_config.legend_font_color
),
bgcolor=plot_config.legend_bgcolor,
bordercolor=plot_config.legend_bordercolor,
borderwidth=plot_config.legend_borderwidth
),
font=dict(
family=plot_config.global_font_family,
size=plot_config.global_font_size,
color=plot_config.global_font_color
),
annotations=[
dict(
x=0.5,
y=0.95,
xref='paper',
yref='paper',
text='NAV<br>',
showarrow=False,
font=dict(
size=plot_config.annotation_font_size,
color=plot_config.annotation_font_color
),
),
dict(
x=0.5,
y=0.45,
xref='paper',
yref='paper',
text='Excess Gains & Losses<br>',
showarrow=False,
font=dict(
size=plot_config.annotation_font_size,
color=plot_config.annotation_font_color
),
),
]
)
fig.update_yaxes(gridcolor=plot_config.gridcolor, type="linear", row=1, col=1)
fig.update_yaxes(gridcolor=plot_config.gridcolor, row=2, col=1)
fig.update_xaxes(gridcolor=plot_config.gridcolor, row=1, col=1)
fig.update_xaxes(gridcolor=plot_config.gridcolor, row=2, col=1)
# Generate monthly ticks between the minimum and maximum dates
monthly_ticks = pd.date_range(nav_data.index.min(), nav_data.index.max(), freq='MS')
monthly_tick_labels = monthly_ticks.strftime('%Y-%m')
# Customize X-axis grid lines
custom_xaxis = XAxis(
tickmode=plot_config.xaxis_tickmode,
tickvals=monthly_ticks, # Set to monthly_ticks
showgrid=True, # Show vertical grid
gridwidth=plot_config.xaxis_gridwidth # Vertical grid width
)
custom_yaxis = YAxis(
showgrid = True, # Show vertical grid
gridwidth = plot_config.yaxis_gridwidth # Vertical grid width
)
fig.update_layout(xaxis=custom_xaxis.to_plotly_json())
fig.update_layout(yaxis=custom_yaxis.to_plotly_json())
# Automatically open the live view version of the HTML if enabled
if plot_config.auto_open_live_view:
fig.show()
save_files(fig, filename, output_dir, nav_data, uid, source_file=None if not bypass_thresholds else nav_dir)
save_files(fig, filename, output_dir, nav_data, uid, source_file=None if not bypass_thresholds else nav_dir)
# Increment qualified_results if not bypassing thresholds
if not bypass_thresholds:
qualified_results += 1
logger.info(f"Generated {TMAEG=}, {sharpe_ratio=}, {num_of_ith_epochs=}, {el_cv=},{ith_cv=}, {aggcv=}, {days_taken_to_ithe=}")
# Append the results to the list only if filename is not None
if filename:
results.append([TMAEG, sharpe_ratio, num_of_ith_epochs, el_cv, ith_cv, aggcv, days_taken_to_ithe, filename])
# Automatically open the NAV HTML file in the default web browser if enabled
if plot_config.auto_open_nav_html and filename:
file_path = str(output_dir / filename)
webbrowser.open(f"file://{os.path.abspath(file_path)}")
else:
pass # Removed repetitive warning logs
if ith_durations is not None:
logger.debug(f"ith_durations in process_nav_data: {ith_durations}")
logger.debug(f"excess_losses_at_ithes in process_nav_data: {excess_losses_at_ithes}")
return ProcessNavDataResult(
qualified_results=qualified_results,
sharpe_ratio=sharpe_ratio,
num_of_ith_epochs=num_of_ith_epochs,
filename=filename,
uid=uid,
fig=fig # Return the fig object
)
# Initialize counters
qualified_results = 0
counter = 0 # Add counter initialization here
# Try to load existing data
existing_csv_files = glob.glob(str(nav_dir / '*.csv'))
logger.debug(f'{existing_csv_files=}')
def determine_tmaeg(nav_data, method):
if method == "geomean":
return geometric_mean_of_drawdown(nav_data['NAV']).geometric_mean
elif method == "mdd":
return max_drawdown(nav_data['NAV']).max_drawdown
elif method == "fixed":
return TMAEG
return None
# Process existing CSV files
for i, csv_file in enumerate(existing_csv_files, 1):
if qualified_results >= config.required_qualified_results:
logger.info(f"Required number of qualified results ({config.required_qualified_results}) reached.")
break
nav_data = pd.read_csv(csv_file, index_col='Date', parse_dates=True)
if 'PnL' not in nav_data.columns:
logger.debug(f"PnL column missing in {csv_file}. Calculating from NAV...")
nav_data = pnl_from_nav(nav_data).nav_data
TMAEG = determine_tmaeg(nav_data, config.TMAEG_dynamically_determined_by)
result = process_nav_data(nav_data, config.output_dir, qualified_results, nav_dir, TMAEG)
qualified_results = result.qualified_results
logger.debug(f'Processing file {i:4} of {len(existing_csv_files)}: {result.filename=}, {result.uid=}, {TMAEG=}, {result.sharpe_ratio=}, from CSV generated.')
# Generate new NAV data if necessary
with Progress(
SpinnerColumn(),
TimeElapsedColumn(),
BarColumn(),
TextColumn("[progress.description]{task.description}"),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("Failed:{task.fields[failed]} Qualified:{task.fields[qualified]}"),
) as progress:
task = progress.add_task(
"Generating synthetic data...",
total=config.required_qualified_results,
failed=0,
qualified=qualified_results
)
while qualified_results < config.required_qualified_results:
counter += 1
synthetic_nav = generate_synthetic_nav(synthetic_nav_params)
TMAEG = determine_tmaeg(synthetic_nav, config.TMAEG_dynamically_determined_by)
result = process_nav_data(synthetic_nav, config.output_dir, qualified_results, nav_dir, TMAEG)
qualified_results = result.qualified_results
if result.filename is not None:
logger.debug(f'Processing synthetic data {counter:4}: {result.filename=}, {result.uid=}, {TMAEG=}, {result.sharpe_ratio=}, newly generated.')
if result.fig is not None:
save_files(result.fig, result.filename, nav_dir, synthetic_nav, result.uid)
progress.update(task, advance=1, qualified=qualified_results)
else:
# Increment failed count without logging warning
progress.update(task, failed=progress.tasks[0].fields['failed'] + 1)
if qualified_results >= config.required_qualified_results:
logger.info(f"Required number of qualified results ({config.required_qualified_results}) reached.")
break
# Log the results in a tabulated format
log_results(results)
# After the config declarations, add this function to handle custom CSV files
def get_date_range_from_csvs(csv_folder: Path) -> tuple[str, str]:
"""Get the earliest start date and latest end date from all CSVs in folder"""
if not csv_folder.exists():
logger.warning(
f"Custom CSV folder not found: {csv_folder}\\n"
"Please replace the folder path with an existing directory and ensure your custom CSV files follow the format of the synthetic data CSV located at synth_ithes/."
)
return synthetic_nav_params.start_date, synthetic_nav_params.end_date
csv_files = list(csv_folder.glob('*.csv'))
if not csv_files:
logger.warning(f"No CSV files found in {csv_folder}")
return synthetic_nav_params.start_date, synthetic_nav_params.end_date
all_dates = []
processed_files = 0
for csv_file in csv_files:
try:
df = pd.read_csv(csv_file, parse_dates=['Date'])
if df.empty:
logger.warning(f"Empty CSV file: {csv_file}")
continue
if 'Date' not in df.columns:
logger.error(f"No Date column in {csv_file}")
continue
all_dates.extend(df['Date'].tolist())
processed_files += 1
except pd.errors.EmptyDataError:
logger.error(f"Empty or corrupt CSV file: {csv_file}")
continue
except Exception as e:
logger.critical(f"Failed to process {csv_file}: {str(e)}", exc_info=True)
continue
if not all_dates:
logger.warning("No valid dates found in any CSV files")
return synthetic_nav_params.start_date, synthetic_nav_params.end_date
if processed_files < len(csv_files):
logger.warning(f"Only processed {processed_files} out of {len(csv_files)} CSV files")
min_date = min(all_dates).strftime('%Y-%m-%d')
max_date = max(all_dates).strftime('%Y-%m-%d')
logger.info(f"Using date range from custom CSVs: {min_date} to {max_date}")
return min_date, max_date
#* IMPORTANT: For first time users, replace this path with your own performance record folder
#* The folder should contain CSV files with columns: Date, NAV, PnL (optional)
#^ Example path: Path('/Users/your_username/trading_records')
custom_csv_folder = Path('/Users/mock_folder/records')
start_date, end_date = get_date_range_from_csvs(custom_csv_folder)
# Update synthetic params with custom date range
synthetic_nav_params = synthetic_nav_params._replace(
start_date=start_date,
end_date=end_date
)
# Process custom CSV files if they exist
if custom_csv_folder.exists():
custom_csvs = list(custom_csv_folder.glob('*.csv'))
total_files = len(custom_csvs)
processed_files = 0
failed_files = 0
for csv_file in custom_csvs:
try:
logger.info(f"Processing custom CSV: {csv_file.name}")
nav_data = pd.read_csv(csv_file, index_col='Date', parse_dates=True)
if nav_data.empty:
logger.error(f"Empty CSV file: {csv_file}")
failed_files += 1
continue
if 'NAV' not in nav_data.columns:
logger.error(f"No NAV column in {csv_file}")
failed_files += 1
continue
if 'PnL' not in nav_data.columns:
logger.info(f"Calculating PnL for {csv_file}")
nav_data = pnl_from_nav(nav_data).nav_data
TMAEG = determine_tmaeg(nav_data, config.TMAEG_dynamically_determined_by)
if pd.isna(TMAEG):
logger.error(f"Invalid TMAEG calculated for {csv_file}")
failed_files += 1
continue
# Pass bypass_thresholds=True for custom CSVs
result = process_nav_data(nav_data, config.output_dir, qualified_results, nav_dir, TMAEG, bypass_thresholds=True)
qualified_results = result.qualified_results
if result.filename is not None:
if result.fig is not None:
# Use the source_file parameter to append CSV name to HTML
if save_files(result.fig, result.filename, nav_dir, nav_data, result.uid, source_file=csv_file):
processed_files += 1
else:
failed_files += 1
else:
logger.error(f"No figure generated for {csv_file}")
failed_files += 1
else:
logger.warning(f"No output filename generated for {csv_file}")
failed_files += 1
except Exception as e:
logger.critical(f"Failed to process {csv_file}: {str(e)}", exc_info=True)
failed_files += 1
continue
logger.info(f"CSV Processing Summary: Total={total_files}, Processed={processed_files}, Failed={failed_files}")
# The rest of the synthetic data generation code remains the same
# but will now use the updated date range from custom CSVs if they exist