In this Python code, the download_perpetual_usdt_trades
function iterates over the given trading symbols and downloads the corresponding trade data for the specified date range.
import os, requests, hashlib, time
from datetime import datetime, timedelta
from pathlib import Path
from tqdm.auto import tqdm
from rich import print
incomplete_checksum_files = []
def md5sum(file_data: bytes, file_size: int) -> tuple[str, float]:
m = hashlib.md5()
with tqdm(total=file_size, unit="B", unit_scale=True, desc="Checksum validation") as progress_bar:
for i in range(0, file_size, 1024):
chunk = file_data[i:i + 1024]
m.update(chunk)
progress_bar.update(len(chunk))
progress_percentage = (progress_bar.n / progress_bar.total) * 100
return m.hexdigest(), progress_percentage
def download_file(url: str, local_folder: str, max_retries: int = 100, retry_delay: int = 60, retry_increment: int = 15) -> None:
retry_count = 0
current_retry_delay = retry_delay
while retry_count < max_retries:
try:
response = requests.get(url, stream=True)
if response.status_code == 404:
print(f"[yellow]File not found:[/yellow] [red1]{url}[/red1]")
return
file_size = int(response.headers.get("Content-Length", 0))
filename = Path(local_folder) / url.split("/")[-1]
checksum_url = url + ".CHECKSUM"
checksum_filename = Path(local_folder) / checksum_url.split("/")[-1]
expected_checksum = None
if checksum_filename.exists():
with open(checksum_filename, "r") as f:
expected_checksum = f.read().strip()
if filename.exists():
with open(filename, "rb") as f:
file_data = f.read()
actual_checksum, progress_percentage = md5sum(file_data, file_size)
if progress_percentage < 100:
print(f"[yellow]Checksum progress not reaching 100% for file:[/yellow] [red1]{filename}[/red1]")
incomplete_checksum_files.append(filename)
if actual_checksum == expected_checksum:
print(f"[green]Local copy found for[/green] {filename}")
return
else:
print(f"Checksum failed for {filename}, redownloading...")
if not filename.parent.exists():
filename.parent.mkdir(parents=True, exist_ok=True)
print(f"[bold green]filename.parent:[/bold green] [red]{filename.parent}[/red] [bold green]has been created![/bold green]")
with open(filename, "wb") as f, tqdm(
total=file_size, unit="B", unit_scale=True, desc=filename.name
) as progress_bar:
for data in response.iter_content(chunk_size=1024):
f.write(data)
progress_bar.update(len(data))
with open(filename, "rb") as f:
file_data = f.read()
actual_checksum, progress_percentage = md5sum(file_data, file_size)
if progress_percentage < 100:
print(f"[yellow]Checksum progress not reaching 100% for file:[/yellow] [red1]{filename}[/red1]")
incomplete_checksum_files.append(filename)
with open(checksum_filename, "w") as f:
f.write(actual_checksum)
if expected_checksum is not None and actual_checksum != expected_checksum:
filename.unlink()
checksum_filename.unlink()
print(f"Checksum failed for {filename}, redownloading...")
time.sleep(5)
download_file(url, local_folder)
else:
print(f"{filename.name} downloaded and validated against its checksum.")
return # Download completed successfully, exit the loop
except requests.exceptions.ConnectionError as e:
retry_count += 1
if retry_count < max_retries:
print(f"Connection error: {e}. Retrying in {current_retry_delay} seconds... (attempt {retry_count}/{max_retries})")
time.sleep(current_retry_delay)
current_retry_delay += retry_increment
else:
print(f"Connection error: {e}. Maximum retries ({max_retries}) reached. Aborting download.")
break
def download_perpetual_usdt_trades_daily(
symbols: list[str], start_year: int, start_month: int, start_day: int, end_year: int, end_month: int, end_day: int, local_folder: str, daily: bool, base_asset: str = "USDT",
specific_files: list[Path] = None
) -> None:
start_date = datetime(start_year, start_month, start_day)
end_date = datetime(end_year, end_month, end_day)
if not Path(local_folder).exists():
Path(local_folder).mkdir(parents=True)
time_unit = "daily" if daily else "monthly"
date_format = "%Y-%m-%d" if daily else "%Y-%m"
for symbol in symbols:
while start_date <= end_date:
date_str = start_date.strftime(date_format)
url = f"<https://data.binance.vision/data/futures/um/{time_unit}/trades/{symbol}{base_asset}/{symbol}{base_asset}-trades-{date_str}.zip>"
if specific_files and filename not in specific_files:
# Skip files not in the specific_files list
continue
download_file(url, Path(local_folder) / f"{symbol}{base_asset}")
start_date += timedelta(days=1 if daily else 32)
if daily:
start_date = start_date.replace(day=start_date.day)
else:
start_date = start_date.replace(day=1)
start_date = datetime(start_year, start_month, start_day)
def remove_incomplete_files(file_list: list[Path]) -> None:
for file_path in file_list:
md5_file_path = file_path.with_suffix(file_path.suffix + ".CHECKSUM")
if file_path.exists():
file_path.unlink()
print(f"[red]Deleted[/red] {file_path}")
if md5_file_path.exists():
md5_file_path.unlink()
print(f"[red]Deleted[/red] {md5_file_path}")
if __name__ == "__main__":
symbols = 'ETH BNB BTC'.split()
symbols = 'ETH'.split()
start_year = 2019
start_month = 12
start_day = 1
end_year = 2023
end_month = 5
end_day = 5
local_folder_monthly = r"D:/binance_raw_trade_data/data/futures/um/monthly/trades"
local_folder_daily = r"D:/binance_raw_trade_data/data/futures/um/daily/trades"
# Set daily to True for daily data, False for monthly data
daily = True
# daily = False
if daily:
download_perpetual_usdt_trades_daily(symbols, start_year, start_month, start_day, end_year, end_month, end_day, local_folder_daily, daily)
else:
download_perpetual_usdt_trades_daily(symbols, start_year, start_month, start_day, end_year, end_month, end_day, local_folder_monthly, daily)
if incomplete_checksum_files:
print("\\nFiles that never reached 100% checksum progress:")
for file_path in incomplete_checksum_files:
print(file_path)
print("\\nDeleting incomplete files and their corresponding MD5 files...")
remove_incomplete_files(incomplete_checksum_files)
print("\\nRe-downloading deleted files...")
if daily:
download_perpetual_usdt_trades_daily(symbols, start_year, start_month, start_day, end_year, end_month, end_day, local_folder_daily, daily, specific_files=incomplete_checksum_files)
else:
download_perpetual_usdt_trades_daily(symbols, start_year, start_month, start_day, end_year, end_month, end_day, local_folder_monthly, daily, specific_files=incomplete_checksum_files)
from rich.logging import RichHandler; from rich.traceback import install; install(show_locals=True)
import logging; logging.basicConfig(level="INFO", format="%(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True, markup=True)]); logger = logging.getLogger("rich")
from pathlib import Path; import sys; sys.path.insert(0, str(next((p for p in Path.cwd().parents if p.name == "ml_feature_set"), Path.cwd()))); __package__ = Path.cwd().parent.name
import os; import concurrent.futures; import pytz; import pandas as pd; import requests
from datetime import datetime, timedelta; from joblib import Memory
from typing import NamedTuple, List, Optional
from tenacity import retry, stop_after_attempt, wait_exponential; import shutil; import numpy as np
from datetime import timedelta
CRYPTO_PAIRS = {'SYNTHETICUSDT': 'SYNTHETICUSDT', 'Bitcoin (BTC)': 'BTCUSDT', 'Ethereum (ETH)': 'ETHUSDT', 'Binance (BNB)': 'BNBUSDT', 'Solana (SOL)': 'SOLUSDT', 'Dogecoin (DOGE)': 'DOGEUSDT', 'Cardano (ADA)': 'ADAUSDT', 'Polygon (MATIC)': 'MATICUSDT', 'Storj (STORJ)': 'STORJUSDT', 'Litecoin (LTC)': 'LTCUSDT', 'dYdX (DYDX)': 'DYDXUSDT', 'Compound (COMP)': 'COMPUSDT', 'Measurable Data Token (MDT)': 'MDTUSDT', 'Avalanche (AVAX)': 'AVAXUSDT', 'Maker (MKR)': 'MKRUSDT', 'The Sandbox (SAND)': 'SANDUSDT', 'Decentraland (MANA)': 'MANAUSDT', 'Fantom (FTM)': 'FTMUSDT', 'Ripple (XRP)': 'XRPUSDT', 'Celo (CELO)': 'CELOUSDT', 'Optimism (OP)': 'OPUSDT', 'ApeCoin (APE)': 'APEUSDT', 'Pepe (PEPE)': 'PEPEUSDT', 'Gala (GALA)': 'GALAUSDT', 'NEAR Protocol (NEAR)': 'NEARUSDT', 'Wild (WLD)': 'WLDUSDT', 'Floki Inu (FLOKI)': 'FLOKIUSDT', 'Pixel (PIXEL)': 'PIXELUSDT', 'Ordi (ORDI)': 'ORDIUSDT', 'Meme (MEME)': 'MEMEUSDT', 'Cosmos (ATOM)': 'ATOMUSDT', 'Tia (TIA)': 'TIAUSDT'}
GRANULARITY_MAP = {'1s': timedelta(seconds=1), '1m': timedelta(minutes=1), '3m': timedelta(minutes=3), '5m': timedelta(minutes=5), '15m': timedelta(minutes=15), '30m': timedelta(minutes=30), '1h': timedelta(hours=1), '2h': timedelta(hours=2), '4h': timedelta(hours=4), '6h': timedelta(hours=6), '8h': timedelta(hours=8), '12h': timedelta(hours=12), '1d': timedelta(days=1)}
# Create a cache directory if it doesn't exist
cache_dir = '.cache' # Shortened cache directory path
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
# Initialize Joblib's Memory to cache data
memory = Memory(location='.cache', verbose=0)
# Ensure OHLCVData is defined before it is used
class OHLCVData(NamedTuple):
open_time: pd.Series
open: pd.Series
high: pd.Series
low: pd.Series
close: pd.Series
volume: pd.Series
# Ensure unique name for cached function
@memory.cache
def cached_fetch_binance_data(symbol: str, interval: str, start_str: str, end_str: str) -> 'OHLCVData':
return fetch_binance_ohlcv(symbol, interval, start_str, end_str)
def cached_fetch_data(symbol: str, interval: str, start_str: str, end_str: str) -> 'OHLCVData':
cache_key = f"{symbol}_{interval}_{start_str}_{end_str}" # noqa # Unused variable
# Check if the function is cached
if cached_fetch_binance_data.check_call_in_cache(symbol, interval, start_str, end_str):
logger.debug(f"Using cached data for {symbol} from {start_str} to {end_str}")
else:
logger.debug(f"Fetching new data for {symbol} from {start_str} to {end_str}")
return cached_fetch_binance_data(symbol, interval, start_str, end_str)
# Define a NamedTuple for the fetched data
class OHLCVData(NamedTuple):
open_time: pd.Series
open: pd.Series
high: pd.Series
low: pd.Series
close: pd.Series
volume: pd.Series
# Utility function to convert time string to UTC localized datetime object
def to_utc(time_str: str) -> datetime:
utc = pytz.UTC
return utc.localize(datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S"))
# Define a NamedTuple for synthetic data parameters
class SyntheticDataParams(NamedTuple):
min_rand: int = 1
max_rand: int = 10
clip_min: int = 1
clip_max: int = 10
datum: int = 100
quanta: int = 17
# Global flag to ensure initial value is added only once
datum_added = False
def generate_synthetic_data_bundle(config: SyntheticDataParams, start_time: datetime, end_time: datetime, granularity: str) -> OHLCVData:
global datum_added
logger.debug(f"Generating synthetic data from {start_time} to {end_time} with granularity {granularity}")
if granularity not in GRANULARITY_MAP:
raise ValueError(f"Invalid granularity: {granularity}. Must be one of {list(GRANULARITY_MAP.keys())}")
interval = GRANULARITY_MAP[granularity]
total_duration = (end_time - start_time).total_seconds()
num_points = max(int(total_duration / interval.total_seconds()), 1) # Calculate number of points based on interval
logger.debug(f"Total duration: {total_duration} seconds, Number of points: {num_points}")
# Ensure start_time is naive before converting to np.datetime64
start_time = start_time.replace(tzinfo=None)
price: np.ndarray = np.cumsum(np.clip(np.random.randint(config.min_rand, config.max_rand + 1, size=num_points), config.clip_min, config.clip_max)) + config.datum
open_time: np.ndarray = np.array([np.datetime64(start_time + interval * i) for i in range(num_points)], dtype='datetime64[ns]')
open_time_series: pd.Series = pd.Series(open_time).drop_duplicates().reset_index(drop=True)
logger.debug(f"Generated open_time series: {open_time_series}")
# Include initial value in open prices
open_prices = np.insert(price[:-1], 0, config.datum)
close_prices = price
high_prices = np.maximum(open_prices, close_prices) + np.random.rand(len(open_prices))
low_prices = np.minimum(open_prices, close_prices) - np.random.rand(len(open_prices))
volume = np.random.randint(1, 100, size=len(open_prices))
# Ensure the initial value is included in all columns only once
if not datum_added:
open_time_series = pd.Series(np.insert(open_time, 0, np.datetime64(start_time)))
open_prices = np.insert(open_prices, 0, config.datum)
high_prices = np.insert(high_prices, 0, config.datum)
low_prices = np.insert(low_prices, 0, config.datum)
close_prices = np.insert(close_prices, 0, config.datum)
volume = np.insert(volume, 0, 0) # Assuming initial volume is 0
datum_added = True
# Convert to appropriate data types
open_time_series = open_time_series.dt.tz_localize('UTC')
open_prices = open_prices.astype('float64')
high_prices = high_prices.astype('float64')
low_prices = low_prices.astype('float64')
close_prices = close_prices.astype('float64')
volume = volume.astype('float64')
# Log lengths of arrays
logger.debug(f"Lengths - open_time: {len(open_time_series)}, open: {len(open_prices)}, high: {len(high_prices)}, low: {len(low_prices)}, close: {len(close_prices)}, volume: {len(volume)}")
# Ensure all arrays have the same length
if not (len(open_time_series) == len(open_prices) == len(high_prices) == len(low_prices) == len(close_prices) == len(volume)):
raise ValueError("PARALLEL: All arrays must be of the same length")
return OHLCVData(
open_time=open_time_series,
open=open_prices,
high=high_prices,
low=low_prices,
close=close_prices,
volume=volume
)
# Define the fetch_binance_ohlcv function
def fetch_binance_ohlcv(symbol: str = "BTCUSDT", interval: str = "1m", start_str: str = None, end_str: str = None) -> OHLCVData:
logger.debug(f"Entering fetch_binance_ohlcv with symbol={symbol}, interval={interval}, start_str={start_str}, end_str={end_str}")
if symbol == "SYNTHETICUSDT":
start_time = to_utc(start_str) # Convert start_str to UTC datetime
end_time = to_utc(end_str) # Convert end_str to UTC datetime
config = SyntheticDataParams(
min_rand=1,
max_rand=10,
clip_min=1,
clip_max=10,
datum=100
)
synthetic_data = generate_synthetic_data_bundle(config, start_time, end_time, interval) # Pass the interval as granularity
logger.debug(f"Generated synthetic data: {synthetic_data}")
logger.debug(f"Synthetic data lengths: open_time={len(synthetic_data.open_time)}, open={len(synthetic_data.open)}, high={len(synthetic_data.high)}, low={len(synthetic_data.low)}, close={len(synthetic_data.close)}, volume={len(synthetic_data.volume)}")
logger.debug(f"Exiting fetch_binance_ohlcv with result: {synthetic_data}")
return OHLCVData(
open_time=synthetic_data.open_time,
open=synthetic_data.open,
high=synthetic_data.high,
low=synthetic_data.low,
close=synthetic_data.close,
volume=synthetic_data.volume
)
base_url = "<https://api.binance.com/api/v3/klines>"
all_data: List[pd.DataFrame] = []
limit = 1000 # Max limit as per Binance API
# Convert start and end strings to datetime objects localized to UTC
start_time = to_utc(start_str).timestamp() * 1000
end_time = to_utc(end_str).timestamp() * 1000
logger.debug(f"Starting fetch: {start_str} ({int(start_time)}) to {end_str} ({int(end_time)})")
@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=1, max=60))
def fetch_data_with_retry(start_time: int, end_time: int) -> pd.DataFrame:
params = {
"symbol": symbol,
"interval": interval,
"limit": limit,
"startTime": int(start_time),
"endTime": int(end_time)
}
response = requests.get(base_url, params=params)
data = response.json()
# Log the raw response from the API
logger.debug(f"API response: {data}")
if not data: # Check if Binance API returned no data
return pd.DataFrame() # Return an empty DataFrame to indicate no data
if 'code' in data: # Check if Binance API returned an error
raise ValueError("API error")
df = pd.DataFrame(data, columns=["open_time", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"])
df["open_time"] = pd.to_datetime(df["open_time"], unit='ms', utc=True)
df["close_time"] = pd.to_datetime(df["close_time"], unit='ms', utc=True)
numeric_columns = ["open", "high", "low", "close", "volume"]
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, axis=1)
logger.debug(f"Fetched data length: {len(data)}") # Log the length of the fetched data
logger.debug(f"Data content: {data}") # Log the content of the fetched data
return df
while start_time < end_time:
try:
df = fetch_data_with_retry(start_time, end_time)
if not df.empty:
all_data.append(df)
logger.debug(f"Batch open_time range: {df.iloc[0]['open_time']} to {df.iloc[-1]['open_time']}")
last_close_time = df.iloc[-1]["close_time"]
start_time = last_close_time.timestamp() * 1000 + GRANULARITY_MAP[interval].total_seconds() * 1000 # Start from the exact end of the last fetch plus one interval
else:
logger.warning(f"No data fetched for range {datetime.utcfromtimestamp(start_time / 1000)} to {datetime.utcfromtimestamp(end_time / 1000)}")
break # Exit the loop if no data is returned
except Exception as e:
logger.error(f"Failed to fetch data: {e}")
break
if all_data:
# Concatenate all dataframes
full_df = pd.concat(all_data, ignore_index=True)
# Convert data types
numeric_columns = ["open", "high", "low", "close", "volume"]
full_df[numeric_columns] = full_df[numeric_columns].apply(pd.to_numeric, axis=1)
logger.debug(f"Data types of Binance data columns:\\n{full_df.dtypes}")
logger.debug(f"First 5 rows of Binance data:\\n{full_df.head()}")
logger.debug(f"Open time length: {len(full_df['open_time'])}, Data length: {len(full_df)}") # Log lengths before returning
# Check for length mismatch
if len(full_df['open_time']) != len(full_df):
logger.error(f"Length mismatch: Open time length {len(full_df['open_time'])}, Data length {len(full_df)}")
logger.debug(f"Exiting fetch_binance_ohlcv with result: {OHLCVData(open_time=full_df['open_time'], open=full_df['open'], high=full_df['high'], low=full_df['low'], close=full_df['close'], volume=full_df['volume'])}")
return OHLCVData(
open_time=full_df["open_time"],
open=full_df["open"],
high=full_df["high"],
low=full_df["low"],
close=full_df["close"],
volume=full_df["volume"]
)
else:
logger.debug(f"Exiting fetch_binance_ohlcv with result: {OHLCVData(open_time=pd.Series([], dtype='datetime64[ns, UTC]'), open=pd.Series([], dtype='float64'), high=pd.Series([], dtype='float64'), low=pd.Series([], dtype='float64'), close=pd.Series([], dtype='float64'), volume=pd.Series([], dtype='float64'))}")
return OHLCVData(
open_time=pd.Series([], dtype='datetime64[ns, UTC]'),
open=pd.Series([], dtype='float64'),
high=pd.Series([], dtype='float64'),
low=pd.Series([], dtype='float64'),
close=pd.Series([], dtype='float64'),
volume=pd.Series([], dtype='float64')
)
# Define a NamedTuple for configuration
class Config(NamedTuple):
symbol: str = "ETHUSDT"
interval: str = "15m"
start_str: str = None
end_str: str = None
threads: int = os.cpu_count()
# Define a NamedTuple for the time range
class TimeRange(NamedTuple):
start: str
end: str
# Define a new NamedTuple for the split time ranges result
class SplitTimeRangeResult(NamedTuple):
ranges: List[TimeRange]
# Define the BinanceOHLCVFetcher class
class BinanceOHLCVFetcher:
def __init__(self, config: Config):
self.config = config
def _split_time_range(self, start_time: datetime, end_time: datetime) -> SplitTimeRangeResult:
"""Split the time range into intervals based on the number of threads and granularity."""
interval = GRANULARITY_MAP[self.config.interval]
total_duration = (end_time - start_time).total_seconds()
num_points = int(total_duration / interval.total_seconds())
# Limit the number of threads to the number of data points
max_threads = min(self.config.threads, num_points)
if max_threads <= 1 or total_duration <= 0:
return SplitTimeRangeResult(ranges=[TimeRange(start=start_time.strftime("%Y-%m-%d %H:%M:%S"), end=end_time.strftime("%Y-%m-%d %H:%M:%S"))])
time_ranges = []
current_start = start_time
for _ in range(max_threads):
current_end = current_start + interval
if current_end > end_time:
current_end = end_time
time_ranges.append(TimeRange(
start=current_start.strftime("%Y-%m-%d %H:%M:%S"),
end=current_end.strftime("%Y-%m-%d %H:%M:%S")
))
current_start = current_end
logger.debug(f"Time ranges for fetching: {time_ranges}")
return SplitTimeRangeResult(ranges=time_ranges)
def _fetch_data_for_range(self, thread_start_str: str, thread_end_str: str) -> pd.DataFrame:
"""Fetch data for a specific time range and return as DataFrame."""
logger.debug(f"Fetching data for range: {thread_start_str} to {thread_end_str}")
data = cached_fetch_data(self.config.symbol, self.config.interval, thread_start_str, thread_end_str)
if not data.open_time.empty:
logger.debug(f"Fetched data length: {len(data.open_time)} for range: {thread_start_str} to {thread_end_str}")
return pd.DataFrame(data._asdict())
logger.warning(f"No data fetched for range {thread_start_str} to {thread_end_str}")
return pd.DataFrame()
def _initialize_futures(self, time_ranges: SplitTimeRangeResult) -> List[concurrent.futures.Future]:
logger.debug(f"Entering _initialize_futures with time_ranges={time_ranges}")
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.config.threads) as executor:
for i, time_range in enumerate(time_ranges.ranges):
logger.debug(f"Thread {i+1}: Fetching data from {time_range.start} to {time_range.end}")
futures.append(executor.submit(self._fetch_data_for_range, time_range.start, time_range.end))
logger.debug(f"Exiting _initialize_futures with futures={futures}")
return futures
def fetch_parallel(self) -> pd.DataFrame:
logger.debug("Entering fetch_parallel")
logger.info(f"🚀 Launching parallel data retrieval across {self.config.threads} vibrant threads performed by {__file__})")
start_time = to_utc(self.config.start_str)
end_time = to_utc(self.config.end_str)
time_ranges = self._split_time_range(start_time, end_time)
futures = self._initialize_futures(time_ranges)
all_data = []
for future in concurrent.futures.as_completed(futures):
try:
data = future.result()
if not data.empty:
all_data.append(data)
except Exception as e:
logger.error(f"Error fetching data for a time range: {e}", exc_info=True)
if not all_data:
logger.error("No data fetched. Please check the parameters and try again.")
return pd.DataFrame()
try:
full_df = pd.concat(all_data, ignore_index=True).sort_values(by="open_time").drop_duplicates().reset_index(drop=True)
logger.debug(f"Concatenated DataFrame: {full_df}")
except Exception as e:
logger.error(f"Error concatenating data frames: {e}", exc_info=True)
return pd.DataFrame()
logger.debug("Fetched OHLCV data:")
logger.debug(f"Data type of 'open_time' column: {full_df['open_time'].dtype}")
logger.debug(f"First 5 'open_time' values:\\n{full_df['open_time'].head()}")
logger.debug(f"Last 5 'open_time' values:\\n{full_df['open_time'].tail()}")
logger.debug(f"Exiting fetch_parallel with full_df={full_df}")
return full_df
class OHLCVParams(NamedTuple):
symbol: str
interval: str
start_str: str
end_str: str
threads: Optional[int] = None
synthetic_min_rand: int = 1
synthetic_max_rand: int = 10
synthetic_clip_min: int = 1
synthetic_clip_max: int = 10
synthetic_datum: int = 17
def get_binance_or_binance_like_ohlcv(params: OHLCVParams) -> pd.DataFrame:
"""Fetch OHLCV data, either from Binance or generate synthetic data."""
if params.symbol == "SYNTHETICUSDT":
config = SyntheticDataParams(
min_rand=params.synthetic_min_rand,
max_rand=params.synthetic_max_rand,
clip_min=params.synthetic_clip_min,
clip_max=params.synthetic_clip_max,
datum=params.synthetic_datum
)
try:
synthetic_data = fetch_binance_ohlcv(params.symbol, params.interval, params.start_str, params.end_str)
logger.info("Synthetic data fetched. Displaying the fetched data:")
synthetic_df = pd.DataFrame(synthetic_data._asdict())
logger.info("\\n" + synthetic_df.head(3).to_string(index=False))
logger.info("\\n" + synthetic_df.tail(3).to_string(index=False))
logger.info(f"Total number of rows: {len(synthetic_df)}")
logger.debug(f"Synthetic data shape: {synthetic_df.shape}")
logger.debug(f"Synthetic data columns: {synthetic_df.columns}")
logger.debug(f"Synthetic data types: {synthetic_df.dtypes}")
return synthetic_df
except Exception as e:
logger.error(f"Failed to fetch synthetic OHLCV data: {e}")
return pd.DataFrame()
else:
if params.threads is None:
params = params._replace(threads=os.cpu_count()) # Default to the number of CPU cores if not specified
config = Config(
symbol=params.symbol,
interval=params.interval,
start_str=params.start_str,
end_str=params.end_str,
threads=params.threads
)
try:
ohlcv_data = BinanceOHLCVFetcher(config).fetch_parallel()
logger.info("Fetching completed. Displaying the fetched data:")
logger.info("\\n" + ohlcv_data.head(3).to_string(index=False))
logger.info("\\n" + ohlcv_data.tail(3).to_string(index=False))
logger.info(f"Total number of rows: {len(ohlcv_data)}")
logger.debug(f"OHLCV data shape: {ohlcv_data.shape}")
logger.debug(f"OHLCV data columns: {ohlcv_data.columns}")
logger.debug(f"OHLCV data types: {ohlcv_data.dtypes}")
return ohlcv_data
except Exception as e:
logger.error(f"Failed to fetch OHLCV data: {e}")
return pd.DataFrame()
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] %(levelname)s %(message)s')
# Global flag to ensure cache is cleared only once
cache_cleared = False
def clear_cache(cache_dir):
global cache_cleared
if not cache_cleared:
if os.path.exists(cache_dir):
logging.info(f"Cache directory {cache_dir} exists. Attempting to clear it.")
try:
shutil.rmtree(cache_dir)
logging.info(f"Cache directory {cache_dir} cleared.")
except Exception as e:
logging.error(f"Failed to clear cache directory {cache_dir}: {e}")
else:
logging.info(f"Cache directory {cache_dir} does not exist.")
cache_cleared = True # Set the flag to True after clearing the cache
else:
logging.info("Cache already cleared, skipping.")
if __name__ == "__main__":
logging.info(f"Checking cache directory: {cache_dir}")
clear_cache(cache_dir)
# Scenario 3: Fetch synthetic data
logger.info("Scenario 3: Fetch synthetic data")
logger.info("This scenario demonstrates fetching synthetic OHLCV data for a specified time range.")
logger.info("OHLCV stands for Open, High, Low, Close, and Volume, which are common data points in financial markets.")
logger.info("Synthetic data is generated to simulate real market data for testing purposes.")
synthetic_data = get_binance_or_binance_like_ohlcv(
OHLCVParams(
symbol="SYNTHETICUSDT",
interval="1m",
start_str="2022-10-01 00:00:00",
end_str="2022-10-01 00:09:00",
synthetic_min_rand=1,
synthetic_max_rand=10,
synthetic_clip_min=1,
synthetic_clip_max=10,
synthetic_datum=100
)
)
logger.info("Synthetic data fetched. Displaying the fetched data:")
logger.info("\\n" + synthetic_data.to_string(index=False))
# Compare synthetic data format with Binance data format
logger.info("Next, we will fetch real OHLCV data from Binance for the same time range and compare the formats.")
logger.info("This ensures that our synthetic data format matches the real data format from Binance.")
binance_data = get_binance_or_binance_like_ohlcv(
OHLCVParams(
symbol="ETHUSDT",
interval="1m",
start_str="2022-10-01 00:00:00",
end_str="2022-10-01 00:09:00"
)
)
logger.info("Binance data fetched. Displaying the fetched data:")
logger.info("\\n" + binance_data.to_string(index=False))
# Assert that the synthetic data format matches the Binance data format
assert synthetic_data.columns.tolist() == binance_data.columns.tolist(), "Synthetic data format does not match Binance data format"
logger.info("Synthetic data format matches Binance data format.")
logger.info("This confirms that our synthetic data generation is consistent with real market data formats.")