Coverage for k8sclient/k8sclient.py : 72%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# ============LICENSE_START=======================================================
2# org.onap.dcae
3# ================================================================================
4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5# Copyright (c) 2020 Pantheon.tech. All rights reserved.
6# Copyright (c) 2020 Nokia. All rights reserved.
7# ================================================================================
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19# ============LICENSE_END=========================================================
20#
21import os
22import re
23import uuid
24from kubernetes import config, client, stream
26# Default values for readiness probe
27PROBE_DEFAULT_PERIOD = 15
28PROBE_DEFAULT_TIMEOUT = 1
30# Location of k8s cluster config file ("kubeconfig")
31K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
33# Regular expression for interval/timeout specification
34INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
35# Conversion factors to seconds
36FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
38# Regular expression for port mapping
39# group 1: container port
40# group 2: / + protocol
41# group 3: protocol
42# group 4: host port
43PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
45# Constants for external_cert
46MOUNT_PATH = "/etc/onap/aaf/certservice/certs/"
47KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks"
48TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks"
49CERT_SECRET_NAME = "aaf-cert-service-client-tls-secret"
51def _create_deployment_name(component_name):
52 return "dep-{0}".format(component_name)[:63]
54def _create_service_name(component_name):
55 return "{0}".format(component_name)[:63]
57def _create_exposed_service_name(component_name):
58 return ("x{0}".format(component_name))[:63]
60def _configure_api(location=None):
61 # Look for a kubernetes config file
62 if os.path.exists(K8S_CONFIG_PATH):
63 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
64 else:
65 # Maybe we're running in a k8s container and we can use info provided by k8s
66 # We would like to use:
67 # config.load_incluster_config()
68 # but this looks into os.environ for kubernetes host and port, and from
69 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
70 # where we can set the environment to what we like.
71 # This is probably brittle! Maybe there's a better alternative.
72 localenv = {
73 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
74 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
75 }
76 config.incluster_config.InClusterConfigLoader(
77 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
78 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
79 environ=localenv
80 ).load_and_set()
82def _parse_interval(t):
83 """
84 Parse an interval specification
85 t can be
86 - a simple integer quantity, interpreted as seconds
87 - a string representation of a decimal integer, interpreted as seconds
88 - a string consisting of a represention of an decimal integer followed by a unit,
89 with "s" representing seconds, "m" representing minutes,
90 and "h" representing hours
91 Used for compatibility with the Docker plugin, where time intervals
92 for health checks were specified as strings with a number and a unit.
93 See 'intervalspec' above for the regular expression that's accepted.
94 """
95 m = INTERVAL_SPEC.match(str(t))
96 if m:
97 time = int(m.group(1)) * FACTORS[m.group(2)]
98 else:
99 raise ValueError("Bad interval specification: {0}".format(t))
100 return time
102def _create_probe(hc, port):
103 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
104 probe_type = hc['type']
105 probe = None
106 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
107 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
108 if probe_type in ['http', 'https']:
109 probe = client.V1Probe(
110 failure_threshold = 1,
111 initial_delay_seconds = 5,
112 period_seconds = period,
113 timeout_seconds = timeout,
114 http_get = client.V1HTTPGetAction(
115 path = hc['endpoint'],
116 port = port,
117 scheme = probe_type.upper()
118 )
119 )
120 elif probe_type in ['script', 'docker']: 120 ↛ 130line 120 didn't jump to line 130, because the condition on line 120 was never false
121 probe = client.V1Probe(
122 failure_threshold = 1,
123 initial_delay_seconds = 5,
124 period_seconds = period,
125 timeout_seconds = timeout,
126 _exec = client.V1ExecAction(
127 command = hc['script'].split( )
128 )
129 )
130 return probe
132def _create_resources(resources=None):
133 if resources is not None: 133 ↛ 140line 133 didn't jump to line 140, because the condition on line 133 was never false
134 resources_obj = client.V1ResourceRequirements(
135 limits = resources.get("limits"),
136 requests = resources.get("requests")
137 )
138 return resources_obj
139 else:
140 return None
142def _create_container_object(name, image, always_pull, **kwargs):
143 # Set up environment variables
144 # Copy any passed in environment variables
145 env = kwargs.get('env') or {}
146 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
147 # Add POD_IP with the IP address of the pod running the container
148 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
149 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
151 # If a health check is specified, create a readiness/liveness probe
152 # (For an HTTP-based check, we assume it's at the first container port)
153 readiness = kwargs.get('readiness')
154 liveness = kwargs.get('liveness')
155 resources = kwargs.get('resources')
156 container_ports = kwargs.get('container_ports') or []
158 hc_port = container_ports[0][0] if container_ports else None
159 probe = _create_probe(readiness, hc_port) if readiness else None
160 live_probe = _create_probe(liveness, hc_port) if liveness else None
161 resources_obj = _create_resources(resources) if resources else None
162 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
163 for port, proto in container_ports]
165 # Define container for pod
166 return client.V1Container(
167 name=name,
168 image=image,
169 image_pull_policy='Always' if always_pull else 'IfNotPresent',
170 env=env_vars,
171 ports=port_objs,
172 volume_mounts=kwargs.get('volume_mounts') or [],
173 resources=resources_obj,
174 readiness_probe=probe,
175 liveness_probe=live_probe
176 )
178def _create_deployment_object(component_name,
179 containers,
180 init_containers,
181 replicas,
182 volumes,
183 labels={},
184 pull_secrets=[]):
186 deployment_name = _create_deployment_name(component_name)
188 # Label the pod with the deployment name, so we can find it easily
189 labels.update({"k8sdeployment" : deployment_name})
191 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
192 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
193 ips = []
194 for secret in pull_secrets:
195 ips.append(client.V1LocalObjectReference(name=secret))
197 # Define pod template
198 template = client.V1PodTemplateSpec(
199 metadata=client.V1ObjectMeta(labels=labels),
200 spec=client.V1PodSpec(hostname=component_name,
201 containers=containers,
202 init_containers=init_containers,
203 volumes=volumes,
204 image_pull_secrets=ips)
205 )
207 # Define deployment spec
208 spec = client.ExtensionsV1beta1DeploymentSpec(
209 replicas=replicas,
210 template=template
211 )
213 # Create deployment object
214 deployment = client.ExtensionsV1beta1Deployment(
215 kind="Deployment",
216 metadata=client.V1ObjectMeta(name=deployment_name),
217 spec=spec
218 )
220 return deployment
222def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
223 service_spec = client.V1ServiceSpec(
224 ports=service_ports,
225 selector={"app" : component_name},
226 type=service_type
227 )
228 if annotations: 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true
229 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
230 else:
231 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
233 service = client.V1Service(
234 kind="Service",
235 api_version="v1",
236 metadata=metadata,
237 spec=service_spec
238 )
239 return service
241def parse_ports(port_list):
242 '''
243 Parse the port list into a list of container ports (needed to create the container)
244 and to a set of port mappings to set up k8s services.
245 '''
246 container_ports = []
247 port_map = {}
248 for p in port_list:
249 m = PORTS.match(p.strip())
250 if m:
251 cport = int(m.group(1))
252 hport = int (m.group(4))
253 if m.group(3):
254 proto = (m.group(3)).upper()
255 else:
256 proto = "TCP"
257 container_ports.append((cport, proto))
258 port_map[(cport, proto)] = hport
259 else:
260 raise ValueError("Bad port specification: {0}".format(p))
262 return container_ports, port_map
264def _parse_volumes(volume_list):
265 volumes = []
266 volume_mounts = []
267 for v in volume_list:
268 vname = str(uuid.uuid4())
269 vhost = v['host']['path']
270 vcontainer = v['container']['bind']
271 vro = (v['container'].get('mode') == 'ro')
272 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
273 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
275 return volumes, volume_mounts
277def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
278 if not log_info or not filebeat: 278 ↛ 279line 278 didn't jump to line 279, because the condition on line 278 was never true
279 return
280 log_dir = log_info.get("log_directory")
281 if not log_dir: 281 ↛ 282line 281 didn't jump to line 282, because the condition on line 281 was never true
282 return
283 sidecar_volume_mounts = []
285 # Create the volume for component log files and volume mounts for the component and sidecar containers
286 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
287 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
288 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
289 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
291 # Create the volume for sidecar data and the volume mount for it
292 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
293 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
295 # Create the volume for the sidecar configuration data and the volume mount for it
296 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
297 volumes.append(
298 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
299 sidecar_volume_mounts.append(
300 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
302 # Finally create the container for the sidecar
303 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
305def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
306 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a
307 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
308 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified),
309 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
310 # In either case, the certificate directory is mounted onto the component container filesystem at the location
311 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
312 # (tls_config["component_cert_dir"]).
314 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
315 env = {}
316 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
318 # Create the certificate volume and volume mounts
319 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
320 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
321 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
323 # Create the init container
324 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
326def _add_external_tls_init_container(init_containers, volumes, external_cert, external_tls_config):
327 env = {}
328 output_path = external_cert.get("external_cert_directory")
329 if not output_path.endswith('/'): 329 ↛ 330line 329 didn't jump to line 330, because the condition on line 329 was never true
330 output_path += '/'
332 env["REQUEST_URL"] = external_tls_config.get("request_url")
333 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
334 env["OUTPUT_PATH"] = output_path + "external"
335 env["OUTPUT_TYPE"] = external_cert.get("cert_type")
336 env["CA_NAME"] = external_cert.get("ca_name")
337 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name")
338 env["ORGANIZATION"] = external_tls_config.get("organization")
339 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit")
340 env["LOCATION"] = external_tls_config.get("location")
341 env["STATE"] = external_tls_config.get("state")
342 env["COUNTRY"] = external_tls_config.get("country")
343 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans")
344 env["KEYSTORE_PATH"] = KEYSTORE_PATH
345 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password")
346 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH
347 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password")
349 # Create the volumes and volume mounts
350 sec = client.V1SecretVolumeSource(secret_name=CERT_SECRET_NAME)
351 volumes.append(client.V1Volume(name="tls-volume", secret=sec))
352 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
353 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
355 # Create the init container
356 init_containers.append(_create_container_object("cert-service-client", external_tls_config["image_tag"], False, volume_mounts=init_volume_mounts, env=env))
358def _process_port_map(port_map):
359 service_ports = [] # Ports exposed internally on the k8s network
360 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
361 for (cport, proto), hport in port_map.items():
362 name = "xport-{0}-{1}".format(proto[0].lower(), cport)
363 cport = int(cport)
364 hport = int(hport)
365 service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
366 if hport != 0: 366 ↛ 367line 366 didn't jump to line 367, because the condition on line 366 was never true
367 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
368 return service_ports, exposed_ports
370def _service_exists(location, namespace, component_name):
371 exists = False
372 try:
373 _configure_api(location)
374 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
375 exists = True
376 except client.rest.ApiException:
377 pass
379 return exists
381def _patch_deployment(location, namespace, deployment, modify):
382 '''
383 Gets the current spec for 'deployment' in 'namespace'
384 in the k8s cluster at 'location',
385 uses the 'modify' function to change the spec,
386 then sends the updated spec to k8s.
387 '''
388 _configure_api(location)
390 # Get deployment spec
391 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
393 # Apply changes to spec
394 spec = modify(spec)
396 # Patch the deploy with updated spec
397 client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
399def _execute_command_in_pod(location, namespace, pod_name, command):
400 '''
401 Execute the command (specified by an argv-style list in the "command" parameter) in
402 the specified pod in the specified namespace at the specified location.
403 For now at least, we use this only to
404 run a notification script in a pod after a configuration change.
406 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
407 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
408 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
409 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
410 There are several issues tracking this, in various states. It isn't clear that there will ever
411 be a fix.
412 - https://github.com/kubernetes-client/python/issues/58
413 - https://github.com/kubernetes-client/python/issues/409
414 - https://github.com/kubernetes-client/python/issues/526
416 The main consequence of the workaround using "stream" is that the caller does not get an indication
417 of the exit code returned by the command when it completes execution. It turns out that the
418 original implementation of notification in the Docker plugin did not use this result, so we can
419 still match the original notification functionality.
421 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
422 We'll return that so it can logged.
423 '''
424 _configure_api(location)
425 try:
426 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
427 name=pod_name,
428 namespace=namespace,
429 command=command,
430 stdout=True,
431 stderr=True,
432 stdin=False,
433 tty=False)
434 except client.rest.ApiException as e:
435 # If the exception indicates the pod wasn't found, it's not a fatal error.
436 # It existed when we enumerated the pods for the deployment but no longer exists.
437 # Unfortunately, the only way to distinguish a pod not found from any other error
438 # is by looking at the reason text.
439 # (The ApiException's "status" field should contain the HTTP status code, which would
440 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
441 # to zero.)
442 if "404 not found" in e.reason.lower():
443 output = "Pod not found"
444 else:
445 raise e
447 return {"pod" : pod_name, "output" : output}
449def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
450 '''
451 This will create a k8s Deployment and, if needed, one or two k8s Services.
452 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
453 We're not exposing k8s to the component developer and the blueprint author.
454 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
455 the details from the component developer and the blueprint author.)
457 namespace: the Kubernetes namespace into which the component is deployed
458 component_name: the component name, used to derive names of Kubernetes entities
459 image: the docker image for the component being deployed
460 replica: the number of instances of the component to be deployed
461 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
462 the Docker image for the component, even if it is already present on the Kubernetes node.
463 k8sconfig contains:
464 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
465 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
466 - filebeat: a dictionary of filebeat sidecar parameters:
467 "log_path" : mount point for log volume in filebeat container
468 "data_path" : mount point for data volume in filebeat container
469 "config_path" : mount point for config volume in filebeat container
470 "config_subpath" : subpath for config data in filebeat container
471 "config_map" : ConfigMap holding the filebeat configuration
472 "image": Docker image to use for filebeat
473 - tls: a dictionary of TLS-related information:
474 "cert_path": mount point for certificate volume in init container
475 "image": Docker image to use for TLS init container
476 "component_cert_dir" : default mount point for certs
477 kwargs may have:
478 - volumes: array of volume objects, where a volume object is:
479 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
480 - ports: array of strings in the form "container_port:host_port"
481 - env: map of name-value pairs ( {name0: value0, name1: value1...}
482 - log_info: an object with info for setting up ELK logging, with the form:
483 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
484 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
485 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
486 - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
487 {"external_cert":
488 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
489 "use_external_tls": true or false,
490 "ca_name": "ca-name-value",
491 "cert_type": "P12" or "JKS" or "PEM",
492 "external_certificate_parameters":
493 "common_name": "common-name-value",
494 "sans": "sans-value"}
495 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
496 These label will be set on all the pods deployed as a result of this deploy() invocation.
497 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
498 - cpu: number CPU usage, like 0.5
499 - memory: string memory requirement, like "2Gi"
500 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
501 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
502 - interval: period (in seconds) between probes
503 - timeout: time (in seconds) to allow a probe to complete
504 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
505 - path: the full path to the script to be executed in the container for "script" and "docker" types
506 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes:
507 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
508 - interval: period (in seconds) between probes
509 - timeout: time (in seconds) to allow a probe to complete
510 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
511 - path: the full path to the script to be executed in the container for "script" and "docker" types
512 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
514 '''
516 deployment_ok = False
517 cip_service_created = False
518 deployment_description = {
519 "namespace": namespace,
520 "location" : kwargs.get("k8s_location"),
521 "deployment": '',
522 "services": []
523 }
525 try:
527 # Get API handles
528 _configure_api(kwargs.get("k8s_location"))
529 core = client.CoreV1Api()
530 ext = client.ExtensionsV1beta1Api()
532 # Parse the port mapping
533 container_ports, port_map = parse_ports(kwargs.get("ports", []))
535 # Parse the volumes list into volumes and volume_mounts for the deployment
536 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
538 # Initialize the list of containers that will be part of the pod
539 containers = []
540 init_containers = []
542 # Set up the ELK logging sidecar container, if needed
543 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
545 # Set up TLS information
546 _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
548 # Set up external TLS information
549 external_cert = kwargs.get("external_cert")
550 if external_cert and external_cert.get("use_external_tls"):
551 _add_external_tls_init_container(init_containers, volumes, external_cert, k8sconfig.get("external_cert"))
553 # Create the container for the component
554 # Make it the first container in the pod
555 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
556 container_args['container_ports'] = container_ports
557 container_args['volume_mounts'] = volume_mounts
558 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
560 # Build the k8s Deployment object
561 labels = kwargs.get("labels", {})
562 labels["app"] = component_name
563 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
565 # Have k8s deploy it
566 ext.create_namespaced_deployment(namespace, dep)
567 deployment_ok = True
568 deployment_description["deployment"] = _create_deployment_name(component_name)
570 # Create service(s), if a port mapping is specified
571 if port_map: 571 ↛ 596line 571 didn't jump to line 596, because the condition on line 571 was never false
572 service_ports, exposed_ports = _process_port_map(port_map)
574 # Create a ClusterIP service for access via the k8s network
575 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP")
576 core.create_namespaced_service(namespace, service)
577 cip_service_created = True
578 deployment_description["services"].append(_create_service_name(component_name))
580 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
581 if exposed_ports: 581 ↛ 582line 581 didn't jump to line 582
582 exposed_service = \
583 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
584 core.create_namespaced_service(namespace, exposed_service)
585 deployment_description["services"].append(_create_exposed_service_name(component_name))
587 except Exception as e:
588 # If the ClusterIP service was created, delete the service:
589 if cip_service_created:
590 core.delete_namespaced_service(_create_service_name(component_name), namespace)
591 # If the deployment was created but not the service, delete the deployment
592 if deployment_ok:
593 client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
594 raise e
596 return dep, deployment_description
598def undeploy(deployment_description):
599 _configure_api(deployment_description["location"])
601 namespace = deployment_description["namespace"]
603 # remove any services associated with the component
604 for service in deployment_description["services"]:
605 client.CoreV1Api().delete_namespaced_service(service, namespace)
607 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
608 options = client.V1DeleteOptions(propagation_policy="Foreground")
609 client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
611def is_available(location, namespace, component_name):
612 _configure_api(location)
613 dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
614 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
615 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
616 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
618def scale(deployment_description, replicas):
619 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
621 def update_replica_count(spec):
622 spec.spec.replicas = replicas
623 return spec
625 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
627def upgrade(deployment_description, image, container_index = 0):
628 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
630 def update_image(spec):
631 spec.spec.template.spec.containers[container_index].image = image
632 return spec
634 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
636def rollback(deployment_description, rollback_to=0):
637 '''
638 Undo upgrade by rolling back to a previous revision of the deployment.
639 By default, go back one revision.
640 rollback_to can be used to supply a specific revision number.
641 Returns the image for the app container and the replica count from the rolled-back deployment
642 '''
643 '''
644 2018-07-13
645 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
646 The k8s python client code throws an exception while processing the response from the API.
647 See:
648 - https://github.com/kubernetes-client/python/issues/491
649 - https://github.com/kubernetes/kubernetes/pull/63837
650 The fix has been merged into the master branch but is not in the latest release.
651 '''
652 _configure_api(deployment_description["location"])
653 deployment = deployment_description["deployment"]
654 namespace = deployment_description["namespace"]
656 # Initiate the rollback
657 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
658 deployment,
659 namespace,
660 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
662 # Read back the spec for the rolled-back deployment
663 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
664 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
666def execute_command_in_deployment(deployment_description, command):
667 '''
668 Enumerates the pods in the k8s deployment identified by "deployment_description",
669 then executes the command (represented as an argv-style list) in "command" in
670 container 0 (the main application container) each of those pods.
672 Note that the sets of pods associated with a deployment can change over time. The
673 enumeration is a snapshot at one point in time. The command will not be executed in
674 pods that are created after the initial enumeration. If a pod disappears after the
675 initial enumeration and before the command is executed, the attempt to execute the
676 command will fail. This is not treated as a fatal error.
678 This approach is reasonable for the one current use case for "execute_command": running a
679 script to notify a container that its configuration has changed as a result of a
680 policy change. In this use case, the new configuration information is stored into
681 the configuration store (Consul), the pods are enumerated, and the command is executed.
682 If a pod disappears after the enumeration, the fact that the command cannot be run
683 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
684 comes up after the enumeration will get its initial configuration from the updated version
685 in Consul.
687 The optimal solution here would be for k8s to provide an API call to execute a command in
688 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
689 only call provided by k8s operates at the pod level, not the deployment level.
691 Another interesting k8s factoid: there's no direct way to list the pods belong to a
692 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
693 the pod that has the k8s deployment name. To list the pods, the code below queries for
694 pods with the label carrying the deployment name.
695 '''
696 location = deployment_description["location"]
697 _configure_api(location)
698 deployment = deployment_description["deployment"]
699 namespace = deployment_description["namespace"]
701 # Get names of all the running pods belonging to the deployment
702 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
703 namespace = namespace,
704 label_selector = "k8sdeployment={0}".format(deployment),
705 field_selector = "status.phase=Running"
706 ).items]
708 # Execute command in the running pods
709 return [_execute_command_in_pod(location, namespace, pod_name, command)
710 for pod_name in pod_names]