diff --git a/bzl/mypy/BUILD b/bzl/mypy/BUILD index ad17fa14..2a4fd51c 100644 --- a/bzl/mypy/BUILD +++ b/bzl/mypy/BUILD @@ -56,6 +56,7 @@ mypy_cli( type_requirement("types-six"), type_requirement("types-tqdm"), type_requirement("types-urllib3"), + type_requirement("types-aiofiles"), ], python_version = "3.10.18", tags = ["no-mypy"], diff --git a/bzl/mypy/locked_requirements.txt b/bzl/mypy/locked_requirements.txt index 9913612b..fd235d09 100644 --- a/bzl/mypy/locked_requirements.txt +++ b/bzl/mypy/locked_requirements.txt @@ -199,6 +199,10 @@ tomli==2.0.1 \ --hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \ --hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f # via mypy +types-aiofiles==25.1.0.20251011 \ + --hash=sha256:1c2b8ab260cb3cd40c15f9d10efdc05a6e1e6b02899304d80dfa0410e028d3ff \ + --hash=sha256:8ff8de7f9d42739d8f0dadcceeb781ce27cd8d8c4152d4a7c52f6b20edb8149c + # via -r ./requirements.txt types-awscrt==0.26.1 \ --hash=sha256:176d320a26990efc057d4bf71396e05be027c142252ac48cc0d87aaea0704280 \ --hash=sha256:aca96f889b3745c0e74f42f08f277fed3bf6e9baa2cf9b06a36f78d77720e504 diff --git a/bzl/mypy/requirements.txt b/bzl/mypy/requirements.txt index 579cc421..3bee14c0 100755 --- a/bzl/mypy/requirements.txt +++ b/bzl/mypy/requirements.txt @@ -35,3 +35,4 @@ types-redis==4.4.* types-six==1.17.* types-tqdm==4.66.* types-urllib3==1.26.* +types-aiofiles==25.1.0.20251011 diff --git a/src/cli/config.py b/src/cli/config.py index f779f206..1e9ed238 100644 --- a/src/cli/config.py +++ b/src/cli/config.py @@ -1041,7 +1041,7 @@ def setup_parser(parser: argparse._SubParsersAction): View history for a specific configuration type:: osmo config history SERVICE - + View history for a specific time range:: osmo config history --created-after "2025-05-18" --created-before "2025-05-25" diff --git a/src/locked_requirements.txt b/src/locked_requirements.txt index 1a4b83a6..7ac9565d 100644 --- a/src/locked_requirements.txt +++ b/src/locked_requirements.txt @@ -4,6 +4,10 @@ # # pip-compile --allow-unsafe --generate-hashes --output-file=locked_requirements.txt ./requirements.txt # +aiofiles==25.1.0 \ + --hash=sha256:a8d728f0a29de45dc521f18f07297428d56992a742f0cd2701ba86e44d23d5b2 \ + --hash=sha256:abe311e527c862958650f9438e859c1fa7568a141b22abcd015e120e86a85695 + # via -r ./requirements.txt altgraph==0.17.3 \ --hash=sha256:ad33358114df7c9416cdb8fa1eaa5852166c505118717021c6a8c7c7abbd03dd \ --hash=sha256:c8ac1ca6772207179ed8003ce7687757c04b0b71536f81e2ac5755c6226458fe @@ -526,9 +530,7 @@ lark==1.1.5 \ macholib==1.16.3 \ --hash=sha256:07ae9e15e8e4cd9a788013d81f5908b3609aa76f9b1421bae9c4d7606ec86a30 \ --hash=sha256:0e315d7583d38b8c77e815b1ecbdbf504a8258d8b3e17b61165c6feb60d18f2c - # via - # -r ./requirements.txt - # pyinstaller + # via -r ./requirements.txt markupsafe==2.1.3 \ --hash=sha256:05fb21170423db021895e1ea1e1f3ab3adb85d1c2333cbc2310f2a26bc77272e \ --hash=sha256:0a4e4a1aff6c7ac4cd55792abf96c915634c2b97e3cc1c7129578aa68ebd754e \ diff --git a/src/requirements.txt b/src/requirements.txt index fadccfcc..4546b0ac 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -105,3 +105,6 @@ watchdog==6.0.0 # Azure Blob Storage azure-storage-blob==12.26.0 azure-identity==1.23.1 + +# Async +aiofiles==25.1.0 diff --git a/src/utils/connectors/BUILD b/src/utils/connectors/BUILD index 041cb4b3..43bc15a4 100644 --- a/src/utils/connectors/BUILD +++ b/src/utils/connectors/BUILD @@ -11,6 +11,7 @@ osmo_py_library( requirement("psycopg2-binary"), requirement("pydantic"), requirement("redis"), + requirement("aiofiles"), requirement("pyyaml"), "//src/lib/data/constants", "//src/lib/data/storage", diff --git a/src/utils/connectors/redis.py b/src/utils/connectors/redis.py index d2be6b05..9697af77 100644 --- a/src/utils/connectors/redis.py +++ b/src/utils/connectors/redis.py @@ -22,6 +22,7 @@ import logging from typing import AsyncGenerator, Dict, Optional +import aiofiles # type: ignore import kombu # type: ignore import pydantic import redis.asyncio # type: ignore @@ -238,9 +239,9 @@ async def write_redis_log_to_disk(url: str, name: str, file_path: str): file_path (str): The path to write the Redis logs to. """ - with open(file_path, 'a', encoding='utf-8') as f: + async with aiofiles.open(file_path, 'a', encoding='utf-8') as f: async for line in redis_log_formatter(url, name): - f.write(line) + await f.write(line) def get_backend_option_name(backend: str) -> str: diff --git a/src/utils/job/jobs.py b/src/utils/job/jobs.py index 7e532b2e..f6934354 100644 --- a/src/utils/job/jobs.py +++ b/src/utils/job/jobs.py @@ -31,6 +31,7 @@ from typing import List, Dict, Tuple, Type import urllib.parse +import aiofiles import redis # type: ignore import redis.asyncio # type: ignore import pydantic @@ -1305,6 +1306,8 @@ def execute(self, context: JobExecutionContext, redis_client = redis.from_url(workflow_obj.logs) + redis_batch_pipeline = redis_client.pipeline() + if workflow_obj.status.failed(): start_delimiter = '\n' + '-' * 100 + '\n' end_delimiter = '-' * 100 + '\n' @@ -1320,121 +1323,124 @@ def execute(self, context: JobExecutionContext, logs = connectors.redis.LogStreamBody( time=common.current_time(), io_type=connectors.redis.IOType.DUMP, source='OSMO', retry_id=0, text=log_message) - redis_client.xadd(f'{self.workflow_id}-logs', json.loads(logs.json())) + redis_batch_pipeline.xadd(f'{self.workflow_id}-logs', json.loads(logs.json())) logs = connectors.redis.LogStreamBody( time=common.current_time(), io_type=connectors.redis.IOType.END_FLAG, source='', retry_id=0, text='') - redis_client.xadd(f'{self.workflow_id}-logs', json.loads(logs.json())) - redis_client.expire(f'{self.workflow_id}-logs', connectors.MAX_LOG_TTL, nx=True) - redis_client.xadd(common.get_workflow_events_redis_name(self.workflow_uuid), - json.loads(logs.json())) - redis_client.expire(common.get_workflow_events_redis_name(self.workflow_uuid), - connectors.MAX_LOG_TTL, nx=True) + redis_batch_pipeline.xadd(f'{self.workflow_id}-logs', json.loads(logs.json())) + redis_batch_pipeline.expire(f'{self.workflow_id}-logs', connectors.MAX_LOG_TTL, nx=True) + redis_batch_pipeline.xadd(common.get_workflow_events_redis_name(self.workflow_uuid), + json.loads(logs.json())) + redis_batch_pipeline.expire(common.get_workflow_events_redis_name(self.workflow_uuid), + connectors.MAX_LOG_TTL, nx=True) for group in workflow_obj.groups: for task_obj in group.tasks: for retry_idx in range(task_obj.retry_id + 1): - redis_client.xadd( + redis_batch_pipeline.xadd( common.get_redis_task_log_name( self.workflow_id, task_obj.name, retry_idx), json.loads(logs.json())) - redis_client.expire( + redis_batch_pipeline.expire( common.get_redis_task_log_name( self.workflow_id, task_obj.name, retry_idx), connectors.MAX_LOG_TTL, nx=True) - redis_client.xadd( + redis_batch_pipeline.xadd( f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}-error-logs', json.loads(logs.json())) - redis_client.expire( + redis_batch_pipeline.expire( f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}-error-logs', connectors.MAX_LOG_TTL, nx=True) + redis_batch_pipeline.execute() + last_timestamp = update_progress_writer( progress_writer, last_timestamp, progress_iter_freq) - # Upload logs + # Create a storage client to upload logs to S3 workflow_config = context.postgres.get_workflow_configs() - if workflow_config.workflow_log.credential is None: return JobResult( success=False, error='Workflow log credential is not set', ) - storage_client = storage.Client.create( data_credential=workflow_config.workflow_log.credential, + executor_params=storage.ExecutorParameters( + num_processes=1, + num_threads=15, + ), ) - def migrate_logs(redis_url: str, redis_key: str, file_name: str): + async def migrate_logs(redis_url: str, redis_key: str, file_name: str): ''' Uploads logs to S3 and deletes them from Redis. Returns the S3 file path. ''' - nonlocal last_timestamp - - with tempfile.NamedTemporaryFile(mode='w+') as temp_file: - loop = asyncio.get_event_loop() - loop.run_until_complete( - connectors.write_redis_log_to_disk( - redis_url, - redis_key, - temp_file.name, - ), - ) - last_timestamp = update_progress_writer( - progress_writer, - last_timestamp, - progress_iter_freq) - - temp_file.flush() - storage_client.upload_objects( - source=temp_file.name, - destination_prefix=self.workflow_id, - destination_name=file_name, + async with aiofiles.tempfile.NamedTemporaryFile(mode='w+') as temp_file: + await connectors.write_redis_log_to_disk( + redis_url, + redis_key, + str(temp_file.name), ) - workflow_logs_redis_key = f'{self.workflow_id}-logs' - workflow_events_redis_key = common.get_workflow_events_redis_name(self.workflow_uuid) + await progress_writer.report_progress_async() - # Upload workflow logs - migrate_logs( - workflow_obj.logs, - workflow_logs_redis_key, - common.WORKFLOW_LOGS_FILE_NAME - ) + await temp_file.flush() - # Upload workflow events - migrate_logs( - workflow_obj.logs, - workflow_events_redis_key, - common.WORKFLOW_EVENTS_FILE_NAME - ) + # Wrap the call in a concrete no-arg function to avoid overload issues during lint. + def _upload_logs() -> storage.UploadSummary: + return storage_client.upload_objects( + source=str(temp_file.name), + destination_prefix=self.workflow_id, + destination_name=file_name, + ) - for group in workflow_obj.groups: - for task_obj in group.tasks: - # Upload task logs - task_file_name = common.get_task_log_file_name( - task_obj.name, - task_obj.retry_id - ) + await asyncio.to_thread(_upload_logs) - task_redis_path = common.get_redis_task_log_name( - self.workflow_id, - task_obj.name, - task_obj.retry_id, - ) + await progress_writer.report_progress_async() + + semaphore = asyncio.Semaphore(10) + + async def migrate_logs_concurrently(redis_url: str, redis_key: str, file_name: str): + async with semaphore: + await migrate_logs(redis_url, redis_key, file_name) + + workflow_logs_redis_key = f'{self.workflow_id}-logs' + workflow_events_redis_key = common.get_workflow_events_redis_name(self.workflow_uuid) - migrate_logs(workflow_obj.logs, task_redis_path, task_file_name) + # Create a list of task parameters + task_parameters : List[Tuple[str, str, str]] = [ + (workflow_obj.logs, workflow_logs_redis_key, common.WORKFLOW_LOGS_FILE_NAME), + (workflow_obj.logs, workflow_events_redis_key, common.WORKFLOW_EVENTS_FILE_NAME) + ] - # Upload error logs + for group in workflow_obj.groups: + for task_obj in group.tasks: + task_parameters.append( + (workflow_obj.logs, + common.get_redis_task_log_name( + self.workflow_id, task_obj.name, task_obj.retry_id), + common.get_task_log_file_name( + task_obj.name, task_obj.retry_id))) if task_obj.status.has_error_logs(): prefix = f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}' task_error_log_name = task_obj.name if task_obj.retry_id > 0: task_error_log_name += f'_{task_obj.retry_id}' task_error_log_name += common.ERROR_LOGS_SUFFIX_FILE_NAME + task_parameters.append( + (workflow_obj.logs, f'{prefix}-error-logs', task_error_log_name)) + + async def run_log_migrations(): + await asyncio.gather( + *( + migrate_logs_concurrently(redis_url, redis_key, file_name) + for redis_url, redis_key, file_name in task_parameters + ) + ) - migrate_logs(workflow_obj.logs, f'{prefix}-error-logs', task_error_log_name) + asyncio.run(run_log_migrations()) wf_logs_ss_file_path = task_common.get_workflow_logs_path( workflow_id=self.workflow_id, @@ -1451,20 +1457,19 @@ def migrate_logs(redis_url: str, redis_key: str, file_name: str): workflow_obj.update_events_to_db(wf_events_ss_file_path) # Remove logs from Redis - redis_client.delete(workflow_logs_redis_key) - redis_client.delete(workflow_events_redis_key) - + redis_keys_to_delete : List[str] = [workflow_logs_redis_key, workflow_events_redis_key] for group in workflow_obj.groups: for task_obj in group.tasks: task_redis_path = common.get_redis_task_log_name( self.workflow_id, task_obj.name, task_obj.retry_id) - redis_client.delete(task_redis_path) - - # Upload error logs + redis_keys_to_delete.append(task_redis_path) if task_obj.status.has_error_logs(): prefix = f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}' - # Remove logs from Redis - redis_client.delete(f'{prefix}-error-logs') + redis_keys_to_delete.append(f'{prefix}-error-logs') + + # Delete in batches to avoid an excessively large single DEL command. + for idx in range(0, len(redis_keys_to_delete), 1000): + redis_client.delete(*redis_keys_to_delete[idx:idx + 1000]) return JobResult() diff --git a/src/utils/progress_check/BUILD b/src/utils/progress_check/BUILD index 916a5b31..2f616a22 100644 --- a/src/utils/progress_check/BUILD +++ b/src/utils/progress_check/BUILD @@ -26,6 +26,7 @@ osmo_py_library( ], deps = [ requirement("pydantic"), + requirement("aiofiles"), "//src/utils:static_config", ], visibility = ["//visibility:public"], diff --git a/src/utils/progress_check/progress.py b/src/utils/progress_check/progress.py index a5e7904c..0d7f58b3 100644 --- a/src/utils/progress_check/progress.py +++ b/src/utils/progress_check/progress.py @@ -20,6 +20,9 @@ import time import uuid +import aiofiles # type: ignore +import aiofiles.os # type: ignore + class ProgressWriter: '''Reports progress by writing the current time to a given progress file''' @@ -36,6 +39,13 @@ def report_progress(self): # Atomically replace the current file with the temp file os.replace(src=temp_file, dst=self._filename) + async def report_progress_async(self): + temp_file = f'{self._filename}-{uuid.uuid4()}.tmp' + # Write the current time to a temporary file + async with aiofiles.open(temp_file, mode='w', encoding='utf-8') as file: + await file.write(str(time.time())) + # Atomically replace the current file with the temp file + await aiofiles.os.replace(temp_file, self._filename) class ProgressReader: '''