Source code for kubetest.utils

"""Utility functions for kubetest."""

import logging
import time
from typing import Dict, Mapping, Union

from kubernetes.client.rest import ApiException

from kubetest.condition import Condition

log = logging.getLogger("kubetest")


[docs]def new_namespace(test_name: str) -> str: """Create a new namespace for the given test name. Kubernetes namespace names follow a DNS-1123 label that consists of lower case alphanumeric characters or '-' and must start with an alphanumeric. The test name and current timestamp are formatted to comply to this spec and appended to the 'kubetest' prefix. Args: test_name: The name of the test case for the namespace. Returns: The namespace name. """ prefix = "kubetest" timestamp = str(int(time.time())) test_name = test_name.replace("_", "-").lower() test_name = test_name.replace("[", "-") test_name = test_name.replace("]", "-") # The length of a resource name in Kubernetes may not exceed 63 # characters. Check the length of all components (+2 for the dashes # joining the components). If the total length exceeds 63, truncate # the test name. name_len = len(prefix) + len(timestamp) + len(test_name) + 2 if name_len > 63: test_name = test_name[: -(name_len - 63)] return "-".join((prefix, test_name, timestamp))
[docs]def selector_string(selectors: Mapping[str, str]) -> str: """Create a selector string from the given dictionary of selectors. Args: selectors: The selectors to stringify. Returns: The selector string for the given dictionary. """ return ",".join([f"{k}={v}" for k, v in selectors.items()])
[docs]def selector_kwargs( fields: Mapping[str, str] = None, labels: Mapping[str, str] = None, ) -> Dict[str, str]: """Create a dictionary of kwargs for Kubernetes object selectors. Args: fields: A mapping of fields used to restrict the returned collection of Objects to only those which match these field selectors. By default, no restricting is done. labels: A mapping of labels used to restrict the returned collection of Objects to only those which match these label selectors. By default, no restricting is done. Returns: A dictionary that can be used as kwargs for many Kubernetes API calls for label and field selectors. """ kwargs = {} if fields is not None: kwargs["field_selector"] = selector_string(fields) if labels is not None: kwargs["label_selector"] = selector_string(labels) return kwargs
[docs]def wait_for_condition( condition: Condition, timeout: int = None, interval: Union[int, float] = 1, fail_on_api_error: bool = True, ) -> None: """Wait for a condition to be met. Args: condition: The Condition to wait for. timeout: The maximum time to wait, in seconds, for the condition to be met. If unspecified, this function will wait indefinitely. If specified and the timeout is met or exceeded, a TimeoutError will be raised. interval: The time, in seconds, to wait before re-checking the condition. fail_on_api_error: Fail the condition checks if a Kubernetes API error is incurred. An API error can be raised for a number of reasons, including a Pod being restarted and temporarily unavailable. Disabling this will cause those errors to be ignored, allowing the check to continue until timeout or resolution. (default: True). Raises: TimeoutError: The specified timeout was exceeded. """ log.info(f"waiting for condition: {condition}") # define the maximum time to wait. once this is met, we should # stop waiting. max_time = None if timeout is not None: max_time = time.time() + timeout # start the wait block start = time.time() while True: if max_time and time.time() >= max_time: raise TimeoutError( f"timed out ({timeout}s) while waiting for condition {condition}" ) # check if the condition is met and break out if it is try: if condition.check(): break except ApiException as e: log.warning(f"got api exception while waiting: {e}") if fail_on_api_error: raise # if the condition is not met, sleep for the interval # to re-check later time.sleep(interval) end = time.time() log.info(f"wait completed (total={end-start}s) {condition}")