diff --git a/ocp_resources/resource.py b/ocp_resources/resource.py index bc16c2cf53..0c29c19a5e 100644 --- a/ocp_resources/resource.py +++ b/ocp_resources/resource.py @@ -59,6 +59,7 @@ TIMEOUT_10SEC, TIMEOUT_30SEC, ) +from ocp_resources.utils.kubeconfig import save_kubeconfig from ocp_resources.utils.resource_constants import ResourceConstants from ocp_resources.utils.schema_validator import SchemaValidator from ocp_resources.utils.utils import skip_existing_resource_creation_teardown @@ -192,6 +193,22 @@ def _exchange_code_for_token( ) +def _resolve_bearer_token( + token: str | None, + client_configuration: "kubernetes.client.Configuration", +) -> str | None: + """Extract bearer token from client configuration if not explicitly provided.""" + if token: + return token + + if client_configuration.api_key: + _bearer = client_configuration.api_key.get("authorization", "") + if _bearer.startswith("Bearer "): + return _bearer.removeprefix("Bearer ") + + return None + + def get_client( config_file: str | None = None, config_dict: dict[str, Any] | None = None, @@ -206,7 +223,8 @@ def get_client( verify_ssl: bool | None = None, token: str | None = None, fake: bool = False, -) -> DynamicClient | FakeDynamicClient: + generate_kubeconfig: bool = False, +) -> DynamicClient | FakeDynamicClient | tuple[DynamicClient, str]: """ Get a kubernetes client. @@ -230,9 +248,11 @@ def get_client( host (str): host for the cluster verify_ssl (bool): whether to verify ssl token (str): Use token to login + generate_kubeconfig (bool): if True, save the kubeconfig to a temporary file and log the path. Returns: DynamicClient: a kubernetes client. + If generate_kubeconfig is True, returns a tuple of (DynamicClient, str) where str is the path to the temporary kubeconfig file. """ if fake: return FakeDynamicClient() @@ -286,16 +306,29 @@ def get_client( kubernetes.client.Configuration.set_default(default=client_configuration) try: - return kubernetes.dynamic.DynamicClient(client=_client) + _dynamic_client = kubernetes.dynamic.DynamicClient(client=_client) except MaxRetryError: # Ref: https://github.com/kubernetes-client/python/blob/v26.1.0/kubernetes/base/config/incluster_config.py LOGGER.info("Trying to get client via incluster_config") - return kubernetes.dynamic.DynamicClient( + _dynamic_client = kubernetes.dynamic.DynamicClient( client=kubernetes.config.incluster_config.load_incluster_config( client_configuration=client_configuration, try_refresh_token=try_refresh_token ), ) + if generate_kubeconfig: + _resolved_token = _resolve_bearer_token(token=token, client_configuration=client_configuration) + kubeconfig_path = save_kubeconfig( + host=host or client_configuration.host, + token=_resolved_token, + config_dict=config_dict, + verify_ssl=verify_ssl, + ) + + return _dynamic_client, kubeconfig_path + + return _dynamic_client + def sub_resource_level(current_class: Any, owner_class: Any, parent_class: Any) -> str | None: # return the name of the last class in MRO list that is not one of base @@ -1452,9 +1485,9 @@ def get_all_cluster_resources( ) client = get_client(config_file=config_file, config_dict=config_dict, context=context) - for _resource in client.resources.search(): + for _resource in client.resources.search(): # type: ignore[union-attr] try: - _resources = client.get(_resource, *args, **kwargs) + _resources = client.get(_resource, *args, **kwargs) # type: ignore[union-attr] yield from _resources.items except (NotFoundError, TypeError, MethodNotAllowedError): diff --git a/ocp_resources/utils/kubeconfig.py b/ocp_resources/utils/kubeconfig.py new file mode 100644 index 0000000000..6184acefe2 --- /dev/null +++ b/ocp_resources/utils/kubeconfig.py @@ -0,0 +1,70 @@ +import atexit +import os +import tempfile +from typing import Any + +import yaml +from simple_logger.logger import get_logger + +LOGGER = get_logger(name=__name__) + + +def save_kubeconfig( + host: str | None = None, + token: str | None = None, + config_dict: dict[str, Any] | None = None, + verify_ssl: bool | None = None, +) -> str: + """ + Save kubeconfig to a temporary file. + + Builds a kubeconfig from the provided parameters and writes it to a + temporary file with 0o600 permissions. + + Args: + host (str): cluster API server URL. + token (str): bearer token for authentication. + config_dict (dict): existing kubeconfig dict to save as-is. + verify_ssl (bool): if False, sets insecure-skip-tls-verify in the saved config. + + Returns: + str: path to the temporary kubeconfig file. + """ + if config_dict is not None: + _config = config_dict + elif host: + cluster_config: dict[str, Any] = {"server": host} + if verify_ssl is False: + cluster_config["insecure-skip-tls-verify"] = True + + user_config: dict[str, str] = {} + if token: + user_config["token"] = token + + _config = { + "apiVersion": "v1", + "kind": "Config", + "clusters": [{"name": "cluster", "cluster": cluster_config}], + "users": [{"name": "user", "user": user_config}], + "contexts": [{"name": "context", "context": {"cluster": "cluster", "user": "user"}}], + "current-context": "context", + } + else: + raise ValueError("Not enough data to build kubeconfig: provide config_dict or host") + + tmp_file = None + try: + tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".kubeconfig", mode="w") + os.chmod(tmp_file.name, 0o600) + yaml.safe_dump(_config, tmp_file) + tmp_file.close() + # Ensures the file is cleaned up when the process exits. + atexit.register(lambda p: os.unlink(p) if os.path.exists(p) else None, tmp_file.name) + LOGGER.info(f"kubeconfig saved to {tmp_file.name}") + return tmp_file.name + + except (OSError, yaml.YAMLError): + if tmp_file is not None and os.path.exists(tmp_file.name): + os.unlink(tmp_file.name) + LOGGER.error("Failed to save kubeconfig") + raise diff --git a/tests/test_resource.py b/tests/test_resource.py index d5db38afcd..33a3c857ff 100644 --- a/tests/test_resource.py +++ b/tests/test_resource.py @@ -1,13 +1,16 @@ import os from unittest.mock import patch +import kubernetes import pytest +import yaml from ocp_resources.exceptions import ResourceTeardownError from ocp_resources.namespace import Namespace from ocp_resources.pod import Pod -from ocp_resources.resource import NamespacedResourceList, Resource, ResourceList +from ocp_resources.resource import NamespacedResourceList, Resource, ResourceList, _resolve_bearer_token from ocp_resources.secret import Secret +from ocp_resources.utils.kubeconfig import save_kubeconfig BASE_NAMESPACE_NAME: str = "test-namespace" BASE_POD_NAME: str = "test-pod" @@ -200,3 +203,116 @@ def test_proxy_precedence(self, fake_client): # Verify HTTPS_PROXY takes precedence over HTTP_PROXY assert fake_client.configuration.proxy == https_proxy + + +class TestSaveKubeconfig: + def test_save_kubeconfig_with_host_and_token(self): + host = "https://api.test-cluster.example.com:6443" + token = "sha256~test-token-value" # noqa: S105 + + kubeconfig_path = save_kubeconfig(host=host, token=token, verify_ssl=False) + try: + assert os.path.exists(kubeconfig_path) + assert os.stat(kubeconfig_path).st_mode & 0o777 == 0o600 + + with open(kubeconfig_path) as f: + config = yaml.safe_load(f) + + assert config["clusters"][0]["cluster"]["server"] == host + assert config["clusters"][0]["cluster"]["insecure-skip-tls-verify"] is True + assert config["users"][0]["user"]["token"] == token + assert config["current-context"] == "context" + finally: + os.unlink(kubeconfig_path) + + def test_save_kubeconfig_with_config_dict(self): + config_dict = { + "apiVersion": "v1", + "kind": "Config", + "clusters": [{"name": "my-cluster", "cluster": {"server": "https://my-server:6443"}}], + "users": [{"name": "my-user", "user": {"token": "my-token"}}], + "contexts": [{"name": "my-context", "context": {"cluster": "my-cluster", "user": "my-user"}}], + "current-context": "my-context", + } + + kubeconfig_path = save_kubeconfig(config_dict=config_dict) + try: + with open(kubeconfig_path) as f: + saved_config = yaml.safe_load(f) + + assert saved_config == config_dict + finally: + os.unlink(kubeconfig_path) + + def test_save_kubeconfig_insufficient_data(self): + with pytest.raises(ValueError, match="Not enough data to build kubeconfig"): + save_kubeconfig() + + def test_save_kubeconfig_file_permissions(self): + _test_token = "test-token" # noqa: S105 + + kubeconfig_path = save_kubeconfig(host="https://api.example.com:6443", token=_test_token) + try: + assert os.stat(kubeconfig_path).st_mode & 0o777 == 0o600 + finally: + os.unlink(kubeconfig_path) + + def test_save_kubeconfig_verify_ssl_not_false(self): + _test_token = "test-token" # noqa: S105 + + kubeconfig_path_true = save_kubeconfig(host="https://api.example.com:6443", token=_test_token, verify_ssl=True) + try: + with open(kubeconfig_path_true) as f: + config_true = yaml.safe_load(f) + + assert "insecure-skip-tls-verify" not in config_true["clusters"][0]["cluster"] + finally: + os.unlink(kubeconfig_path_true) + + kubeconfig_path_none = save_kubeconfig(host="https://api.example.com:6443", token=_test_token, verify_ssl=None) + try: + with open(kubeconfig_path_none) as f: + config_none = yaml.safe_load(f) + finally: + os.unlink(kubeconfig_path_none) + + assert "insecure-skip-tls-verify" not in config_none["clusters"][0]["cluster"] + + def test_resolve_bearer_token_from_api_key(self): + """Test that _resolve_bearer_token extracts token from Bearer api_key.""" + cfg = kubernetes.client.Configuration() + cfg.api_key = {"authorization": "Bearer sha256~oauth-resolved-token"} # noqa: S105 + result = _resolve_bearer_token(token=None, client_configuration=cfg) + assert result == "sha256~oauth-resolved-token" + + def test_resolve_bearer_token_explicit_takes_precedence(self): + """Test that an explicit token takes precedence over Bearer in api_key.""" + cfg = kubernetes.client.Configuration() + cfg.api_key = {"authorization": "Bearer sha256~oauth-token"} # noqa: S105 + explicit_token = "explicit-token" # noqa: S105 + result = _resolve_bearer_token(token=explicit_token, client_configuration=cfg) + assert result == "explicit-token" + + def test_resolve_bearer_token_no_bearer_prefix(self): + """Test that api_key without Bearer prefix does not resolve a token.""" + cfg = kubernetes.client.Configuration() + cfg.api_key = {"authorization": "Basic some-basic-auth"} + result = _resolve_bearer_token(token=None, client_configuration=cfg) + assert result is None + + def test_resolve_bearer_token_empty_api_key(self): + """Test that empty api_key does not resolve a token.""" + cfg = kubernetes.client.Configuration() + cfg.api_key = {} + result = _resolve_bearer_token(token=None, client_configuration=cfg) + assert result is None + + def test_save_kubeconfig_write_failure(self): + _test_token = "test-token" # noqa: S105 + + with pytest.raises(OSError, match="Permission denied"): + with patch( + "ocp_resources.utils.kubeconfig.tempfile.NamedTemporaryFile", + side_effect=OSError("Permission denied"), + ): + save_kubeconfig(host="https://api.example.com:6443", token=_test_token)