"""Kubetest wrapper for the Kubernetes ``Service`` API Object."""
import logging
from typing import List
from kubernetes import client
from .api_object import ApiObject
log = logging.getLogger("kubetest")
[docs]class Service(ApiObject):
"""Kubetest wrapper around a Kubernetes `Service`_ API Object.
The actual ``kubernetes.client.V1Service`` instance that this
wraps can be accessed via the ``obj`` instance member.
This wrapper provides some convenient functionality around the
API Object and provides some state management for the `Service`_.
.. _Service:
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#service-v1-core
"""
obj_type = client.V1Service
api_clients = {
"preferred": client.CoreV1Api,
"v1": client.CoreV1Api,
}
[docs] def create(self, namespace: str = None) -> None:
"""Create the Service under the given namespace.
Args:
namespace: The namespace to create the Service under.
If the Service was loaded via the kubetest client, the
namespace will already be set, so it is not needed here.
Otherwise, the namespace will need to be provided.
"""
if namespace is None:
namespace = self.namespace
log.info(f'creating service "{self.name}" in namespace "{self.namespace}"')
log.debug(f"service: {self.obj}")
self.obj = self.api_client.create_namespaced_service(
namespace=namespace,
body=self.obj,
)
[docs] def delete(self, options: client.V1DeleteOptions = None) -> client.V1Status:
"""Delete the Service.
This method expects the Service to have been loaded or otherwise
assigned a namespace already. If it has not, the namespace will need
to be set manually.
Args:
options: Options for Service deletion.
Returns:
The status of the delete operation.
"""
if options is None:
options = client.V1DeleteOptions()
log.info(f'deleting service "{self.name}"')
log.debug(f"delete options: {options}")
log.debug(f"service: {self.obj}")
return self.api_client.delete_namespaced_service(
name=self.name,
namespace=self.namespace,
body=options,
)
[docs] def refresh(self) -> None:
"""Refresh the underlying Kubernetes Service resource."""
self.obj = self.api_client.read_namespaced_service(
name=self.name,
namespace=self.namespace,
)
[docs] def is_ready(self) -> bool:
"""Check if the Service is in the ready state.
The readiness state is not clearly available from the Service
status, so to see whether or not the Service is ready this
will check whether the endpoints of the Service are ready.
This comes with the caveat that in order for a Service to
have endpoints, there needs to be some backend hooked up to it.
If there is no backend, the Service will never have endpoints,
so this will never resolve to True.
Returns:
True if in the ready state; False otherwise.
"""
self.refresh()
# check the status. if there is no status, the service is
# definitely not ready.
if self.obj.status is None:
return False
endpoints = self.get_endpoints()
# if the Service has no endpoints, its not ready.
if len(endpoints) == 0:
return False
# get the service endpoints and check that they are all ready.
for endpoint in endpoints:
# if we have an endpoint, but there are no subsets, we
# consider the endpoint to be not ready.
if endpoint.subsets is None:
return False
for subset in endpoint.subsets:
# if the endpoint has no addresses setup yet, its not ready
if subset.addresses is None or len(subset.addresses) == 0:
return False
# if there are still addresses that are not ready, the
# service is not ready
not_ready = subset.not_ready_addresses
if not_ready is not None and len(not_ready) > 0:
return False
# if we got here, then all endpoints are ready, so the service
# must also be ready
return True
[docs] def status(self) -> client.V1ServiceStatus:
"""Get the status of the Service.
Returns:
The status of the Service.
"""
log.info(f'checking status of service "{self.name}"')
# first, refresh the service state to ensure the latest status
self.refresh()
# return the status from the service
return self.obj.status
[docs] def get_endpoints(self) -> List[client.V1Endpoints]:
"""Get the endpoints for the Service.
This can be useful for checking internal IP addresses used
in containers, e.g. for container auto-discovery.
Returns:
A list of endpoints associated with the Service.
"""
log.info(f'getting endpoints for service "{self.name}"')
endpoints = self.api_client.list_namespaced_endpoints(
namespace=self.namespace,
)
svc_endpoints = []
for endpoint in endpoints.items:
# filter to include only the endpoints with the same
# name as the service.
if endpoint.metadata.name == self.name:
svc_endpoints.append(endpoint)
log.debug(f"endpoints: {svc_endpoints}")
return svc_endpoints
def _proxy_http_request(self, method, path, **kwargs) -> tuple:
"""Template request to proxy of a Service.
Args:
method: The http request method e.g. 'GET', 'POST' etc.
path: The URI path for the request.
kwargs: Keyword arguments for the proxy_http_get function.
Returns:
The response data
"""
path_params = {
"name": f"{self.name}:{self.obj.spec.ports[0].port}",
"namespace": self.namespace,
"path": path,
}
return client.CoreV1Api().api_client.call_api(
"/api/v1/namespaces/{namespace}/services/{name}/proxy/{path}",
method,
path_params=path_params,
**kwargs,
)
[docs] def proxy_http_get(self, path: str, **kwargs) -> tuple:
"""Issue a GET request to proxy of a Service.
Args:
path: The URI path for the request.
kwargs: Keyword arguments for the proxy_http_get function.
Returns:
The response data
"""
return self._proxy_http_request("GET", path, **kwargs)
[docs] def proxy_http_post(self, path: str, **kwargs) -> tuple:
"""Issue a POST request to proxy of a Service.
Args:
path: The URI path for the request.
kwargs: Keyword arguments for the proxy_http_post function.
Returns:
The response data
"""
return self._proxy_http_request("POST", path, **kwargs)