Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/cdmtaskserviceclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

# note - keeping the docs to < 80 chars per line for easy display in ipython terms

import datetime
import json
import logging
import requests
import sys
import time
from typing import Any, BinaryIO
from urllib.parse import urlencode


# TODO TEST logging
Expand Down Expand Up @@ -266,6 +268,61 @@ def get_sites(self, fmt="text"
sites.append(s)
return "\n".join(sites)

def list_jobs(
self,
cluster: str | None = None,
state: str | None = None,
after: datetime.datetime | None = None,
before: datetime.datetime | None = None,
limit: int | None = 1000
) -> dict[str, list[dict[str, Any]]]:
"""
List jobs.

cluster - filter the jobs by cluster. One of kbase, perlmutter-jaws,
or lawrencium-jaws.
state - filter the jobs by job state:
created
download_submitted
job_submittting
job_submitted
upload_submitting
upload_submitted
error_processing_submitting
error_processing_submitted
complete
error
after - only return jobs after this time (inclusive).
Must be a timezone aware datetime.
before - only return jobs before this time (exclusive).
Must be a timezone aware datetime.
limit - limit the number of jobs returned.
If provided must be between 1 and 1000.

Returns the same job structure as the service, except that
each job will have a `_job` key added with a Job class instance
as the value, allowing further querying of individual jobs.
"""
# TODO TEST add tests. Pretty simple method so just manually testing for now
limit = 1000 if limit is None else limit
if limit < 1 or limit > 1000:
raise ValueError("Limit must be between 1 and 1000")
query = {
"limit": limit,
}
if cluster:
query["cluster"] = cluster
if state:
query["state"] = state
if after:
query["after"] = _process_datetime(after, "after")
if before:
query["before"] = _process_datetime(before, "before")
res = self._cts_request(f"jobs?{urlencode(query)}")
for j in res["jobs"]:
j["_job"] = Job(j["id"], self)
return res

def get_job_by_id(self, job_id: str) -> "Job": # yuck, but this is the least bad sol'n
"""
Get a Job instance given a job ID. The instance is lazily created -
Expand Down Expand Up @@ -596,6 +653,14 @@ def _get_next_backoff(self, backoff_index: int):
return bi, self._BACKOFF[bi]


def _process_datetime(dt: datetime.datetime | None, name: str):
if not dt:
return None
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
raise ValueError(f"{name} datetime is naive; a timezone must be included")
return dt.isoformat()


def _not_falsy(putative: Any, name: str) -> Any:
if not putative:
raise ValueError(f"{name} is required")
Expand Down
Loading