Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/resources/custom_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Error:
'File: %s does not exist in the folder.\n'
'Please remove the resumable upload log and retry uploading the entire folder again.'
),
'INVALID_RESUMABLE_FILE_SIZE': (
'The file size of %s is not the same as the previous upload. '
'Expected size: %s, Actual size: %s. Please verify the file content and try again.'
),
'INVALID_FOLDERNAME': (
'The input folder name is not valid. Please follow the rule:\n'
' - cannot contains special characters.\n'
Expand Down
87 changes: 62 additions & 25 deletions app/services/file_manager/file_upload/file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,35 +268,24 @@ def simple_upload( # noqa: C901
return [file_object.item_id for file_object in pre_upload_infos]


def resume_upload(
manifest_json: Dict[str, Any],
num_of_thread: int = 1,
):
"""
def resume_get_unfinished_items(
upload_client: UploadClient, all_files: Dict[str, Any], item_ids: List[str]
) -> List[FileObject]:
'''
Summary:
Resume upload from the manifest file
Parameters:
- manifest_json: the manifest json which store the upload information
- num_of_thread: the number of thread to upload the file
"""
upload_start_time = time.time()
Function will loop over `all_files` batchly and check if the file is already uploaded.
During the process, the logic wll check if the size registered in the backend is matched
with the local file size. If not, the function will raise an error.

upload_client = UploadClient(
project_code=manifest_json.get('project_code'),
zone=manifest_json.get('zone'),
job_type='AS_FOLDER',
current_folder_node=manifest_json.get('current_folder_node', ''),
parent_folder_id=manifest_json.get('parent_folder_id', ''),
tags=manifest_json.get('tags'),
)
Parameter:
- upload_client(UploadClient): the upload client object
- all_files(Dict[str, Any]): the file object dictionary
- item_ids(List[str]): the list of item ids that will be checked
Return:
- unfinished_items(List[FileObject]): the list of file object that is not uploaded yet
'''

# check files in manifest if some of them are already uploaded
unfinished_items = []
all_files = manifest_json.get('file_objects')
item_ids = []
for item_id in all_files:
item_ids.append(item_id)

# here add the batch of 500 per loop, the pre upload api cannot
# process very large amount of file at same time. otherwise it will timeout
# here is list of pre upload result. We decided to call pre upload api by batch
Expand All @@ -312,8 +301,26 @@ def resume_upload(
SrvErrorHandler.customized_handle(
ECustomizedError.INVALID_RESUMABLE_UPLOAD, if_exit=True, value=missing_item.get('object_path')
)
# check if the file is already registered
elif x.get('result').get('status') == ItemStatus.REGISTERED:
file_info = all_files.get(file_meta.get('id'))
# check if size is matched during resume vs preupload
logger.info(
f'Check file size: {file_info.get("object_path")}, '
f'expected size: {file_info.get("total_size")}, '
f'actual size: {x.get("result").get("size")}'
)
if file_info.get('total_size') != x.get('result').get('size'):
SrvErrorHandler.customized_handle(
ECustomizedError.INVALID_RESUMABLE_FILE_SIZE,
if_exit=True,
value=(
file_info.get('object_path'),
x.get('result').get('size'),
file_info.get('total_size'),
),
)

unfinished_files.append(
FileObject(
file_info.get('object_path'),
Expand All @@ -329,6 +336,36 @@ def resume_upload(
if len(unfinished_files) > 0:
unfinished_items.extend(upload_client.resume_upload(unfinished_files))

return unfinished_items


def resume_upload(
manifest_json: Dict[str, Any],
num_of_thread: int = 1,
):
"""
Summary:
Resume upload from the manifest file
Parameters:
- manifest_json: the manifest json which store the upload information
- num_of_thread: the number of thread to upload the file
"""
upload_start_time = time.time()

upload_client = UploadClient(
project_code=manifest_json.get('project_code'),
zone=manifest_json.get('zone'),
job_type='AS_FOLDER',
current_folder_node=manifest_json.get('current_folder_node', ''),
parent_folder_id=manifest_json.get('parent_folder_id', ''),
tags=manifest_json.get('tags'),
)

# check files in manifest if some of them are already uploaded
all_files = manifest_json.get('file_objects')
item_ids = list(all_files.keys())
unfinished_items = resume_get_unfinished_items(upload_client, all_files, item_ids)

mhandler.SrvOutPutHandler.resume_warning(len(unfinished_items))
mhandler.SrvOutPutHandler.resume_check_success()

Expand Down
3 changes: 2 additions & 1 deletion app/services/file_manager/file_upload/upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ def pre_upload(self, file_objects: List[FileObject], output_path: str) -> List[F
'parent_folder_id': self.parent_folder_id,
'folder_tags': self.tags,
'data': [
{'resumable_filename': x.file_name, 'resumable_relative_path': x.parent_path} for x in file_objects
{'resumable_filename': x.file_name, 'resumable_relative_path': x.parent_path, 'size': x.total_size}
for x in file_objects
],
}
if self.source_id:
Expand Down
1 change: 1 addition & 0 deletions app/services/output_manager/error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ECustomizedError(enum.Enum):
INVALID_PATHS = 'INVALID_PATHS'
INVALID_RESUMABLE_FILE = 'INVALID_RESUMABLE_FILE'
INVALID_RESUMABLE_UPLOAD = 'INVALID_RESUMABLE_UPLOAD'
INVALID_RESUMABLE_FILE_SIZE = 'INVALID_RESUMABLE_FILE_SIZE'
TOU_CONTENT = 'TOU_CONTENT'
INVALID_TOKEN = 'INVALID_TOKEN'
PERMISSION_DENIED = 'PERMISSION_DENIED'
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "app"
version = "3.14.1"
version = "3.15.0"
description = "This service is designed to support pilot platform"
authors = ["Indoc Systems"]

Expand Down
39 changes: 39 additions & 0 deletions tests/app/services/file_manager/file_upload/test_file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def test_folder_merge_skip_with_all_duplication(mocker, mock_upload_client, capf
def test_resume_upload(mocker):
mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1))
test_obj = FileObject('object/path', 'local_path', 'resumable_id', 'job_id', 'item_id')
test_obj.total_size = 1

manifest_json = {
'project_code': 'project_code',
Expand All @@ -328,11 +329,13 @@ def test_resume_upload(mocker):
'current_folder_node': 'current_folder_node',
'tags': 'tags',
'file_objects': {test_obj.item_id: test_obj.to_dict()},
'total_size': 1,
}

get_return = test_obj.to_dict()
get_return.update({'status': ItemStatus.REGISTERED})
get_return.update({'id': get_return.get('item_id')})
get_return.update({'size': 1})
get_mock = mocker.patch(
'app.services.file_manager.file_upload.file_upload.get_file_info_by_geid', return_value=[{'result': get_return}]
)
Expand Down Expand Up @@ -378,3 +381,39 @@ def test_resume_upload_failed_when_REGISTERED_doesnt_exist(mocker, capfd):

get_mock.assert_called_once()
assert resume_upload_mock.call_count == 0


def test_resume_upload_integrity_check_failed(mocker, capfd):
mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1))
test_obj = FileObject('object/path', 'local_path', 'resumable_id', 'job_id', 'item_id')
test_obj.total_size = 2 # wrong size

manifest_json = {
'project_code': 'project_code',
'operator': 'operator',
'zone': AppConfig.Env.green_zone,
'parent_folder_id': 'parent_folder_id',
'current_folder_node': 'current_folder_node',
'tags': 'tags',
'file_objects': {test_obj.item_id: test_obj.to_dict()},
'total_size': 1,
}

get_return = test_obj.to_dict()
get_return.update({'status': ItemStatus.REGISTERED})
get_return.update({'id': get_return.get('item_id')})
get_return.update({'size': 1})
get_mock = mocker.patch(
'app.services.file_manager.file_upload.file_upload.get_file_info_by_geid', return_value=[{'result': get_return}]
)

try:
resume_upload(manifest_json, 1)
except SystemExit:
out, _ = capfd.readouterr()
expect = customized_error_msg(ECustomizedError.INVALID_RESUMABLE_FILE_SIZE) % ('object/path', 1, 2)
assert expect in out
else:
AssertionError('SystemExit not raised')

get_mock.assert_called_once()
24 changes: 12 additions & 12 deletions tests/app/services/project_manager/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_list_project(httpx_mock, mocker, capsys):
mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0)
httpx_mock.add_response(
method='GET',
url='http://bff_cli/v1/projects?page=0&page_size=10&order=created_at&order_by=desc',
url='http://bff_cli/v1/projects?page=0&page_size=10&order=desc&order_by=created_at',
json={
'code': 200,
'error_msg': '',
Expand Down Expand Up @@ -38,7 +38,7 @@ def test_list_project(httpx_mock, mocker, capsys):
},
)
project_mgr = SrvProjectManager()
project_mgr.list_projects(page=0, page_size=10, order='created_at', order_by='desc')
project_mgr.list_projects(page=0, page_size=10, order='desc', order_by='created_at')
out, _ = capsys.readouterr()
print_out = out.split('\n')
assert print_out[0] == ' Project Name Project Code '
Expand All @@ -54,11 +54,11 @@ def test_list_project_no_project(httpx_mock, mocker, capsys):
mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0)
httpx_mock.add_response(
method='GET',
url='http://bff_cli/v1/projects?page=0&page_size=10&order=created_at&order_by=desc',
url='http://bff_cli/v1/projects?page=0&page_size=10&order=desc&order_by=created_at',
json={'code': 200, 'error_msg': '', 'result': [], 'total': 0, 'page': 0},
)
project_mgr = SrvProjectManager()
project_mgr.list_projects(page=0, page_size=10, order='created_at', order_by='desc')
project_mgr.list_projects(page=0, page_size=10, order='desc', order_by='created_at')
out, _ = capsys.readouterr()
print_out = out.split('\n')
assert print_out[0] == ' Project Name Project Code '
Expand All @@ -71,7 +71,7 @@ def test_list_project_desc_by_code(httpx_mock, mocker, capsys):
mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0)
httpx_mock.add_response(
method='GET',
url='http://bff_cli/v1/projects?page=0&page_size=10&order=code&order_by=desc',
url='http://bff_cli/v1/projects?page=0&page_size=10&order=desc&order_by=code',
json={
'code': 200,
'error_msg': '',
Expand All @@ -92,7 +92,7 @@ def test_list_project_desc_by_code(httpx_mock, mocker, capsys):
},
)
project_mgr = SrvProjectManager()
project_mgr.list_projects(page=0, page_size=10, order='code', order_by='desc')
project_mgr.list_projects(page=0, page_size=10, order='desc', order_by='code')
out, _ = capsys.readouterr()
print_out = out.split('\n')
assert print_out[0] == ' Project Name Project Code '
Expand All @@ -115,7 +115,7 @@ def test_list_project_desc_by_name(httpx_mock, mocker, capsys):
mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0)
httpx_mock.add_response(
method='GET',
url='http://bff_cli/v1/projects?page=0&page_size=10&order=code&order_by=desc',
url='http://bff_cli/v1/projects?page=0&page_size=10&order=desc&order_by=name',
json={
'code': 200,
'error_msg': '',
Expand All @@ -136,7 +136,7 @@ def test_list_project_desc_by_name(httpx_mock, mocker, capsys):
},
)
project_mgr = SrvProjectManager()
project_mgr.list_projects(page=0, page_size=10, order='code', order_by='desc')
project_mgr.list_projects(page=0, page_size=10, order='desc', order_by='name')
out, _ = capsys.readouterr()
print_out = out.split('\n')
assert print_out[0] == ' Project Name Project Code '
Expand All @@ -159,7 +159,7 @@ def test_list_project_desc_by_name_with_page_size(httpx_mock, mocker, capsys):
mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0)
httpx_mock.add_response(
method='GET',
url='http://bff_cli/v1/projects?page=0&page_size=3&order=code&order_by=desc',
url='http://bff_cli/v1/projects?page=0&page_size=3&order=desc&order_by=name',
json={
'code': 200,
'error_msg': '',
Expand All @@ -173,7 +173,7 @@ def test_list_project_desc_by_name_with_page_size(httpx_mock, mocker, capsys):
},
)
project_mgr = SrvProjectManager()
project_mgr.list_projects(page=0, page_size=3, order='code', order_by='desc')
project_mgr.list_projects(page=0, page_size=3, order='desc', order_by='name')
out, _ = capsys.readouterr()
print_out = out.split('\n')
assert print_out[0] == ' Project Name Project Code '
Expand All @@ -189,7 +189,7 @@ def test_list_project_desc_by_name_with_page_size_and_page(httpx_mock, mocker, c
mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0)
httpx_mock.add_response(
method='GET',
url='http://bff_cli/v1/projects?page=1&page_size=3&order=code&order_by=desc',
url='http://bff_cli/v1/projects?page=1&page_size=3&order=desc&order_by=name',
json={
'code': 200,
'error_msg': '',
Expand All @@ -203,7 +203,7 @@ def test_list_project_desc_by_name_with_page_size_and_page(httpx_mock, mocker, c
},
)
project_mgr = SrvProjectManager()
project_mgr.list_projects(page=1, page_size=3, order='code', order_by='desc')
project_mgr.list_projects(page=1, page_size=3, order='desc', order_by='name')
out, _ = capsys.readouterr()
print_out = out.split('\n')
assert print_out[0] == ' Project Name Project Code '
Expand Down
Loading