diff --git a/ads/__init__.py b/ads/__init__.py index 8a2faf209..5b5f3424f 100644 --- a/ads/__init__.py +++ b/ads/__init__.py @@ -1,15 +1,13 @@ #!/usr/bin/env python -# -*- coding: utf-8 -*-- -# Copyright (c) 2020, 2023 Oracle and/or its affiliates. +# Copyright (c) 2020, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -from __future__ import print_function, division, absolute_import + +import logging import os import sys -import logging -import json -from typing import Callable, Dict, Optional, Union +from pkgutil import extend_path # https://packaging.python.org/en/latest/guides/single-sourcing-package-version/#single-sourcing-the-package-version if sys.version_info >= (3, 8): @@ -17,31 +15,42 @@ else: import importlib_metadata as metadata +__path__ = extend_path(__path__, __name__) __version__ = metadata.version("oracle_ads") -import oci -import matplotlib.font_manager # causes matplotlib to regenerate its fonts +try: + import ads.auth # noqa: F401 +except ImportError as ex: + raise ImportError( + "The `ads.auth` module is not available because the optional `oracle-ads-auth` package is missing.\n" + "To use authentication features in ADS, please install it using:\n" + " pip install oracle-ads-auth" + ) from ex + +import matplotlib.font_manager # causes matplotlib to regenerate its fonts +import oci import ocifs + +import ads.telemetry +from ads.auth import set_auth +from ads.common.config import Config from ads.common.decorator.deprecate import deprecated -from ads.common.ipython import configure_plotting, _log_traceback -from ads.feature_engineering.accessor.series_accessor import ADSSeriesAccessor +from ads.common.ipython import _log_traceback, configure_plotting from ads.feature_engineering.accessor.dataframe_accessor import ADSDataFrameAccessor -from ads.common import auth -from ads.common.auth import set_auth -from ads.common.config import Config +from ads.feature_engineering.accessor.series_accessor import ADSSeriesAccessor os.environ["GIT_PYTHON_REFRESH"] = "quiet" - debug_mode = os.environ.get("DEBUG_MODE", False) documentation_mode = os.environ.get("DOCUMENTATION_MODE", "False") == "True" test_mode = os.environ.get("TEST_MODE", False) -resource_principal_mode = bool( - os.environ.get("RESOURCE_PRINCIPAL_MODE", False) -) # deprecated with is_resource_principal_mode() from ads.common.utils +resource_principal_mode = bool(os.environ.get("RESOURCE_PRINCIPAL_MODE", False)) orig_ipython_traceback = None +# Registers a global hook for enriching OCI client configuration with user agent metadata. +ads.auth.register_user_agent_hook(hook=ads.telemetry.update_oci_client_config) + def getLogger(name="ads"): logger = logging.getLogger(name) @@ -125,9 +134,6 @@ def _set_test_mode(mode=False): def hello(): - import oci - import ocifs - print( f""" diff --git a/ads/ads_auth/.gitignore b/ads/ads_auth/.gitignore new file mode 100644 index 000000000..2d4afdb05 --- /dev/null +++ b/ads/ads_auth/.gitignore @@ -0,0 +1,165 @@ +./storage/ +.DS_Store +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +bin/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +etc/ +include/ +lib/ +lib64/ +parts/ +sdist/ +share/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +.ruff_cache + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints +notebooks/ + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +pyvenv.cfg + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Jetbrains +.idea +modules/ +*.swp + +# VsCode +.vscode + +# pipenv +Pipfile +Pipfile.lock + +# pyright +pyrightconfig.json + +# persist dir for chromadb test +/data/ + +# vim +*.swp + +# Python Wheel +*.whl + +# The demo folder +.demo diff --git a/ads/ads_auth/CONTRIBUTING.md b/ads/ads_auth/CONTRIBUTING.md new file mode 100644 index 000000000..24b923860 --- /dev/null +++ b/ads/ads_auth/CONTRIBUTING.md @@ -0,0 +1,62 @@ +# Contributing to Oracle Accelerated Data Science (ADS) + +We welcome your contributions! There are multiple ways to contribute. + +## Opening issues + +For bugs or enhancement requests, file a GitHub issue unless it's +security related. When filing a bug, remember that the better written the bug is, +the more likely it is to be fixed. If you think you've found a security +vulnerability, don't raise a GitHub issue and follow the instructions in our +[security policy](./SECURITY.md). + +## Contributing code + +We welcome your code contributions. Before submitting code using a pull request, +you must sign the [Oracle Contributor Agreement](https://oca.opensource.oracle.com) (OCA) and +your commits need to include the following line using the name and e-mail +address you used to sign the OCA: + +```text +Signed-off-by: Your Name +``` + +This can be automatically added to pull requests by committing with `--sign-off` +or `-s`, for example: + +```text +git commit --signoff +``` + +Only pull requests from committers that can be verified as having signed the OCA +are accepted. + +## Branching strategy + +Use GitHub flow branching strategy. With the GitHub flow, there are only 2 branches: main and feature. + +1. Every change that is worked on is branched directly off of main into a feature branch. +2. Once a feature is ready it is tested on the feature branch and the code is reviewed before being merged to main. + +Hotfixes, bugfixes are treated the same as feature branches. + +## Pull request process + +1. Ensure there is an issue created to track and discuss the fix or enhancement + you intend to submit. +2. Fork this repository. +3. Create a branch in your fork to implement the changes. We recommend using + the issue number as part of your branch name, for example `1234-fixes`. +4. Ensure that any documentation is updated with the changes that are required + by your change. +5. Ensure that any samples are updated if the base image has been changed. +6. Submit the pull request. *Don't leave the pull request blank*. Explain exactly + what your changes are meant to do and provide simple steps about how to validate + your changes. Ensure that you reference the issue you created as well. +7. We assign the pull request to 2-3 people for review before it is merged. + +## Code of conduct + +Follow the [Golden Rule](https://en.wikipedia.org/wiki/Golden_Rule). If you'd +like more specific guidelines, see the +[Contributor Covenant Code of Conduct](https://www.contributor-covenant.org/version/1/4/code-of-conduct/). diff --git a/ads/ads_auth/LICENSE b/ads/ads_auth/LICENSE new file mode 100644 index 000000000..1441cd473 --- /dev/null +++ b/ads/ads_auth/LICENSE @@ -0,0 +1,35 @@ +Copyright (c) 2020, 2022 Oracle and/or its affiliates. + +The Universal Permissive License (UPL), Version 1.0 + +Subject to the condition set forth below, permission is hereby granted to any +person obtaining a copy of this software, associated documentation and/or data +(collectively the "Software"), free of charge and under any and all copyright +rights in the Software, and any and all patent rights owned or freely +licensable by each licensor hereunder covering either (i) the unmodified +Software as contributed to or provided by such licensor, or (ii) the Larger +Works (as defined below), to deal in both + +(a) the Software, and +(b) any piece of software and/or hardware listed in the lrgrwrks.txt file if +one is included with the Software (each a "Larger Work" to which the Software +is contributed by such licensors), + +without restriction, including without limitation the rights to copy, create +derivative works of, display, perform, and distribute the Software and make, +use, sell, offer for sale, import, export, have made, and have sold the +Software and the Larger Work(s), and to sublicense the foregoing rights on +either these or other terms. + +This license is subject to the following condition: +The above copyright notice and either this complete permission notice or at +a minimum a reference to the UPL must be included in all copies or +substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/ads/ads_auth/Makefile b/ads/ads_auth/Makefile new file mode 100644 index 000000000..f822a4ed5 --- /dev/null +++ b/ads/ads_auth/Makefile @@ -0,0 +1,31 @@ +GIT_ROOT ?= $(shell git rev-parse --show-toplevel) + +dist: clean + @python3 -m build + +publish: dist + @twine upload dist/* + +clean: + @echo "Cleaning - removing dist, *.pyc, Thumbs.db and other files" + @rm -rf dist build oracle_ads.egg-info + @find ./ -name '*.pyc' -exec rm -f {} \; + @find ./ -name 'Thumbs.db' -exec rm -f {} \; + @find ./ -name '*~' -exec rm -f {} \; + @find ./ -name '.DS_Store' -exec rm -f {} \; + +help: ## Show all Makefile targets. + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[33m%-30s\033[0m %s\n", $$1, $$2}' + +format: ## Run code autoformatters (black). + pre-commit install + git ls-files | xargs pre-commit run black --files + +lint: ## Run linters: pre-commit (black, ruff, codespell) and mypy + pre-commit install && git ls-files | xargs pre-commit run --show-diff-on-failure --files + +test: ## Run tests via pytest. + pytest tests + +watch-docs: ## Build and watch documentation. + sphinx-autobuild docs/ docs/_build/html --open-browser --watch $(GIT_ROOT)/llama_index/ diff --git a/ads/ads_auth/README.md b/ads/ads_auth/README.md new file mode 100644 index 000000000..86c130cd2 --- /dev/null +++ b/ads/ads_auth/README.md @@ -0,0 +1,125 @@ +# Oracle Accelerated Data Science Auth (ADS Auth) + +A standalone Python package providing authentication management for Oracle Cloud Infrastructure (OCI) clients within the Oracle Accelerated Data Science ecosystem. ADS Auth can be installed and used independently from the broader `oracle-ads` package. + +--- + +## Features + +* **Multiple Auth Methods**: Supports API Key, Resource Principal, Instance Principal, and Security Token signers. +* **Easy Integration**: Simple functions to set and retrieve signer configs for OCI SDK clients. + +--- + +## Installation + +```bash +pip install oracle-ads-auth +``` + +> **Note**: ADS Auth requires Python 3.9+ and depends on `pydantic` and `oci`. + +--- + +## Quickstart + +```python +import ads.auth as auth +from oci.object_storage import ObjectStorageClient + +# 1. Set API Key authentication (default) +auth.set_auth(auth.AuthType.API_KEY) +client = ObjectStorageClient(**auth.default_signer()) + +# 2. Use Resource Principal (in OCI Functions / Data Science jobs) +auth.set_auth(auth.AuthType.RESOURCE_PRINCIPAL) +client = ObjectStorageClient(**auth.default_signer()) + +# 3. Override temporarily with Instance Principal +with auth.AuthContext(auth=auth.AuthType.INSTANCE_PRINCIPAL): + client = ObjectStorageClient(**auth.default_signer()) + +# 4. Convenience helper +client = ObjectStorageClient(**auth.api_keys("~/.oci/config", profile="DEV")) +``` + +--- + +## API Reference + +
+auth.set_auth + +```python +def set_auth( + auth: AuthType = AuthType.API_KEY, + oci_config_location: str = DEFAULT_LOCATION, + profile: str = DEFAULT_PROFILE, + config: Optional[Dict[str, Any]] = None, + signer: Optional[Any] = None, + signer_callable: Optional[Callable[..., Any]] = None, + signer_kwargs: Optional[Dict[str, Any]] = None, + client_kwargs: Optional[Dict[str, Any]] = None, +) -> None: +``` + +Configure the global authentication method and parameters for subsequent client creation. + +
+ +
+auth.create_signer + +```python +def create_signer(...) +``` + +Builds an authentication dictionary (`config`, `signer`, `client_kwargs`) for any OCI SDK client. + +
+ +
+auth.default_signer + +```python +def default_signer(client_kwargs: Optional[Dict] = None) -> Dict[str, Any]: +``` + +Retrieves the current session’s signer configuration. + +
+ +
+Convenience Wrappers + +* `auth.api_keys(oci_config: str|dict, profile: str, client_kwargs: dict)` +* `auth.resource_principal(client_kwargs: dict)` +* `auth.security_token(...)` + +
+ +
+AuthContext + +Context manager to temporarily override global auth state: + +```python +with auth.AuthContext(auth=AuthType.RESOURCE_PRINCIPAL): + # operations use resource principal +``` + +
+ + +## Contributing + +This project welcomes contributions from the community. Before submitting a pull request, please [review our contribution guide](./CONTRIBUTING.md) + + +## Security + +Consult the security guide [SECURITY.md](https://github.com/oracle/accelerated-data-science/blob/main/SECURITY.md) for our responsible security vulnerability disclosure process. + +## License + +Copyright (c) 2020, 2025 Oracle and/or its affiliates. Licensed under the [Universal Permissive License v1.0](https://oss.oracle.com/licenses/upl/) diff --git a/ads/ads_auth/SECURITY.md b/ads/ads_auth/SECURITY.md new file mode 100644 index 000000000..2ca81027f --- /dev/null +++ b/ads/ads_auth/SECURITY.md @@ -0,0 +1,38 @@ +# Reporting security vulnerabilities + +Oracle values the independent security research community and believes that +responsible disclosure of security vulnerabilities helps us ensure the security +and privacy of all our users. + +Please do NOT raise a GitHub Issue to report a security vulnerability. If you +believe you have found a security vulnerability, please submit a report to +[secalert_us@oracle.com][1] preferably with a proof of concept. Please review +some additional information on [how to report security vulnerabilities to Oracle][2]. +We encourage people who contact Oracle Security to use email encryption using +[our encryption key][3]. + +We ask that you do not use other channels or contact the project maintainers +directly. + +Non-vulnerability related security issues including ideas for new or improved +security features are welcome on GitHub Issues. + +## Security updates, alerts and bulletins + +Security updates will be released on a regular cadence. Many of our projects +will typically release security fixes in conjunction with the +Oracle Critical Patch Update program. Additional +information, including past advisories, is available on our [security alerts][4] +page. + +## Security-related information + +We will provide security related information such as a threat model, considerations +for secure use, or any known security issues in our documentation. Please note +that labs and sample code are intended to demonstrate a concept and may not be +sufficiently hardened for production use. + +[1]: mailto:secalert_us@oracle.com +[2]: https://www.oracle.com/corporate/security-practices/assurance/vulnerability/reporting.html +[3]: https://www.oracle.com/security-alerts/encryptionkey.html +[4]: https://www.oracle.com/security-alerts/ diff --git a/ads/ads_auth/THIRD_PARTY_LICENSES.txt b/ads/ads_auth/THIRD_PARTY_LICENSES.txt new file mode 100644 index 000000000..ac1fca8be --- /dev/null +++ b/ads/ads_auth/THIRD_PARTY_LICENSES.txt @@ -0,0 +1,22 @@ +Advanced Data Science and Data Services toolkit for Python Third Party License File + +------------------------ Third Party Components ------------------------ + +------------------------------- Licenses ------------------------------- +- Apache License 2.0 +- MIT License +- Universal Permissive License (UPL) + +======================== Third Party Components ======================== + +oci +* Copyright (c) 2016, 2020, Oracle and/or its affiliates. [see notices section above] +* License: Apache License 2.0, Universal Permissive License (UPL) +* Source code: https://github.com/oracle/oci-python-sdk +* Project home: https://docs.oracle.com/en-us/iaas/tools/python/2.44.0/index.html + +pydantic +* Copyright (c) 2017 to present Pydantic Services Inc. and individual contributors. +* License: The MIT License (MIT) +* Source code: https://github.com/pydantic/pydantic +* Project home: https://docs.pydantic.dev/latest/ diff --git a/ads/ads_auth/ads/__init__.py b/ads/ads_auth/ads/__init__.py new file mode 100644 index 000000000..84f0ee8eb --- /dev/null +++ b/ads/ads_auth/ads/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python + +# Copyright (c) 2025 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +from pkgutil import extend_path + +__path__ = extend_path(__path__, __name__) diff --git a/ads/ads_auth/ads/auth/__init__.py b/ads/ads_auth/ads/auth/__init__.py new file mode 100644 index 000000000..23acd001d --- /dev/null +++ b/ads/ads_auth/ads/auth/__init__.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +# Copyright (c) 2025 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +from .auth import ( + AuthContext, + AuthFactory, + AuthState, + AuthType, + SecurityToken, + SecurityTokenError, + api_keys, + create_signer, + default_signer, + register_user_agent_hook, + resource_principal, + security_token, + set_auth, +) + +__all__ = [ + "register_user_agent_hook", + "default_signer", + "create_signer", + "set_auth", + "AuthType", + "AuthContext", + "api_keys", + "resource_principal", + "security_token", + "SecurityToken", + "SecurityTokenError", + "AuthState", + "AuthFactory", +] diff --git a/ads/ads_auth/ads/auth/auth.py b/ads/ads_auth/ads/auth/auth.py new file mode 100644 index 000000000..b3e27ba3f --- /dev/null +++ b/ads/ads_auth/ads/auth/auth.py @@ -0,0 +1,1091 @@ +#!/usr/bin/env python + +# Copyright (c) 2021, 2025 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +""" +Module: ads.auth + +Provides authentication management for OCI clients within the ADS framework. + +Supported authentication types: + - API Key + - Resource Principal + - Instance Principal + - Security Token +""" + +import copy +import logging +import os +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from typing import Any, Callable, Dict, Optional, Type, Union + +import oci +from oci.config import DEFAULT_LOCATION, DEFAULT_PROFILE + +logger = logging.getLogger(__name__) + +# Buffer before expiry (in seconds) to refresh security token +SECURITY_TOKEN_LEFT_TIME = 600 + +# Global hook for enriching OCI config (e.g., for telemetry) +_user_agent_hook: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None + + +def register_user_agent_hook(hook: Callable[[Dict[str, Any]], Dict[str, Any]]) -> None: + """ + Registers a global hook for enriching OCI client configuration with user agent metadata. + + This hook allows consumers of the ADS Auth package (e.g., Oracle ADS) to inject + custom telemetry tags or headers into the OCI config dictionary used by all signers. + + The hook should be a pure function that accepts a config dict and returns an updated version. + + Parameters + ---------- + hook : Callable[[dict], dict] + A function that takes a config dict and returns a new or modified config dict. + + Examples + -------- + >>> from ads.auth import register_user_agent_hook + >>> def enrich_user_agent(cfg): + ... cfg["additional_user_agent"] = "MyLib/version=1.0#surface=NOTEBOOK" + ... return cfg + >>> register_user_agent_hook(enrich_user_agent) + + Notes + ----- + - The hook is global and affects all future signer creation calls. + - Only a single hook can be registered at a time. + - The hook should not raise; exceptions are caught and logged. + """ + global _user_agent_hook + _user_agent_hook = hook + + +def _apply_user_agent_hook(config: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """ + Applies the registered user agent enrichment hook to the OCI config, if any. + + Parameters + ---------- + config : dict or None + The original OCI config. + + Returns + ------- + dict + The potentially enriched OCI config. + """ + cfg = config.copy() if config else {} + if _user_agent_hook: + try: + cfg = _user_agent_hook(cfg) + except Exception as ex: + logger.debug("User agent hook failed: %s", ex) + return cfg + + +class SecurityTokenError(Exception): # pragma: no cover + pass + + +class AuthType(str, Enum): + """Enumerates supported authentication methods.""" + + API_KEY = "api_key" + RESOURCE_PRINCIPAL = "resource_principal" + INSTANCE_PRINCIPAL = "instance_principal" + SECURITY_TOKEN = "security_token" + + @classmethod + def values(cls) -> tuple: + """Returns a tuple of all enum values.""" + return tuple(member.value for member in cls) + + +class SingletonMeta(type): + """Metaclass enforcing singleton behavior.""" + + _instances: Dict[Type[Any], Any] = {} + + def __call__(cls, *args: Any, **kwargs: Any) -> Any: + if cls not in cls._instances: + cls._instances[cls] = super().__call__(*args, **kwargs) + return cls._instances[cls] + + +@dataclass() +class AuthState(metaclass=SingletonMeta): + """ + Class stores state of variables specified for auth method, configuration, + configuration file location, profile name, signer or signer_callable, which + set by use at any given time and can be provided by this class in any ADS module. + """ + + oci_iam_type: str = None + oci_cli_auth: str = None + oci_config_path: str = None + oci_key_profile: str = None + oci_config: Dict = None + oci_signer: Any = None + oci_signer_callable: Callable = None + oci_signer_kwargs: Dict = None + oci_client_kwargs: Dict = None + + def __post_init__(self): + self.oci_iam_type = self.oci_iam_type or os.environ.get( + "OCI_IAM_TYPE", AuthType.API_KEY + ) + self.oci_cli_auth = self.oci_cli_auth or os.environ.get( + "OCI_CLI_AUTH", AuthType.RESOURCE_PRINCIPAL + ) + self.oci_config_path = self.oci_config_path or os.environ.get( + "OCI_CONFIG_LOCATION", DEFAULT_LOCATION + ) + self.oci_key_profile = self.oci_key_profile or os.environ.get( + "OCI_CONFIG_PROFILE", DEFAULT_PROFILE + ) + self.oci_config = ( + self.oci_config or {"region": os.environ["OCI_RESOURCE_REGION"]} + if os.environ.get("OCI_RESOURCE_REGION") + else {} + ) + self.oci_signer_kwargs = self.oci_signer_kwargs or {} + self.oci_client_kwargs = self.oci_client_kwargs or {} + + +def set_auth( + auth: Optional[str] = AuthType.API_KEY, + oci_config_location: Optional[str] = DEFAULT_LOCATION, + profile: Optional[str] = DEFAULT_PROFILE, + config: Optional[Dict] = ( + {"region": os.environ["OCI_RESOURCE_REGION"]} + if os.environ.get("OCI_RESOURCE_REGION") + else {} + ), + signer: Optional[Any] = None, + signer_callable: Optional[Callable] = None, + signer_kwargs: Optional[Dict] = None, + client_kwargs: Optional[Dict] = None, +) -> None: + """ + Sets the default authentication type. + + Parameters + ---------- + auth: Optional[str], default 'api_key' + 'api_key', 'resource_principal' or 'instance_principal'. Enable/disable resource principal identity, + instance principal or keypair identity in a notebook session + oci_config_location: Optional[str], default '~/.oci/config' + config file location + profile: Optional[str], default is DEFAULT_PROFILE, which is 'DEFAULT' + profile name for api keys config file + config: Optional[Dict], default {} + created config dictionary + signer: Optional[Any], default None + created signer, can be resource principals signer, instance principal signer or other. + Check documentation for more signers: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html + signer_callable: Optional[Callable], default None + a callable object that returns signer + signer_kwargs: Optional[Dict], default None + parameters accepted by the signer. + Check documentation: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html + client_kwargs: Optional[Dict], default None + Additional keyword arguments for initializing the OCI client. + Example: client_kwargs = {"timeout": 60} + + Examples + -------- + >>> ads.auth.set_auth("api_key") # default signer is set to api keys + + >>> ads.auth.set_auth("api_key", profile = "TEST") # default signer is set to api keys and to use TEST profile + + >>> ads.auth.set_auth("api_key", oci_config_location = "other_config_location") # use non-default oci_config_location + + >>> ads.auth.set_auth("api_key", client_kwargs={"timeout": 60}) # default signer with connection and read timeouts set to 60 seconds for the client. + + >>> other_config = oci.config.from_file("other_config_location", "OTHER_PROFILE") # Create non-default config + >>> ads.auth.set_auth(config=other_config) # Set api keys type of authentication based on provided config + + >>> config={ + ... user=ocid1.user.oc1.., + ... fingerprint=, + ... tenancy=ocid1.tenancy.oc1.., + ... region=us-ashburn-1, + ... key_content=, + ... } + >>> ads.auth.set_auth(config=config) # Set api key authentication using private key content based on provided config + + >>> config={ + ... user=ocid1.user.oc1.., + ... fingerprint=, + ... tenancy=ocid1.tenancy.oc1.., + ... region=us-ashburn-1, + ... key_file=~/.oci/oci_api_key.pem, + ... } + >>> ads.auth.set_auth(config=config) # Set api key authentication using private key file location based on provided config + + >>> ads.auth.set_auth("resource_principal") # Set resource principal authentication + + >>> ads.auth.set_auth("instance_principal") # Set instance principal authentication + + >>> ads.auth.set_auth("security_token") # Set security token authentication + + >>> config = dict( + ... region=us-ashburn-1, + ... key_file=~/.oci/sessions/DEFAULT/oci_api_key.pem, + ... security_token_file=~/.oci/sessions/DEFAULT/token + ... ) + >>> ads.auth.set_auth("security_token", config=config) # Set security token authentication from provided config + + >>> signer = oci.signer.Signer( + ... user=ocid1.user.oc1.., + ... fingerprint=, + ... tenancy=ocid1.tenancy.oc1.., + ... region=us-ashburn-1, + ... private_key_content=, + ... ) + >>> ads.auth.set_auth(signer=signer) # Set api keys authentication with private key content based on provided signer + + >>> signer = oci.signer.Signer( + ... user=ocid1.user.oc1.., + ... fingerprint=, + ... tenancy=ocid1.tenancy.oc1.., + ... region=us-ashburn-1, + ... private_key_file_location=, + ... ) + >>> ads.auth.set_auth(signer=signer) # Set api keys authentication with private key file location based on provided signer + + >>> signer = oci.auth.signers.get_resource_principals_signer() + >>> ads.auth.create_signer(config={}, signer=signer) # resource principals authentication dictionary created + + >>> signer_callable = oci.auth.signers.ResourcePrincipalsFederationSigner + >>> ads.auth.set_auth(signer_callable=signer_callable) # Set resource principal federation signer callable + + >>> signer_callable = oci.auth.signers.InstancePrincipalsSecurityTokenSigner + >>> signer_kwargs = dict(log_requests=True) # will log the request url and response data when retrieving + >>> # instance principals authentication dictionary created based on callable with kwargs parameters: + >>> ads.auth.set_auth(signer_callable=signer_callable, signer_kwargs=signer_kwargs) + """ + signer_kwargs = signer_kwargs or {} + client_kwargs = client_kwargs or {} + + auth_state = AuthState() + + valid_auth_keys = AuthFactory.classes.keys() + if auth in valid_auth_keys: + auth_state.oci_iam_type = auth + else: + raise ValueError( + f"Allowed values are: {valid_auth_keys}. If you wish to use other authentication form, " + f"pass a valid signer or use signer_callable and signer_kwargs" + ) + + if config and ( + oci_config_location != DEFAULT_LOCATION or profile != DEFAULT_PROFILE + ): + raise ValueError( + "'config' and 'oci_config_location', 'profile' pair are mutually exclusive." + "Please specify 'config' OR 'oci_config_location', 'profile' pair." + ) + + auth_state.oci_config = config + auth_state.oci_key_profile = profile + auth_state.oci_config_path = oci_config_location + auth_state.oci_signer = signer + auth_state.oci_signer_callable = signer_callable + auth_state.oci_signer_kwargs = signer_kwargs + auth_state.oci_client_kwargs = client_kwargs + + +def api_keys( + oci_config: Union[str, Dict] = os.path.expanduser(DEFAULT_LOCATION), + profile: str = DEFAULT_PROFILE, + client_kwargs: Dict = None, +) -> Dict: + """ + Prepares authentication and extra arguments necessary for creating clients for different OCI services using API + Keys. + + Parameters + ---------- + oci_config: Optional[Union[str, Dict]], default is ~/.oci/config + OCI authentication config file location or a dictionary with config attributes. + profile: Optional[str], is DEFAULT_PROFILE, which is 'DEFAULT' + Profile name to select from the config file. + client_kwargs: Optional[Dict], default None + kwargs that are required to instantiate the Client if we need to override the defaults. + + Returns + ------- + dict + Contains keys - config, signer and client_kwargs. + + - The config contains the config loaded from the configuration loaded from `oci_config`. + - The signer contains the signer object created from the api keys. + - client_kwargs contains the `client_kwargs` that was passed in as input parameter. + + Examples + -------- + >>> auth = ads.auth.api_keys(oci_config="/home/datascience/.oci/config", profile="TEST", client_kwargs={"timeout": 6000}) + """ + signer_args = dict( + oci_config=oci_config if isinstance(oci_config, Dict) else {}, + oci_config_location=( + oci_config + if isinstance(oci_config, str) + else os.path.expanduser(DEFAULT_LOCATION) + ), + oci_key_profile=profile, + client_kwargs=client_kwargs, + ) + signer_generator = AuthFactory().signerGenerator(AuthType.API_KEY) + return signer_generator(signer_args).create_signer() + + +def resource_principal( + client_kwargs: Optional[Dict] = None, +) -> Dict: + """ + Prepares authentication and extra arguments necessary for creating clients for different OCI services using + Resource Principals. + + Parameters + ---------- + client_kwargs: Dict, default None + kwargs that are required to instantiate the Client if we need to override the defaults. + + Returns + ------- + dict + Contains keys - config, signer and client_kwargs. + + - The config contains and empty dictionary. + - The signer contains the signer object created from the resource principal. + - client_kwargs contains the `client_kwargs` that was passed in as input parameter. + + Examples + -------- + >>> auth = ads.auth.resource_principal({"timeout": 6000}) + """ + signer_args = dict(client_kwargs=client_kwargs) + signer_generator = AuthFactory().signerGenerator(AuthType.RESOURCE_PRINCIPAL) + return signer_generator(signer_args).create_signer() + + +def security_token( + oci_config: Union[str, Dict] = os.path.expanduser(DEFAULT_LOCATION), + profile: str = DEFAULT_PROFILE, + client_kwargs: Dict = None, +) -> Dict: + """ + Prepares authentication and extra arguments necessary for creating clients for different OCI services using Security Token. + + Parameters + ---------- + oci_config: Optional[Union[str, Dict]], default is ~/.oci/config + OCI authentication config file location or a dictionary with config attributes. + profile: Optional[str], is DEFAULT_PROFILE, which is 'DEFAULT' + Profile name to select from the config file. + client_kwargs: Optional[Dict], default None + kwargs that are required to instantiate the Client if we need to override the defaults. + + Returns + ------- + dict + Contains keys - config, signer and client_kwargs. + + - The config contains the config loaded from the configuration loaded from `oci_config`. + - The signer contains the signer object created from the security token. + - client_kwargs contains the `client_kwargs` that was passed in as input parameter. + + Examples + -------- + >>> auth = ads.auth.security_token(oci_config="/home/datascience/.oci/config", profile="TEST", client_kwargs={"timeout": 6000}) + """ + signer_args = dict( + oci_config=oci_config if isinstance(oci_config, Dict) else {}, + oci_config_location=( + oci_config + if isinstance(oci_config, str) + else os.path.expanduser(DEFAULT_LOCATION) + ), + oci_key_profile=profile, + client_kwargs=client_kwargs, + ) + signer_generator = AuthFactory().signerGenerator(AuthType.SECURITY_TOKEN) + return signer_generator(signer_args).create_signer() + + +def create_signer( + auth_type: Optional[str] = AuthType.API_KEY, + oci_config_location: Optional[str] = DEFAULT_LOCATION, + profile: Optional[str] = DEFAULT_PROFILE, + config: Optional[Dict] = {}, + signer: Optional[Any] = None, + signer_callable: Optional[Callable] = None, + signer_kwargs: Optional[Dict] = {}, + client_kwargs: Optional[Dict] = None, +) -> Dict: + """ + Prepares authentication and extra arguments necessary for creating clients for different OCI services based on + provided parameters. If `signer` or `signer`_callable` provided, authentication with that signer will be created. + If `config` provided, api_key type of authentication will be created. Accepted values for `auth_type`: + `api_key` (default), 'instance_principal', 'resource_principal'. + + Parameters + ---------- + auth_type: Optional[str], default 'api_key' + 'api_key', 'resource_principal' or 'instance_principal'. Enable/disable resource principal identity, + instance principal or keypair identity in a notebook session + oci_config_location: Optional[str], default oci.config.DEFAULT_LOCATION, which is '~/.oci/config' + config file location + profile: Optional[str], default is DEFAULT_PROFILE, which is 'DEFAULT' + profile name for api keys config file + config: Optional[Dict], default {} + created config dictionary + signer: Optional[Any], default None + created signer, can be resource principals signer, instance principal signer or other. + Check documentation for more signers: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html + signer_callable: Optional[Callable], default None + a callable object that returns signer + signer_kwargs: Optional[Dict], default None + parameters accepted by the signer. + Check documentation: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html + client_kwargs : dict + kwargs that are required to instantiate the Client if we need to override the defaults + + Examples + -------- + >>> import ads.auth + >>> auth = ads.auth.create_signer() # api_key type of authentication dictionary created with default config location and default profile + + >>> config = oci.config.from_file("other_config_location", "OTHER_PROFILE") + >>> auth = ads.auth.create_signer(config=config) # api_key type of authentication dictionary created based on provided config + + >>> config={ + ... user=ocid1.user.oc1.., + ... fingerprint=, + ... tenancy=ocid1.tenancy.oc1.., + ... region=us-ashburn-1, + ... key_content=, + ... } + >>> auth = ads.auth.create_signer(config=config) # api_key type of authentication dictionary with private key content created based on provided config + + >>> config={ + ... user=ocid1.user.oc1.., + ... fingerprint=, + ... tenancy=ocid1.tenancy.oc1.., + ... region=us-ashburn-1, + ... key_file=~/.oci/oci_api_key.pem, + ... } + >>> auth = ads.auth.create_signer(config=config) # api_key type of authentication dictionary with private key file location created based on provided config + + >>> signer = oci.auth.signers.get_resource_principals_signer() + >>> auth = ads.auth.create_signer(config={}, signer=signer) # resource principals authentication dictionary created + + >>> auth = ads.auth.create_signer(auth_type='instance_principal') # instance principals authentication dictionary created + + >>> signer_callable = oci.auth.signers.InstancePrincipalsSecurityTokenSigner + >>> signer_kwargs = dict(log_requests=True) # will log the request url and response data when retrieving + >>> auth = ads.auth.create_signer(signer_callable=signer_callable, signer_kwargs=signer_kwargs) # instance principals authentication dictionary created based on callable with kwargs parameters + >>> config = dict( + ... region=us-ashburn-1, + ... key_file=~/.oci/sessions/DEFAULT/oci_api_key.pem, + ... security_token_file=~/.oci/sessions/DEFAULT/token + ... ) + >>> auth = ads.auth.create_signer(auth_type="security_token", config=config) # security token authentication created based on provided config + """ + if signer or signer_callable: + configuration = _apply_user_agent_hook(config) + if signer_callable: + signer = signer_callable(**signer_kwargs) + signer_dict = { + "config": configuration, + "signer": signer, + "client_kwargs": client_kwargs, + } + logger.debug(f"Using authentication signer type {type(signer)}.") + return signer_dict + else: + signer_args = dict( + oci_config_location=oci_config_location, + oci_key_profile=profile, + oci_config=config, + client_kwargs=client_kwargs, + ) + + signer_generator = AuthFactory().signerGenerator(auth_type) + + return signer_generator(signer_args).create_signer() + + +def default_signer(client_kwargs: Optional[Dict] = None) -> Dict: + """ + Prepares authentication and extra arguments necessary for creating clients for different OCI services based on + the default authentication setting for the session. Refer ads.auth.set_auth API for further reference. + + Parameters + ---------- + client_kwargs : dict + kwargs that are required to instantiate the Client if we need to override the defaults. + Example: client_kwargs = {"timeout": 60} + + Returns + ------- + dict + Contains keys - config, signer and client_kwargs. + + - The config contains the config loaded from the configuration loaded from the default location if the default + auth mode is API keys, otherwise it is empty dictionary. + - The signer contains the signer object created from default auth mode. + - client_kwargs contains the `client_kwargs` that was passed in as input parameter. + + Examples + -------- + >>> import ads.auth + + >>> auth = ads.auth.default_signer() + + >>> ads.auth.set_auth("resource_principal") + >>> auth = ads.auth.default_signer() + + >>> signer_callable = oci.auth.signers.InstancePrincipalsSecurityTokenSigner + >>> ads.auth.set_auth(signer_callable=signer_callable) # Set instance principal callable + >>> auth = ads.auth.default_signer() # signer_callable instantiated + """ + auth_state = AuthState() + if auth_state.oci_signer or auth_state.oci_signer_callable: + configuration = _apply_user_agent_hook(auth_state.oci_config) + signer = auth_state.oci_signer + if auth_state.oci_signer_callable: + signer_kwargs = auth_state.oci_signer_kwargs or {} + signer = auth_state.oci_signer_callable(**signer_kwargs) + signer_dict = { + "config": configuration, + "signer": signer, + "client_kwargs": { + **(auth_state.oci_client_kwargs or {}), + **(client_kwargs or {}), + }, + } + logger.debug(f"Using authentication signer type {type(signer)}.") + return signer_dict + else: + signer_args = dict( + oci_config_location=auth_state.oci_config_path, + oci_key_profile=auth_state.oci_key_profile, + oci_config=auth_state.oci_config, + client_kwargs={ + **(auth_state.oci_client_kwargs or {}), + **(client_kwargs or {}), + }, + ) + signer_generator = AuthFactory().signerGenerator(auth_state.oci_iam_type) + return signer_generator(signer_args).create_signer() + + +class AuthSignerGenerator(ABC): + """Abstract base class for signer generators.""" + + @abstractmethod + def create_signer(self) -> Dict[str, Any]: + """Produce {'config','signer','client_kwargs'} dictionary.""" + pass + + +class APIKey(AuthSignerGenerator): + """ + Creates api keys auth instance. This signer is intended to be used when signing requests for + a given user - it requires that user’s ID, their private key and certificate fingerprint. + It prepares extra arguments necessary for creating clients for variety of OCI services. + """ + + def __init__(self, args: Optional[Dict] = None): + """ + Signer created based on args provided. If not provided current values of according arguments + will be used from current global state from AuthState class. + + Parameters + ---------- + args: dict + args that are required to create api key config and signer. Contains keys: oci_config, + oci_config_location, oci_key_profile, client_kwargs. + + - oci_config is a configuration dict that can be used to create clients + - oci_config_location - path to config file + - oci_key_profile - the profile to load from config file + - client_kwargs - optional parameters for OCI client creation in next steps + """ + self.oci_config = args.get("oci_config") + self.oci_config_location = args.get("oci_config_location") + self.oci_key_profile = args.get("oci_key_profile") + self.client_kwargs = args.get("client_kwargs") + + def create_signer(self) -> Dict: + """ + Creates api keys configuration and signer with extra arguments necessary for creating clients. + Signer constructed from the `oci_config` provided. If not 'oci_config', configuration will be + constructed from 'oci_config_location' and 'oci_key_profile' in place. + + Returns + ------- + dict + Contains keys - config, signer and client_kwargs. + + - config contains the configuration information + - signer contains the signer object created. It is instantiated from signer_callable, or + signer provided in args used, or instantiated in place + - client_kwargs contains the `client_kwargs` that was passed in as input parameter + + Examples + -------- + >>> signer_args = dict( + >>> client_kwargs=client_kwargs + >>> ) + >>> signer_generator = AuthFactory().signerGenerator(AuthType.API_KEY) + >>> signer_generator(signer_args).create_signer() + """ + if self.oci_config: + configuration = _apply_user_agent_hook(self.oci_config) + else: + configuration = _apply_user_agent_hook( + oci.config.from_file(self.oci_config_location, self.oci_key_profile) + ) + + oci.config.validate_config(configuration) + logger.debug("Using 'api_key' authentication.") + return { + "config": configuration, + "signer": oci.signer.Signer( + tenancy=configuration["tenancy"], + user=configuration["user"], + fingerprint=configuration["fingerprint"], + private_key_file_location=configuration.get("key_file"), + pass_phrase=configuration.get("pass_phrase"), + private_key_content=configuration.get("key_content"), + ), + "client_kwargs": self.client_kwargs, + } + + +class ResourcePrincipal(AuthSignerGenerator): + """ + Creates Resource Principal signer - a security token for a resource principal. + It prepares extra arguments necessary for creating clients for variety of OCI services. + """ + + def __init__(self, args: Optional[Dict] = None): + """ + Signer created based on args provided. If not provided current values of according arguments + will be used from current global state from AuthState class. + + Parameters + ---------- + args: dict + args that are required to create Resource Principal signer. Contains keys: client_kwargs. + + - client_kwargs - optional parameters for OCI client creation in next steps + """ + self.client_kwargs = args.get("client_kwargs") + + def create_signer(self) -> Dict: + """ + Creates Resource Principal signer with extra arguments necessary for creating clients. + + Returns + ------- + dict + Contains keys - config, signer and client_kwargs. + + - config contains the configuration information + - signer contains the signer object created. It is instantiated from signer_callable, or + signer provided in args used, or instantiated in place + - client_kwargs contains the `client_kwargs` that was passed in as input parameter + + Examples + -------- + >>> signer_args = dict( + >>> signer=oci.auth.signers.get_resource_principals_signer() + >>> ) + >>> signer_generator = AuthFactory().signerGenerator(AuthType.RESOURCE_PRINCIPAL) + >>> signer_generator(signer_args).create_signer() + """ + configuration = _apply_user_agent_hook(AuthState().oci_config) + signer_dict = { + "config": configuration, + "signer": oci.auth.signers.get_resource_principals_signer(), + "client_kwargs": self.client_kwargs, + } + logger.debug("Using 'resource_principal' authentication.") + return signer_dict + + @staticmethod + def supported(): + return any( + os.environ.get(var) + for var in [ + "JOB_RUN_OCID", + "NB_SESSION_OCID", + "DATAFLOW_RUN_ID", + "PIPELINE_RUN_OCID", + ] + ) + + +class InstancePrincipal(AuthSignerGenerator): + """ + Creates Instance Principal signer - a SecurityTokenSigner which uses a security token for an instance + principal. It prepares extra arguments necessary for creating clients for variety of OCI services. + """ + + def __init__(self, args: Optional[Dict] = None): + """ + Signer created based on args provided. If not provided current values of according arguments + will be used from current global state from AuthState class. + + Parameters + ---------- + args: dict + args that are required to create Instance Principal signer. Contains keys: signer_kwargs, client_kwargs. + + - signer_kwargs - optional parameters required to instantiate instance principal signer + - client_kwargs - optional parameters for OCI client creation in next steps + """ + self.signer_kwargs = args.get("signer_kwargs", dict()) + self.client_kwargs = args.get("client_kwargs") + + def create_signer(self) -> Dict: + """ + Creates Instance Principal signer with extra arguments necessary for creating clients. + Signer instantiated from the `signer_callable` or if the `signer` provided is will be return by this method. + If `signer_callable` or `signer` not provided new signer will be created in place. + + Returns + ------- + dict + Contains keys - config, signer and client_kwargs. + + - config contains the configuration information + - signer contains the signer object created. It is instantiated from signer_callable, or + signer provided in args used, or instantiated in place + - client_kwargs contains the `client_kwargs` that was passed in as input parameter + + Examples + -------- + >>> signer_args = dict(signer_kwargs={"log_requests": True}) + >>> signer_generator = AuthFactory().signerGenerator(AuthType.INSTANCE_PRINCIPAL) + >>> signer_generator(signer_args).create_signer() + """ + configuration = _apply_user_agent_hook(AuthState().oci_config) + signer_dict = { + "config": configuration, + "signer": oci.auth.signers.InstancePrincipalsSecurityTokenSigner( + **self.signer_kwargs + ), + "client_kwargs": self.client_kwargs, + } + logger.debug("Using 'instance_principal' authentication.") + return signer_dict + + +class SecurityToken(AuthSignerGenerator): + """ + Creates security token auth instance. This signer is intended to be used when signing requests for + a given user - it requires that user's private key and security token. + It prepares extra arguments necessary for creating clients for variety of OCI services. + """ + + SECURITY_TOKEN_GENERIC_HEADERS = ["date", "(request-target)", "host"] + SECURITY_TOKEN_BODY_HEADERS = ["content-length", "content-type", "x-content-sha256"] + SECURITY_TOKEN_REQUIRED = ["security_token_file", "key_file", "region"] + + def __init__(self, args: Optional[Dict] = None): + """ + Signer created based on args provided. If not provided current values of according arguments + will be used from current global state from AuthState class. + + Parameters + ---------- + args: dict + args that are required to create Security Token signer. Contains keys: oci_config, + oci_config_location, oci_key_profile, client_kwargs. + + - oci_config is a configuration dict that can be used to create clients + - oci_config_location - path to config file + - oci_key_profile - the profile to load from config file + - client_kwargs - optional parameters for OCI client creation in next steps + """ + self.oci_config = args.get("oci_config") + self.oci_config_location = args.get("oci_config_location") + self.oci_key_profile = args.get("oci_key_profile") + self.client_kwargs = args.get("client_kwargs") + + def create_signer(self) -> Dict: + """ + Creates security token configuration and signer with extra arguments necessary for creating clients. + Signer constructed from the `oci_config` provided. If not 'oci_config', configuration will be + constructed from 'oci_config_location' and 'oci_key_profile' in place. + + Returns + ------- + dict + Contains keys - config, signer and client_kwargs. + + - config contains the configuration information + - signer contains the signer object created. It is instantiated from signer_callable, or + signer provided in args used, or instantiated in place + - client_kwargs contains the `client_kwargs` that was passed in as input parameter + + Examples + -------- + >>> signer_args = dict( + ... client_kwargs=client_kwargs + ... ) + >>> signer_generator = AuthFactory().signerGenerator(AuthType.SECURITY_TOKEN) + >>> signer_generator(signer_args).create_signer() + """ + if self.oci_config: + configuration = _apply_user_agent_hook(self.oci_config) + else: + configuration = _apply_user_agent_hook( + oci.config.from_file(self.oci_config_location, self.oci_key_profile) + ) + + logger.debug("Using 'security_token' authentication.") + + for parameter in self.SECURITY_TOKEN_REQUIRED: + if parameter not in configuration: + raise ValueError( + f"Parameter `{parameter}` must be provided for using `security_token` authentication." + ) + + self._validate_and_refresh_token(configuration) + + return { + "config": configuration, + "signer": oci.auth.signers.SecurityTokenSigner( + token=self._read_security_token_file( + configuration.get("security_token_file") + ), + private_key=oci.signer.load_private_key_from_file( + configuration.get("key_file"), configuration.get("pass_phrase") + ), + generic_headers=configuration.get( + "generic_headers", self.SECURITY_TOKEN_GENERIC_HEADERS + ), + body_headers=configuration.get( + "body_headers", self.SECURITY_TOKEN_BODY_HEADERS + ), + ), + "client_kwargs": self.client_kwargs, + } + + def _validate_and_refresh_token(self, configuration: Dict[str, Any]): + """Validates and refreshes security token. + + Parameters + ---------- + configuration: Dict + Security token configuration. + """ + security_token = self._read_security_token_file( + configuration.get("security_token_file") + ) + security_token_container = ( + oci.auth.security_token_container.SecurityTokenContainer( + session_key_supplier=None, security_token=security_token + ) + ) + + if not security_token_container.valid(): + raise SecurityTokenError( + "Security token has expired. Call `oci session authenticate` to generate new session." + ) + + time_now = int(time.time()) + time_expired = security_token_container.get_jwt()["exp"] + if time_expired - time_now < SECURITY_TOKEN_LEFT_TIME: + if not self.oci_config_location: + logger.warning( + "Can not auto-refresh token. Specify parameter `oci_config_location` through ads.auth.set_auth() or ads.auth.create_signer()." + ) + else: + result = os.system( + f"oci session refresh --config-file {self.oci_config_location} --profile {self.oci_key_profile}" + ) + if result == 1: + logger.warning( + "Some error happened during auto-refreshing the token. Continue using the current one that's expiring in less than {SECURITY_TOKEN_LEFT_TIME} seconds." + "Please follow steps in https://docs.oracle.com/en-us/iaas/Content/API/SDKDocs/clitoken.htm to renew token." + ) + + date_time = datetime.fromtimestamp(time_expired).strftime("%Y-%m-%d %H:%M:%S") + logger.debug(f"Session is valid until {date_time}.") + + def _read_security_token_file(self, security_token_file: str) -> str: + """Reads security token from file. + + Parameters + ---------- + security_token_file: str + The path to security token file. + + Returns + ------- + str: + Security token string. + """ + expanded_path = os.path.expanduser(security_token_file) + if not os.path.isfile(expanded_path): + raise ValueError("Invalid `security_token_file`. Specify a valid path.") + try: + token = None + with open(expanded_path) as f: + token = f.read() + return token + except: + raise + + +class AuthFactory: + """ + AuthFactory class which contains list of registered signers and allows to register new signers. + Check documentation for more signers: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html. + + Current signers: + * APIKey + * ResourcePrincipal + * InstancePrincipal + * SecurityToken + """ + + classes = { + AuthType.API_KEY: APIKey, + AuthType.RESOURCE_PRINCIPAL: ResourcePrincipal, + AuthType.INSTANCE_PRINCIPAL: InstancePrincipal, + AuthType.SECURITY_TOKEN: SecurityToken, + } + + @classmethod + def register(cls, signer_type: str, signer: Any) -> None: + """Registers a new signer. + + Parameters + ---------- + signer_type: str + signer type to be registers + signer: RecordParser + A new signer class to be registered. + + Returns + ------- + None + Nothing. + """ + cls.classes[signer_type] = signer + + def signerGenerator(self, iam_type: Optional[str] = "api_key"): + """ + Generates signer classes based of iam_type, which specify one of auth methods: + 'api_key', 'resource_principal' or 'instance_principal'. + + Parameters + ---------- + iam_type: str, default 'api_key' + type of auth provided in IAM_TYPE environment variable or set in parameters in + ads.auth.set_auth() method. + + Returns + ------- + :class:`APIKey` or :class:`ResourcePrincipal` or :class:`InstancePrincipal` or :class:`SecurityToken` + returns one of classes, which implements creation of signer of specified type + + Raises + ------ + ValueError + If iam_type is not supported. + """ + + valid_auth_keys = AuthFactory.classes.keys() + if iam_type in valid_auth_keys: + return AuthFactory.classes[iam_type] + else: + raise ValueError( + f"Allowed values are: {valid_auth_keys}. If you wish to use other authentication form, " + f"pass a valid signer or use signer_callable and signer_kwargs" + ) + + +class AuthContext: + """ + AuthContext used in 'with' statement for properly managing global authentication type, signer, config + and global configuration parameters. + + Examples + -------- + >>> from ads import set_auth + >>> from ads.jobs import DataFlowRun + >>> with AuthContext(auth='resource_principal'): + >>> df_run = DataFlowRun.from_ocid(run_id) + + >>> from ads.model.framework.sklearn_model import SklearnModel + >>> model = SklearnModel.from_model_artifact(uri="model_artifact_path", artifact_dir="model_artifact_path") + >>> set_auth(auth='api_key', oci_config_location="~/.oci/config") + >>> with AuthContext(auth='api_key', oci_config_location="~/another_config_location/config"): + >>> # upload model to Object Storage using config from another_config_location/config + >>> model.upload_artifact(uri="oci://bucket@namespace/prefix/") + >>> # upload model to Object Storage using config from ~/.oci/config, which was set before 'with AuthContext():' + >>> model.upload_artifact(uri="oci://bucket@namespace/prefix/") + """ + + def __init__(self, **kwargs): + """ + Initialize class AuthContext and saves global state of authentication type, signer, config + and global configuration parameters. + + Parameters + ---------- + **kwargs: optional, list of parameters passed to ads.auth.set_auth() method, which can be: + auth: Optional[str], default 'api_key' + 'api_key', 'resource_principal' or 'instance_principal'. Enable/disable resource principal + identity, instance principal or keypair identity + oci_config_location: Optional[str], default oci.config.DEFAULT_LOCATION, which is '~/.oci/config' + config file location + profile: Optional[str], default is DEFAULT_PROFILE, which is 'DEFAULT' + profile name for api keys config file + config: Optional[Dict], default {} + created config dictionary + signer: Optional[Any], default None + created signer, can be resource principals signer, instance principal signer or other + signer_callable: Optional[Callable], default None + a callable object that returns signer + signer_kwargs: Optional[Dict], default None + parameters accepted by the signer + client_kwargs: Optional[Dict], default None + Additional keyword arguments for initializing the OCI client. + Example: client_kwargs = {"timeout": 60} + """ + self.kwargs = kwargs + + def __enter__(self): + """ + When called by the 'with' statement current state of authentication type, signer, config + and configuration parameters saved. + """ + self.previous_state = copy.deepcopy(AuthState()) + set_auth(**self.kwargs) + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + When called by the 'with' statement initial state of authentication type, signer, config + and configuration parameters restored. + """ + AuthState().__dict__.update(self.previous_state.__dict__) diff --git a/ads/ads_auth/pyproject.toml b/ads/ads_auth/pyproject.toml new file mode 100644 index 000000000..be25e2a0f --- /dev/null +++ b/ads/ads_auth/pyproject.toml @@ -0,0 +1,53 @@ +[build-system] +requires = ["setuptools>=61"] +build-backend = "setuptools.build_meta" + + +[project] +name = "oracle-ads-auth" +version = "0.0.1" +description = "Authentication interface for Oracle Cloud Infrastructure (OCI) clients within the Oracle Accelerated Data Science ecosystem." +readme = { file = "README.md", content-type = "text/markdown" } +requires-python = ">=3.9,<4.0" +license = "UPL-1.0" + +authors = [ + { name = "Dmitrii Cherkasov", email = "dmitrii.cherkasov@oracle.com" }, +] + +maintainers = [ + { name = "Dmitrii Cherkasov", email = "dmitrii.cherkasov@oracle.com" }, +] + +keywords = [ + "Oracle Cloud Infrastructure", + "OCI", + "AI Quick Actions", + "Accelerated Data Science", + "ADS", + "LLM", +] + +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] + +dependencies = ["oci", "pydantic>=2.8.0"] + +[project.urls] +Homepage = "https://github.com/oracle/accelerated-data-science" +Source = "https://github.com/oracle/accelerated-data-science" +Documentation = "https://accelerated-data-science.readthedocs.io/en/latest/" + + +[tool.setuptools.packages.find] +where = ["."] +include = ["ads.auth"] +namespaces = false diff --git a/ads/ads_auth/tests/__init__.py b/ads/ads_auth/tests/__init__.py new file mode 100644 index 000000000..0fb4f1549 --- /dev/null +++ b/ads/ads_auth/tests/__init__.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python + +# Copyright (c) 2025 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ diff --git a/ads/common/auth.py b/ads/common/auth.py index b42dc89f4..5a3ef3cad 100644 --- a/ads/common/auth.py +++ b/ads/common/auth.py @@ -3,1120 +3,55 @@ # Copyright (c) 2021, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -import copy -import os -import time -from dataclasses import dataclass -from datetime import datetime -from typing import Any, Callable, Dict, Optional, Union - -import oci -from oci.config import ( - DEFAULT_LOCATION, # "~/.oci/config" - DEFAULT_PROFILE, # "DEFAULT" -) - -import ads.telemetry -from ads.common import logger -from ads.common.decorator.deprecate import deprecated -from ads.common.extended_enum import ExtendedEnum - -SECURITY_TOKEN_LEFT_TIME = 600 - - -class SecurityTokenError(Exception): # pragma: no cover - pass - - -class AuthType(ExtendedEnum): - API_KEY = "api_key" - RESOURCE_PRINCIPAL = "resource_principal" - INSTANCE_PRINCIPAL = "instance_principal" - SECURITY_TOKEN = "security_token" - - -class SingletonMeta(type): - _instances = {} - - def __call__(cls, *args, **kwargs): - if cls not in cls._instances: - cls._instances[cls] = super().__call__(*args, **kwargs) - return cls._instances[cls] - - -@dataclass() -class AuthState(metaclass=SingletonMeta): - """ - Class stores state of variables specified for auth method, configuration, - configuration file location, profile name, signer or signer_callable, which - set by use at any given time and can be provided by this class in any ADS module. - """ - - oci_iam_type: str = None - oci_cli_auth: str = None - oci_config_path: str = None - oci_key_profile: str = None - oci_config: Dict = None - oci_signer: Any = None - oci_signer_callable: Callable = None - oci_signer_kwargs: Dict = None - oci_client_kwargs: Dict = None - - def __post_init__(self): - self.oci_iam_type = self.oci_iam_type or os.environ.get( - "OCI_IAM_TYPE", AuthType.API_KEY - ) - self.oci_cli_auth = self.oci_cli_auth or os.environ.get( - "OCI_CLI_AUTH", AuthType.RESOURCE_PRINCIPAL - ) - self.oci_config_path = self.oci_config_path or os.environ.get( - "OCI_CONFIG_LOCATION", DEFAULT_LOCATION - ) - self.oci_key_profile = self.oci_key_profile or os.environ.get( - "OCI_CONFIG_PROFILE", DEFAULT_PROFILE - ) - self.oci_config = ( - self.oci_config or {"region": os.environ["OCI_RESOURCE_REGION"]} - if os.environ.get("OCI_RESOURCE_REGION") - else {} - ) - self.oci_signer_kwargs = self.oci_signer_kwargs or {} - self.oci_client_kwargs = self.oci_client_kwargs or {} - - -def set_auth( - auth: Optional[str] = AuthType.API_KEY, - oci_config_location: Optional[str] = DEFAULT_LOCATION, - profile: Optional[str] = DEFAULT_PROFILE, - config: Optional[Dict] = ( - {"region": os.environ["OCI_RESOURCE_REGION"]} - if os.environ.get("OCI_RESOURCE_REGION") - else {} - ), - signer: Optional[Any] = None, - signer_callable: Optional[Callable] = None, - signer_kwargs: Optional[Dict] = None, - client_kwargs: Optional[Dict] = None, -) -> None: - """ - Sets the default authentication type. - - Parameters - ---------- - auth: Optional[str], default 'api_key' - 'api_key', 'resource_principal' or 'instance_principal'. Enable/disable resource principal identity, - instance principal or keypair identity in a notebook session - oci_config_location: Optional[str], default '~/.oci/config' - config file location - profile: Optional[str], default is DEFAULT_PROFILE, which is 'DEFAULT' - profile name for api keys config file - config: Optional[Dict], default {} - created config dictionary - signer: Optional[Any], default None - created signer, can be resource principals signer, instance principal signer or other. - Check documentation for more signers: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html - signer_callable: Optional[Callable], default None - a callable object that returns signer - signer_kwargs: Optional[Dict], default None - parameters accepted by the signer. - Check documentation: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html - client_kwargs: Optional[Dict], default None - Additional keyword arguments for initializing the OCI client. - Example: client_kwargs = {"timeout": 60} - - Examples - -------- - >>> ads.set_auth("api_key") # default signer is set to api keys - - >>> ads.set_auth("api_key", profile = "TEST") # default signer is set to api keys and to use TEST profile - - >>> ads.set_auth("api_key", oci_config_location = "other_config_location") # use non-default oci_config_location - - >>> ads.set_auth("api_key", client_kwargs={"timeout": 60}) # default signer with connection and read timeouts set to 60 seconds for the client. - - >>> other_config = oci.config.from_file("other_config_location", "OTHER_PROFILE") # Create non-default config - >>> ads.set_auth(config=other_config) # Set api keys type of authentication based on provided config - - >>> config={ - ... user=ocid1.user.oc1.., - ... fingerprint=, - ... tenancy=ocid1.tenancy.oc1.., - ... region=us-ashburn-1, - ... key_content=, - ... } - >>> ads.set_auth(config=config) # Set api key authentication using private key content based on provided config - - >>> config={ - ... user=ocid1.user.oc1.., - ... fingerprint=, - ... tenancy=ocid1.tenancy.oc1.., - ... region=us-ashburn-1, - ... key_file=~/.oci/oci_api_key.pem, - ... } - >>> ads.set_auth(config=config) # Set api key authentication using private key file location based on provided config - - >>> ads.set_auth("resource_principal") # Set resource principal authentication - - >>> ads.set_auth("instance_principal") # Set instance principal authentication - - >>> ads.set_auth("security_token") # Set security token authentication - - >>> config = dict( - ... region=us-ashburn-1, - ... key_file=~/.oci/sessions/DEFAULT/oci_api_key.pem, - ... security_token_file=~/.oci/sessions/DEFAULT/token - ... ) - >>> ads.set_auth("security_token", config=config) # Set security token authentication from provided config - - >>> signer = oci.signer.Signer( - ... user=ocid1.user.oc1.., - ... fingerprint=, - ... tenancy=ocid1.tenancy.oc1.., - ... region=us-ashburn-1, - ... private_key_content=, - ... ) - >>> ads.set_auth(signer=signer) # Set api keys authentication with private key content based on provided signer - - >>> signer = oci.signer.Signer( - ... user=ocid1.user.oc1.., - ... fingerprint=, - ... tenancy=ocid1.tenancy.oc1.., - ... region=us-ashburn-1, - ... private_key_file_location=, - ... ) - >>> ads.set_auth(signer=signer) # Set api keys authentication with private key file location based on provided signer - - >>> signer = oci.auth.signers.get_resource_principals_signer() - >>> ads.auth.create_signer(config={}, signer=signer) # resource principals authentication dictionary created - - >>> signer_callable = oci.auth.signers.ResourcePrincipalsFederationSigner - >>> ads.set_auth(signer_callable=signer_callable) # Set resource principal federation signer callable - - >>> signer_callable = oci.auth.signers.InstancePrincipalsSecurityTokenSigner - >>> signer_kwargs = dict(log_requests=True) # will log the request url and response data when retrieving - >>> # instance principals authentication dictionary created based on callable with kwargs parameters: - >>> ads.set_auth(signer_callable=signer_callable, signer_kwargs=signer_kwargs) - """ - signer_kwargs = signer_kwargs or {} - client_kwargs = client_kwargs or {} - - auth_state = AuthState() - - valid_auth_keys = AuthFactory.classes.keys() - if auth in valid_auth_keys: - auth_state.oci_iam_type = auth - else: - raise ValueError( - f"Allowed values are: {valid_auth_keys}. If you wish to use other authentication form, " - f"pass a valid signer or use signer_callable and signer_kwargs" - ) - - if config and ( - oci_config_location != DEFAULT_LOCATION or profile != DEFAULT_PROFILE - ): - raise ValueError( - "'config' and 'oci_config_location', 'profile' pair are mutually exclusive." - "Please specify 'config' OR 'oci_config_location', 'profile' pair." - ) - - auth_state.oci_config = config - auth_state.oci_key_profile = profile - auth_state.oci_config_path = oci_config_location - auth_state.oci_signer = signer - auth_state.oci_signer_callable = signer_callable - auth_state.oci_signer_kwargs = signer_kwargs - auth_state.oci_client_kwargs = client_kwargs - - -def api_keys( - oci_config: Union[str, Dict] = os.path.expanduser(DEFAULT_LOCATION), - profile: str = DEFAULT_PROFILE, - client_kwargs: Dict = None, -) -> Dict: - """ - Prepares authentication and extra arguments necessary for creating clients for different OCI services using API - Keys. - - Parameters - ---------- - oci_config: Optional[Union[str, Dict]], default is ~/.oci/config - OCI authentication config file location or a dictionary with config attributes. - profile: Optional[str], is DEFAULT_PROFILE, which is 'DEFAULT' - Profile name to select from the config file. - client_kwargs: Optional[Dict], default None - kwargs that are required to instantiate the Client if we need to override the defaults. - - Returns - ------- - dict - Contains keys - config, signer and client_kwargs. - - - The config contains the config loaded from the configuration loaded from `oci_config`. - - The signer contains the signer object created from the api keys. - - client_kwargs contains the `client_kwargs` that was passed in as input parameter. - - Examples - -------- - >>> from ads.common import oci_client as oc - >>> auth = ads.auth.api_keys(oci_config="/home/datascience/.oci/config", profile="TEST", client_kwargs={"timeout": 6000}) - >>> oc.OCIClientFactory(**auth).object_storage # Creates Object storage client with timeout set to 6000 using API Key authentication - """ - signer_args = dict( - oci_config=oci_config if isinstance(oci_config, Dict) else {}, - oci_config_location=( - oci_config - if isinstance(oci_config, str) - else os.path.expanduser(DEFAULT_LOCATION) - ), - oci_key_profile=profile, - client_kwargs=client_kwargs, - ) - signer_generator = AuthFactory().signerGenerator(AuthType.API_KEY) - return signer_generator(signer_args).create_signer() - - -def resource_principal( - client_kwargs: Optional[Dict] = None, -) -> Dict: - """ - Prepares authentication and extra arguments necessary for creating clients for different OCI services using - Resource Principals. - - Parameters - ---------- - client_kwargs: Dict, default None - kwargs that are required to instantiate the Client if we need to override the defaults. - - Returns - ------- - dict - Contains keys - config, signer and client_kwargs. - - - The config contains and empty dictionary. - - The signer contains the signer object created from the resource principal. - - client_kwargs contains the `client_kwargs` that was passed in as input parameter. - - Examples - -------- - >>> from ads.common import oci_client as oc - >>> auth = ads.auth.resource_principal({"timeout": 6000}) - >>> oc.OCIClientFactory(**auth).object_storage # Creates Object Storage client with timeout set to 6000 seconds using resource principal authentication - """ - signer_args = dict(client_kwargs=client_kwargs) - signer_generator = AuthFactory().signerGenerator(AuthType.RESOURCE_PRINCIPAL) - return signer_generator(signer_args).create_signer() - - -def security_token( - oci_config: Union[str, Dict] = os.path.expanduser(DEFAULT_LOCATION), - profile: str = DEFAULT_PROFILE, - client_kwargs: Dict = None, -) -> Dict: - """ - Prepares authentication and extra arguments necessary for creating clients for different OCI services using Security Token. - - Parameters - ---------- - oci_config: Optional[Union[str, Dict]], default is ~/.oci/config - OCI authentication config file location or a dictionary with config attributes. - profile: Optional[str], is DEFAULT_PROFILE, which is 'DEFAULT' - Profile name to select from the config file. - client_kwargs: Optional[Dict], default None - kwargs that are required to instantiate the Client if we need to override the defaults. - - Returns - ------- - dict - Contains keys - config, signer and client_kwargs. - - - The config contains the config loaded from the configuration loaded from `oci_config`. - - The signer contains the signer object created from the security token. - - client_kwargs contains the `client_kwargs` that was passed in as input parameter. - - Examples - -------- - >>> from ads.common import oci_client as oc - >>> auth = ads.auth.security_token(oci_config="/home/datascience/.oci/config", profile="TEST", client_kwargs={"timeout": 6000}) - >>> oc.OCIClientFactory(**auth).object_storage # Creates Object storage client with timeout set to 6000 using Security Token authentication - """ - signer_args = dict( - oci_config=oci_config if isinstance(oci_config, Dict) else {}, - oci_config_location=( - oci_config - if isinstance(oci_config, str) - else os.path.expanduser(DEFAULT_LOCATION) - ), - oci_key_profile=profile, - client_kwargs=client_kwargs, +""" +This module forwards authentication utilities to the new 'oracle-ads-auth' package +to preserve backward compatibility. + +All functionality is now implemented in the `ads.auth` namespace from the +`oracle-ads-auth` package. +""" + +import inspect +import warnings + +# Inspect the stack to check if the import comes from outside `ads` +_stack = inspect.stack() +if all("ads/" not in frame.filename.replace("\\", "/") for frame in _stack[1:]): + warnings.warn( + "'ads.common.auth' is deprecated and will be removed in a future release.\n" + "Please update your imports to use 'ads.auth' from the new 'oracle-ads-auth' package.", + DeprecationWarning, + stacklevel=2, ) - signer_generator = AuthFactory().signerGenerator(AuthType.SECURITY_TOKEN) - return signer_generator(signer_args).create_signer() - - -def create_signer( - auth_type: Optional[str] = AuthType.API_KEY, - oci_config_location: Optional[str] = DEFAULT_LOCATION, - profile: Optional[str] = DEFAULT_PROFILE, - config: Optional[Dict] = {}, - signer: Optional[Any] = None, - signer_callable: Optional[Callable] = None, - signer_kwargs: Optional[Dict] = {}, - client_kwargs: Optional[Dict] = None, -) -> Dict: - """ - Prepares authentication and extra arguments necessary for creating clients for different OCI services based on - provided parameters. If `signer` or `signer`_callable` provided, authentication with that signer will be created. - If `config` provided, api_key type of authentication will be created. Accepted values for `auth_type`: - `api_key` (default), 'instance_principal', 'resource_principal'. - - Parameters - ---------- - auth_type: Optional[str], default 'api_key' - 'api_key', 'resource_principal' or 'instance_principal'. Enable/disable resource principal identity, - instance principal or keypair identity in a notebook session - oci_config_location: Optional[str], default oci.config.DEFAULT_LOCATION, which is '~/.oci/config' - config file location - profile: Optional[str], default is DEFAULT_PROFILE, which is 'DEFAULT' - profile name for api keys config file - config: Optional[Dict], default {} - created config dictionary - signer: Optional[Any], default None - created signer, can be resource principals signer, instance principal signer or other. - Check documentation for more signers: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html - signer_callable: Optional[Callable], default None - a callable object that returns signer - signer_kwargs: Optional[Dict], default None - parameters accepted by the signer. - Check documentation: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html - client_kwargs : dict - kwargs that are required to instantiate the Client if we need to override the defaults - - Examples - -------- - >>> import ads - >>> auth = ads.auth.create_signer() # api_key type of authentication dictionary created with default config location and default profile - >>> config = oci.config.from_file("other_config_location", "OTHER_PROFILE") - >>> auth = ads.auth.create_signer(config=config) # api_key type of authentication dictionary created based on provided config - - >>> config={ - ... user=ocid1.user.oc1.., - ... fingerprint=, - ... tenancy=ocid1.tenancy.oc1.., - ... region=us-ashburn-1, - ... key_content=, - ... } - >>> auth = ads.auth.create_signer(config=config) # api_key type of authentication dictionary with private key content created based on provided config - - >>> config={ - ... user=ocid1.user.oc1.., - ... fingerprint=, - ... tenancy=ocid1.tenancy.oc1.., - ... region=us-ashburn-1, - ... key_file=~/.oci/oci_api_key.pem, - ... } - >>> auth = ads.auth.create_signer(config=config) # api_key type of authentication dictionary with private key file location created based on provided config - - >>> signer = oci.auth.signers.get_resource_principals_signer() - >>> auth = ads.auth.create_signer(config={}, signer=signer) # resource principals authentication dictionary created - - >>> auth = ads.auth.create_signer(auth_type='instance_principal') # instance principals authentication dictionary created - - >>> signer_callable = oci.auth.signers.InstancePrincipalsSecurityTokenSigner - >>> signer_kwargs = dict(log_requests=True) # will log the request url and response data when retrieving - >>> auth = ads.auth.create_signer(signer_callable=signer_callable, signer_kwargs=signer_kwargs) # instance principals authentication dictionary created based on callable with kwargs parameters - >>> config = dict( - ... region=us-ashburn-1, - ... key_file=~/.oci/sessions/DEFAULT/oci_api_key.pem, - ... security_token_file=~/.oci/sessions/DEFAULT/token - ... ) - >>> auth = ads.auth.create_signer(auth_type="security_token", config=config) # security token authentication created based on provided config - """ - if signer or signer_callable: - configuration = ads.telemetry.update_oci_client_config(config) - if signer_callable: - signer = signer_callable(**signer_kwargs) - signer_dict = { - "config": configuration, - "signer": signer, - "client_kwargs": client_kwargs, - } - logger.debug(f"Using authentication signer type {type(signer)}.") - return signer_dict - else: - signer_args = dict( - oci_config_location=oci_config_location, - oci_key_profile=profile, - oci_config=config, - client_kwargs=client_kwargs, - ) - - signer_generator = AuthFactory().signerGenerator(auth_type) - - return signer_generator(signer_args).create_signer() - - -def default_signer(client_kwargs: Optional[Dict] = None) -> Dict: - """ - Prepares authentication and extra arguments necessary for creating clients for different OCI services based on - the default authentication setting for the session. Refer ads.set_auth API for further reference. - - Parameters - ---------- - client_kwargs : dict - kwargs that are required to instantiate the Client if we need to override the defaults. - Example: client_kwargs = {"timeout": 60} - - Returns - ------- - dict - Contains keys - config, signer and client_kwargs. - - - The config contains the config loaded from the configuration loaded from the default location if the default - auth mode is API keys, otherwise it is empty dictionary. - - The signer contains the signer object created from default auth mode. - - client_kwargs contains the `client_kwargs` that was passed in as input parameter. - - Examples - -------- - >>> import ads - >>> from ads.common import oci_client as oc - - >>> auth = ads.auth.default_signer() - >>> oc.OCIClientFactory(**auth).object_storage # Creates Object storage client - - >>> ads.set_auth("resource_principal") - >>> auth = ads.auth.default_signer() - >>> oc.OCIClientFactory(**auth).object_storage # Creates Object storage client using resource principal authentication - - >>> signer_callable = oci.auth.signers.InstancePrincipalsSecurityTokenSigner - >>> ads.set_auth(signer_callable=signer_callable) # Set instance principal callable - >>> auth = ads.auth.default_signer() # signer_callable instantiated - >>> oc.OCIClientFactory(**auth).object_storage # Creates Object storage client using instance principal authentication - """ - auth_state = AuthState() - if auth_state.oci_signer or auth_state.oci_signer_callable: - configuration = ads.telemetry.update_oci_client_config(auth_state.oci_config) - signer = auth_state.oci_signer - if auth_state.oci_signer_callable: - signer_kwargs = auth_state.oci_signer_kwargs or {} - signer = auth_state.oci_signer_callable(**signer_kwargs) - signer_dict = { - "config": configuration, - "signer": signer, - "client_kwargs": { - **(auth_state.oci_client_kwargs or {}), - **(client_kwargs or {}), - }, - } - logger.debug(f"Using authentication signer type {type(signer)}.") - return signer_dict - else: - signer_args = dict( - oci_config_location=auth_state.oci_config_path, - oci_key_profile=auth_state.oci_key_profile, - oci_config=auth_state.oci_config, - client_kwargs={ - **(auth_state.oci_client_kwargs or {}), - **(client_kwargs or {}), - }, - ) - signer_generator = AuthFactory().signerGenerator(auth_state.oci_iam_type) - return signer_generator(signer_args).create_signer() - - -@deprecated( - "2.7.3", - details="Deprecated, use: from ads.common.auth import create_signer. https://accelerated-data-science.readthedocs.io/en/latest/user_guide/cli/authentication.html#overriding-defaults.", +from ads.auth import ( + AuthContext, + AuthFactory, + AuthState, + AuthType, + SecurityToken, + SecurityTokenError, + api_keys, + create_signer, + default_signer, + register_user_agent_hook, + resource_principal, + security_token, + set_auth, ) -def get_signer( - oci_config: Optional[str] = None, oci_profile: Optional[str] = None, **client_kwargs -) -> Dict: - """ - Provides config and signer based given parameters. If oci_config (api key config file location) and - oci_profile specified new signer will ge generated. Else signer of a type specified in OCI_CLI_AUTH - environment variable will be used to generate signer and return. If OCI_CLI_AUTH not set, - resource principal signer will be provided. Accepted values for OCI_CLI_AUTH: 'api_key', - 'instance_principal', 'resource_principal'. - - Parameters - ---------- - oci_config: Optional[str], default None - Path to the config file - oci_profile: Optional[str], default None - the profile to load from the config file - client_kwargs: - kwargs that are required to instantiate the Client if we need to override the defaults - """ - if oci_config and oci_profile: - signer_args = dict( - oci_config_location=oci_config, - oci_key_profile=oci_profile, - client_kwargs=client_kwargs, - ) - signer_generator = AuthFactory().signerGenerator(AuthType.API_KEY) - else: - oci_cli_auth = ( - AuthState().oci_cli_auth - ) # "resource_principal", if env variable OCI_CLI_AUTH not set - valid_auth_keys = AuthFactory.classes.keys() - if oci_cli_auth not in valid_auth_keys: - oci_cli_auth = AuthType.RESOURCE_PRINCIPAL - signer_args = dict( - client_kwargs=client_kwargs, - ) - signer_generator = AuthFactory().signerGenerator(oci_cli_auth) - return signer_generator(signer_args).create_signer() - - -class AuthSignerGenerator: # pragma: no cover - """ - Abstract class for auth configuration and signer creation. - """ - - def create_signer(self): - pass - - -class APIKey(AuthSignerGenerator): - """ - Creates api keys auth instance. This signer is intended to be used when signing requests for - a given user - it requires that user’s ID, their private key and certificate fingerprint. - It prepares extra arguments necessary for creating clients for variety of OCI services. - """ - - def __init__(self, args: Optional[Dict] = None): - """ - Signer created based on args provided. If not provided current values of according arguments - will be used from current global state from AuthState class. - - Parameters - ---------- - args: dict - args that are required to create api key config and signer. Contains keys: oci_config, - oci_config_location, oci_key_profile, client_kwargs. - - - oci_config is a configuration dict that can be used to create clients - - oci_config_location - path to config file - - oci_key_profile - the profile to load from config file - - client_kwargs - optional parameters for OCI client creation in next steps - """ - self.oci_config = args.get("oci_config") - self.oci_config_location = args.get("oci_config_location") - self.oci_key_profile = args.get("oci_key_profile") - self.client_kwargs = args.get("client_kwargs") - - def create_signer(self) -> Dict: - """ - Creates api keys configuration and signer with extra arguments necessary for creating clients. - Signer constructed from the `oci_config` provided. If not 'oci_config', configuration will be - constructed from 'oci_config_location' and 'oci_key_profile' in place. - - Returns - ------- - dict - Contains keys - config, signer and client_kwargs. - - - config contains the configuration information - - signer contains the signer object created. It is instantiated from signer_callable, or - signer provided in args used, or instantiated in place - - client_kwargs contains the `client_kwargs` that was passed in as input parameter - - Examples - -------- - >>> signer_args = dict( - >>> client_kwargs=client_kwargs - >>> ) - >>> signer_generator = AuthFactory().signerGenerator(AuthType.API_KEY) - >>> signer_generator(signer_args).create_signer() - """ - if self.oci_config: - configuration = ads.telemetry.update_oci_client_config(self.oci_config) - else: - configuration = ads.telemetry.update_oci_client_config( - oci.config.from_file(self.oci_config_location, self.oci_key_profile) - ) - - oci.config.validate_config(configuration) - logger.debug("Using 'api_key' authentication.") - return { - "config": configuration, - "signer": oci.signer.Signer( - tenancy=configuration["tenancy"], - user=configuration["user"], - fingerprint=configuration["fingerprint"], - private_key_file_location=configuration.get("key_file"), - pass_phrase=configuration.get("pass_phrase"), - private_key_content=configuration.get("key_content"), - ), - "client_kwargs": self.client_kwargs, - } - - -class ResourcePrincipal(AuthSignerGenerator): - """ - Creates Resource Principal signer - a security token for a resource principal. - It prepares extra arguments necessary for creating clients for variety of OCI services. - """ - - def __init__(self, args: Optional[Dict] = None): - """ - Signer created based on args provided. If not provided current values of according arguments - will be used from current global state from AuthState class. - - Parameters - ---------- - args: dict - args that are required to create Resource Principal signer. Contains keys: client_kwargs. - - - client_kwargs - optional parameters for OCI client creation in next steps - """ - self.client_kwargs = args.get("client_kwargs") - - def create_signer(self) -> Dict: - """ - Creates Resource Principal signer with extra arguments necessary for creating clients. - - Returns - ------- - dict - Contains keys - config, signer and client_kwargs. - - - config contains the configuration information - - signer contains the signer object created. It is instantiated from signer_callable, or - signer provided in args used, or instantiated in place - - client_kwargs contains the `client_kwargs` that was passed in as input parameter - - Examples - -------- - >>> signer_args = dict( - >>> signer=oci.auth.signers.get_resource_principals_signer() - >>> ) - >>> signer_generator = AuthFactory().signerGenerator(AuthType.RESOURCE_PRINCIPAL) - >>> signer_generator(signer_args).create_signer() - """ - configuration = ads.telemetry.update_oci_client_config(AuthState().oci_config) - signer_dict = { - "config": configuration, - "signer": oci.auth.signers.get_resource_principals_signer(), - "client_kwargs": self.client_kwargs, - } - logger.debug("Using 'resource_principal' authentication.") - return signer_dict - - @staticmethod - def supported(): - return any( - os.environ.get(var) - for var in [ - "JOB_RUN_OCID", - "NB_SESSION_OCID", - "DATAFLOW_RUN_ID", - "PIPELINE_RUN_OCID", - ] - ) - - -class InstancePrincipal(AuthSignerGenerator): - """ - Creates Instance Principal signer - a SecurityTokenSigner which uses a security token for an instance - principal. It prepares extra arguments necessary for creating clients for variety of OCI services. - """ - - def __init__(self, args: Optional[Dict] = None): - """ - Signer created based on args provided. If not provided current values of according arguments - will be used from current global state from AuthState class. - - Parameters - ---------- - args: dict - args that are required to create Instance Principal signer. Contains keys: signer_kwargs, client_kwargs. - - - signer_kwargs - optional parameters required to instantiate instance principal signer - - client_kwargs - optional parameters for OCI client creation in next steps - """ - self.signer_kwargs = args.get("signer_kwargs", dict()) - self.client_kwargs = args.get("client_kwargs") - - def create_signer(self) -> Dict: - """ - Creates Instance Principal signer with extra arguments necessary for creating clients. - Signer instantiated from the `signer_callable` or if the `signer` provided is will be return by this method. - If `signer_callable` or `signer` not provided new signer will be created in place. - - Returns - ------- - dict - Contains keys - config, signer and client_kwargs. - - - config contains the configuration information - - signer contains the signer object created. It is instantiated from signer_callable, or - signer provided in args used, or instantiated in place - - client_kwargs contains the `client_kwargs` that was passed in as input parameter - - Examples - -------- - >>> signer_args = dict(signer_kwargs={"log_requests": True}) - >>> signer_generator = AuthFactory().signerGenerator(AuthType.INSTANCE_PRINCIPAL) - >>> signer_generator(signer_args).create_signer() - """ - configuration = ads.telemetry.update_oci_client_config(AuthState().oci_config) - signer_dict = { - "config": configuration, - "signer": oci.auth.signers.InstancePrincipalsSecurityTokenSigner( - **self.signer_kwargs - ), - "client_kwargs": self.client_kwargs, - } - logger.debug("Using 'instance_principal' authentication.") - return signer_dict - - -class SecurityToken(AuthSignerGenerator): - """ - Creates security token auth instance. This signer is intended to be used when signing requests for - a given user - it requires that user's private key and security token. - It prepares extra arguments necessary for creating clients for variety of OCI services. - """ - - SECURITY_TOKEN_GENERIC_HEADERS = ["date", "(request-target)", "host"] - SECURITY_TOKEN_BODY_HEADERS = ["content-length", "content-type", "x-content-sha256"] - SECURITY_TOKEN_REQUIRED = ["security_token_file", "key_file", "region"] - - def __init__(self, args: Optional[Dict] = None): - """ - Signer created based on args provided. If not provided current values of according arguments - will be used from current global state from AuthState class. - - Parameters - ---------- - args: dict - args that are required to create Security Token signer. Contains keys: oci_config, - oci_config_location, oci_key_profile, client_kwargs. - - - oci_config is a configuration dict that can be used to create clients - - oci_config_location - path to config file - - oci_key_profile - the profile to load from config file - - client_kwargs - optional parameters for OCI client creation in next steps - """ - self.oci_config = args.get("oci_config") - self.oci_config_location = args.get("oci_config_location") - self.oci_key_profile = args.get("oci_key_profile") - self.client_kwargs = args.get("client_kwargs") - - def create_signer(self) -> Dict: - """ - Creates security token configuration and signer with extra arguments necessary for creating clients. - Signer constructed from the `oci_config` provided. If not 'oci_config', configuration will be - constructed from 'oci_config_location' and 'oci_key_profile' in place. - - Returns - ------- - dict - Contains keys - config, signer and client_kwargs. - - - config contains the configuration information - - signer contains the signer object created. It is instantiated from signer_callable, or - signer provided in args used, or instantiated in place - - client_kwargs contains the `client_kwargs` that was passed in as input parameter - - Examples - -------- - >>> signer_args = dict( - ... client_kwargs=client_kwargs - ... ) - >>> signer_generator = AuthFactory().signerGenerator(AuthType.SECURITY_TOKEN) - >>> signer_generator(signer_args).create_signer() - """ - if self.oci_config: - configuration = ads.telemetry.update_oci_client_config(self.oci_config) - else: - configuration = ads.telemetry.update_oci_client_config( - oci.config.from_file(self.oci_config_location, self.oci_key_profile) - ) - - logger.debug("Using 'security_token' authentication.") - - for parameter in self.SECURITY_TOKEN_REQUIRED: - if parameter not in configuration: - raise ValueError( - f"Parameter `{parameter}` must be provided for using `security_token` authentication." - ) - - self._validate_and_refresh_token(configuration) - - return { - "config": configuration, - "signer": oci.auth.signers.SecurityTokenSigner( - token=self._read_security_token_file( - configuration.get("security_token_file") - ), - private_key=oci.signer.load_private_key_from_file( - configuration.get("key_file"), configuration.get("pass_phrase") - ), - generic_headers=configuration.get( - "generic_headers", self.SECURITY_TOKEN_GENERIC_HEADERS - ), - body_headers=configuration.get( - "body_headers", self.SECURITY_TOKEN_BODY_HEADERS - ), - ), - "client_kwargs": self.client_kwargs, - } - - def _validate_and_refresh_token(self, configuration: Dict[str, Any]): - """Validates and refreshes security token. - - Parameters - ---------- - configuration: Dict - Security token configuration. - """ - security_token = self._read_security_token_file( - configuration.get("security_token_file") - ) - security_token_container = ( - oci.auth.security_token_container.SecurityTokenContainer( - session_key_supplier=None, security_token=security_token - ) - ) - - if not security_token_container.valid(): - raise SecurityTokenError( - "Security token has expired. Call `oci session authenticate` to generate new session." - ) - - time_now = int(time.time()) - time_expired = security_token_container.get_jwt()["exp"] - if time_expired - time_now < SECURITY_TOKEN_LEFT_TIME: - if not self.oci_config_location: - logger.warning( - "Can not auto-refresh token. Specify parameter `oci_config_location` through ads.set_auth() or ads.auth.create_signer()." - ) - else: - result = os.system( - f"oci session refresh --config-file {self.oci_config_location} --profile {self.oci_key_profile}" - ) - if result == 1: - logger.warning( - "Some error happened during auto-refreshing the token. Continue using the current one that's expiring in less than {SECURITY_TOKEN_LEFT_TIME} seconds." - "Please follow steps in https://docs.oracle.com/en-us/iaas/Content/API/SDKDocs/clitoken.htm to renew token." - ) - - date_time = datetime.fromtimestamp(time_expired).strftime("%Y-%m-%d %H:%M:%S") - logger.debug(f"Session is valid until {date_time}.") - - def _read_security_token_file(self, security_token_file: str) -> str: - """Reads security token from file. - - Parameters - ---------- - security_token_file: str - The path to security token file. - - Returns - ------- - str: - Security token string. - """ - expanded_path = os.path.expanduser(security_token_file) - if not os.path.isfile(expanded_path): - raise ValueError("Invalid `security_token_file`. Specify a valid path.") - try: - token = None - with open(expanded_path) as f: - token = f.read() - return token - except: - raise - - -class AuthFactory: - """ - AuthFactory class which contains list of registered signers and allows to register new signers. - Check documentation for more signers: https://docs.oracle.com/en-us/iaas/tools/python/latest/api/signing.html. - - Current signers: - * APIKey - * ResourcePrincipal - * InstancePrincipal - * SecurityToken - """ - - classes = { - AuthType.API_KEY: APIKey, - AuthType.RESOURCE_PRINCIPAL: ResourcePrincipal, - AuthType.INSTANCE_PRINCIPAL: InstancePrincipal, - AuthType.SECURITY_TOKEN: SecurityToken, - } - - @classmethod - def register(cls, signer_type: str, signer: Any) -> None: - """Registers a new signer. - - Parameters - ---------- - signer_type: str - signer type to be registers - signer: RecordParser - A new signer class to be registered. - - Returns - ------- - None - Nothing. - """ - cls.classes[signer_type] = signer - - def signerGenerator(self, iam_type: Optional[str] = "api_key"): - """ - Generates signer classes based of iam_type, which specify one of auth methods: - 'api_key', 'resource_principal' or 'instance_principal'. - - Parameters - ---------- - iam_type: str, default 'api_key' - type of auth provided in IAM_TYPE environment variable or set in parameters in - ads.set_auth() method. - - Returns - ------- - :class:`APIKey` or :class:`ResourcePrincipal` or :class:`InstancePrincipal` or :class:`SecurityToken` - returns one of classes, which implements creation of signer of specified type - - Raises - ------ - ValueError - If iam_type is not supported. - """ - - valid_auth_keys = AuthFactory.classes.keys() - if iam_type in valid_auth_keys: - return AuthFactory.classes[iam_type] - else: - raise ValueError( - f"Allowed values are: {valid_auth_keys}. If you wish to use other authentication form, " - f"pass a valid signer or use signer_callable and signer_kwargs" - ) - - -class OCIAuthContext: - """ - OCIAuthContext used in 'with' statement for properly managing global authentication type - and global configuration profile parameters. - - Examples - -------- - >>> from ads.jobs import DataFlowRun - >>> with OCIAuthContext(profile='TEST'): - >>> df_run = DataFlowRun.from_ocid(run_id) - """ - - @deprecated( - "2.7.3", - details="Deprecated, use: from ads.common.auth import AuthContext", - ) - def __init__(self, profile: str = None): - """ - Initialize class OCIAuthContext and saves global state of authentication type and configuration profile. - - Parameters - ---------- - profile: str, default is None - profile name for api keys config file - """ - self.profile = profile - self.prev_mode = AuthState().oci_iam_type - self.prev_profile = AuthState().oci_key_profile - self.oci_cli_auth = AuthState().oci_cli_auth - - @deprecated( - "2.7.3", - details="Deprecated, use: from ads.common.auth import AuthContext", - ) - def __enter__(self): - """ - When called by the 'with' statement and if 'profile' provided - 'api_key' authentication with 'profile' used. - If 'profile' not provided, authentication method will be 'resource_principal'. - """ - if self.profile: - ads.set_auth(auth=AuthType.API_KEY, profile=self.profile) - logger.debug(f"OCI profile set to {self.profile}") - else: - ads.set_auth(auth=AuthType.RESOURCE_PRINCIPAL) - logger.debug("OCI auth set to resource principal") - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """ - When called by the 'with' statement restores initial state of authentication type and profile value. - """ - ads.set_auth(auth=self.prev_mode, profile=self.prev_profile) - - -class AuthContext: - """ - AuthContext used in 'with' statement for properly managing global authentication type, signer, config - and global configuration parameters. - - Examples - -------- - >>> from ads import set_auth - >>> from ads.jobs import DataFlowRun - >>> with AuthContext(auth='resource_principal'): - >>> df_run = DataFlowRun.from_ocid(run_id) - - >>> from ads.model.framework.sklearn_model import SklearnModel - >>> model = SklearnModel.from_model_artifact(uri="model_artifact_path", artifact_dir="model_artifact_path") - >>> set_auth(auth='api_key', oci_config_location="~/.oci/config") - >>> with AuthContext(auth='api_key', oci_config_location="~/another_config_location/config"): - >>> # upload model to Object Storage using config from another_config_location/config - >>> model.upload_artifact(uri="oci://bucket@namespace/prefix/") - >>> # upload model to Object Storage using config from ~/.oci/config, which was set before 'with AuthContext():' - >>> model.upload_artifact(uri="oci://bucket@namespace/prefix/") - """ - - def __init__(self, **kwargs): - """ - Initialize class AuthContext and saves global state of authentication type, signer, config - and global configuration parameters. - - Parameters - ---------- - **kwargs: optional, list of parameters passed to ads.set_auth() method, which can be: - auth: Optional[str], default 'api_key' - 'api_key', 'resource_principal' or 'instance_principal'. Enable/disable resource principal - identity, instance principal or keypair identity - oci_config_location: Optional[str], default oci.config.DEFAULT_LOCATION, which is '~/.oci/config' - config file location - profile: Optional[str], default is DEFAULT_PROFILE, which is 'DEFAULT' - profile name for api keys config file - config: Optional[Dict], default {} - created config dictionary - signer: Optional[Any], default None - created signer, can be resource principals signer, instance principal signer or other - signer_callable: Optional[Callable], default None - a callable object that returns signer - signer_kwargs: Optional[Dict], default None - parameters accepted by the signer - client_kwargs: Optional[Dict], default None - Additional keyword arguments for initializing the OCI client. - Example: client_kwargs = {"timeout": 60} - """ - self.kwargs = kwargs - - def __enter__(self): - """ - When called by the 'with' statement current state of authentication type, signer, config - and configuration parameters saved. - """ - self.previous_state = copy.deepcopy(AuthState()) - set_auth(**self.kwargs) - def __exit__(self, exc_type, exc_val, exc_tb): - """ - When called by the 'with' statement initial state of authentication type, signer, config - and configuration parameters restored. - """ - AuthState().__dict__.update(self.previous_state.__dict__) +__all__ = [ + "set_auth", + "default_signer", + "create_signer", + "api_keys", + "resource_principal", + "security_token", + "AuthType", + "AuthContext", + "register_user_agent_hook", + "SecurityToken", + "SecurityTokenError", + "AuthState", + "AuthFactory", +] diff --git a/pyproject.toml b/pyproject.toml index 9a2dcf6c9..a85fd5ba6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,7 @@ classifiers = [ # Copied from install_requires list in setup.py, setup.py got removed in favor of this config file dependencies = [ "PyYAML>=6", # pyyaml 5.4 is broken with cython 3 + "oracle_ads_auth", "asteval>=0.9.25", "cerberus>=1.3.4", "cloudpickle>=1.6.0", diff --git a/tests/unitary/default_setup/auth/test_auth.py b/tests/unitary/default_setup/auth/test_auth.py index 2c00b48c7..3139deaf0 100644 --- a/tests/unitary/default_setup/auth/test_auth.py +++ b/tests/unitary/default_setup/auth/test_auth.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (c) 2021, 2023 Oracle and/or its affiliates. +# Copyright (c) 2021, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import os @@ -27,11 +27,9 @@ security_token, create_signer, default_signer, - get_signer, AuthType, AuthState, AuthFactory, - OCIAuthContext, AuthContext, ) from ads.common.oci_logging import OCILog @@ -99,18 +97,6 @@ def test_api_keys_using_default_profile( api_keys("test_path") mock_config_from_file.assert_called_with("test_path", "DEFAULT") - @mock.patch("oci.config.validate_config") - @mock.patch("oci.auth.signers.get_resource_principals_signer") - @mock.patch("oci.config.from_file") - @mock.patch("oci.signer.Signer") - def test_get_signer_with_api_keys( - self, mock_signer, mock_config_from_file, mock_rp_signer, mock_validate_config - ): - get_signer("test_path", "TEST_PROFILE") - mock_config_from_file.assert_called_with("test_path", "TEST_PROFILE") - get_signer() - mock_rp_signer.assert_called_once() - @mock.patch("oci.auth.signers.get_resource_principals_signer") @mock.patch.dict(os.environ, {"OCI_RESOURCE_PRINCIPAL_VERSION": "2.2"}) def test_resource_principal(self, mock_rp_signer): @@ -497,28 +483,6 @@ def test_set_auth_multiple_times_with( mock_ip_signer.assert_called_with(**test_signer_kwargs) -class TestOCIAuthContext(TestCase): - def tearDown(self) -> None: - with mock.patch("os.path.exists"): - ads.set_auth(AuthType.API_KEY) - return super().tearDown() - - @mock.patch("os.path.exists") - def test_oci_auth_context(self, mock_path_exists): - profile = AuthState().oci_key_profile - mode = AuthState().oci_iam_type - with OCIAuthContext(profile="TEST"): - assert AuthState().oci_key_profile == "TEST" - assert AuthState().oci_iam_type == AuthType.API_KEY - assert AuthState().oci_key_profile == profile - assert AuthState().oci_iam_type == mode - - with OCIAuthContext(): - assert AuthState().oci_iam_type == AuthType.RESOURCE_PRINCIPAL - assert AuthState().oci_key_profile == profile - assert AuthState().oci_iam_type == mode - - class TestAuthContext: def tearDown(self) -> None: ads.set_auth(AuthType.API_KEY)