In this Python code, the download_perpetual_usdt_trades function iterates over the given trading symbols and downloads the corresponding trade data for the specified date range.

import os, requests, hashlib, time
from datetime import datetime, timedelta
from pathlib import Path
from tqdm.auto import tqdm
from rich import print

incomplete_checksum_files = []

def md5sum(file_data: bytes, file_size: int) -> tuple[str, float]:

    m = hashlib.md5()
    with tqdm(total=file_size, unit="B", unit_scale=True, desc="Checksum validation") as progress_bar:
        for i in range(0, file_size, 1024):
            chunk = file_data[i:i + 1024]
            m.update(chunk)
            progress_bar.update(len(chunk))
    progress_percentage = (progress_bar.n / progress_bar.total) * 100
    return m.hexdigest(), progress_percentage

def download_file(url: str, local_folder: str, max_retries: int = 100, retry_delay: int = 60, retry_increment: int = 15) -> None:

    retry_count = 0
    current_retry_delay = retry_delay
    while retry_count < max_retries:
        try:
            response = requests.get(url, stream=True)
            if response.status_code == 404:
                print(f"[yellow]File not found:[/yellow] [red1]{url}[/red1]")
                return

            file_size = int(response.headers.get("Content-Length", 0))
            filename = Path(local_folder) / url.split("/")[-1]
            checksum_url = url + ".CHECKSUM"
            checksum_filename = Path(local_folder) / checksum_url.split("/")[-1]

            expected_checksum = None

            if checksum_filename.exists():
                with open(checksum_filename, "r") as f:
                    expected_checksum = f.read().strip()
                if filename.exists():
                    with open(filename, "rb") as f:
                        file_data = f.read()
                    actual_checksum, progress_percentage = md5sum(file_data, file_size)
                    if progress_percentage < 100:
                        print(f"[yellow]Checksum progress not reaching 100% for file:[/yellow] [red1]{filename}[/red1]")
                        incomplete_checksum_files.append(filename)
                    if actual_checksum == expected_checksum:
                        print(f"[green]Local copy found for[/green] {filename}")
                        return
                    else:
                        print(f"Checksum failed for {filename}, redownloading...")

            if not filename.parent.exists(): 
                filename.parent.mkdir(parents=True, exist_ok=True)
                print(f"[bold green]filename.parent:[/bold green] [red]{filename.parent}[/red] [bold green]has been created![/bold green]")
                
            with open(filename, "wb") as f, tqdm(
                total=file_size, unit="B", unit_scale=True, desc=filename.name
            ) as progress_bar:
                for data in response.iter_content(chunk_size=1024):
                    f.write(data)
                    progress_bar.update(len(data))

            with open(filename, "rb") as f:
                file_data = f.read()
            actual_checksum, progress_percentage = md5sum(file_data, file_size)
            if progress_percentage < 100:
                print(f"[yellow]Checksum progress not reaching 100% for file:[/yellow] [red1]{filename}[/red1]")
                incomplete_checksum_files.append(filename)

            with open(checksum_filename, "w") as f:
                f.write(actual_checksum)

            if expected_checksum is not None and actual_checksum != expected_checksum:
                filename.unlink()
                checksum_filename.unlink()
                print(f"Checksum failed for {filename}, redownloading...")
                time.sleep(5)
                download_file(url, local_folder)
            else:
                print(f"{filename.name} downloaded and validated against its checksum.")
                
            return  # Download completed successfully, exit the loop

        except requests.exceptions.ConnectionError as e:
            retry_count += 1
            if retry_count < max_retries:
                print(f"Connection error: {e}. Retrying in {current_retry_delay} seconds... (attempt {retry_count}/{max_retries})")
                time.sleep(current_retry_delay)
                current_retry_delay += retry_increment
            else:
                print(f"Connection error: {e}. Maximum retries ({max_retries}) reached. Aborting download.")
                break

def download_perpetual_usdt_trades_daily(
    symbols: list[str], start_year: int, start_month: int, start_day: int, end_year: int, end_month: int, end_day: int, local_folder: str, daily: bool, base_asset: str = "USDT",
    specific_files: list[Path] = None
) -> None:

    start_date = datetime(start_year, start_month, start_day)
    end_date = datetime(end_year, end_month, end_day)

    if not Path(local_folder).exists():
        Path(local_folder).mkdir(parents=True)

    time_unit = "daily" if daily else "monthly"
    date_format = "%Y-%m-%d" if daily else "%Y-%m"

    for symbol in symbols:
        while start_date <= end_date:
            date_str = start_date.strftime(date_format)
            url = f"<https://data.binance.vision/data/futures/um/{time_unit}/trades/{symbol}{base_asset}/{symbol}{base_asset}-trades-{date_str}.zip>"
            if specific_files and filename not in specific_files:
                # Skip files not in the specific_files list
                continue
            download_file(url, Path(local_folder) / f"{symbol}{base_asset}")
            start_date += timedelta(days=1 if daily else 32)
            if daily:
                start_date = start_date.replace(day=start_date.day)
            else:
                start_date = start_date.replace(day=1)
        start_date = datetime(start_year, start_month, start_day)

def remove_incomplete_files(file_list: list[Path]) -> None:
    for file_path in file_list:
        md5_file_path = file_path.with_suffix(file_path.suffix + ".CHECKSUM")
        if file_path.exists():
            file_path.unlink()
            print(f"[red]Deleted[/red] {file_path}")
        if md5_file_path.exists():
            md5_file_path.unlink()
            print(f"[red]Deleted[/red] {md5_file_path}")

if __name__ == "__main__":
    symbols = 'ETH BNB BTC'.split()
    symbols = 'ETH'.split()
    start_year = 2019
    start_month = 12
    start_day = 1
    end_year = 2023
    end_month = 5
    end_day = 5
    local_folder_monthly = r"D:/binance_raw_trade_data/data/futures/um/monthly/trades"
    local_folder_daily = r"D:/binance_raw_trade_data/data/futures/um/daily/trades"

    # Set daily to True for daily data, False for monthly data
    daily = True
    # daily = False

    if daily:
        download_perpetual_usdt_trades_daily(symbols, start_year, start_month, start_day, end_year, end_month, end_day, local_folder_daily, daily)
    else:
        download_perpetual_usdt_trades_daily(symbols, start_year, start_month, start_day, end_year, end_month, end_day, local_folder_monthly, daily)

    if incomplete_checksum_files:
        print("\\nFiles that never reached 100% checksum progress:")
        for file_path in incomplete_checksum_files:
            print(file_path)
        print("\\nDeleting incomplete files and their corresponding MD5 files...")
        remove_incomplete_files(incomplete_checksum_files)

        print("\\nRe-downloading deleted files...")
        if daily:
            download_perpetual_usdt_trades_daily(symbols, start_year, start_month, start_day, end_year, end_month, end_day, local_folder_daily, daily, specific_files=incomplete_checksum_files)
        else:
            download_perpetual_usdt_trades_daily(symbols, start_year, start_month, start_day, end_year, end_month, end_day, local_folder_monthly, daily, specific_files=incomplete_checksum_files)

Download Binance OHLC Data

from rich.logging import RichHandler; from rich.traceback import install; install(show_locals=True)
import logging; logging.basicConfig(level="INFO", format="%(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True, markup=True)]); logger = logging.getLogger("rich")
from pathlib import Path; import sys; sys.path.insert(0, str(next((p for p in Path.cwd().parents if p.name == "ml_feature_set"), Path.cwd()))); __package__ = Path.cwd().parent.name
import os; import concurrent.futures; import pytz; import pandas as pd; import requests
from datetime import datetime, timedelta; from joblib import Memory
from typing import NamedTuple, List, Optional
from tenacity import retry, stop_after_attempt, wait_exponential; import shutil; import numpy as np

from datetime import timedelta
CRYPTO_PAIRS = {'SYNTHETICUSDT': 'SYNTHETICUSDT', 'Bitcoin (BTC)': 'BTCUSDT', 'Ethereum (ETH)': 'ETHUSDT', 'Binance (BNB)': 'BNBUSDT', 'Solana (SOL)': 'SOLUSDT', 'Dogecoin (DOGE)': 'DOGEUSDT', 'Cardano (ADA)': 'ADAUSDT', 'Polygon (MATIC)': 'MATICUSDT', 'Storj (STORJ)': 'STORJUSDT', 'Litecoin (LTC)': 'LTCUSDT', 'dYdX (DYDX)': 'DYDXUSDT', 'Compound (COMP)': 'COMPUSDT', 'Measurable Data Token (MDT)': 'MDTUSDT', 'Avalanche (AVAX)': 'AVAXUSDT', 'Maker (MKR)': 'MKRUSDT', 'The Sandbox (SAND)': 'SANDUSDT', 'Decentraland (MANA)': 'MANAUSDT', 'Fantom (FTM)': 'FTMUSDT', 'Ripple (XRP)': 'XRPUSDT', 'Celo (CELO)': 'CELOUSDT', 'Optimism (OP)': 'OPUSDT', 'ApeCoin (APE)': 'APEUSDT', 'Pepe (PEPE)': 'PEPEUSDT', 'Gala (GALA)': 'GALAUSDT', 'NEAR Protocol (NEAR)': 'NEARUSDT', 'Wild (WLD)': 'WLDUSDT', 'Floki Inu (FLOKI)': 'FLOKIUSDT', 'Pixel (PIXEL)': 'PIXELUSDT', 'Ordi (ORDI)': 'ORDIUSDT', 'Meme (MEME)': 'MEMEUSDT', 'Cosmos (ATOM)': 'ATOMUSDT', 'Tia (TIA)': 'TIAUSDT'}
GRANULARITY_MAP = {'1s': timedelta(seconds=1), '1m': timedelta(minutes=1), '3m': timedelta(minutes=3), '5m': timedelta(minutes=5), '15m': timedelta(minutes=15), '30m': timedelta(minutes=30), '1h': timedelta(hours=1), '2h': timedelta(hours=2), '4h': timedelta(hours=4), '6h': timedelta(hours=6), '8h': timedelta(hours=8), '12h': timedelta(hours=12), '1d': timedelta(days=1)}

# Create a cache directory if it doesn't exist
cache_dir = '.cache'  # Shortened cache directory path
if not os.path.exists(cache_dir):
    os.makedirs(cache_dir)

# Initialize Joblib's Memory to cache data
memory = Memory(location='.cache', verbose=0)

# Ensure OHLCVData is defined before it is used
class OHLCVData(NamedTuple):
    open_time: pd.Series
    open: pd.Series
    high: pd.Series
    low: pd.Series
    close: pd.Series
    volume: pd.Series

# Ensure unique name for cached function
@memory.cache
def cached_fetch_binance_data(symbol: str, interval: str, start_str: str, end_str: str) -> 'OHLCVData':
    return fetch_binance_ohlcv(symbol, interval, start_str, end_str)

def cached_fetch_data(symbol: str, interval: str, start_str: str, end_str: str) -> 'OHLCVData':
    cache_key = f"{symbol}_{interval}_{start_str}_{end_str}"  # noqa  # Unused variable
    
    # Check if the function is cached
    if cached_fetch_binance_data.check_call_in_cache(symbol, interval, start_str, end_str):
        logger.debug(f"Using cached data for {symbol} from {start_str} to {end_str}")
    else:
        logger.debug(f"Fetching new data for {symbol} from {start_str} to {end_str}")
    
    return cached_fetch_binance_data(symbol, interval, start_str, end_str)

# Define a NamedTuple for the fetched data
class OHLCVData(NamedTuple):
    open_time: pd.Series
    open: pd.Series
    high: pd.Series
    low: pd.Series
    close: pd.Series
    volume: pd.Series

# Utility function to convert time string to UTC localized datetime object
def to_utc(time_str: str) -> datetime:
    utc = pytz.UTC
    return utc.localize(datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S"))

# Define a NamedTuple for synthetic data parameters
class SyntheticDataParams(NamedTuple):
    min_rand: int = 1
    max_rand: int = 10
    clip_min: int = 1
    clip_max: int = 10
    datum: int = 100
    quanta: int = 17

# Global flag to ensure initial value is added only once
datum_added = False

def generate_synthetic_data_bundle(config: SyntheticDataParams, start_time: datetime, end_time: datetime, granularity: str) -> OHLCVData:
    global datum_added
    logger.debug(f"Generating synthetic data from {start_time} to {end_time} with granularity {granularity}")
    if granularity not in GRANULARITY_MAP:
        raise ValueError(f"Invalid granularity: {granularity}. Must be one of {list(GRANULARITY_MAP.keys())}")

    interval = GRANULARITY_MAP[granularity]
    total_duration = (end_time - start_time).total_seconds()
    num_points = max(int(total_duration / interval.total_seconds()), 1)  # Calculate number of points based on interval
    logger.debug(f"Total duration: {total_duration} seconds, Number of points: {num_points}")

    # Ensure start_time is naive before converting to np.datetime64
    start_time = start_time.replace(tzinfo=None)
    price: np.ndarray = np.cumsum(np.clip(np.random.randint(config.min_rand, config.max_rand + 1, size=num_points), config.clip_min, config.clip_max)) + config.datum

    open_time: np.ndarray = np.array([np.datetime64(start_time + interval * i) for i in range(num_points)], dtype='datetime64[ns]')
    open_time_series: pd.Series = pd.Series(open_time).drop_duplicates().reset_index(drop=True)
    logger.debug(f"Generated open_time series: {open_time_series}")

    # Include initial value in open prices
    open_prices = np.insert(price[:-1], 0, config.datum)
    close_prices = price
    high_prices = np.maximum(open_prices, close_prices) + np.random.rand(len(open_prices))
    low_prices = np.minimum(open_prices, close_prices) - np.random.rand(len(open_prices))
    volume = np.random.randint(1, 100, size=len(open_prices))

    # Ensure the initial value is included in all columns only once
    if not datum_added:
        open_time_series = pd.Series(np.insert(open_time, 0, np.datetime64(start_time)))
        open_prices = np.insert(open_prices, 0, config.datum)
        high_prices = np.insert(high_prices, 0, config.datum)
        low_prices = np.insert(low_prices, 0, config.datum)
        close_prices = np.insert(close_prices, 0, config.datum)
        volume = np.insert(volume, 0, 0)  # Assuming initial volume is 0
        datum_added = True

    # Convert to appropriate data types
    open_time_series = open_time_series.dt.tz_localize('UTC')
    open_prices = open_prices.astype('float64')
    high_prices = high_prices.astype('float64')
    low_prices = low_prices.astype('float64')
    close_prices = close_prices.astype('float64')
    volume = volume.astype('float64')

    # Log lengths of arrays
    logger.debug(f"Lengths - open_time: {len(open_time_series)}, open: {len(open_prices)}, high: {len(high_prices)}, low: {len(low_prices)}, close: {len(close_prices)}, volume: {len(volume)}")

    # Ensure all arrays have the same length
    if not (len(open_time_series) == len(open_prices) == len(high_prices) == len(low_prices) == len(close_prices) == len(volume)):
        raise ValueError("PARALLEL: All arrays must be of the same length")

    return OHLCVData(
        open_time=open_time_series,
        open=open_prices,
        high=high_prices,
        low=low_prices,
        close=close_prices,
        volume=volume
    )

# Define the fetch_binance_ohlcv function
def fetch_binance_ohlcv(symbol: str = "BTCUSDT", interval: str = "1m", start_str: str = None, end_str: str = None) -> OHLCVData:
    logger.debug(f"Entering fetch_binance_ohlcv with symbol={symbol}, interval={interval}, start_str={start_str}, end_str={end_str}")
    if symbol == "SYNTHETICUSDT":
        start_time = to_utc(start_str)  # Convert start_str to UTC datetime
        end_time = to_utc(end_str)  # Convert end_str to UTC datetime
        config = SyntheticDataParams(
            min_rand=1,
            max_rand=10,
            clip_min=1,
            clip_max=10,
            datum=100
        )
        synthetic_data = generate_synthetic_data_bundle(config, start_time, end_time, interval)  # Pass the interval as granularity
        logger.debug(f"Generated synthetic data: {synthetic_data}")
        logger.debug(f"Synthetic data lengths: open_time={len(synthetic_data.open_time)}, open={len(synthetic_data.open)}, high={len(synthetic_data.high)}, low={len(synthetic_data.low)}, close={len(synthetic_data.close)}, volume={len(synthetic_data.volume)}")
        logger.debug(f"Exiting fetch_binance_ohlcv with result: {synthetic_data}")
        return OHLCVData(
            open_time=synthetic_data.open_time,
            open=synthetic_data.open,
            high=synthetic_data.high,
            low=synthetic_data.low,
            close=synthetic_data.close,
            volume=synthetic_data.volume
        )

    base_url = "<https://api.binance.com/api/v3/klines>"
    all_data: List[pd.DataFrame] = []
    limit = 1000  # Max limit as per Binance API

    # Convert start and end strings to datetime objects localized to UTC
    start_time = to_utc(start_str).timestamp() * 1000
    end_time = to_utc(end_str).timestamp() * 1000

    logger.debug(f"Starting fetch: {start_str} ({int(start_time)}) to {end_str} ({int(end_time)})")

    @retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=1, max=60))
    def fetch_data_with_retry(start_time: int, end_time: int) -> pd.DataFrame:
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit,
            "startTime": int(start_time),
            "endTime": int(end_time)
        }
        response = requests.get(base_url, params=params)
        data = response.json()

        # Log the raw response from the API
        logger.debug(f"API response: {data}")

        if not data:  # Check if Binance API returned no data
            return pd.DataFrame()  # Return an empty DataFrame to indicate no data

        if 'code' in data:  # Check if Binance API returned an error
            raise ValueError("API error")

        df = pd.DataFrame(data, columns=["open_time", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"])
        df["open_time"] = pd.to_datetime(df["open_time"], unit='ms', utc=True)
        df["close_time"] = pd.to_datetime(df["close_time"], unit='ms', utc=True)
        numeric_columns = ["open", "high", "low", "close", "volume"]
        df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, axis=1)

        logger.debug(f"Fetched data length: {len(data)}")  # Log the length of the fetched data
        logger.debug(f"Data content: {data}")  # Log the content of the fetched data

        return df

    while start_time < end_time:
        try:
            df = fetch_data_with_retry(start_time, end_time)
            if not df.empty:
                all_data.append(df)
                logger.debug(f"Batch open_time range: {df.iloc[0]['open_time']} to {df.iloc[-1]['open_time']}")
                last_close_time = df.iloc[-1]["close_time"]
                start_time = last_close_time.timestamp() * 1000 + GRANULARITY_MAP[interval].total_seconds() * 1000  # Start from the exact end of the last fetch plus one interval
            else:
                logger.warning(f"No data fetched for range {datetime.utcfromtimestamp(start_time / 1000)} to {datetime.utcfromtimestamp(end_time / 1000)}")
                break  # Exit the loop if no data is returned
        except Exception as e:
            logger.error(f"Failed to fetch data: {e}")
            break

    if all_data:
        # Concatenate all dataframes
        full_df = pd.concat(all_data, ignore_index=True)
        
        # Convert data types
        numeric_columns = ["open", "high", "low", "close", "volume"]
        full_df[numeric_columns] = full_df[numeric_columns].apply(pd.to_numeric, axis=1)
        
        logger.debug(f"Data types of Binance data columns:\\n{full_df.dtypes}")
        logger.debug(f"First 5 rows of Binance data:\\n{full_df.head()}")
        
        logger.debug(f"Open time length: {len(full_df['open_time'])}, Data length: {len(full_df)}")  # Log lengths before returning
        # Check for length mismatch
        if len(full_df['open_time']) != len(full_df):
            logger.error(f"Length mismatch: Open time length {len(full_df['open_time'])}, Data length {len(full_df)}")
        logger.debug(f"Exiting fetch_binance_ohlcv with result: {OHLCVData(open_time=full_df['open_time'], open=full_df['open'], high=full_df['high'], low=full_df['low'], close=full_df['close'], volume=full_df['volume'])}")
        return OHLCVData(
            open_time=full_df["open_time"],
            open=full_df["open"],
            high=full_df["high"],
            low=full_df["low"],
            close=full_df["close"],
            volume=full_df["volume"]
        )
    else:
        logger.debug(f"Exiting fetch_binance_ohlcv with result: {OHLCVData(open_time=pd.Series([], dtype='datetime64[ns, UTC]'), open=pd.Series([], dtype='float64'), high=pd.Series([], dtype='float64'), low=pd.Series([], dtype='float64'), close=pd.Series([], dtype='float64'), volume=pd.Series([], dtype='float64'))}")
        return OHLCVData(
            open_time=pd.Series([], dtype='datetime64[ns, UTC]'),
            open=pd.Series([], dtype='float64'),
            high=pd.Series([], dtype='float64'),
            low=pd.Series([], dtype='float64'),
            close=pd.Series([], dtype='float64'),
            volume=pd.Series([], dtype='float64')
        )

# Define a NamedTuple for configuration
class Config(NamedTuple):
    symbol: str = "ETHUSDT"
    interval: str = "15m"
    start_str: str = None
    end_str: str = None
    threads: int = os.cpu_count()

# Define a NamedTuple for the time range
class TimeRange(NamedTuple):
    start: str
    end: str

# Define a new NamedTuple for the split time ranges result
class SplitTimeRangeResult(NamedTuple):
    ranges: List[TimeRange]

# Define the BinanceOHLCVFetcher class
class BinanceOHLCVFetcher:
    def __init__(self, config: Config):
        self.config = config

    def _split_time_range(self, start_time: datetime, end_time: datetime) -> SplitTimeRangeResult:
        """Split the time range into intervals based on the number of threads and granularity."""
        interval = GRANULARITY_MAP[self.config.interval]
        total_duration = (end_time - start_time).total_seconds()
        num_points = int(total_duration / interval.total_seconds())

        # Limit the number of threads to the number of data points
        max_threads = min(self.config.threads, num_points)
        if max_threads <= 1 or total_duration <= 0:
            return SplitTimeRangeResult(ranges=[TimeRange(start=start_time.strftime("%Y-%m-%d %H:%M:%S"), end=end_time.strftime("%Y-%m-%d %H:%M:%S"))])

        time_ranges = []
        current_start = start_time

        for _ in range(max_threads):
            current_end = current_start + interval
            if current_end > end_time:
                current_end = end_time
            time_ranges.append(TimeRange(
                start=current_start.strftime("%Y-%m-%d %H:%M:%S"),
                end=current_end.strftime("%Y-%m-%d %H:%M:%S")
            ))
            current_start = current_end

        logger.debug(f"Time ranges for fetching: {time_ranges}")
        return SplitTimeRangeResult(ranges=time_ranges)

    def _fetch_data_for_range(self, thread_start_str: str, thread_end_str: str) -> pd.DataFrame:
        """Fetch data for a specific time range and return as DataFrame."""
        logger.debug(f"Fetching data for range: {thread_start_str} to {thread_end_str}")
        data = cached_fetch_data(self.config.symbol, self.config.interval, thread_start_str, thread_end_str)
        if not data.open_time.empty:
            logger.debug(f"Fetched data length: {len(data.open_time)} for range: {thread_start_str} to {thread_end_str}")
            return pd.DataFrame(data._asdict())
        logger.warning(f"No data fetched for range {thread_start_str} to {thread_end_str}")
        return pd.DataFrame()

    def _initialize_futures(self, time_ranges: SplitTimeRangeResult) -> List[concurrent.futures.Future]:
        logger.debug(f"Entering _initialize_futures with time_ranges={time_ranges}")
        futures = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.config.threads) as executor:
            for i, time_range in enumerate(time_ranges.ranges):
                logger.debug(f"Thread {i+1}: Fetching data from {time_range.start} to {time_range.end}")
                futures.append(executor.submit(self._fetch_data_for_range, time_range.start, time_range.end))
        logger.debug(f"Exiting _initialize_futures with futures={futures}")
        return futures

    def fetch_parallel(self) -> pd.DataFrame:
        logger.debug("Entering fetch_parallel")
        logger.info(f"🚀 Launching parallel data retrieval across {self.config.threads} vibrant threads performed by {__file__})")
        start_time = to_utc(self.config.start_str)
        end_time = to_utc(self.config.end_str)
        time_ranges = self._split_time_range(start_time, end_time)

        futures = self._initialize_futures(time_ranges)
        all_data = []

        for future in concurrent.futures.as_completed(futures):
            try:
                data = future.result()
                if not data.empty:
                    all_data.append(data)
            except Exception as e:
                logger.error(f"Error fetching data for a time range: {e}", exc_info=True)

        if not all_data:
            logger.error("No data fetched. Please check the parameters and try again.")
            return pd.DataFrame()

        try:
            full_df = pd.concat(all_data, ignore_index=True).sort_values(by="open_time").drop_duplicates().reset_index(drop=True)
            logger.debug(f"Concatenated DataFrame: {full_df}")
        except Exception as e:
            logger.error(f"Error concatenating data frames: {e}", exc_info=True)
            return pd.DataFrame()

        logger.debug("Fetched OHLCV data:")
        logger.debug(f"Data type of 'open_time' column: {full_df['open_time'].dtype}")
        logger.debug(f"First 5 'open_time' values:\\n{full_df['open_time'].head()}")
        logger.debug(f"Last 5 'open_time' values:\\n{full_df['open_time'].tail()}")

        logger.debug(f"Exiting fetch_parallel with full_df={full_df}")
        return full_df

class OHLCVParams(NamedTuple):
    symbol: str
    interval: str
    start_str: str
    end_str: str
    threads: Optional[int] = None
    synthetic_min_rand: int = 1
    synthetic_max_rand: int = 10
    synthetic_clip_min: int = 1
    synthetic_clip_max: int = 10
    synthetic_datum: int = 17

def get_binance_or_binance_like_ohlcv(params: OHLCVParams) -> pd.DataFrame:
    """Fetch OHLCV data, either from Binance or generate synthetic data."""
    if params.symbol == "SYNTHETICUSDT":
        config = SyntheticDataParams(
            min_rand=params.synthetic_min_rand,
            max_rand=params.synthetic_max_rand,
            clip_min=params.synthetic_clip_min,
            clip_max=params.synthetic_clip_max,
            datum=params.synthetic_datum
        )
        try:
            synthetic_data = fetch_binance_ohlcv(params.symbol, params.interval, params.start_str, params.end_str)
            logger.info("Synthetic data fetched. Displaying the fetched data:")
            synthetic_df = pd.DataFrame(synthetic_data._asdict())
            logger.info("\\n" + synthetic_df.head(3).to_string(index=False))
            logger.info("\\n" + synthetic_df.tail(3).to_string(index=False))
            logger.info(f"Total number of rows: {len(synthetic_df)}")
            logger.debug(f"Synthetic data shape: {synthetic_df.shape}")
            logger.debug(f"Synthetic data columns: {synthetic_df.columns}")
            logger.debug(f"Synthetic data types: {synthetic_df.dtypes}")
            return synthetic_df
        except Exception as e:
            logger.error(f"Failed to fetch synthetic OHLCV data: {e}")
            return pd.DataFrame()
    else:
        if params.threads is None:
            params = params._replace(threads=os.cpu_count())  # Default to the number of CPU cores if not specified
        config = Config(
            symbol=params.symbol,
            interval=params.interval,
            start_str=params.start_str,
            end_str=params.end_str,
            threads=params.threads
        )
        try:
            ohlcv_data = BinanceOHLCVFetcher(config).fetch_parallel()
            logger.info("Fetching completed. Displaying the fetched data:")
            logger.info("\\n" + ohlcv_data.head(3).to_string(index=False))
            logger.info("\\n" + ohlcv_data.tail(3).to_string(index=False))
            logger.info(f"Total number of rows: {len(ohlcv_data)}")
            logger.debug(f"OHLCV data shape: {ohlcv_data.shape}")
            logger.debug(f"OHLCV data columns: {ohlcv_data.columns}")
            logger.debug(f"OHLCV data types: {ohlcv_data.dtypes}")
            return ohlcv_data
        except Exception as e:
            logger.error(f"Failed to fetch OHLCV data: {e}")
            return pd.DataFrame()

# Configure logging
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] %(levelname)s %(message)s')

# Global flag to ensure cache is cleared only once
cache_cleared = False

def clear_cache(cache_dir):
    global cache_cleared
    if not cache_cleared:
        if os.path.exists(cache_dir):
            logging.info(f"Cache directory {cache_dir} exists. Attempting to clear it.")
            try:
                shutil.rmtree(cache_dir)
                logging.info(f"Cache directory {cache_dir} cleared.")
            except Exception as e:
                logging.error(f"Failed to clear cache directory {cache_dir}: {e}")
        else:
            logging.info(f"Cache directory {cache_dir} does not exist.")
        cache_cleared = True  # Set the flag to True after clearing the cache
    else:
        logging.info("Cache already cleared, skipping.")

if __name__ == "__main__":
    logging.info(f"Checking cache directory: {cache_dir}")
    clear_cache(cache_dir)

    # Scenario 3: Fetch synthetic data
    logger.info("Scenario 3: Fetch synthetic data")
    logger.info("This scenario demonstrates fetching synthetic OHLCV data for a specified time range.")
    logger.info("OHLCV stands for Open, High, Low, Close, and Volume, which are common data points in financial markets.")
    logger.info("Synthetic data is generated to simulate real market data for testing purposes.")
    synthetic_data = get_binance_or_binance_like_ohlcv(
        OHLCVParams(
            symbol="SYNTHETICUSDT",
            interval="1m",
            start_str="2022-10-01 00:00:00",
            end_str="2022-10-01 00:09:00",
            synthetic_min_rand=1,
            synthetic_max_rand=10,
            synthetic_clip_min=1,
            synthetic_clip_max=10,
            synthetic_datum=100
        )
    )
    logger.info("Synthetic data fetched. Displaying the fetched data:")
    logger.info("\\n" + synthetic_data.to_string(index=False))

    # Compare synthetic data format with Binance data format
    logger.info("Next, we will fetch real OHLCV data from Binance for the same time range and compare the formats.")
    logger.info("This ensures that our synthetic data format matches the real data format from Binance.")
    binance_data = get_binance_or_binance_like_ohlcv(
        OHLCVParams(
            symbol="ETHUSDT",
            interval="1m",
            start_str="2022-10-01 00:00:00",
            end_str="2022-10-01 00:09:00"
        )
    )
    logger.info("Binance data fetched. Displaying the fetched data:")
    logger.info("\\n" + binance_data.to_string(index=False))

    # Assert that the synthetic data format matches the Binance data format
    assert synthetic_data.columns.tolist() == binance_data.columns.tolist(), "Synthetic data format does not match Binance data format"
    logger.info("Synthetic data format matches Binance data format.")
    logger.info("This confirms that our synthetic data generation is consistent with real market data formats.")