diff --git a/python/README.md b/python/README.md index 77671f0..cd452a7 100644 --- a/python/README.md +++ b/python/README.md @@ -34,6 +34,7 @@ Some arguments come with a default value if not declared. | -folder | **Directory** to store the downloaded data | Current directory | No | | -c | 1 to download **checksum file** | 0 | No | | -h | show help messages| - | No | +| -w | use parallel workers from `download-kline-parallel.py`|-|No| #### Example @@ -43,6 +44,9 @@ e.g download ETHUSDT BTCUSDT BNBBUSD spot kline of 1 week interval from year 202 e.g download all symbols' daily USD-M futures kline of 1 minute interval from 2021-01-01 to 2021-02-02: `python3 download-kline.py -t um -i 1m -skip-monthly 1 -startDate 2021-01-01 -endDate 2021-02-02` +e.g. download all symbols' monthly frequency daily spot kline data but faster, meaning using parallel workers (and also in a specific folder): +`python3 download-kline-parallel.py -t spot -i 1d -skip-daily 1 -folder "C:\Users\user\data" -w 15` - this will run 15 workers parallelly; the more the better/faster. + ### Download trades `python3 download-trade.py -t `
diff --git a/python/download-kline-parallel.py b/python/download-kline-parallel.py new file mode 100644 index 0000000..8aaa3cc --- /dev/null +++ b/python/download-kline-parallel.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python + +""" + script to download klines with parallel processing. + set the absolute path destination folder for STORE_DIRECTORY, and run + + e.g. STORE_DIRECTORY=/data/ ./download-kline-parallel.py + +""" +import sys +from datetime import * +import pandas as pd +from enums import * +from utility import download_file, get_all_symbols, get_parser, get_start_end_date_objects, convert_to_date_object, \ + get_path +from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import partial + + +def download_symbol_monthly(symbol, symbol_index, num_symbols, trading_type, intervals, years, months, start_date, end_date, folder, checksum, date_range): + print("[{}/{}] - start download monthly {} klines ".format(symbol_index+1, num_symbols, symbol)) + for interval in intervals: + for year in years: + for month in months: + current_date = convert_to_date_object('{}-{}-01'.format(year, month)) + if current_date >= start_date and current_date <= end_date: + path = get_path(trading_type, "klines", "monthly", symbol, interval) + file_name = "{}-{}-{}-{}.zip".format(symbol. upper(), interval, year, '{:02d}'.format(month)) + download_file(path, file_name, date_range, folder) + + if checksum == 1: + checksum_path = get_path(trading_type, "klines", "monthly", symbol, interval) + checksum_file_name = "{}-{}-{}-{}.zip.CHECKSUM".format(symbol.upper(), interval, year, '{:02d}'. format(month)) + download_file(checksum_path, checksum_file_name, date_range, folder) + + +def download_monthly_klines(trading_type, symbols, num_symbols, intervals, years, months, start_date, end_date, folder, checksum, max_workers=10): + date_range = None + + if start_date and end_date: + date_range = start_date + " " + end_date + + if not start_date: + start_date = START_DATE + else: + start_date = convert_to_date_object(start_date) + + if not end_date: + end_date = END_DATE + else: + end_date = convert_to_date_object(end_date) + + print("Found {} symbols". format(num_symbols)) + print("Using {} parallel workers".format(max_workers)) + + download_func = partial( + download_symbol_monthly, + trading_type=trading_type, + intervals=intervals, + years=years, + months=months, + start_date=start_date, + end_date=end_date, + folder=folder, + checksum=checksum, + date_range=date_range, + num_symbols=num_symbols + ) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(download_func, symbol, idx): symbol for idx, symbol in enumerate(symbols)} + + for future in as_completed(futures): + symbol = futures[future] + try: + future.result() + except Exception as exc: + print('{} generated an exception: {}'.format(symbol, exc)) + + +def download_symbol_daily(symbol, symbol_index, num_symbols, trading_type, intervals, dates, start_date, end_date, folder, checksum, date_range): + print("[{}/{}] - start download daily {} klines ".format(symbol_index+1, num_symbols, symbol)) + for interval in intervals: + for date in dates: + current_date = convert_to_date_object(date) + if current_date >= start_date and current_date <= end_date: + path = get_path(trading_type, "klines", "daily", symbol, interval) + file_name = "{}-{}-{}.zip".format(symbol.upper(), interval, date) + download_file(path, file_name, date_range, folder) + + if checksum == 1: + checksum_path = get_path(trading_type, "klines", "daily", symbol, interval) + checksum_file_name = "{}-{}-{}.zip.CHECKSUM".format(symbol.upper(), interval, date) + download_file(checksum_path, checksum_file_name, date_range, folder) + + +def download_daily_klines(trading_type, symbols, num_symbols, intervals, dates, start_date, end_date, folder, checksum, max_workers=10): + date_range = None + + if start_date and end_date: + date_range = start_date + " " + end_date + + if not start_date: + start_date = START_DATE + else: + start_date = convert_to_date_object(start_date) + + if not end_date: + end_date = END_DATE + else: + end_date = convert_to_date_object(end_date) + + intervals = list(set(intervals) & set(DAILY_INTERVALS)) + print("Found {} symbols".format(num_symbols)) + print("Using {} parallel workers".format(max_workers)) + + download_func = partial( + download_symbol_daily, + trading_type=trading_type, + intervals=intervals, + dates=dates, + start_date=start_date, + end_date=end_date, + folder=folder, + checksum=checksum, + date_range=date_range, + num_symbols=num_symbols + ) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor. submit(download_func, symbol, idx): symbol for idx, symbol in enumerate(symbols)} + + for future in as_completed(futures): + symbol = futures[future] + try: + future.result() + except Exception as exc: + print('{} generated an exception: {}'.format(symbol, exc)) + + +if __name__ == "__main__": + parser = get_parser('klines') + parser.add_argument( + '-w', dest='workers', default=10, type=int, + help='Number of parallel workers (default: 10, recommended: 5-20)') + + args = parser.parse_args(sys.argv[1:]) + + if not args.symbols: + print("fetching all symbols from exchange") + symbols = get_all_symbols(args.type) + num_symbols = len(symbols) + else: + symbols = args.symbols + num_symbols = len(symbols) + + if args.dates: + dates = args.dates + else: + period = convert_to_date_object(datetime.today().strftime('%Y-%m-%d')) - convert_to_date_object( + PERIOD_START_DATE) + dates = pd.date_range(end=datetime.today(), periods=period.days + 1).to_pydatetime().tolist() + dates = [date.strftime("%Y-%m-%d") for date in dates] + + if args.skip_monthly == 0: + download_monthly_klines(args.type, symbols, num_symbols, args. intervals, args.years, args. months, + args.startDate, args.endDate, args.folder, args.checksum, args.workers) + if args.skip_daily == 0: + download_daily_klines(args.type, symbols, num_symbols, args.intervals, dates, args.startDate, + args.endDate, args.folder, args.checksum, args. workers)