Source code for kubetest.objects.pod

"""Kubetest wrapper for the Kubernetes `Pod` API Object."""

import logging
from typing import Dict, List, Union

from kubernetes import client
from kubernetes.client.rest import ApiException

from kubetest import condition, response, utils

from .api_object import ApiObject
from .container import Container

log = logging.getLogger("kubetest")


[docs]class Pod(ApiObject): """Kubetest wrapper around a Kubernetes `Pod`_ API Object. The actual ``kubernetes.client.V1Pod`` instance that this wraps can be accessed via the ``obj`` instance member. This wrapper provides some convenient functionality around the API Object and provides some state management for the `Pod`_. .. _Pod: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#pod-v1-core """ obj_type = client.V1Pod api_clients = { "preferred": client.CoreV1Api, "v1": client.CoreV1Api, }
[docs] def create(self, namespace: str = None) -> None: """Create the Pod under the given namespace. Args: namespace: The namespace to create the Pod under. If the Pod was loaded via the kubetest client, the namespace will already be set, so it is not needed here. Otherwise, the namespace will need to be provided. """ if namespace is None: namespace = self.namespace log.info(f'creating pod "{self.name}" in namespace "{self.namespace}"') log.debug(f"pod: {self.obj}") self.obj = self.api_client.create_namespaced_pod( namespace=namespace, body=self.obj, )
[docs] def delete(self, options: client.V1DeleteOptions = None) -> client.V1Status: """Delete the Pod. This method expects the Pod to have been loaded or otherwise assigned a namespace already. If it has not, the namespace will need to be set manually. Args: options: Options for Pod deletion. Return: The status of the delete operation. """ if options is None: options = client.V1DeleteOptions() log.info(f'deleting pod "{self.name}"') log.debug(f"delete options: {options}") log.debug(f"pod: {self.obj}") return self.api_client.delete_namespaced_pod( name=self.name, namespace=self.namespace, body=options, )
[docs] def refresh(self) -> None: """Refresh the underlying Kubernetes Pod resource.""" self.obj = self.api_client.read_namespaced_pod_status( name=self.name, namespace=self.namespace, )
[docs] def is_ready(self) -> bool: """Check if the Pod is in the ready state. Returns: True if in the ready state; False otherwise. """ self.refresh() # if there is no status, the pod is definitely not ready status = self.obj.status if status is None: return False # check the pod phase to make sure it is running. a pod in # the 'failed' or 'success' state will no longer be running, # so we only care if the pod is in the 'running' state. phase = status.phase if phase.lower() != "running": return False for cond in status.conditions: # we only care about the condition type 'ready' if cond.type.lower() != "ready": continue # check that the readiness condition is True return cond.status.lower() == "true" # Catchall return False
[docs] def status(self) -> client.V1PodStatus: """Get the status of the Pod. Returns: The status of the Pod. """ # first, refresh the pod state to ensure latest status self.refresh() # return the status of the pod return self.obj.status
[docs] def get_containers(self) -> List[Container]: """Get the Pod's containers. Returns: A list of containers that belong to the Pod. """ log.info(f'getting containers for pod "{self.name}"') self.refresh() return [Container(c, self) for c in self.obj.spec.containers]
[docs] def get_container(self, name: str) -> Union[Container, None]: """Get a container in the Pod by name. Args: name (str): The name of the Container. Returns: Container: The Pod's Container with the matching name. If no container with the given name is found, ``None`` is returned. """ for c in self.get_containers(): if c.obj.name == name: return c return None
[docs] def get_restart_count(self) -> int: """Get the total number of Container restarts for the Pod. Returns: The total number of Container restarts. """ container_statuses = self.status().container_statuses if container_statuses is None: return 0 total = 0 for container_status in container_statuses: total += container_status.restart_count return total
[docs] def http_proxy_get( self, path: str, query_params: Dict[str, str] = None ) -> response.Response: """Issue a GET request to a proxy for the Pod. Notes: This function does not use the kubernetes ``connect_get_namespaced_pod_proxy_with_path`` function because there appears to be lack of support for custom query parameters (as of the ``kubernetes==9.0.0`` package version). To bypass this, parts of the core functionality from the aforementioned function are used here with the modification of allowing user-defined query parameters to be passed along. Args: path: The URI path for the request. query_params: Any query parameters for the request. Returns: The response data. """ c = client.CoreV1Api() if query_params is None: query_params = {} path_params = {"name": self.name, "namespace": self.namespace} header_params = { "Accept": c.api_client.select_header_accept(["*/*"]), "Content-Type": c.api_client.select_header_content_type(["*/*"]), } auth_settings = ["BearerToken"] try: resp = response.Response( *c.api_client.call_api( "/api/v1/namespaces/{namespace}/pods/{name}/proxy/" + path, "GET", path_params=path_params, query_params=query_params, header_params=header_params, body=None, post_params=[], files={}, response_type="str", auth_settings=auth_settings, _return_http_data_only=False, # we want all info, not just data _preload_content=True, _request_timeout=None, collection_formats={}, ) ) except ApiException as e: # if the ApiException does not have a body or headers, that # means the raised exception did not get a response (even if # it were 404, 500, etc), so we want to continue to raise in # that case. if there is a body and headers, we will not raise # and just take the data out that we need from the exception. if e.body is None and e.headers is None: raise resp = response.Response( data=e.body, status=e.status, headers=e.headers, ) return resp
[docs] def http_proxy_post( self, path: str, query_params: Dict[str, str] = None, data=None, ) -> response.Response: """Issue a POST request to a proxy for the Pod. Notes: This function does not use the kubernetes ``connect_post_namespaced_pod_proxy_with_path`` function because there appears to be lack of support for custom query parameters (as of the ``kubernetes==9.0.0`` package version). To bypass this, parts of the core functionality from the aforementioned function are used here with the modification of allowing user-defined query parameters to be passed along. Args: path: The URI path for the request. query_params: Any query parameters for the request. data: The data to POST. Returns: The response data. """ c = client.CoreV1Api() if query_params is None: query_params = {} path_params = {"name": self.name, "namespace": self.namespace} header_params = { "Accept": c.api_client.select_header_accept(["*/*"]), "Content-Type": c.api_client.select_header_content_type(["*/*"]), } auth_settings = ["BearerToken"] try: resp = response.Response( *c.api_client.call_api( "/api/v1/namespaces/{namespace}/pods/{name}/proxy/" + path, "POST", path_params=path_params, query_params=query_params, header_params=header_params, body=data, post_params=[], files={}, response_type="str", auth_settings=auth_settings, _return_http_data_only=False, # we want all info, not just data _preload_content=True, _request_timeout=None, collection_formats={}, ) ) except ApiException as e: # if the ApiException does not have a body or headers, that # means the raised exception did not get a response (even if # it were 404, 500, etc), so we want to continue to raise in # that case. if there is a body and headers, we will not raise # and just take the data out that we need from the exception. if e.body is None and e.headers is None: raise resp = response.Response( data=e.body, status=e.status, headers=e.headers, ) return resp
[docs] def containers_started(self) -> bool: """Check if the Pod's Containers have all started. Returns: True if all Containers have started; False otherwise. """ # start the flag as true - we will check the state and set # this to False if any container is not yet running. containers_started = True status = self.status() if status.container_statuses is not None: for container_status in status.container_statuses: if container_status.state is not None: if container_status.state.running is not None: if container_status.state.running.started_at is not None: # The container is started, so move on to check the # next container continue # If we get here, then the container has not started. containers_started = containers_started and False break return containers_started
[docs] def wait_until_containers_start(self, timeout: int = None) -> None: """Wait until all containers in the Pod have started. This will wait for the images to be pulled and for the containers to be created and started. This will unblock once all Pod containers have been started. This is different than waiting until ready, since a container may not be ready immediately after it has been started. Args: timeout: The maximum time to wait, in seconds, for the Pod's containers to be started. If unspecified, this will wait indefinitely. If specified and the timeout is met or exceeded, a TimeoutError will be raised. Raises: TimeoutError: The specified timeout was exceeded. """ wait_condition = condition.Condition( "all pod containers started", self.containers_started, ) utils.wait_for_condition( condition=wait_condition, timeout=timeout, interval=1, )