Skip to content
Open
43 changes: 38 additions & 5 deletions ocp_resources/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
TIMEOUT_10SEC,
TIMEOUT_30SEC,
)
from ocp_resources.utils.kubeconfig import save_kubeconfig
from ocp_resources.utils.resource_constants import ResourceConstants
from ocp_resources.utils.schema_validator import SchemaValidator
from ocp_resources.utils.utils import skip_existing_resource_creation_teardown
Expand Down Expand Up @@ -192,6 +193,22 @@ def _exchange_code_for_token(
)


def _resolve_bearer_token(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If none of them passed the function do nothing, please raise error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_resolve_bearer_token is a local helper to avoid code duplication — it extracts the Bearer token extraction logic that would otherwise be duplicated between the host + token and host + username + password paths.

It intentionally returns None rather than raising because:

  • It's called for all kubeconfig_output_path flows, including config_dict and config_file where no token exists in client_configuration.api_key — and that's perfectly valid
  • A kubeconfig without a token is valid (e.g., cert-based auth)
  • The actual validation ("do we have enough data?") is handled by save_kubeconfig, which now raises ValueError when it has nothing to work with

So the "raise on insufficient data" feedback is addressed in save_kubeconfig where it belongs, not in this helper.

token: str | None,
client_configuration: "kubernetes.client.Configuration",
) -> str | None:
"""Extract bearer token from client configuration if not explicitly provided."""
if token:
return token

if client_configuration.api_key:
_bearer = client_configuration.api_key.get("authorization", "")
if _bearer.startswith("Bearer "):
return _bearer.removeprefix("Bearer ")

return None


def get_client(
config_file: str | None = None,
config_dict: dict[str, Any] | None = None,
Expand All @@ -206,7 +223,8 @@ def get_client(
verify_ssl: bool | None = None,
token: str | None = None,
fake: bool = False,
) -> DynamicClient | FakeDynamicClient:
generate_kubeconfig: bool = False,
) -> DynamicClient | FakeDynamicClient | tuple[DynamicClient, str]:
"""
Get a kubernetes client.

Expand All @@ -230,9 +248,11 @@ def get_client(
host (str): host for the cluster
verify_ssl (bool): whether to verify ssl
token (str): Use token to login
generate_kubeconfig (bool): if True, save the kubeconfig to a temporary file and log the path.

Returns:
DynamicClient: a kubernetes client.
If generate_kubeconfig is True, returns a tuple of (DynamicClient, str) where str is the path to the temporary kubeconfig file.
"""
if fake:
return FakeDynamicClient()
Expand Down Expand Up @@ -286,16 +306,29 @@ def get_client(
kubernetes.client.Configuration.set_default(default=client_configuration)

try:
return kubernetes.dynamic.DynamicClient(client=_client)
_dynamic_client = kubernetes.dynamic.DynamicClient(client=_client)
except MaxRetryError:
# Ref: https://github.com/kubernetes-client/python/blob/v26.1.0/kubernetes/base/config/incluster_config.py
LOGGER.info("Trying to get client via incluster_config")
return kubernetes.dynamic.DynamicClient(
_dynamic_client = kubernetes.dynamic.DynamicClient(
client=kubernetes.config.incluster_config.load_incluster_config(
client_configuration=client_configuration, try_refresh_token=try_refresh_token
),
)

if generate_kubeconfig:
_resolved_token = _resolve_bearer_token(token=token, client_configuration=client_configuration)
kubeconfig_path = save_kubeconfig(
host=host or client_configuration.host,
token=_resolved_token,
config_dict=config_dict,
verify_ssl=verify_ssl,
)

return _dynamic_client, kubeconfig_path

return _dynamic_client


def sub_resource_level(current_class: Any, owner_class: Any, parent_class: Any) -> str | None:
# return the name of the last class in MRO list that is not one of base
Expand Down Expand Up @@ -1452,9 +1485,9 @@ def get_all_cluster_resources(
)
client = get_client(config_file=config_file, config_dict=config_dict, context=context)

for _resource in client.resources.search():
for _resource in client.resources.search(): # type: ignore[union-attr]
try:
_resources = client.get(_resource, *args, **kwargs)
_resources = client.get(_resource, *args, **kwargs) # type: ignore[union-attr]
yield from _resources.items

except (NotFoundError, TypeError, MethodNotAllowedError):
Expand Down
70 changes: 70 additions & 0 deletions ocp_resources/utils/kubeconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import atexit
import os
import tempfile
from typing import Any

import yaml
from simple_logger.logger import get_logger

LOGGER = get_logger(name=__name__)


def save_kubeconfig(
host: str | None = None,
token: str | None = None,
config_dict: dict[str, Any] | None = None,
verify_ssl: bool | None = None,
) -> str:
"""
Save kubeconfig to a temporary file.

Builds a kubeconfig from the provided parameters and writes it to a
temporary file with 0o600 permissions.

Args:
host (str): cluster API server URL.
token (str): bearer token for authentication.
config_dict (dict): existing kubeconfig dict to save as-is.
verify_ssl (bool): if False, sets insecure-skip-tls-verify in the saved config.

Returns:
str: path to the temporary kubeconfig file.
"""
if config_dict is not None:
_config = config_dict
elif host:
cluster_config: dict[str, Any] = {"server": host}
if verify_ssl is False:
cluster_config["insecure-skip-tls-verify"] = True

user_config: dict[str, str] = {}
if token:
user_config["token"] = token

_config = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [{"name": "cluster", "cluster": cluster_config}],
"users": [{"name": "user", "user": user_config}],
"contexts": [{"name": "context", "context": {"cluster": "cluster", "user": "user"}}],
"current-context": "context",
}
else:
raise ValueError("Not enough data to build kubeconfig: provide config_dict or host")

tmp_file = None
try:
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".kubeconfig", mode="w")
os.chmod(tmp_file.name, 0o600)
yaml.safe_dump(_config, tmp_file)
tmp_file.close()
# Ensures the file is cleaned up when the process exits.
atexit.register(lambda p: os.unlink(p) if os.path.exists(p) else None, tmp_file.name)
LOGGER.info(f"kubeconfig saved to {tmp_file.name}")
return tmp_file.name

except (OSError, yaml.YAMLError):
if tmp_file is not None and os.path.exists(tmp_file.name):
os.unlink(tmp_file.name)
LOGGER.error("Failed to save kubeconfig")
raise
118 changes: 117 additions & 1 deletion tests/test_resource.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import os
from unittest.mock import patch

import kubernetes
import pytest
import yaml

from ocp_resources.exceptions import ResourceTeardownError
from ocp_resources.namespace import Namespace
from ocp_resources.pod import Pod
from ocp_resources.resource import NamespacedResourceList, Resource, ResourceList
from ocp_resources.resource import NamespacedResourceList, Resource, ResourceList, _resolve_bearer_token
from ocp_resources.secret import Secret
from ocp_resources.utils.kubeconfig import save_kubeconfig

BASE_NAMESPACE_NAME: str = "test-namespace"
BASE_POD_NAME: str = "test-pod"
Expand Down Expand Up @@ -200,3 +203,116 @@ def test_proxy_precedence(self, fake_client):

# Verify HTTPS_PROXY takes precedence over HTTP_PROXY
assert fake_client.configuration.proxy == https_proxy


class TestSaveKubeconfig:
def test_save_kubeconfig_with_host_and_token(self):
host = "https://api.test-cluster.example.com:6443"
token = "sha256~test-token-value" # noqa: S105

kubeconfig_path = save_kubeconfig(host=host, token=token, verify_ssl=False)
try:
assert os.path.exists(kubeconfig_path)
assert os.stat(kubeconfig_path).st_mode & 0o777 == 0o600

with open(kubeconfig_path) as f:
config = yaml.safe_load(f)

assert config["clusters"][0]["cluster"]["server"] == host
assert config["clusters"][0]["cluster"]["insecure-skip-tls-verify"] is True
assert config["users"][0]["user"]["token"] == token
assert config["current-context"] == "context"
finally:
os.unlink(kubeconfig_path)

def test_save_kubeconfig_with_config_dict(self):
config_dict = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [{"name": "my-cluster", "cluster": {"server": "https://my-server:6443"}}],
"users": [{"name": "my-user", "user": {"token": "my-token"}}],
"contexts": [{"name": "my-context", "context": {"cluster": "my-cluster", "user": "my-user"}}],
"current-context": "my-context",
}

kubeconfig_path = save_kubeconfig(config_dict=config_dict)
try:
with open(kubeconfig_path) as f:
saved_config = yaml.safe_load(f)

assert saved_config == config_dict
finally:
os.unlink(kubeconfig_path)

def test_save_kubeconfig_insufficient_data(self):
with pytest.raises(ValueError, match="Not enough data to build kubeconfig"):
save_kubeconfig()

def test_save_kubeconfig_file_permissions(self):
_test_token = "test-token" # noqa: S105

kubeconfig_path = save_kubeconfig(host="https://api.example.com:6443", token=_test_token)
try:
assert os.stat(kubeconfig_path).st_mode & 0o777 == 0o600
finally:
os.unlink(kubeconfig_path)

def test_save_kubeconfig_verify_ssl_not_false(self):
_test_token = "test-token" # noqa: S105

kubeconfig_path_true = save_kubeconfig(host="https://api.example.com:6443", token=_test_token, verify_ssl=True)
try:
with open(kubeconfig_path_true) as f:
config_true = yaml.safe_load(f)

assert "insecure-skip-tls-verify" not in config_true["clusters"][0]["cluster"]
finally:
os.unlink(kubeconfig_path_true)

kubeconfig_path_none = save_kubeconfig(host="https://api.example.com:6443", token=_test_token, verify_ssl=None)
try:
with open(kubeconfig_path_none) as f:
config_none = yaml.safe_load(f)
finally:
os.unlink(kubeconfig_path_none)

assert "insecure-skip-tls-verify" not in config_none["clusters"][0]["cluster"]

def test_resolve_bearer_token_from_api_key(self):
"""Test that _resolve_bearer_token extracts token from Bearer api_key."""
cfg = kubernetes.client.Configuration()
cfg.api_key = {"authorization": "Bearer sha256~oauth-resolved-token"} # noqa: S105
result = _resolve_bearer_token(token=None, client_configuration=cfg)
assert result == "sha256~oauth-resolved-token"

def test_resolve_bearer_token_explicit_takes_precedence(self):
"""Test that an explicit token takes precedence over Bearer in api_key."""
cfg = kubernetes.client.Configuration()
cfg.api_key = {"authorization": "Bearer sha256~oauth-token"} # noqa: S105
explicit_token = "explicit-token" # noqa: S105
result = _resolve_bearer_token(token=explicit_token, client_configuration=cfg)
assert result == "explicit-token"

def test_resolve_bearer_token_no_bearer_prefix(self):
"""Test that api_key without Bearer prefix does not resolve a token."""
cfg = kubernetes.client.Configuration()
cfg.api_key = {"authorization": "Basic some-basic-auth"}
result = _resolve_bearer_token(token=None, client_configuration=cfg)
assert result is None

def test_resolve_bearer_token_empty_api_key(self):
"""Test that empty api_key does not resolve a token."""
cfg = kubernetes.client.Configuration()
cfg.api_key = {}
result = _resolve_bearer_token(token=None, client_configuration=cfg)
assert result is None

def test_save_kubeconfig_write_failure(self):
_test_token = "test-token" # noqa: S105

with pytest.raises(OSError, match="Permission denied"):
with patch(
"ocp_resources.utils.kubeconfig.tempfile.NamedTemporaryFile",
side_effect=OSError("Permission denied"),
):
save_kubeconfig(host="https://api.example.com:6443", token=_test_token)