From 125552ba80f619f5484492cca1f05274f28bb571 Mon Sep 17 00:00:00 2001 From: Gordon Watts Date: Fri, 13 Dec 2024 00:33:03 +0100 Subject: [PATCH] Basic changes ot push minio and sx adaptor through out --- servicex/app/transforms.py | 3 +-- servicex/query_core.py | 7 +++-- servicex/servicex_client.py | 54 ++++++++++++++++++++++++------------- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/servicex/app/transforms.py b/servicex/app/transforms.py index c31dc673..5043cdae 100644 --- a/servicex/app/transforms.py +++ b/servicex/app/transforms.py @@ -142,7 +142,6 @@ async def download_with_progress(filename) -> Path: @transforms_app.command(no_args_is_help=True) def delete( - url: Optional[str] = url_cli_option, backend: Optional[str] = backend_cli_option, transform_id_list: List[str] = typer.Argument(help="Transform ID"), ): @@ -150,7 +149,7 @@ def delete( Delete a completed transform along with the result files. """ import servicex.app.cache - sx = ServiceXClient(url=url, backend=backend) + sx = ServiceXClient(backend=backend) for transform_id in transform_id_list: asyncio.run(sx.delete_transform(transform_id)) servicex.app.cache.delete(transform_id) diff --git a/servicex/query_core.py b/servicex/query_core.py index d58841c3..57e45fe2 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -32,7 +32,7 @@ from abc import ABC from asyncio import Task, CancelledError import logging -from typing import List, Optional, Union +from typing import List, Optional, Type, Union from servicex.expandable_progress import ExpandableProgress from rich.logging import RichHandler @@ -80,6 +80,7 @@ def __init__( ignore_cache: bool = False, query_string_generator: Optional[QueryStringGenerator] = None, fail_if_incomplete: bool = True, + minio_adaptor_class: Optional[Type] = None, ): r""" This is the main class for constructing transform requests and receiving the @@ -125,6 +126,8 @@ def __init__( self.servicex_polling_interval = servicex_polling_interval self.minio_polling_interval = minio_polling_interval + self.minio_adaptor_class = minio_adaptor_class if minio_adaptor_class is not None else MinioAdapter + def generate_selection_string(self) -> str: if self.query_string_generator is None: raise RuntimeError('query string generator not set') @@ -483,7 +486,7 @@ async def retrieve_current_transform_status(self): # status. This includes the minio host and credentials. We use the # transform id as the bucket. if not self.minio: - self.minio = MinioAdapter.for_transform(self.current_status) + self.minio = self.minio_adaptor_class.for_transform(self.current_status) async def download_files( self, diff --git a/servicex/servicex_client.py b/servicex/servicex_client.py index 9c1f1cb0..2798bc35 100644 --- a/servicex/servicex_client.py +++ b/servicex/servicex_client.py @@ -26,7 +26,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging -from typing import Optional, List, TypeVar, Any, Mapping, Union, cast +from typing import Optional, List, Type, TypeVar, Any, Mapping, Union, cast from pathlib import Path from servicex.configuration import Configuration @@ -114,7 +114,7 @@ def _load_ServiceXSpec( file_path = config import sys - from ccorp.ruamel.yaml.include import YAML + from ccorp.ruamel.yaml.include import YAML # type: ignore yaml = YAML() if sys.version_info < (3, 10): @@ -137,7 +137,14 @@ def _load_ServiceXSpec( return config -def _build_datasets(config, config_path, servicex_name, fail_if_incomplete): +def _build_datasets( + config, + config_path, + servicex_name, + fail_if_incomplete, + servicex_adaptor, + minio_adaptor_class, +): def get_codegen(_sample: Sample, _general: General): if _sample.Codegen is not None: return _sample.Codegen @@ -148,7 +155,7 @@ def get_codegen(_sample: Sample, _general: General): elif isinstance(_sample.Query, Query): return _sample.Query.codegen - sx = ServiceXClient(backend=servicex_name, config_path=config_path) + sx = ServiceXClient(backend=servicex_name, config_path=config_path, servicex_adaptor=servicex_adaptor) datasets = [] for sample in config.Sample: query = sx.generic_query( @@ -158,7 +165,8 @@ def get_codegen(_sample: Sample, _general: General): result_format=config.General.OutputFormat.to_ResultFormat(), ignore_cache=sample.IgnoreLocalCache, query=sample.Query, - fail_if_incomplete=fail_if_incomplete + fail_if_incomplete=fail_if_incomplete, + minio_adaptor_class=minio_adaptor_class ) logger.debug(f"Query string: {query.generate_selection_string()}") query.ignore_cache = sample.IgnoreLocalCache @@ -199,11 +207,13 @@ def deliver( config_path: Optional[str] = None, servicex_name: Optional[str] = None, return_exceptions: bool = True, - fail_if_incomplete: bool = True + fail_if_incomplete: bool = True, + servicex_adaptor: Optional[ServiceXAdapter] = None, + minio_adaptor_class: Optional[Type] = None, ): config = _load_ServiceXSpec(config) - datasets = _build_datasets(config, config_path, servicex_name, fail_if_incomplete) + datasets = _build_datasets(config, config_path, servicex_name, fail_if_incomplete, servicex_adaptor, minio_adaptor_class=minio_adaptor_class) group = DatasetGroup(datasets) @@ -223,7 +233,7 @@ class ServiceXClient: Instances of this class are factories for `Datasets`` """ - def __init__(self, backend=None, url=None, config_path=None): + def __init__(self, backend=None, url=None, config_path=None, servicex_adaptor=None): r""" If both `backend` and `url` are unspecified then it will attempt to pick up the default backend from `.servicex` @@ -248,15 +258,18 @@ def __init__(self, backend=None, url=None, config_path=None): if bool(url) == bool(backend): raise ValueError("Only specify backend or url... not both") - if url: - self.servicex = ServiceXAdapter(url) - elif backend: - if backend not in self.endpoints: - raise ValueError(f"Backend {backend} not defined in .servicex file") - self.servicex = ServiceXAdapter( - self.endpoints[backend].endpoint, - refresh_token=self.endpoints[backend].token, - ) + if servicex_adaptor is None: + if url: + self.servicex = ServiceXAdapter(url) + elif backend: + if backend not in self.endpoints: + raise ValueError(f"Backend {backend} not defined in .servicex file") + self.servicex = ServiceXAdapter( + self.endpoints[backend].endpoint, + refresh_token=self.endpoints[backend].token, + ) + else: + self.servicex = servicex_adaptor self.query_cache = QueryCache(self.config) self.code_generators = set(self.get_code_generators(backend).keys()) @@ -308,7 +321,7 @@ def delete_transform(self, transform_id): """ return self.servicex.delete_transform(transform_id) - def get_code_generators(self, backend=None): + def get_code_generators(self, backend: Optional[str]=None): r""" Retrieve the code generators deployed with the serviceX instance :return: The list of code generators as json dictionary @@ -321,6 +334,7 @@ def get_code_generators(self, backend=None): return cached_backends["codegens"] else: code_generators = self.servicex.get_code_generators() + assert backend is not None, "Backend must be specified to cache code generators" self.query_cache.update_codegen_by_backend(backend, code_generators) return code_generators @@ -333,6 +347,7 @@ def generic_query( result_format: ResultFormat = ResultFormat.parquet, ignore_cache: bool = False, fail_if_incomplete: bool = True, + minio_adaptor_class: Optional[Type] = None, ) -> Query: r""" Generate a Query object for a generic codegen specification @@ -377,6 +392,7 @@ def generic_query( result_format=result_format, ignore_cache=ignore_cache, query_string_generator=query, - fail_if_incomplete=fail_if_incomplete + fail_if_incomplete=fail_if_incomplete, + minio_adaptor_class=minio_adaptor_class, ) return qobj