Skip to content

Allow user specified Adaptor and MiniIO Interfaces #537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions servicex/app/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,14 @@ async def download_with_progress(filename) -> Path:

@transforms_app.command(no_args_is_help=True)
def delete(
url: Optional[str] = url_cli_option,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this change is a bit independent of the rest of the PR, should it be included here?

backend: Optional[str] = backend_cli_option,
transform_id_list: List[str] = typer.Argument(help="Transform ID"),
):
"""
Delete a completed transform along with the result files.
"""
import servicex.app.cache
sx = ServiceXClient(url=url, backend=backend)
sx = ServiceXClient(backend=backend)
for transform_id in transform_id_list:
asyncio.run(sx.delete_transform(transform_id))
servicex.app.cache.delete(transform_id)
Expand Down
7 changes: 5 additions & 2 deletions servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from abc import ABC
from asyncio import Task, CancelledError
import logging
from typing import List, Optional, Union
from typing import List, Optional, Type, Union
from servicex.expandable_progress import ExpandableProgress
from rich.logging import RichHandler

Expand Down Expand Up @@ -80,6 +80,7 @@ def __init__(
ignore_cache: bool = False,
query_string_generator: Optional[QueryStringGenerator] = None,
fail_if_incomplete: bool = True,
minio_adaptor_class: Optional[Type] = None,
):
r"""
This is the main class for constructing transform requests and receiving the
Expand Down Expand Up @@ -125,6 +126,8 @@ def __init__(
self.servicex_polling_interval = servicex_polling_interval
self.minio_polling_interval = minio_polling_interval

self.minio_adaptor_class = minio_adaptor_class if minio_adaptor_class is not None else MinioAdapter

def generate_selection_string(self) -> str:
if self.query_string_generator is None:
raise RuntimeError('query string generator not set')
Expand Down Expand Up @@ -483,7 +486,7 @@ async def retrieve_current_transform_status(self):
# status. This includes the minio host and credentials. We use the
# transform id as the bucket.
if not self.minio:
self.minio = MinioAdapter.for_transform(self.current_status)
self.minio = self.minio_adaptor_class.for_transform(self.current_status)

async def download_files(
self,
Expand Down
54 changes: 35 additions & 19 deletions servicex/servicex_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import logging
from typing import Optional, List, TypeVar, Any, Mapping, Union, cast
from typing import Optional, List, Type, TypeVar, Any, Mapping, Union, cast
from pathlib import Path

from servicex.configuration import Configuration
Expand Down Expand Up @@ -114,7 +114,7 @@ def _load_ServiceXSpec(
file_path = config

import sys
from ccorp.ruamel.yaml.include import YAML
from ccorp.ruamel.yaml.include import YAML # type: ignore
yaml = YAML()

if sys.version_info < (3, 10):
Expand All @@ -137,7 +137,14 @@ def _load_ServiceXSpec(
return config


def _build_datasets(config, config_path, servicex_name, fail_if_incomplete):
def _build_datasets(
config,
config_path,
servicex_name,
fail_if_incomplete,
servicex_adaptor,
minio_adaptor_class,
):
def get_codegen(_sample: Sample, _general: General):
if _sample.Codegen is not None:
return _sample.Codegen
Expand All @@ -148,7 +155,7 @@ def get_codegen(_sample: Sample, _general: General):
elif isinstance(_sample.Query, Query):
return _sample.Query.codegen

sx = ServiceXClient(backend=servicex_name, config_path=config_path)
sx = ServiceXClient(backend=servicex_name, config_path=config_path, servicex_adaptor=servicex_adaptor)
datasets = []
for sample in config.Sample:
query = sx.generic_query(
Expand All @@ -158,7 +165,8 @@ def get_codegen(_sample: Sample, _general: General):
result_format=config.General.OutputFormat.to_ResultFormat(),
ignore_cache=sample.IgnoreLocalCache,
query=sample.Query,
fail_if_incomplete=fail_if_incomplete
fail_if_incomplete=fail_if_incomplete,
minio_adaptor_class=minio_adaptor_class
)
logger.debug(f"Query string: {query.generate_selection_string()}")
query.ignore_cache = sample.IgnoreLocalCache
Expand Down Expand Up @@ -199,11 +207,13 @@ def deliver(
config_path: Optional[str] = None,
servicex_name: Optional[str] = None,
return_exceptions: bool = True,
fail_if_incomplete: bool = True
fail_if_incomplete: bool = True,
servicex_adaptor: Optional[ServiceXAdapter] = None,
minio_adaptor_class: Optional[Type] = None,
):
config = _load_ServiceXSpec(config)

datasets = _build_datasets(config, config_path, servicex_name, fail_if_incomplete)
datasets = _build_datasets(config, config_path, servicex_name, fail_if_incomplete, servicex_adaptor, minio_adaptor_class=minio_adaptor_class)

group = DatasetGroup(datasets)

Expand All @@ -223,7 +233,7 @@ class ServiceXClient:
Instances of this class are factories for `Datasets``
"""

def __init__(self, backend=None, url=None, config_path=None):
def __init__(self, backend=None, url=None, config_path=None, servicex_adaptor=None):
r"""
If both `backend` and `url` are unspecified then it will attempt to pick up
the default backend from `.servicex`
Expand All @@ -248,15 +258,18 @@ def __init__(self, backend=None, url=None, config_path=None):
if bool(url) == bool(backend):
raise ValueError("Only specify backend or url... not both")

if url:
self.servicex = ServiceXAdapter(url)
elif backend:
if backend not in self.endpoints:
raise ValueError(f"Backend {backend} not defined in .servicex file")
self.servicex = ServiceXAdapter(
self.endpoints[backend].endpoint,
refresh_token=self.endpoints[backend].token,
)
if servicex_adaptor is None:
if url:
self.servicex = ServiceXAdapter(url)
elif backend:
if backend not in self.endpoints:
raise ValueError(f"Backend {backend} not defined in .servicex file")
self.servicex = ServiceXAdapter(
self.endpoints[backend].endpoint,
refresh_token=self.endpoints[backend].token,
)
else:
self.servicex = servicex_adaptor

self.query_cache = QueryCache(self.config)
self.code_generators = set(self.get_code_generators(backend).keys())
Expand Down Expand Up @@ -308,7 +321,7 @@ def delete_transform(self, transform_id):
"""
return self.servicex.delete_transform(transform_id)

def get_code_generators(self, backend=None):
def get_code_generators(self, backend: Optional[str]=None):
r"""
Retrieve the code generators deployed with the serviceX instance
:return: The list of code generators as json dictionary
Expand All @@ -321,6 +334,7 @@ def get_code_generators(self, backend=None):
return cached_backends["codegens"]
else:
code_generators = self.servicex.get_code_generators()
assert backend is not None, "Backend must be specified to cache code generators"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code that we can ever hit?

self.query_cache.update_codegen_by_backend(backend, code_generators)
return code_generators

Expand All @@ -333,6 +347,7 @@ def generic_query(
result_format: ResultFormat = ResultFormat.parquet,
ignore_cache: bool = False,
fail_if_incomplete: bool = True,
minio_adaptor_class: Optional[Type] = None,
) -> Query:
r"""
Generate a Query object for a generic codegen specification
Expand Down Expand Up @@ -377,6 +392,7 @@ def generic_query(
result_format=result_format,
ignore_cache=ignore_cache,
query_string_generator=query,
fail_if_incomplete=fail_if_incomplete
fail_if_incomplete=fail_if_incomplete,
minio_adaptor_class=minio_adaptor_class,
)
return qobj
Loading