Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bzl/mypy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ mypy_cli(
type_requirement("types-six"),
type_requirement("types-tqdm"),
type_requirement("types-urllib3"),
type_requirement("types-aiofiles"),
],
python_version = "3.10.18",
tags = ["no-mypy"],
Expand Down
4 changes: 4 additions & 0 deletions bzl/mypy/locked_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ tomli==2.0.1 \
--hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \
--hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f
# via mypy
types-aiofiles==25.1.0.20251011 \
--hash=sha256:1c2b8ab260cb3cd40c15f9d10efdc05a6e1e6b02899304d80dfa0410e028d3ff \
--hash=sha256:8ff8de7f9d42739d8f0dadcceeb781ce27cd8d8c4152d4a7c52f6b20edb8149c
# via -r ./requirements.txt
types-awscrt==0.26.1 \
--hash=sha256:176d320a26990efc057d4bf71396e05be027c142252ac48cc0d87aaea0704280 \
--hash=sha256:aca96f889b3745c0e74f42f08f277fed3bf6e9baa2cf9b06a36f78d77720e504
Expand Down
1 change: 1 addition & 0 deletions bzl/mypy/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ types-redis==4.4.*
types-six==1.17.*
types-tqdm==4.66.*
types-urllib3==1.26.*
types-aiofiles==25.1.0.20251011
2 changes: 1 addition & 1 deletion src/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ def setup_parser(parser: argparse._SubParsersAction):
View history for a specific configuration type::

osmo config history SERVICE

View history for a specific time range::

osmo config history --created-after "2025-05-18" --created-before "2025-05-25"
Expand Down
8 changes: 5 additions & 3 deletions src/locked_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
#
# pip-compile --allow-unsafe --generate-hashes --output-file=locked_requirements.txt ./requirements.txt
#
aiofiles==25.1.0 \
--hash=sha256:a8d728f0a29de45dc521f18f07297428d56992a742f0cd2701ba86e44d23d5b2 \
--hash=sha256:abe311e527c862958650f9438e859c1fa7568a141b22abcd015e120e86a85695
# via -r ./requirements.txt
altgraph==0.17.3 \
--hash=sha256:ad33358114df7c9416cdb8fa1eaa5852166c505118717021c6a8c7c7abbd03dd \
--hash=sha256:c8ac1ca6772207179ed8003ce7687757c04b0b71536f81e2ac5755c6226458fe
Expand Down Expand Up @@ -526,9 +530,7 @@ lark==1.1.5 \
macholib==1.16.3 \
--hash=sha256:07ae9e15e8e4cd9a788013d81f5908b3609aa76f9b1421bae9c4d7606ec86a30 \
--hash=sha256:0e315d7583d38b8c77e815b1ecbdbf504a8258d8b3e17b61165c6feb60d18f2c
# via
# -r ./requirements.txt
# pyinstaller
# via -r ./requirements.txt
markupsafe==2.1.3 \
--hash=sha256:05fb21170423db021895e1ea1e1f3ab3adb85d1c2333cbc2310f2a26bc77272e \
--hash=sha256:0a4e4a1aff6c7ac4cd55792abf96c915634c2b97e3cc1c7129578aa68ebd754e \
Expand Down
3 changes: 3 additions & 0 deletions src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,6 @@ watchdog==6.0.0
# Azure Blob Storage
azure-storage-blob==12.26.0
azure-identity==1.23.1

# Async
aiofiles==25.1.0
1 change: 1 addition & 0 deletions src/utils/connectors/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ osmo_py_library(
requirement("psycopg2-binary"),
requirement("pydantic"),
requirement("redis"),
requirement("aiofiles"),
requirement("pyyaml"),
"//src/lib/data/constants",
"//src/lib/data/storage",
Expand Down
5 changes: 3 additions & 2 deletions src/utils/connectors/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
from typing import AsyncGenerator, Dict, Optional

import aiofiles # type: ignore
import kombu # type: ignore
import pydantic
import redis.asyncio # type: ignore
Expand Down Expand Up @@ -238,9 +239,9 @@ async def write_redis_log_to_disk(url: str, name: str, file_path: str):
file_path (str): The path to write the Redis logs to.

"""
with open(file_path, 'a', encoding='utf-8') as f:
async with aiofiles.open(file_path, 'a', encoding='utf-8') as f:
async for line in redis_log_formatter(url, name):
f.write(line)
await f.write(line)


def get_backend_option_name(backend: str) -> str:
Expand Down
151 changes: 78 additions & 73 deletions src/utils/job/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from typing import List, Dict, Tuple, Type
import urllib.parse

import aiofiles
import redis # type: ignore
import redis.asyncio # type: ignore
import pydantic
Expand Down Expand Up @@ -1305,6 +1306,8 @@ def execute(self, context: JobExecutionContext,

redis_client = redis.from_url(workflow_obj.logs)

redis_batch_pipeline = redis_client.pipeline()

if workflow_obj.status.failed():
start_delimiter = '\n' + '-' * 100 + '\n'
end_delimiter = '-' * 100 + '\n'
Expand All @@ -1320,121 +1323,124 @@ def execute(self, context: JobExecutionContext,
logs = connectors.redis.LogStreamBody(
time=common.current_time(), io_type=connectors.redis.IOType.DUMP,
source='OSMO', retry_id=0, text=log_message)
redis_client.xadd(f'{self.workflow_id}-logs', json.loads(logs.json()))
redis_batch_pipeline.xadd(f'{self.workflow_id}-logs', json.loads(logs.json()))

logs = connectors.redis.LogStreamBody(
time=common.current_time(), io_type=connectors.redis.IOType.END_FLAG,
source='', retry_id=0, text='')
redis_client.xadd(f'{self.workflow_id}-logs', json.loads(logs.json()))
redis_client.expire(f'{self.workflow_id}-logs', connectors.MAX_LOG_TTL, nx=True)
redis_client.xadd(common.get_workflow_events_redis_name(self.workflow_uuid),
json.loads(logs.json()))
redis_client.expire(common.get_workflow_events_redis_name(self.workflow_uuid),
connectors.MAX_LOG_TTL, nx=True)
redis_batch_pipeline.xadd(f'{self.workflow_id}-logs', json.loads(logs.json()))
redis_batch_pipeline.expire(f'{self.workflow_id}-logs', connectors.MAX_LOG_TTL, nx=True)
redis_batch_pipeline.xadd(common.get_workflow_events_redis_name(self.workflow_uuid),
json.loads(logs.json()))
redis_batch_pipeline.expire(common.get_workflow_events_redis_name(self.workflow_uuid),
connectors.MAX_LOG_TTL, nx=True)
for group in workflow_obj.groups:
for task_obj in group.tasks:
for retry_idx in range(task_obj.retry_id + 1):
redis_client.xadd(
redis_batch_pipeline.xadd(
common.get_redis_task_log_name(
self.workflow_id, task_obj.name, retry_idx),
json.loads(logs.json()))
redis_client.expire(
redis_batch_pipeline.expire(
common.get_redis_task_log_name(
self.workflow_id, task_obj.name, retry_idx),
connectors.MAX_LOG_TTL, nx=True)
redis_client.xadd(
redis_batch_pipeline.xadd(
f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}-error-logs',
json.loads(logs.json()))
redis_client.expire(
redis_batch_pipeline.expire(
f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}-error-logs',
connectors.MAX_LOG_TTL, nx=True)

redis_batch_pipeline.execute()

last_timestamp = update_progress_writer(
progress_writer,
last_timestamp,
progress_iter_freq)

# Upload logs
# Create a storage client to upload logs to S3
workflow_config = context.postgres.get_workflow_configs()

if workflow_config.workflow_log.credential is None:
return JobResult(
success=False,
error='Workflow log credential is not set',
)

storage_client = storage.Client.create(
data_credential=workflow_config.workflow_log.credential,
executor_params=storage.ExecutorParameters(
num_processes=1,
num_threads=15,
),
)

def migrate_logs(redis_url: str, redis_key: str, file_name: str):
async def migrate_logs(redis_url: str, redis_key: str, file_name: str):
''' Uploads logs to S3 and deletes them from Redis. Returns the S3 file path. '''
nonlocal last_timestamp

with tempfile.NamedTemporaryFile(mode='w+') as temp_file:
loop = asyncio.get_event_loop()
loop.run_until_complete(
connectors.write_redis_log_to_disk(
redis_url,
redis_key,
temp_file.name,
),
)

last_timestamp = update_progress_writer(
progress_writer,
last_timestamp,
progress_iter_freq)

temp_file.flush()
storage_client.upload_objects(
source=temp_file.name,
destination_prefix=self.workflow_id,
destination_name=file_name,
async with aiofiles.tempfile.NamedTemporaryFile(mode='w+') as temp_file:
await connectors.write_redis_log_to_disk(
redis_url,
redis_key,
str(temp_file.name),
)

workflow_logs_redis_key = f'{self.workflow_id}-logs'
workflow_events_redis_key = common.get_workflow_events_redis_name(self.workflow_uuid)
await progress_writer.report_progress_async()

# Upload workflow logs
migrate_logs(
workflow_obj.logs,
workflow_logs_redis_key,
common.WORKFLOW_LOGS_FILE_NAME
)
await temp_file.flush()

# Upload workflow events
migrate_logs(
workflow_obj.logs,
workflow_events_redis_key,
common.WORKFLOW_EVENTS_FILE_NAME
)
# Wrap the call in a concrete no-arg function to avoid overload issues during lint.
def _upload_logs() -> storage.UploadSummary:
return storage_client.upload_objects(
source=str(temp_file.name),
destination_prefix=self.workflow_id,
destination_name=file_name,
)

for group in workflow_obj.groups:
for task_obj in group.tasks:
# Upload task logs
task_file_name = common.get_task_log_file_name(
task_obj.name,
task_obj.retry_id
)
await asyncio.to_thread(_upload_logs)

task_redis_path = common.get_redis_task_log_name(
self.workflow_id,
task_obj.name,
task_obj.retry_id,
)
await progress_writer.report_progress_async()

semaphore = asyncio.Semaphore(10)

async def migrate_logs_concurrently(redis_url: str, redis_key: str, file_name: str):
async with semaphore:
await migrate_logs(redis_url, redis_key, file_name)

workflow_logs_redis_key = f'{self.workflow_id}-logs'
workflow_events_redis_key = common.get_workflow_events_redis_name(self.workflow_uuid)

migrate_logs(workflow_obj.logs, task_redis_path, task_file_name)
# Create a list of task parameters
task_parameters : List[Tuple[str, str, str]] = [
(workflow_obj.logs, workflow_logs_redis_key, common.WORKFLOW_LOGS_FILE_NAME),
(workflow_obj.logs, workflow_events_redis_key, common.WORKFLOW_EVENTS_FILE_NAME)
]

# Upload error logs
for group in workflow_obj.groups:
for task_obj in group.tasks:
task_parameters.append(
(workflow_obj.logs,
common.get_redis_task_log_name(
self.workflow_id, task_obj.name, task_obj.retry_id),
common.get_task_log_file_name(
task_obj.name, task_obj.retry_id)))
if task_obj.status.has_error_logs():
prefix = f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}'
task_error_log_name = task_obj.name
if task_obj.retry_id > 0:
task_error_log_name += f'_{task_obj.retry_id}'
task_error_log_name += common.ERROR_LOGS_SUFFIX_FILE_NAME
task_parameters.append(
(workflow_obj.logs, f'{prefix}-error-logs', task_error_log_name))

async def run_log_migrations():
await asyncio.gather(
*(
migrate_logs_concurrently(redis_url, redis_key, file_name)
for redis_url, redis_key, file_name in task_parameters
)
)

migrate_logs(workflow_obj.logs, f'{prefix}-error-logs', task_error_log_name)
asyncio.run(run_log_migrations())

wf_logs_ss_file_path = task_common.get_workflow_logs_path(
workflow_id=self.workflow_id,
Expand All @@ -1451,20 +1457,19 @@ def migrate_logs(redis_url: str, redis_key: str, file_name: str):
workflow_obj.update_events_to_db(wf_events_ss_file_path)

# Remove logs from Redis
redis_client.delete(workflow_logs_redis_key)
redis_client.delete(workflow_events_redis_key)

redis_keys_to_delete : List[str] = [workflow_logs_redis_key, workflow_events_redis_key]
for group in workflow_obj.groups:
for task_obj in group.tasks:
task_redis_path = common.get_redis_task_log_name(
self.workflow_id, task_obj.name, task_obj.retry_id)
redis_client.delete(task_redis_path)

# Upload error logs
redis_keys_to_delete.append(task_redis_path)
if task_obj.status.has_error_logs():
prefix = f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}'
# Remove logs from Redis
redis_client.delete(f'{prefix}-error-logs')
redis_keys_to_delete.append(f'{prefix}-error-logs')

# Delete in batches to avoid an excessively large single DEL command.
for idx in range(0, len(redis_keys_to_delete), 1000):
redis_client.delete(*redis_keys_to_delete[idx:idx + 1000])

return JobResult()

Expand Down
1 change: 1 addition & 0 deletions src/utils/progress_check/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ osmo_py_library(
],
deps = [
requirement("pydantic"),
requirement("aiofiles"),
"//src/utils:static_config",
],
visibility = ["//visibility:public"],
Expand Down
10 changes: 10 additions & 0 deletions src/utils/progress_check/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import time
import uuid

import aiofiles # type: ignore
import aiofiles.os # type: ignore


class ProgressWriter:
'''Reports progress by writing the current time to a given progress file'''
Expand All @@ -36,6 +39,13 @@ def report_progress(self):
# Atomically replace the current file with the temp file
os.replace(src=temp_file, dst=self._filename)

async def report_progress_async(self):
temp_file = f'{self._filename}-{uuid.uuid4()}.tmp'
# Write the current time to a temporary file
async with aiofiles.open(temp_file, mode='w', encoding='utf-8') as file:
await file.write(str(time.time()))
# Atomically replace the current file with the temp file
await aiofiles.os.replace(temp_file, self._filename)

class ProgressReader:
'''
Expand Down