Source code for kubetest.manager

"""Kubetest manager for test client instances and namespace management."""

import logging
from typing import Generator, List, Union

import kubernetes

from kubetest import client, objects, utils

log = logging.getLogger("kubetest")


[docs]class ObjectManager: """ObjectManager is a convenience class used to manage Kubernetes API objects that are registered with a test case. The core usage of the ObjectManager is to sort each of the registered objects into different buckets by type. An "apply order" is also defined here so we can get the bucketed objects in the order that they should be applied onto the cluster. This manager will only be used for API objects loaded from manifests that are specified by the `pytest.mark.applymanifests` marker on a test case by default. """ # The ApiObject buckets in the order that that should be # applied when creating them on the cluster. ordered_buckets = [ "namespace", "rolebinding", "clusterrolebinding", "secret", "networkpolicy", "service", "configmap", "persistentvolumeclaim", "ingress", "daemonset", "statefulset", "replicaset", "deployment", "pod", ] def __init__(self): # Add attributes with the bucket names to the instance. # By default they will all be empty lists. While this could # be done in a more readable way by explicitly setting the # buckets as members, e.g. self.pod = [], but by tying the # creation to the ordered_buckets list (which the other # instance methods use), adding and removing buckets means # only adding to the list, not updating in numerous locations. for bucket in self.ordered_buckets: self.__setattr__(bucket, [])
[docs] def add(self, *args: objects.ApiObject) -> None: """Add API objects to the object manager. This method will take in any number of ApiObjects and sort them into the correct buckets. It will not check for duplicates. If a non-ApiObject is passed in, an error will be raised. Args: args: Any subclass of the kubetest ApiObject wrapping a Kubernetes API object. Raises: ValueError: One or more arguments passed to the function are not ApiObject subclasses. """ for arg in args: if not isinstance(arg, objects.ApiObject): raise ValueError( "Only ApiObject instances can be added to the ObjectManager, " f"but was given: {arg}" ) # Get the type name of the ApiObject wrapper and lower case it, # e.g. ClusterRoleBinding -> clusterrolebinding. This will be # used to compare to the buckets. name = type(arg).__name__.lower() # Check if we have a bucket for the name, if so, add it to the # bucket. If not, raise an error. if name in self.ordered_buckets: self.__getattribute__(name).append(arg) else: raise ValueError(f"Unable to determine bucket for ApiObject: {arg}")
[docs] def get_objects_in_apply_order(self) -> Generator[objects.ApiObject, None, None]: """Get all of the managed objects in the order that they should be applied onto the cluster. Within the buckets themselves, API objects are not sorted. This function only yields the buckets in the correct order. Each of the buckets corresponds to an ApiObject wrapper that is supported by kubetest. As more ApiObject wrappers are added, the buckets here should be updated to reflect that. The bucket order in which objects are yielded are: - Namespace - RoleBinding - ClusterRoleBinding - Secret - NetworkPolicy - Service - ConfigMap - PersistentVolumeClaim - Ingress - DaemonSet - StatefulSet - ReplicaSet - Deployment - Pod Yields: The kubetest ApiObject wrapper to be created on the cluster. """ for bucket in self.ordered_buckets: for obj in self.__getattribute__(bucket): yield obj
[docs]class TestMeta: """TestMeta holds information associated with a single test node. Args: name: The name of the test. node_id: The id of the test node. namespace_create: Option to create a new namespace. This can be toggled off in the event that a user is using an existing namespace on their test cluster. namespace_name: The name of the namespace to use. If the namespace is to be autogenerated, this can be left as None. """ def __init__( self, name: str, node_id: str, namespace_create: bool = True, namespace_name: str = None, ) -> None: self.name = name self.node_id = node_id if namespace_name is None: self.ns = utils.new_namespace(name) else: self.ns = namespace_name # Flag to designate whether pytest setup (not TestMeta.setup) failed. # If pytest setup failed for the case, we can skip teardown, as there # will be nothing on the cluster itself. self._pt_setup_failed = False # lazy load self._client = None self._namespace = None self.namespace_create = namespace_create self.rolebindings = [] self.clusterrolebindings = [] self.test_objects = ObjectManager() @property def client(self) -> client.TestClient: """Get the TestClient for the test case.""" if self._client is None: self._client = client.TestClient(self.ns) return self._client @property def namespace(self) -> objects.Namespace: """Get the Namespace API Object associated with the test case.""" if self._namespace is None: self._namespace = objects.Namespace.new(self.ns) return self._namespace
[docs] def setup(self) -> None: """Setup the cluster state for the test case. This performs all actions needed in order for the test client to be ready to use by a test case. """ # create the test case namespace if self.namespace_create: self.namespace.create() # if there are any role bindings, create them. for rb in self.rolebindings: self.client.create(rb) # if there are any cluster role bindings, create them. for crb in self.clusterrolebindings: self.client.create(crb) # if any objects were registered with the test case via the # `applymanifests` marker, register them to the test client # and add them to the cluster now for obj in self.test_objects.get_objects_in_apply_order(): self.client.create(obj) self.client.wait_until_created(obj, timeout=10, interval=0.5) self.client.pre_registered.append(obj)
[docs] def teardown(self) -> None: """Clean up the cluster state for the given test case. This performs all actions needed in order to clean up the state that was previously set up for the test client in `setup`. """ # Only perform teardown if pytest setup was successful and there is a # possibility of things existing on the cluster. if self._pt_setup_failed: log.info( f"pytest setup failed for {self.name}: not running test case teardown" ) return # Delete the test case namespace if we've created it. # This will also delete anything in the namespace, which includes RoleBindings. if self._namespace and self.namespace_create: self.namespace.delete() # ClusterRoleBindings are not bound to a namespace, so we will need # to delete them ourselves. for crb in self.clusterrolebindings: self.client.delete(crb)
[docs] def yield_container_logs( self, tail_lines: int = None ) -> Generator[str, None, None]: """Yield the container logs for the test case. These logs will be printed out if the test was in error to provide more context and make it easier to debug the issue. Args: tail_lines: The number of container log lines to print. Yields: str: Logs for the running containers on the cluster. """ try: # prior to tearing down the namespace and cleaning up all of the # objects in the namespace, get the logs for the containers in the # namespace. pods_list = kubernetes.client.CoreV1Api().list_namespaced_pod( namespace=self.ns ) except Exception as e: log.warning( f'Unable to get pods for namespace "{self.ns}" to cache logs ({e})', ) return log_kwargs = {} if tail_lines is not None and tail_lines > 0: log_kwargs["tail_lines"] = tail_lines for pod in pods_list.items: for container in pod.spec.containers: pod_name = pod.metadata.name pod_ns = pod.metadata.namespace container_name = container.name try: logs = kubernetes.client.CoreV1Api().read_namespaced_pod_log( name=pod_name, namespace=pod_ns, container=container_name, **log_kwargs, ) except Exception as e: log.warning( f"Unable to cache logs for {pod_name}::{container_name} ({e})", ) continue if logs != "": _id = f"=== {self.node_id} -> {pod_name}::{container_name} ===" border = "=" * len(_id) yield "\n".join([border, _id, border, logs, "\n"]) return
[docs] def register_rolebindings(self, *rolebindings: objects.RoleBinding) -> None: """Register a RoleBinding requirement with the test case. Args: rolebindings: The RoleBindings that are needed for the test case. """ self.rolebindings.extend(rolebindings)
[docs] def register_clusterrolebindings( self, *clusterrolebindings: objects.ClusterRoleBinding, ) -> None: """Register a ClusterRoleBinding requirement with the test case. Args: clusterrolebindings: The ClusterRoleBindings that are needed for the test case. """ self.clusterrolebindings.extend(clusterrolebindings)
[docs] def register_objects(self, api_objects: List[objects.ApiObject]): """Register the provided objects with the test case. These objects will be registered to the test client and applied to the namespace on test setup. Args: api_objects: The wrapped Kubernetes API objects to create on the cluster. """ self.test_objects.add(*api_objects)
[docs]class KubetestManager: """The manager for kubetest state. The KubetestManager is in charge of providing test clients for the tests that request them and mediating the namespace management corresponding to those test clients. """ def __init__(self): # A dictionary mapping test node IDs to their corresponding TestMeta # object. Each test node will have a TestMeta created when the client # is created for the test node. self.nodes = {}
[docs] def new_test( self, node_id: str, test_name: str, namespace_create: bool = True, namespace_name: str = None, ) -> TestMeta: """Create a new TestMeta for a test case. This will be called by the test setup hook in order to create a new TestMeta for the manager to keep track of. Args: node_id: The id of the test node. test_name: The name of the test. namespace_create: Option to create a new namespace. This can be toggled off in the event that a user is using an existing namespace on their test cluster. namespace_name: The name of the namespace to use. If the namespace is to be autogenerated, this can be left as None. Returns: The newly created TestMeta for the test case. """ log.info(f"creating test meta for {node_id}") meta = TestMeta( node_id=node_id, name=test_name, namespace_create=namespace_create, namespace_name=namespace_name, ) self.nodes[node_id] = meta return meta
[docs] def get_test(self, node_id: str) -> Union[TestMeta, None]: """Get the test metadata for the specified test node. Args: node_id: The id of the test node. Returns: The test metadata for the given node. If no test metadata is found for the given node, None is returned. """ return self.nodes.get(node_id)
[docs] def teardown(self, node_id: str) -> None: """Tear down the test case. This is effectively a wrapper around the `teardown` method of the test client. It will also remove the test client from the manager. Test client teardown will delete the test client's namespace from the cluster. Deleting a namespace will delete all the things in the namespace (e.g. API objects bound to the namespace). Args: node_id: The id of the test node. """ test_case = self.nodes.get(node_id) if test_case is not None: test_case.teardown() del self.nodes[node_id]