"""The test client for managing Kubernetes resources within test cases.
An instance of the ``TestClient`` defined in this module is automatically
created for each test case that uses the ``kube`` fixture. The ``kube``
fixture provides the ``TestClient`` instance to the test case.
"""
import logging
from typing import Dict, Optional, Union
from kubernetes import client
from kubetest import objects, utils
from kubetest.condition import Condition, Policy, check_and_sort
log = logging.getLogger("kubetest")
[docs]class TestClient:
"""Test client for managing Kubernetes resources for a test case.
The ``namespace`` for the TestClient will be automatically generated and
provided to the TestClient during the test setup process.
Args:
namespace: The namespace associated with the test client. Each test
case will have its own namespace assigned.
"""
def __init__(self, namespace: str) -> None:
self.namespace = namespace
self.pre_registered = []
# ****** Generic Helpers on ApiObjects ******
[docs] def create(self, obj: objects.ApiObject) -> None:
"""Create the provided ApiObject on the Kubernetes cluster.
If the object does not already have a namespace assigned to it, the client's
generated test case namespace will be used.
Args:
obj: A kubetest API Object wrapper.
"""
if obj.namespace is None:
obj.namespace = self.namespace
obj.create()
[docs] def delete(
self, obj: objects.ApiObject, options: client.V1DeleteOptions = None
) -> None:
"""Delete the provided ApiObject from the Kubernetes cluster.
If the object does not already have a namespace assigned to it, the client's
generated test case namespace will be used.
Args:
obj: A kubetest API Object wrapper.
options: Additional options for deleting the resource from the cluster.
"""
if obj.namespace is None:
obj.namespace = self.namespace
obj.delete(options=options)
[docs] @staticmethod
def refresh(obj: objects.ApiObject) -> None:
"""Refresh the underlying Kubernetes resource status and state.
Args:
obj: A kubetest API Object wrapper.
"""
obj.refresh()
# ****** Manifest Loaders ******
[docs] @staticmethod
def load_clusterrolebinding(
path: str,
name: Optional[str] = None,
) -> objects.ClusterRoleBinding:
"""Load a manifest YAML into a ClusterRoleBinding object.
Args:
path: The path to the ClusterRoleBinding manifest.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The ClusterRoleBinding for the specified manifest.
"""
log.info(f"loading clusterrolebinding from path: {path}")
return objects.ClusterRoleBinding.load(path, name=name)
[docs] def load_configmap(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.ConfigMap:
"""Load a manifest YAML into a ConfigMap object.
By default, this will augment the ConfigMap object with the generated
test case namespace. This behavior can be disabled with the
``set_namespace`` flag.
Args:
path: The path to the ConfigMap manifest.
set_namespace: Enable/disable the automatic augmentation of the
ConfigMap namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The ConfigMap for the specified manifest.
"""
log.info(f"loading configmap from path: {path}")
configmap = objects.ConfigMap.load(path, name=name)
if set_namespace:
configmap.namespace = self.namespace
return configmap
[docs] def load_daemonset(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.DaemonSet:
"""Load a manifest YAML into a DaemonSet object.
By default, this will augment the DaemonSet object with the generated test
case namespace. This behavior can be disabled with the ``set_namespace`` flag.
Args:
path: The path to the DaemonSet manifest.
set_namespace: Enable/disable the automatic augmentation of the
DaemonSet namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The DaemonSet for the specified manifest.
"""
log.info(f"loading daemonset from path: {path}")
daemonset = objects.DaemonSet.load(path, name=name)
if set_namespace:
daemonset.namespace = self.namespace
return daemonset
[docs] def load_deployment(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.Deployment:
"""Load a manifest YAML into a Deployment object.
By default, this will augment the Deployment object with the generated
test case namespace. This behavior can be disabled with the
``set_namespace`` flag.
Args:
path: The path to the Deployment manifest.
set_namespace: Enable/disable the automatic augmentation of the
Deployment namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The Deployment for the specified manifest.
"""
log.info(f"loading deployment from path: {path}")
deployment = objects.Deployment.load(path, name=name)
if set_namespace:
deployment.namespace = self.namespace
return deployment
[docs] def load_pod(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.Pod:
"""Load a manifest YAML into a Pod object.
By default, this will augment the Pod object with the generated test case
namespace. This behavior can be disabled with the ``set_namespace`` flag.
Args:
path: The path to the Pod manifest.
set_namespace: Enable/disable the automatic augmentation of the
Pod namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The Pod for the specified manifest.
"""
log.info(f"loading pod from path: {path}")
pod = objects.Pod.load(path, name=name)
if set_namespace:
pod.namespace = self.namespace
return pod
[docs] def load_rolebinding(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.RoleBinding:
"""Load a manifest YAML into a RoleBinding object.
By default, this will augment the RoleBinding object with the generated
test case namespace. This behavior can be disabled with the
``set_namespace`` flag.
Args:
path: The path to the RoleBinding manifest.
set_namespace: Enable/disable the automatic augmentation of the
RoleBinding namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The RoleBinding for the specified manifest.
"""
log.info(f"loading rolebinding from path: {path}")
rolebinding = objects.RoleBinding.load(path, name=name)
if set_namespace:
rolebinding.namespace = self.namespace
return rolebinding
[docs] def load_secret(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.Secret:
"""Load a manifest YAML into a Secret object.
By default, this will augment the Secret object with the generated
test case namespace. This behavior can be disabled with the
``set_namespace`` flag.
Args:
path: The path to the Secret manifest.
set_namespace: Enable/disable the automatic augmentation of the
Secret namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The Secret for the specified manifest.
"""
log.info(f"loading secret from path: {path}")
secret = objects.Secret.load(path, name=name)
if set_namespace:
secret.namespace = self.namespace
return secret
[docs] def load_service(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.Service:
"""Load a manifest YAML into a Service object.
By default, this will augment the Service object with the generated
test case namespace. This behavior can be disabled with the
``set_namespace`` flag.
Args:
path: The path to the Service manifest.
set_namespace: Enable/disable the automatic augmentation of the
Service namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The Service for the specified manifest.
"""
log.info(f"loading service from path: {path}")
service = objects.Service.load(path, name=name)
if set_namespace:
service.namespace = self.namespace
return service
[docs] def load_persistentvolumeclaim(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.PersistentVolumeClaim:
"""Load a manifest YAML into a PersistentVolumeClaim object.
By default, this will augment the PersistentVolumeClaim object with
the generated test case namespace. This behavior can be
disabled with the ``set_namespace`` flag.
Args:
path (str): The path to the PersistentVolumeClaim manifest.
set_namespace (bool): Enable/disable the automatic
augmentation of the PersistentVolumeClaim namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
objects.PersistentVolumeClaim: The PersistentVolumeClaim for the specified
manifest.
"""
log.info("loading persistentvolumeclaim from path: %s", path)
persistentvolumeclaim = objects.PersistentVolumeClaim.load(path, name=name)
if set_namespace:
persistentvolumeclaim.namespace = self.namespace
return persistentvolumeclaim
[docs] def load_ingress(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.Ingress:
"""Load a manifest YAML into a Ingress object.
By default, this will augment the Ingress object with
the generated test case namespace. This behavior can be
disabled with the ``set_namespace`` flag.
Args:
path (str): The path to the Ingress manifest.
set_namespace (bool): Enable/disable the automatic
augmentation of the Ingress namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
objects.Ingress: The ingress for the specified manifest.
"""
log.info("loading ingress from path: %s", path)
ingress = objects.Ingress.load(path, name=name)
if set_namespace:
ingress.namespace = self.namespace
return ingress
[docs] def load_replicaset(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.ReplicaSet:
"""Load a manifest YAML into a ReplicaSet object.
By default, this will augment the ReplicaSet object with the generated
test case namespace. This behavior can be disabled with the
``set_namespace`` flag.
Args:
path: The path to the ReplicaSet manifest.
set_namespace: Enable/disable the automatic augmentation of the
ReplicaSet namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The ReplicaSet for the specified manifest.
"""
log.info(f"loading replicaset from path: {path}")
replicaset = objects.ReplicaSet.load(path, name=name)
if set_namespace:
replicaset.namespace = self.namespace
return replicaset
[docs] def load_statefulset(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.StatefulSet:
"""Load a manifest YAML into a StatefulSet object.
By default, this will augment the StatefulSet object with the generated
test case namespace. This behavior can be disabled with the
``set_namespace`` flag.
Args:
path: The path to the StatefulSet manifest.
set_namespace: Enable/disable the automatic augmentation of the
StatefulSet namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The StatefulSet for the specified manifest.
"""
log.info(f"loading statefulset from path: {path}")
statefulset = objects.StatefulSet.load(path, name=name)
if set_namespace:
statefulset.namespace = self.namespace
return statefulset
[docs] def load_serviceaccount(
self,
path: str,
set_namespace: bool = True,
name: Optional[str] = None,
) -> objects.ServiceAccount:
"""Load a manifest YAML into a ServiceAccount object.
By default, this will augment the ServiceAccount object with the generated
test case namespace. This behavior can be disabled with the
``set_namespace`` flag.
Args:
path: The path to the ServiceAccount manifest.
set_namespace: Enable/disable the automatic augmentation of the
ServiceAccount namespace.
name: The name of the resource to load. If the manifest file
contains a single object definition for the type being
loaded, it is not necessary to specify the name. If the
manifest has multiple definitions containing the same
type, a name is required to differentiate between them.
If no name is specified in such case, an error is raised.
Returns:
The ServiceAccount for the specified manifest.
"""
log.info(f"loading serviceaccount from path: {path}")
serviceaccount = objects.ServiceAccount.load(path, name=name)
if set_namespace:
serviceaccount.namespace = self.namespace
return serviceaccount
# ****** General Helpers ******
[docs] def get_configmaps(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.ConfigMap]:
"""Get ConfigMaps from the cluster.
Args:
namespace: The namespace to get the ConfigMaps from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of ConfigMaps to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of ConfigMaps to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the ConfigMap name and the value is the
ConfigMap itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.ConfigMap.preferred_client().list_namespaced_config_map(
namespace=namespace,
**selectors,
)
configmaps = {}
for obj in results.items:
cm = objects.ConfigMap(obj)
configmaps[cm.name] = cm
return configmaps
[docs] def get_daemonsets(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.DaemonSet]:
"""Get DaemonSets from the cluster.
Args:
namespace: The namespace to get the DaemonSets from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of DaemonSets to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of DaemonSets to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the DaemonSet name and the value is the
DaemonSet itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.DaemonSet.preferred_client().list_namespaced_daemon_set(
namespace=namespace,
**selectors,
)
daemonsets = {}
for obj in results.items:
daemonset = objects.DaemonSet(obj)
daemonsets[daemonset.name] = daemonset
return daemonsets
[docs] def get_deployments(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.Deployment]:
"""Get Deployments from the cluster.
Args:
namespace: The namespace to get the Deployments from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of Deployments to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Deployments to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the Deployment name and the value is the
Deployment itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.Deployment.preferred_client().list_namespaced_deployment(
namespace=namespace,
**selectors,
)
deployments = {}
for obj in results.items:
deployment = objects.Deployment(obj)
deployments[deployment.name] = deployment
return deployments
[docs] def get_endpoints(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.Endpoints]:
"""Get Endpoints from the cluster.
Args:
namespace: The namespace to get the Endpoints from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of Endpoints to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Endpoints to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the Endpoint name and the value is the
Endpoint itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.Endpoints.preferred_client().list_namespaced_endpoints(
namespace=namespace,
**selectors,
)
endpoints = {}
for obj in results.items:
endpoint = objects.Endpoints(obj)
endpoints[endpoint.name] = endpoint
return endpoints
[docs] def get_events(
self,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
all_namespaces: bool = False,
) -> Dict[str, objects.Event]:
"""Get the latest Events that occurred in the cluster.
Args:
fields: A dictionary of fields used to restrict the returned collection
of Events to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Events to only those which match these label selectors. By
default, no restricting is done.
all_namespaces: If True, get the events across all namespaces.
Returns:
A dictionary where the key is the Event name and the value is the
Event itself.
"""
selectors = utils.selector_kwargs(fields, labels)
if all_namespaces:
results = client.CoreV1Api().list_event_for_all_namespaces(**selectors)
else:
results = client.CoreV1Api().list_namespaced_event(
namespace=self.namespace, **selectors
)
events = {}
for obj in results.items:
event = objects.Event(obj)
events[event.name] = event
return events
[docs] def get_namespaces(
self,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.Namespace]:
"""Get Namespaces from the cluster.
Args:
fields: A dictionary of fields used to restrict the returned collection
of Namespaces to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Namespaces to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the Namespace name and the value is the
Namespace itself.
"""
selectors = utils.selector_kwargs(fields, labels)
results = objects.Namespace.preferred_client().list_namespace(
**selectors,
)
namespaces = {}
for obj in results.items:
namespace = objects.Namespace(obj)
namespaces[namespace.name] = namespace
return namespaces
[docs] @staticmethod
def get_nodes(
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.Node]:
"""Get the Nodes that make up the cluster.
Args:
fields: A dictionary of fields used to restrict the returned collection
of Nodes to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Nodes to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the Node name and the value is the
Node itself.
"""
selectors = utils.selector_kwargs(fields, labels)
results = client.CoreV1Api().list_node(
**selectors,
)
nodes = {}
for obj in results.items:
node = objects.Node(obj)
nodes[node.name] = node
return nodes
[docs] def get_pods(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.Pod]:
"""Get Pods from the cluster.
Args:
namespace: The namespace to get the Pods from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of Pods to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Pods to only those which match these label selectors. By default,
no restricting is done.
Returns:
A dictionary where the key is the Pod name and the value is the
Pod itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.Pod.preferred_client().list_namespaced_pod(
namespace=namespace,
**selectors,
)
pods = {}
for obj in results.items:
pod = objects.Pod(obj)
pods[pod.name] = pod
return pods
[docs] def get_secrets(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.Secret]:
"""Get Secrets from the cluster.
Args:
namespace: The namespace to get the Secrets from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of Secrets to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Secrets to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the Secret name and the value is the
Secret itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.Secret.preferred_client().list_namespaced_secret(
namespace=namespace,
**selectors,
)
secrets = {}
for obj in results.items:
secret = objects.Secret(obj)
secrets[secret.name] = secret
return secrets
[docs] def get_services(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.Service]:
"""Get Services under the test case namespace.
Args:
namespace: The namespace to get the Services from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of Services to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Services to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the Service name and the value is the
Service itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.Service.preferred_client().list_namespaced_service(
namespace=namespace,
**selectors,
)
services = {}
for obj in results.items:
service = objects.Service(obj)
services[service.name] = service
return services
[docs] def get_persistentvolumeclaims(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.PersistentVolumeClaim]:
"""Get PersistentVolumeClaims from the cluster.
Args:
namespace: The namespace to get the PersistentVolumeClaim from. If not
specified, it will use the auto-generated test case namespace
by default.
fields: A dictionary of fields used to restrict the returned collection
of PersistentVolumeClaim to only those which match these field
selectors. By default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of PersistentVolumeClaim to only those which match these label
selectors. By default, no restricting is done.
Returns:
A dictionary where the key is the PersistentVolumeClaim name and the
value is the PersistentVolumeClaim itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
c = objects.PersistentVolumeClaim.preferred_client()
results = c.list_namespaced_persistent_volume_claim(
namespace=namespace,
**selectors,
)
persistentvolumeclaims = {}
for obj in results.items:
persistentvolumeclaim = objects.PersistentVolumeClaim(obj)
persistentvolumeclaims[persistentvolumeclaim.name] = persistentvolumeclaim
return persistentvolumeclaims
[docs] def get_ingresses(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.Ingress]:
"""Get Ingresses from the cluster.
Args:
namespace: The namespace to get the Ingress from. If not
specified, it will use the auto-generated test case namespace
by default.
fields: A dictionary of fields used to restrict the returned collection
of Ingress to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of Ingress to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the Ingress name and the value
is the Ingress itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.Ingress.preferred_client().list_namespaced_ingress(
namespace=namespace,
**selectors,
)
ingresses = {}
for obj in results.items:
ingress = objects.Ingress(obj)
ingresses[ingress.name] = ingress
return ingresses
[docs] def get_replicasets(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.ReplicaSet]:
"""Get ReplicaSets from the cluster.
Args:
namespace: The namespace to get the ReplicaSets from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of ReplicaSets to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of ReplicaSets to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the ReplicaSet name and the value is the
ReplicaSet itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.ReplicaSet.preferred_client().list_namespaced_replica_set(
namespace=namespace,
**selectors,
)
replicasets = {}
for obj in results.items:
rs = objects.ReplicaSet(obj)
replicasets[replicasets.name] = rs
return replicasets
[docs] def get_statefulsets(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.StatefulSet]:
"""Get StatefulSets from the cluster.
Args:
namespace: The namespace to get the StatefulSets from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of StatefulSets to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of StatefulSets to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the StatefulSet name and the value is the
StatefulSet itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = objects.StatefulSet.preferred_client().list_namespaced_stateful_set(
namespace=namespace,
**selectors,
)
statefulsets = {}
for obj in results.items:
statefulset = objects.StatefulSet(obj)
statefulsets[statefulset.name] = statefulset
return statefulsets
[docs] def get_serviceaccounts(
self,
namespace: str = None,
fields: Dict[str, str] = None,
labels: Dict[str, str] = None,
) -> Dict[str, objects.ServiceAccount]:
"""Get ServiceAccounts from the cluster.
Args:
namespace: The namespace to get the ServiceAccount from. If not specified,
it will use the auto-generated test case namespace by default.
fields: A dictionary of fields used to restrict the returned collection
of ServiceAccount to only those which match these field selectors. By
default, no restricting is done.
labels: A dictionary of labels used to restrict the returned collection
of ServiceAccount to only those which match these label selectors. By
default, no restricting is done.
Returns:
A dictionary where the key is the ServiceAccount name and the value is the
ServiceAccount itself.
"""
if namespace is None:
namespace = self.namespace
selectors = utils.selector_kwargs(fields, labels)
results = (
objects.ServiceAccount.preferred_client().list_namespaced_service_account(
namespace=namespace,
**selectors,
)
)
serviceaccount = {}
for obj in results.items:
cm = objects.ServiceAccount(obj)
serviceaccount[cm.name] = cm
return serviceaccount
# ****** Test Helpers ******
[docs] @staticmethod
def wait_for_conditions(
*args: Condition,
timeout: int = None,
interval: Union[float, int] = 1,
policy: Policy = Policy.ONCE,
fail_on_api_error: bool = True,
) -> None:
"""Wait for all of the provided Conditions to be met.
All Conditions must be met for this to unblock. If no Conditions are
provided, this method will do nothing.
Args:
*args: Conditions to check.
timeout: The maximum time to wait, in seconds, for the provided
Conditions to be met. If all of the Conditions are not met within
the given timeout, this will raise a TimeoutError. By default,
there is no timeout so this will wait indefinitely.
interval: The time, in seconds, to sleep before re-evaluating the
conditions. Default: 1s
policy: The condition checking policy that defines the checking
behavior. Default: ONCE
fail_on_api_error: Fail the condition checks if a Kubernetes API error
is incurred. An API error can be raised for a number of reasons,
including a Pod being restarted and temporarily unavailable.
Disabling this will cause those errors to be ignored, allowing
the check to continue until timeout or resolution. (default: True).
Raises:
TimeoutError: The Conditions were not met within the specified
timeout period.
ValueError: Not all arguments are a Condition.
"""
# If no Conditions were given, there is nothing to do.
if not args:
return
# If something was given, make sure they are all Conditions
if not all(map(lambda c: isinstance(c, Condition), args)):
raise ValueError("All arguments must be a Condition")
# make a copy of the conditions
to_check = list(args)
def condition_checker(conditions):
# check that the conditions were met according to the
# condition checking policy
met, unmet = check_and_sort(*conditions)
if policy == Policy.ONCE:
log.info(f"check met: {met}")
conditions[:] = unmet
return len(unmet) == 0
elif policy == Policy.SIMULTANEOUS:
return len(unmet) == 0 and len(met) == len(args)
else:
raise ValueError(
f"Invalid condition policy specified: {policy}",
)
wait_condition = Condition(
"wait for conditions",
condition_checker,
to_check,
)
try:
utils.wait_for_condition(
condition=wait_condition,
timeout=timeout,
interval=interval,
fail_on_api_error=fail_on_api_error,
)
except TimeoutError:
# If we time out here, we want to show all the conditions
# that we weren't able to resolve in the error message, not
# the 'wait for conditions' wrapper.
raise TimeoutError(
f"timed out wile waiting for conditions to be met: {to_check}",
)
[docs] def wait_for_ready_nodes(
self,
count: int,
timeout: int = None,
interval: Union[int, float] = 1,
) -> None:
"""Wait until there are at least ``count`` number of nodes available
in the cluster.
Notes:
This should only be used for clusters that auto-scale the
nodes. This will not create/delete nodes on its own.
Args:
count: The number of nodes to wait for.
timeout: The maximum time to wait, in seconds.
interval: The time, in seconds, to sleep before re-checking the
number of nodes.
"""
def node_count_match(node_count):
nodes = self.get_nodes()
return [n.is_ready() for n in nodes.values()].count(True) >= node_count
wait_condition = Condition(
f"wait for {count} nodes",
node_count_match,
count,
)
utils.wait_for_condition(
condition=wait_condition,
timeout=timeout,
interval=interval,
)
[docs] def wait_for_registered(
self, timeout: int = None, interval: Union[int, float] = 1
) -> None:
"""Wait for all of the pre-registered objects to be ready on the cluster.
An object is pre-registered with the test client if it is specified
to the test via the ``applymanifests`` pytest marker. The marker will load
the manifest and add the object to the cluster, and register it with
the test client. This method waits until all such loaded manifest objects
are in the ready state simultaneously.
Args:
timeout: The maximum time to wait, in seconds.
interval: The time, in seconds, to sleep before re-checking the ready
state for pre-registered objects.
"""
def check_registered():
for obj in self.pre_registered:
if not obj.is_ready():
return False
return True
wait_condition = Condition(
"wait for pre-registered objects to be ready",
check_registered,
)
utils.wait_for_condition(
condition=wait_condition,
timeout=timeout,
interval=interval,
)
[docs] @staticmethod
def wait_until_created(
obj: objects.ApiObject,
timeout: int = None,
interval: Union[int, float] = 1,
) -> None:
"""Wait until the specified object has been created.
Here, creation is judged on whether or not refreshing the object (e.g.
getting it) returns an object (created) or an error (not yet created).
Args:
obj: The ApiObject to wait on.
timeout: The maximum time to wait, in seconds.
interval: The time, in seconds, to sleep before re-checking the
created state of the object.
"""
def check_ready(api_obj):
try:
api_obj.refresh()
except: # noqa
return False
return True
wait_condition = Condition(
f"wait for {type(obj).__name__}:{obj.name} to be created",
check_ready,
obj,
)
utils.wait_for_condition(
condition=wait_condition, timeout=timeout, interval=interval
)