Skip to content

Commit

Permalink
Small improvments
Browse files Browse the repository at this point in the history
  • Loading branch information
Marishka17 committed Feb 21, 2025
1 parent 8ca66ff commit 9ce85ac
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 86 deletions.
2 changes: 1 addition & 1 deletion cvat/apps/dataset_manager/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2452,7 +2452,7 @@ def load_dataset_data(project_annotation, dataset: dm.Dataset, project_data):
raise CvatImportError(f'Target project does not have label with name "{label.name}"')
for subset_id, subset in enumerate(dataset.subsets().values()):
job = rq.get_current_job()
job_meta = ImportRQMeta.from_job(job)
job_meta = ImportRQMeta.for_job(job)
job_meta.status = 'Task from dataset is being created...'
job_meta.progress = (subset_id + (job_meta.task_progress or 0.)) / len(dataset.subsets().keys())
job_meta.save()
Expand Down
2 changes: 1 addition & 1 deletion cvat/apps/dataset_manager/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def data(self) -> dict:
@transaction.atomic
def import_dataset_as_project(src_file, project_id, format_name, conv_mask_to_poly):
rq_job = rq.get_current_job()
rq_job_meta = ImportRQMeta.from_job(rq_job)
rq_job_meta = ImportRQMeta.for_job(rq_job)
rq_job_meta.status = 'Dataset import has been started...'
rq_job_meta.progress = 0.
rq_job_meta.save()
Expand Down
2 changes: 1 addition & 1 deletion cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def _patched_retry(*_1, **_2):
settings.CVAT_QUEUES.EXPORT_DATA.value
)

rq_job_meta = ExportRQMeta.from_job(current_rq_job)
rq_job_meta = ExportRQMeta.for_job(current_rq_job)
user_id = rq_job_meta.user.id or -1

with get_rq_lock_by_user(settings.CVAT_QUEUES.EXPORT_DATA.value, user_id):
Expand Down
8 changes: 4 additions & 4 deletions cvat/apps/engine/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _handle_rq_job_v1(
) -> Optional[Response]:

def is_result_outdated() -> bool:
return ExportRQMeta.from_job(rq_job).request.timestamp < instance_update_time
return ExportRQMeta.for_job(rq_job).request.timestamp < instance_update_time

def handle_local_download() -> Response:
with dm.util.get_export_cache_lock(
Expand Down Expand Up @@ -340,7 +340,7 @@ def handle_local_download() -> Response:
f"Export to {self.export_args.location} location is not implemented yet"
)
elif rq_job_status == RQJobStatus.FAILED:
exc_info = ExportRQMeta.from_job(rq_job).formatted_exception or str(rq_job.exc_info)
exc_info = ExportRQMeta.for_job(rq_job).formatted_exception or str(rq_job.exc_info)
rq_job.delete()
return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
elif (
Expand Down Expand Up @@ -549,7 +549,7 @@ def _handle_rq_job_v1(
) -> Optional[Response]:

def is_result_outdated() -> bool:
return ExportRQMeta.from_job(rq_job).request.timestamp < last_instance_update_time
return ExportRQMeta.for_job(rq_job).request.timestamp < last_instance_update_time

last_instance_update_time = timezone.localtime(self.db_instance.updated_date)
timestamp = self.get_timestamp(last_instance_update_time)
Expand Down Expand Up @@ -645,7 +645,7 @@ def is_result_outdated() -> bool:
f"Export to {self.export_args.location} location is not implemented yet"
)
elif rq_job_status == RQJobStatus.FAILED:
exc_info = ExportRQMeta.from_job(rq_job).formatted_exception or str(rq_job.exc_info)
exc_info = ExportRQMeta.for_job(rq_job).formatted_exception or str(rq_job.exc_info)
rq_job.delete()
return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
elif (
Expand Down
2 changes: 1 addition & 1 deletion cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ def _import(
failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds()
)
else:
rq_job_meta = ImportRQMeta.from_job(rq_job)
rq_job_meta = ImportRQMeta.for_job(rq_job)
if rq_job_meta.user.id != request.user.id:
return Response(status=status.HTTP_403_FORBIDDEN)

Expand Down
2 changes: 1 addition & 1 deletion cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def wait_for_rq_job(rq_job: rq.job.Job):
return
elif job_status in ("failed",):
rq_job.get_meta() # refresh from Redis
job_meta = RQMetaWithFailureInfo.from_job(rq_job)
job_meta = RQMetaWithFailureInfo.for_job(rq_job)
exc_type = job_meta.exc_type or Exception
exc_args = job_meta.exc_args or ("Cannot create chunk",)
raise exc_type(*exc_args)
Expand Down
143 changes: 88 additions & 55 deletions cvat/apps/engine/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from abc import ABCMeta, abstractmethod
from datetime import datetime
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, Optional, Protocol, Union
from uuid import UUID

import attrs
Expand All @@ -24,6 +24,15 @@


class RQJobMetaField:
class UserField:
ID = "id"
USERNAME = "username"
EMAIL = "email"

class RequestField:
UUID = "uuid"
TIMESTAMP = "timestamp"

# common fields
FORMATTED_EXCEPTION = "formatted_exception"
REQUEST = "request"
Expand All @@ -46,7 +55,11 @@ class RQJobMetaField:
TMP_FILE = "tmp_file"


class _AbstractAttribute:
class WithMeta(Protocol):
meta: dict[str, Any]


class _AbstractRQMetaAttribute:
def __init__(
self, key: str, *, optional: bool = False, validator: Callable | None = None
) -> None:
Expand All @@ -56,44 +69,62 @@ def __init__(
self._optional = optional


class _SettableAttribute(_AbstractAttribute):
class _GettableRQMetaAttribute(_AbstractRQMetaAttribute):
def __get__(self, instance: WithMeta, objtype: type[WithMeta] | None = None):
if self._optional:
return instance.meta.get(self._key)

return instance.meta[self._key]


class _SettableRQMetaAttribute(_AbstractRQMetaAttribute):
def validate(self, value):
if value is None and not self._optional:
raise ValueError(f"{self._key} is required")
if value is not None and self._validator and not self._validator(value):
raise ValueError("Wrong type")

def __set__(self, instance: AbstractRQMeta, value):
def __set__(self, instance: WithMeta, value: Any):
self.validate(value)
instance.meta[self._key] = value


class _GettableAttribute(_AbstractAttribute):
def __get__(self, instance: AbstractRQMeta, objtype: type[AbstractRQMeta] | None = None):
if self._optional:
return instance.meta.get(self._key)
class ImmutableRQMetaAttribute(_GettableRQMetaAttribute):
pass

# TODO: refactor
if self._key == RQJobMetaField.USER:
return UserInfo(data=instance.meta[self._key])
elif self._key == RQJobMetaField.REQUEST:
return RequestInfo(data=instance.meta[self._key])

return instance.meta[self._key]
class MutableRQMetaAttribute(_GettableRQMetaAttribute, _SettableRQMetaAttribute):
pass


class ImmutableRQMetaAttribute(_GettableAttribute):
pass
class UserRQMetaAttribute(ImmutableRQMetaAttribute):
def __init__(self, *, optional: bool = False, validator: Callable | None = None) -> None:
super().__init__(RQJobMetaField.USER, optional=optional, validator=validator)

def __get__(self, instance: WithMeta, objtype: type[WithMeta] | None = None):
assert RQJobMetaField.USER == self._key
return UserMeta(data=instance.meta[self._key])

class MutableRQMetaAttribute(_GettableAttribute, _SettableAttribute):
pass

class RequestRQMetaAttribute(ImmutableRQMetaAttribute):
def __init__(self, *, optional: bool = False, validator: Callable | None = None) -> None:
super().__init__(RQJobMetaField.REQUEST, optional=optional, validator=validator)

def __get__(self, instance: WithMeta, objtype: type[WithMeta] | None = None):
assert RQJobMetaField.REQUEST == self._key
return RequestMeta(data=instance.meta[self._key])

class UserInfo:
id: int = ImmutableRQMetaAttribute("id", validator=lambda x: isinstance(x, int))
username: str = ImmutableRQMetaAttribute("username", validator=lambda x: isinstance(x, str))
email: str = ImmutableRQMetaAttribute("email", validator=lambda x: isinstance(x, str))

class UserMeta:
id: int = ImmutableRQMetaAttribute(
RQJobMetaField.UserField.ID, validator=lambda x: isinstance(x, int)
)
username: str = ImmutableRQMetaAttribute(
RQJobMetaField.UserField.USERNAME, validator=lambda x: isinstance(x, str)
)
email: str = ImmutableRQMetaAttribute(
RQJobMetaField.UserField.EMAIL, validator=lambda x: isinstance(x, str)
)

def to_dict(self):
return self.meta
Expand All @@ -106,9 +137,13 @@ def meta(self):
return self._data


class RequestInfo:
uuid = ImmutableRQMetaAttribute("uuid", validator=lambda x: isinstance(x, str))
timestamp = ImmutableRQMetaAttribute("timestamp", validator=lambda x: isinstance(x, datetime))
class RequestMeta:
uuid = ImmutableRQMetaAttribute(
RQJobMetaField.RequestField.UUID, validator=lambda x: isinstance(x, str)
)
timestamp = ImmutableRQMetaAttribute(
RQJobMetaField.RequestField.TIMESTAMP, validator=lambda x: isinstance(x, datetime)
)

def __init__(self, *, data: dict[str, str | int]) -> None:
self._data = data
Expand All @@ -132,10 +167,10 @@ def meta(self) -> dict[RQJobMetaField, Any]:
return self._job.meta if self._job else self._data

def to_dict(self) -> dict:
pass
return self.meta

@classmethod
def from_job(cls, job: RQJob):
def for_job(cls, job: RQJob):
return cls(job=job)

def save(self) -> None:
Expand Down Expand Up @@ -180,12 +215,11 @@ def _get_resettable_fields() -> list[RQJobMetaField]:


class BaseRQMeta(RQMetaWithFailureInfo):
# immutable and required fields
user: UserInfo = ImmutableRQMetaAttribute(RQJobMetaField.USER)
request: RequestInfo = ImmutableRQMetaAttribute(RQJobMetaField.REQUEST)
# immutable && required fields
user: UserMeta = UserRQMetaAttribute()
request: RequestMeta = RequestRQMetaAttribute()

# immutable and optional fields
# TODO: only __get__ should be allowed
# immutable && optional fields
org_id: int | None = ImmutableRQMetaAttribute(
RQJobMetaField.ORG_ID, validator=lambda x: isinstance(x, int), optional=True
)
Expand All @@ -202,11 +236,10 @@ class BaseRQMeta(RQMetaWithFailureInfo):
RQJobMetaField.JOB_ID, validator=lambda x: isinstance(x, int), optional=True
)

# mutable fields
# mutable && optional fields
progress: float | None = MutableRQMetaAttribute(
RQJobMetaField.PROGRESS, validator=lambda x: isinstance(x, float), optional=True
)
# todo: not nullable?
status: str | None = MutableRQMetaAttribute(
RQJobMetaField.STATUS, validator=lambda x: isinstance(x, str), optional=True
)
Expand Down Expand Up @@ -239,30 +272,32 @@ def build(

return cls(
data={
# TODO:
RQJobMetaField.USER: UserInfo(
id=getattr(user, "id", None),
username=getattr(user, "username", None),
email=getattr(user, "email", None),
),
RQJobMetaField.REQUEST: RequestInfo(
uuid=request.uuid,
timestamp=timezone.localtime(),
),
RQJobMetaField.USER: UserMeta(
data={
RQJobMetaField.UserField.ID: user.id,
RQJobMetaField.UserField.USERNAME: user.username,
RQJobMetaField.UserField.EMAIL: getattr(user, "email", ""),
}
).to_dict(),
RQJobMetaField.REQUEST: RequestMeta(
data={
RQJobMetaField.RequestField.UUID: request.uuid,
RQJobMetaField.RequestField.TIMESTAMP: timezone.localtime(),
}
).to_dict(),
RQJobMetaField.ORG_ID: oid,
RQJobMetaField.ORG_SLUG: oslug,
RQJobMetaField.PROJECT_ID: pid,
RQJobMetaField.TASK_ID: tid,
RQJobMetaField.JOB_ID: jid,
}
)
).to_dict()


class ExportRQMeta(BaseRQMeta):
# will be changed to ExportResultInfo in the next PR
result_url: str | None = ImmutableRQMetaAttribute(
RQJobMetaField.RESULT_URL, validator=lambda x: isinstance(x, str), optional=True
)
) # will be changed to ExportResultInfo in the next PR

@staticmethod
def _get_resettable_fields() -> list[RQJobMetaField]:
Expand All @@ -279,21 +314,19 @@ def build_for(
):
base_meta = BaseRQMeta.build(request=request, db_obj=db_obj)

return cls(data={**base_meta, RQJobMetaField.RESULT_URL: result_url})
return cls(data={**base_meta, RQJobMetaField.RESULT_URL: result_url}).to_dict()


class ImportRQMeta(BaseRQMeta):
# immutable && optional fields
# todo: immutable
tmp_file: str | None = ImmutableRQMetaAttribute(
RQJobMetaField.TMP_FILE, validator=lambda x: isinstance(x, str), optional=True
)
) # used only when importing annotations|datasets|backups

# mutable fields
# used when importing project dataset
task_progress: float | None = MutableRQMetaAttribute(
RQJobMetaField.TASK_PROGRESS, validator=lambda x: isinstance(x, float), optional=True
)
) # used when importing project dataset

@staticmethod
def _get_resettable_fields() -> list[RQJobMetaField]:
Expand All @@ -311,11 +344,11 @@ def build_for(
):
base_meta = BaseRQMeta.build(request=request, db_obj=db_obj)

return cls({**base_meta, RQJobMetaField.TMP_FILE: tmp_file})
return cls(data={**base_meta, RQJobMetaField.TMP_FILE: tmp_file}).to_dict()


def is_rq_job_owner(rq_job: RQJob, user_id: int) -> bool:
return BaseRQMeta.from_job(rq_job).user.id == user_id
return BaseRQMeta.for_job(rq_job).user.id == user_id


@attrs.frozen()
Expand Down Expand Up @@ -457,7 +490,7 @@ def define_dependent_job(
job_ids = q.get_job_ids()
jobs = q.job_class.fetch_many(job_ids, q.connection)
jobs = filter(
lambda job: job and BaseRQMeta.from_job(job).user.id == user_id and f(job), jobs
lambda job: job and BaseRQMeta.for_job(job).user.id == user_id and f(job), jobs
)
all_user_jobs.extend(jobs)

Expand Down
Loading

0 comments on commit 9ce85ac

Please sign in to comment.