Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/configs/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(

This adjustment is made to prevent complications with mounted NFS volumes where all files have root ownership.
"""

if config_path is None:
config_path = ConfigClass.config_path
if config_filename is None:
Expand Down
3 changes: 1 addition & 2 deletions app/services/crypto/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,4 @@ def decryption(encrypted_message, secret, interactive=True):
ehandler.SrvErrorHandler.default_handle(str(ex) + ', please try login as a valid user.')
else:
raise ex
else:
ehandler.SrvErrorHandler.customized_handle(ehandler.ECustomizedError.LOGIN_SESSION_INVALID, True)
return ''
25 changes: 17 additions & 8 deletions app/services/file_manager/file_upload/file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,13 @@ def simple_upload( # noqa: C901
pool.close()
pool.join()

if attribute:
continue_loop = True
while continue_loop:
# the last uploaded file
succeed = upload_client.check_status(file_object)
continue_loop = not succeed
time.sleep(0.5)
unfinished_files = pre_upload_infos
while len(unfinished_files) > 0:
temp = []
mhandler.SrvOutPutHandler.finalize_upload()
for file_batchs in batch_generator(pre_upload_infos, batch_size=AppConfig.Env.upload_batch_size):
temp.extend(upload_client.check_status(file_batchs))
unfinished_files = temp

num_of_file = len(pre_upload_infos)
logger.info(f'Upload Time: {time.time() - upload_start_time:.2f}s for {num_of_file:d} files')
Expand Down Expand Up @@ -341,7 +341,7 @@ def resume_upload(
# out of thread pool.
res = pool.apply_async(
upload_client.on_succeed,
args=(file_object),
args=(file_object,),
)
on_success_res.append(res)

Expand All @@ -353,5 +353,14 @@ def resume_upload(
pool.close()
pool.join()

unfinished_files = unfinished_items
while len(unfinished_files) > 0:
temp = []
mhandler.SrvOutPutHandler.finalize_upload()
for file_batchs in batch_generator(unfinished_items, batch_size=AppConfig.Env.upload_batch_size):
temp.extend(upload_client.check_status(file_batchs))
unfinished_files = temp
time.sleep(1)

num_of_file = len(unfinished_items)
logger.info(f'Upload Time: {time.time() - upload_start_time:.2f}s for {num_of_file:d} files')
25 changes: 14 additions & 11 deletions app/services/file_manager/file_upload/upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from app.services.output_manager.error_handler import ECustomizedError
from app.services.output_manager.error_handler import SrvErrorHandler
from app.services.user_authentication.decorator import require_valid_token
from app.utils.aggregated import ItemStatus
from app.utils.aggregated import get_file_info_by_geid

from .exception import INVALID_CHUNK_ETAG
Expand Down Expand Up @@ -354,10 +355,11 @@ def on_complete(result):

# for resumable check ONLY if user resume the upload at 100%
# just check if there is any active job, if not, set the event
while not chunk_upload_done.wait(timeout=60):
while not chunk_upload_done.wait(timeout=5):
if self.active_jobs == 0:
chunk_upload_done.set()
logger.warning('Waiting for all the chunks to be uploaded, remaining jobs: %s', file_object.progress)
else:
logger.warning('Waiting for all the chunks to be uploaded, remaining jobs: %s', self.active_jobs)

f.close()

Expand Down Expand Up @@ -447,7 +449,7 @@ def on_succeed(self, file_object: FileObject) -> None:
result = response.json().get('result')
return result

def check_status(self, file_object: FileObject) -> bool:
def check_status(self, file_objects: list[FileObject]) -> list[FileObject]:
"""
Summary:
The function is to check the status of upload process.
Expand All @@ -458,14 +460,15 @@ def check_status(self, file_object: FileObject) -> bool:
- bool: if job success or not
"""

# with pre-register upload, we can check if the file entity is already exist
# if exist, we can continue with manifest process
file_entity = get_file_info_by_geid([file_object.item_id])[0].get('result', {})
mhandler.SrvOutPutHandler.finalize_upload()
if file_entity.get('status') == 'ACTIVE':
return True
else:
return False
file_ids = [file_object.item_id for file_object in file_objects]
results = get_file_info_by_geid(file_ids)
unfinished_files = []
for r in results:
status = r.get('status')
if status != ItemStatus.ACTIVE:
unfinished_files.append(r.get('result'))

return unfinished_files

def set_finish_upload(self):
self.finish_upload = True
3 changes: 2 additions & 1 deletion app/services/output_manager/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ def newer_version_available(version, download_url, print_message=True):
clickable_text = f'\033]8;;{download_url}\033\\latest cli version\033]8;;\033\\'
message = (
f'\nNewer version available! Pilotcli v{version} is available. Please vist \n{clickable_text}. '
'This link will expire in 10 minutes.\n'
'This link will expire in 10 minutes. If the link doesn\'t show up, Please visit the \n'
'support page on portal to download the latest version.'
)
if print_message:
logger.warning(message)
Expand Down
9 changes: 0 additions & 9 deletions app/utils/aggregated.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,24 +234,15 @@ def remove_the_output_file(filepath: str) -> None:


def get_latest_cli_version() -> Tuple[Version, str]:
import logging
import time

try:
start_time = time.time()
httpx_client = BaseClient(AppConfig.Connections.url_fileops_greenroom)
logging.critical(f'http client init time: {time.time() - start_time}')
user_config = UserConfig()
logging.critical(f'user config init time: {time.time() - start_time}')
t1 = time.time()
if not user_config.is_access_token_exists():
return Version('0.0.0')
logging.critical(f'Check token time: {time.time() - t1}')
t2 = time.time()

headers = {'Authorization': 'Bearer'}
response = httpx_client._get('v1/download/cli/presigned', headers=headers)
logging.critical(f'Get latest version time: {time.time() - t2}')
result = response.json().get('result', {})
latest_version = result.get('linux', {}).get('version', '0.0.0')
download_url = result.get('linux', {}).get('download_url', '')
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "app"
version = "3.14.0"
version = "3.14.1"
description = "This service is designed to support pilot platform"
authors = ["Indoc Systems"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pytest

from app.configs.app_config import AppConfig
from app.models.item import ItemStatus
from app.services.file_manager.file_upload.exception import INVALID_CHUNK_ETAG
from app.services.file_manager.file_upload.models import FileObject
from app.services.file_manager.file_upload.upload_client import UploadClient
Expand All @@ -39,14 +40,14 @@ def test_check_status_success(httpx_mock, mocker):
httpx_mock.add_response(
method='POST',
url=AppConfig.Connections.url_bff + '/v1/query/geid',
json={'result': [{'result': {'filename': 'test', 'status': 'ACTIVE'}}]},
json={'result': [{'status': ItemStatus.ACTIVE, 'result': {'name': 'test', 'status': ItemStatus.ACTIVE}}]},
status_code=200,
)

test_obj = FileObject('test', 'test', 'test', 'test', 'test')
result = upload_client.check_status(test_obj)
result = upload_client.check_status([test_obj])

assert result is True
assert len(result) == 0


def test_check_status_fail(httpx_mock, mocker):
Expand All @@ -58,14 +59,16 @@ def test_check_status_fail(httpx_mock, mocker):
httpx_mock.add_response(
method='POST',
url=AppConfig.Connections.url_bff + '/v1/query/geid',
json={'result': [{'result': {'filename': 'test', 'status': 'REGISTERED'}}]},
json={
'result': [{'status': ItemStatus.REGISTERED, 'result': {'name': 'test', 'status': ItemStatus.REGISTERED}}]
},
status_code=200,
)

test_obj = FileObject('test', 'test', 'test', 'test', 'test')
result = upload_client.check_status(test_obj)
result = upload_client.check_status([test_obj])

assert result is False
assert len(result) == 1


def test_chunk_upload(httpx_mock, mocker):
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def mock_upload_client(monkeypatch):
monkeypatch.setattr(UploadClient, 'stream_upload', lambda *args, **kwargs: [])
monkeypatch.setattr(UploadClient, 'on_succeed', lambda *args, **kwargs: None)
monkeypatch.setattr(UploadClient, 'output_manifest', lambda *args, **kwargs: {})
monkeypatch.setattr(UploadClient, 'check_status', lambda *args, **kwargs: True)
monkeypatch.setattr(UploadClient, 'check_status', lambda *args, **kwargs: [])


@pytest.fixture
Expand Down
Loading