|
2 | 2 | import os
|
3 | 3 | import sys
|
4 | 4 | import argparse
|
5 |
| -import requests |
6 | 5 | from urllib.parse import urlparse
|
| 6 | +import requests |
7 | 7 |
|
8 |
| -SPECIAL_FILES_PATH = "test-tree/special-files/" |
9 |
| -LARGE_FILE_URIS = [ |
10 |
| - "https://www.quintic.com/software/sample_videos/Cricket%20Bowling%20150fps%201200.avi" |
11 |
| -] |
| 8 | +SPECIAL_FILES_ROOT = "test-tree/special-files/" |
| 9 | +LARGE_FILES_PATH = SPECIAL_FILES_ROOT + "large/" |
| 10 | +ZIP_FILES_PATH = SPECIAL_FILES_ROOT + "zips/" |
| 11 | +CUSTOM_FILES_PATH = SPECIAL_FILES_ROOT + "custom/" |
| 12 | +CHUNK_SIZE = 1024 * 1024 |
| 13 | +LARGE_FILE_URLS = [] |
12 | 14 | ZIP_FILE_URLS = []
|
13 | 15 |
|
14 | 16 |
|
15 | 17 | def parse_cli():
|
| 18 | + """Prepare parser""" |
16 | 19 | parser = argparse.ArgumentParser(
|
17 | 20 | prog="special-files-downloader", description="Download special test files"
|
18 | 21 | )
|
19 | 22 |
|
20 |
| - parser.add_argument("--large", help=f"Download large files for testing") |
21 |
| - parser.add_argument("--zip", help=f"Download zip files for testing") |
22 | 23 | parser.add_argument(
|
23 |
| - "--my-sources", help=f"Download files from links listed in text file path" |
| 24 | + "--large", help="Download large files for testing", action="store_true" |
| 25 | + ) |
| 26 | + parser.add_argument( |
| 27 | + "--zip", help="Download zip files for testing", action="store_true" |
| 28 | + ) |
| 29 | + parser.add_argument( |
| 30 | + "--my-source", help="Download files from links listed in text file path" |
24 | 31 | )
|
25 | 32 | parser.add_argument(
|
26 |
| - "--all", |
27 |
| - help="Download all earmarked special files.", |
| 33 | + "--all", help="Download all earmarked special files.", action="store_true" |
28 | 34 | )
|
29 | 35 |
|
30 | 36 | return parser
|
31 | 37 |
|
32 | 38 |
|
33 |
| -def download_file_from_url(uri, path): |
34 |
| - response = requests.get(uri, allow_redirects=False) |
35 |
| - fname = os.path.basename(urlparse(uri).path) |
36 |
| - if response.status_code == 200: |
37 |
| - with open(path + fname, "wb") as file: |
38 |
| - file.write(response.content) |
| 39 | +def check_paths(): |
| 40 | + """Ensure special-file folders required in test-tree are present""" |
| 41 | + if not os.path.exists(SPECIAL_FILES_ROOT): |
| 42 | + os.makedirs(SPECIAL_FILES_ROOT) |
| 43 | + if not os.path.exists(LARGE_FILES_PATH): |
| 44 | + os.makedirs(LARGE_FILES_PATH) |
| 45 | + if not os.path.exists(ZIP_FILES_PATH): |
| 46 | + os.makedirs(ZIP_FILES_PATH) |
| 47 | + if not os.path.exists(CUSTOM_FILES_PATH): |
| 48 | + os.makedirs(CUSTOM_FILES_PATH) |
| 49 | + |
| 50 | + |
| 51 | +def get_file_urls(): |
| 52 | + """Get links to default special-files required for testing""" |
| 53 | + global LARGE_FILE_URLS |
| 54 | + global ZIP_FILE_URLS |
| 55 | + large_files_handle = open("source_large_files.txt", "r", encoding="utf-8") |
| 56 | + large_files = large_files_handle.readlines() |
| 57 | + large_files_handle.close() |
| 58 | + LARGE_FILE_URLS = map(lambda x: x.strip(), large_files) |
| 59 | + zip_files_handle = open("source_zip_files.txt", "r", encoding="utf-8") |
| 60 | + zip_files = zip_files_handle.readlines() |
| 61 | + zip_files_handle.close() |
| 62 | + ZIP_FILE_URLS = map(lambda x: x.strip(), zip_files) |
| 63 | + |
| 64 | + |
| 65 | +def download_file_from_url(url, path): |
| 66 | + """Download file in url to path""" |
| 67 | + fname = os.path.basename(urlparse(url).path) |
| 68 | + print(f"\nDownloading {fname}\n") |
| 69 | + underline = "=" * len(fname) |
| 70 | + underline = underline + "============" |
| 71 | + print(underline + "\n") |
| 72 | + size = 0 |
| 73 | + with requests.get(url, stream=True) as response: |
| 74 | + if response.status_code == 200: |
| 75 | + with open(path + fname, "wb") as file: |
| 76 | + for chunk in response.iter_content(chunk_size=CHUNK_SIZE): |
| 77 | + file.write(chunk) |
| 78 | + size = size + (CHUNK_SIZE) |
| 79 | + print(f"Downloaded {size} bytes of {fname} ...") |
39 | 80 |
|
40 | 81 |
|
41 | 82 | def main():
|
| 83 | + "Script entry point" |
| 84 | + check_paths() |
| 85 | + get_file_urls() |
42 | 86 | parser = parse_cli()
|
43 | 87 | if len(sys.argv) == 1:
|
44 | 88 | parser.print_help()
|
45 |
| - print("No downloads done....") |
| 89 | + print( |
| 90 | + "\n========================\n| No downloads done... |\n========================\n" |
| 91 | + ) |
46 | 92 | args = parser.parse_args()
|
47 | 93 |
|
48 | 94 | if args.large:
|
49 |
| - for uri in LARGE_FILE_URIS: |
50 |
| - download_file_from_url(uri, SPECIAL_FILES_PATH + "large/") |
| 95 | + for url in LARGE_FILE_URLS: |
| 96 | + download_file_from_url(url, LARGE_FILES_PATH) |
51 | 97 | if args.zip:
|
52 |
| - for uri in ZIP_FILE_URLS: |
53 |
| - download_file_from_url(uri, SPECIAL_FILES_PATH + "zips/") |
| 98 | + for url in ZIP_FILE_URLS: |
| 99 | + download_file_from_url(url, ZIP_FILE_URLS) |
54 | 100 | if args.all:
|
55 |
| - ALL = ZIP_FILE_URLS + LARGE_FILE_URIS |
56 |
| - for uri in ALL: |
57 |
| - download_file_from_url(uri, SPECIAL_FILES_PATH + "mixed/") |
58 |
| - if args.my_sources: |
59 |
| - sources = open(args.my_sources, "r") |
60 |
| - sources = sources.readlines() |
61 |
| - for source in sources: |
62 |
| - download_file_from_url(source, SPECIAL_FILES_PATH + "custom/") |
| 101 | + ALL_URLS = ZIP_FILE_URLS + LARGE_FILE_URLS |
| 102 | + for url in ALL_URLS: |
| 103 | + download_file_from_url(url, SPECIAL_FILES_ROOT) |
| 104 | + if args.my_source: |
| 105 | + source = open(args.my_source, "r") |
| 106 | + source = source.readlines() |
| 107 | + for url in source: |
| 108 | + download_file_from_url(url.strip(), CUSTOM_FILES_PATH) |
63 | 109 |
|
64 | 110 |
|
65 | 111 | if __name__ == "__main__":
|
|
0 commit comments