Coverage for k8sclient/k8sclient.py : 75%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# ============LICENSE_START=======================================================
2# org.onap.dcae
3# ================================================================================
4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5# Copyright (c) 2020 Pantheon.tech. All rights reserved.
6# Copyright (c) 2020 Nokia. All rights reserved.
7# ================================================================================
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19# ============LICENSE_END=========================================================
20#
21import os
22import re
23import uuid
25from kubernetes import config, client, stream
27# Default values for readiness probe
28PROBE_DEFAULT_PERIOD = 15
29PROBE_DEFAULT_TIMEOUT = 1
31# Location of k8s cluster config file ("kubeconfig")
32K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
34# Regular expression for interval/timeout specification
35INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
36# Conversion factors to seconds
37FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
39# Regular expression for port mapping
40# group 1: container port
41# group 2: / + protocol
42# group 3: protocol
43# group 4: host port
44PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
46# Constants for external_cert
47MOUNT_PATH = "/etc/onap/aaf/certservice/certs/"
48KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks"
49TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks"
50DEFAULT_CERT_TYPE = "p12"
52def _create_deployment_name(component_name):
53 return "dep-{0}".format(component_name)[:63]
55def _create_service_name(component_name):
56 return "{0}".format(component_name)[:63]
58def _create_exposed_service_name(component_name):
59 return ("x{0}".format(component_name))[:63]
61def _configure_api(location=None):
62 # Look for a kubernetes config file
63 if os.path.exists(K8S_CONFIG_PATH):
64 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
65 else:
66 # Maybe we're running in a k8s container and we can use info provided by k8s
67 # We would like to use:
68 # config.load_incluster_config()
69 # but this looks into os.environ for kubernetes host and port, and from
70 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
71 # where we can set the environment to what we like.
72 # This is probably brittle! Maybe there's a better alternative.
73 localenv = {
74 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
75 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
76 }
77 config.incluster_config.InClusterConfigLoader(
78 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
79 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
80 environ=localenv
81 ).load_and_set()
83def _parse_interval(t):
84 """
85 Parse an interval specification
86 t can be
87 - a simple integer quantity, interpreted as seconds
88 - a string representation of a decimal integer, interpreted as seconds
89 - a string consisting of a represention of an decimal integer followed by a unit,
90 with "s" representing seconds, "m" representing minutes,
91 and "h" representing hours
92 Used for compatibility with the Docker plugin, where time intervals
93 for health checks were specified as strings with a number and a unit.
94 See 'intervalspec' above for the regular expression that's accepted.
95 """
96 m = INTERVAL_SPEC.match(str(t))
97 if m:
98 time = int(m.group(1)) * FACTORS[m.group(2)]
99 else:
100 raise ValueError("Bad interval specification: {0}".format(t))
101 return time
103def _create_probe(hc, port):
104 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
105 probe_type = hc['type']
106 probe = None
107 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
108 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
109 if probe_type in ['http', 'https']:
110 probe = client.V1Probe(
111 failure_threshold = 1,
112 initial_delay_seconds = 5,
113 period_seconds = period,
114 timeout_seconds = timeout,
115 http_get = client.V1HTTPGetAction(
116 path = hc['endpoint'],
117 port = port,
118 scheme = probe_type.upper()
119 )
120 )
121 elif probe_type in ['script', 'docker']: 121 ↛ 131line 121 didn't jump to line 131, because the condition on line 121 was never false
122 probe = client.V1Probe(
123 failure_threshold = 1,
124 initial_delay_seconds = 5,
125 period_seconds = period,
126 timeout_seconds = timeout,
127 _exec = client.V1ExecAction(
128 command = hc['script'].split( )
129 )
130 )
131 return probe
133def _create_resources(resources=None):
134 if resources is not None: 134 ↛ 141line 134 didn't jump to line 141, because the condition on line 134 was never false
135 resources_obj = client.V1ResourceRequirements(
136 limits = resources.get("limits"),
137 requests = resources.get("requests")
138 )
139 return resources_obj
140 else:
141 return None
143def _create_container_object(name, image, always_pull, **kwargs):
144 # Set up environment variables
145 # Copy any passed in environment variables
146 env = kwargs.get('env') or {}
147 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
148 # Add POD_IP with the IP address of the pod running the container
149 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
150 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
152 # If a health check is specified, create a readiness/liveness probe
153 # (For an HTTP-based check, we assume it's at the first container port)
154 readiness = kwargs.get('readiness')
155 liveness = kwargs.get('liveness')
156 resources = kwargs.get('resources')
157 container_ports = kwargs.get('container_ports') or []
159 hc_port = container_ports[0][0] if container_ports else None
160 probe = _create_probe(readiness, hc_port) if readiness else None
161 live_probe = _create_probe(liveness, hc_port) if liveness else None
162 resources_obj = _create_resources(resources) if resources else None
163 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
164 for port, proto in container_ports]
166 # Define container for pod
167 return client.V1Container(
168 name=name,
169 image=image,
170 image_pull_policy='Always' if always_pull else 'IfNotPresent',
171 env=env_vars,
172 ports=port_objs,
173 volume_mounts=kwargs.get('volume_mounts') or [],
174 resources=resources_obj,
175 readiness_probe=probe,
176 liveness_probe=live_probe
177 )
179def _create_deployment_object(component_name,
180 containers,
181 init_containers,
182 replicas,
183 volumes,
184 labels={},
185 pull_secrets=[]):
187 deployment_name = _create_deployment_name(component_name)
189 # Label the pod with the deployment name, so we can find it easily
190 labels.update({"k8sdeployment" : deployment_name})
192 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
193 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
194 ips = []
195 for secret in pull_secrets:
196 ips.append(client.V1LocalObjectReference(name=secret))
198 # Define pod template
199 template = client.V1PodTemplateSpec(
200 metadata=client.V1ObjectMeta(labels=labels),
201 spec=client.V1PodSpec(hostname=component_name,
202 containers=containers,
203 init_containers=init_containers,
204 volumes=volumes,
205 image_pull_secrets=ips)
206 )
208 # Define deployment spec
209 spec = client.V1DeploymentSpec(
210 replicas=replicas,
211 selector=client.V1LabelSelector(match_labels=labels),
212 template=template
213 )
215 # Create deployment object
216 deployment = client.V1Deployment(
217 api_version="apps/v1",
218 kind="Deployment",
219 metadata=client.V1ObjectMeta(name=deployment_name),
220 spec=spec
221 )
223 return deployment
225def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
226 service_spec = client.V1ServiceSpec(
227 ports=service_ports,
228 selector={"app" : component_name},
229 type=service_type
230 )
231 if annotations: 231 ↛ 232line 231 didn't jump to line 232, because the condition on line 231 was never true
232 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
233 else:
234 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
236 service = client.V1Service(
237 kind="Service",
238 api_version="v1",
239 metadata=metadata,
240 spec=service_spec
241 )
242 return service
244def parse_ports(port_list):
245 '''
246 Parse the port list into a list of container ports (needed to create the container)
247 and to a set of port mappings to set up k8s services.
248 '''
249 container_ports = []
250 port_map = {}
251 for p in port_list:
252 m = PORTS.match(p.strip())
253 if m:
254 cport = int(m.group(1))
255 hport = int (m.group(4))
256 if m.group(3):
257 proto = (m.group(3)).upper()
258 else:
259 proto = "TCP"
260 container_ports.append((cport, proto))
261 port_map[(cport, proto)] = hport
262 else:
263 raise ValueError("Bad port specification: {0}".format(p))
265 return container_ports, port_map
267def _parse_volumes(volume_list):
268 volumes = []
269 volume_mounts = []
270 for v in volume_list:
271 vname = str(uuid.uuid4())
272 vhost = v['host']['path']
273 vcontainer = v['container']['bind']
274 vro = (v['container'].get('mode') == 'ro')
275 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
276 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
278 return volumes, volume_mounts
280def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
281 if not log_info or not filebeat: 281 ↛ 282line 281 didn't jump to line 282, because the condition on line 281 was never true
282 return
283 log_dir = log_info.get("log_directory")
284 if not log_dir: 284 ↛ 285line 284 didn't jump to line 285, because the condition on line 284 was never true
285 return
286 sidecar_volume_mounts = []
288 # Create the volume for component log files and volume mounts for the component and sidecar containers
289 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
290 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
291 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
292 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
294 # Create the volume for sidecar data and the volume mount for it
295 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
296 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
298 # Create the volume for the sidecar configuration data and the volume mount for it
299 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
300 volumes.append(
301 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
302 sidecar_volume_mounts.append(
303 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
305 # Finally create the container for the sidecar
306 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
308def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config):
309 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a
310 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
311 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified),
312 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
313 # In either case, the certificate directory is mounted onto the component container filesystem at the location
314 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
315 # (tls_config["component_cert_dir"]).
316 docker_image = tls_config["image"]
317 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]")
319 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
320 env = {}
321 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
323 # Create the certificate volume and volume mounts
324 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
325 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
326 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
328 # Create the init container
329 init_containers.append(_create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
331def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config):
332 # Adds an InitContainer to the pod which will generate external TLS certificates.
333 docker_image = external_tls_config["image_tag"]
334 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]")
336 env = {}
337 output_path = external_cert.get("external_cert_directory")
338 if not output_path.endswith('/'): 338 ↛ 339line 338 didn't jump to line 339, because the condition on line 338 was never true
339 output_path += '/'
341 env["REQUEST_URL"] = external_tls_config.get("request_url")
342 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
343 env["OUTPUT_PATH"] = output_path + "external"
344 env["OUTPUT_TYPE"] = external_cert.get("cert_type")
345 env["CA_NAME"] = external_cert.get("ca_name")
346 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name")
347 env["ORGANIZATION"] = external_tls_config.get("organization")
348 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit")
349 env["LOCATION"] = external_tls_config.get("location")
350 env["STATE"] = external_tls_config.get("state")
351 env["COUNTRY"] = external_tls_config.get("country")
352 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans")
353 env["KEYSTORE_PATH"] = KEYSTORE_PATH
354 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password")
355 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH
356 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password")
358 # Create the volumes and volume mounts
359 sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name"))
360 volumes.append(client.V1Volume(name="tls-volume", secret=sec))
361 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
362 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
364 # Create the init container
365 init_containers.append(_create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env))
368def _add_truststore_merger_init_container(ctx, init_containers, tls_info, tls_config, external_cert, truststore_merger_config):
369 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file.
370 docker_image = truststore_merger_config["image_tag"]
371 ctx.logger.info("Creating init container: truststore merger \n * [" + docker_image + "]")
373 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
374 if not tls_cert_dir.endswith('/'): 374 ↛ 377line 374 didn't jump to line 377, because the condition on line 374 was never false
375 tls_cert_dir += '/'
377 tls_cert_file_path = tls_cert_dir + "trust.jks"
378 tls_cert_file_pass = tls_cert_dir + "trust.pass"
380 ext_cert_dir = tls_cert_dir + "external/"
382 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower()
383 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type)
384 ext_truststore_pass = ''
385 if output_type != 'pem': 385 ↛ 388line 385 didn't jump to line 388, because the condition on line 385 was never false
386 ext_truststore_pass = ext_cert_dir + "truststore.pass"
388 env = {}
389 env["TRUSTSTORES_PATHS"] = tls_cert_file_path + ":" + ext_truststore_path
390 env["TRUSTSTORES_PASSWORDS_PATHS"] = tls_cert_file_pass + ":" + ext_truststore_pass
391 env["KEYSTORE_SOURCE_PATHS"] = _get_keystore_source_paths(output_type, ext_cert_dir)
392 env["KEYSTORE_DESTINATION_PATHS"] = _get_keystore_destination_paths(output_type, tls_cert_dir)
394 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"])
395 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"])
396 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"])
397 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"])
399 # Create the volumes and volume mounts
400 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)]
402 # Create the init container
403 init_containers.append(_create_container_object("truststore-merger", docker_image, False, volume_mounts=init_volume_mounts, env=env))
406def _get_file_extension(output_type):
407 return {
408 'p12': 'p12',
409 'pem': 'pem',
410 'jks': 'jks',
411 }[output_type]
413def _get_keystore_source_paths(output_type, ext_cert_dir):
414 source_paths_template = {
415 'p12': "{0}keystore.p12:{0}keystore.pass",
416 'jks': "{0}keystore.jks:{0}keystore.pass",
417 'pem': "{0}keystore.pem:{0}key.pem",
418 }[output_type]
419 return source_paths_template.format(ext_cert_dir)
421def _get_keystore_destination_paths(output_type, tls_cert_dir):
422 destination_paths_template = {
423 'p12': "{0}cert.p12:{0}p12.pass",
424 'jks': "{0}cert.jks:{0}jks.pass",
425 'pem': "{0}cert.pem:{0}key.pem",
426 }[output_type]
427 return destination_paths_template.format(tls_cert_dir)
429def _process_port_map(port_map):
430 service_ports = [] # Ports exposed internally on the k8s network
431 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
432 for (cport, proto), hport in port_map.items():
433 name = "xport-{0}-{1}".format(proto[0].lower(), cport)
434 cport = int(cport)
435 hport = int(hport)
436 service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
437 if hport != 0: 437 ↛ 438line 437 didn't jump to line 438, because the condition on line 437 was never true
438 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
439 return service_ports, exposed_ports
441def _service_exists(location, namespace, component_name):
442 exists = False
443 try:
444 _configure_api(location)
445 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
446 exists = True
447 except client.rest.ApiException:
448 pass
450 return exists
452def _patch_deployment(location, namespace, deployment, modify):
453 '''
454 Gets the current spec for 'deployment' in 'namespace'
455 in the k8s cluster at 'location',
456 uses the 'modify' function to change the spec,
457 then sends the updated spec to k8s.
458 '''
459 _configure_api(location)
461 # Get deployment spec
462 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
464 # Apply changes to spec
465 spec = modify(spec)
467 # Patch the deploy with updated spec
468 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec)
470def _execute_command_in_pod(location, namespace, pod_name, command):
471 '''
472 Execute the command (specified by an argv-style list in the "command" parameter) in
473 the specified pod in the specified namespace at the specified location.
474 For now at least, we use this only to
475 run a notification script in a pod after a configuration change.
477 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
478 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
479 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
480 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
481 There are several issues tracking this, in various states. It isn't clear that there will ever
482 be a fix.
483 - https://github.com/kubernetes-client/python/issues/58
484 - https://github.com/kubernetes-client/python/issues/409
485 - https://github.com/kubernetes-client/python/issues/526
487 The main consequence of the workaround using "stream" is that the caller does not get an indication
488 of the exit code returned by the command when it completes execution. It turns out that the
489 original implementation of notification in the Docker plugin did not use this result, so we can
490 still match the original notification functionality.
492 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
493 We'll return that so it can logged.
494 '''
495 _configure_api(location)
496 try:
497 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
498 name=pod_name,
499 namespace=namespace,
500 command=command,
501 stdout=True,
502 stderr=True,
503 stdin=False,
504 tty=False)
505 except client.rest.ApiException as e:
506 # If the exception indicates the pod wasn't found, it's not a fatal error.
507 # It existed when we enumerated the pods for the deployment but no longer exists.
508 # Unfortunately, the only way to distinguish a pod not found from any other error
509 # is by looking at the reason text.
510 # (The ApiException's "status" field should contain the HTTP status code, which would
511 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
512 # to zero.)
513 if "404 not found" in e.reason.lower():
514 output = "Pod not found"
515 else:
516 raise e
518 return {"pod" : pod_name, "output" : output}
520def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
521 '''
522 This will create a k8s Deployment and, if needed, one or two k8s Services.
523 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
524 We're not exposing k8s to the component developer and the blueprint author.
525 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
526 the details from the component developer and the blueprint author.)
528 namespace: the Kubernetes namespace into which the component is deployed
529 component_name: the component name, used to derive names of Kubernetes entities
530 image: the docker image for the component being deployed
531 replica: the number of instances of the component to be deployed
532 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
533 the Docker image for the component, even if it is already present on the Kubernetes node.
534 k8sconfig contains:
535 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
536 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
537 - filebeat: a dictionary of filebeat sidecar parameters:
538 "log_path" : mount point for log volume in filebeat container
539 "data_path" : mount point for data volume in filebeat container
540 "config_path" : mount point for config volume in filebeat container
541 "config_subpath" : subpath for config data in filebeat container
542 "config_map" : ConfigMap holding the filebeat configuration
543 "image": Docker image to use for filebeat
544 - tls: a dictionary of TLS-related information:
545 "cert_path": mount point for certificate volume in init container
546 "image": Docker image to use for TLS init container
547 "component_cert_dir" : default mount point for certs
548 - truststore-merger: a dictionary of trustore-merger information:
549 "image_tag": docker image to use for truststore-merger init container
550 kwargs may have:
551 - volumes: array of volume objects, where a volume object is:
552 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
553 - ports: array of strings in the form "container_port:host_port"
554 - env: map of name-value pairs ( {name0: value0, name1: value1...}
555 - log_info: an object with info for setting up ELK logging, with the form:
556 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
557 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
558 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
559 - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
560 {"external_cert":
561 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
562 "use_external_tls": true or false,
563 "ca_name": "ca-name-value",
564 "cert_type": "P12" or "JKS" or "PEM",
565 "external_certificate_parameters":
566 "common_name": "common-name-value",
567 "sans": "sans-value"}
568 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
569 These label will be set on all the pods deployed as a result of this deploy() invocation.
570 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
571 - cpu: number CPU usage, like 0.5
572 - memory: string memory requirement, like "2Gi"
573 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
574 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
575 - interval: period (in seconds) between probes
576 - timeout: time (in seconds) to allow a probe to complete
577 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
578 - path: the full path to the script to be executed in the container for "script" and "docker" types
579 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes:
580 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
581 - interval: period (in seconds) between probes
582 - timeout: time (in seconds) to allow a probe to complete
583 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
584 - path: the full path to the script to be executed in the container for "script" and "docker" types
585 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
587 '''
589 deployment_ok = False
590 cip_service_created = False
591 deployment_description = {
592 "namespace": namespace,
593 "location" : kwargs.get("k8s_location"),
594 "deployment": '',
595 "services": []
596 }
598 try:
600 # Get API handles
601 _configure_api(kwargs.get("k8s_location"))
602 core = client.CoreV1Api()
603 k8s_apps_v1_api_client = client.AppsV1Api()
605 # Parse the port mapping
606 container_ports, port_map = parse_ports(kwargs.get("ports", []))
608 # Parse the volumes list into volumes and volume_mounts for the deployment
609 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
611 # Initialize the list of containers that will be part of the pod
612 containers = []
613 init_containers = []
615 # Set up the ELK logging sidecar container, if needed
616 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
618 # Set up TLS information
619 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
621 # Set up external TLS information
622 external_cert = kwargs.get("external_cert")
623 if external_cert and external_cert.get("use_external_tls"):
624 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, k8sconfig.get("external_cert"))
625 _add_truststore_merger_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, k8sconfig.get("tls"), external_cert, k8sconfig.get("truststore_merger"))
627 # Create the container for the component
628 # Make it the first container in the pod
629 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
630 container_args['container_ports'] = container_ports
631 container_args['volume_mounts'] = volume_mounts
632 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
634 # Build the k8s Deployment object
635 labels = kwargs.get("labels", {})
636 labels["app"] = component_name
637 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
639 # Have k8s deploy it
640 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep)
641 deployment_ok = True
642 deployment_description["deployment"] = _create_deployment_name(component_name)
644 # Create service(s), if a port mapping is specified
645 if port_map: 645 ↛ 670line 645 didn't jump to line 670, because the condition on line 645 was never false
646 service_ports, exposed_ports = _process_port_map(port_map)
648 # Create a ClusterIP service for access via the k8s network
649 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP")
650 core.create_namespaced_service(namespace, service)
651 cip_service_created = True
652 deployment_description["services"].append(_create_service_name(component_name))
654 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
655 if exposed_ports: 655 ↛ 656line 655 didn't jump to line 656
656 exposed_service = \
657 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
658 core.create_namespaced_service(namespace, exposed_service)
659 deployment_description["services"].append(_create_exposed_service_name(component_name))
661 except Exception as e:
662 # If the ClusterIP service was created, delete the service:
663 if cip_service_created:
664 core.delete_namespaced_service(_create_service_name(component_name), namespace)
665 # If the deployment was created but not the service, delete the deployment
666 if deployment_ok:
667 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
668 raise e
670 return dep, deployment_description
672def undeploy(deployment_description):
673 _configure_api(deployment_description["location"])
675 namespace = deployment_description["namespace"]
677 # remove any services associated with the component
678 for service in deployment_description["services"]:
679 client.CoreV1Api().delete_namespaced_service(service, namespace)
681 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
682 options = client.V1DeleteOptions(propagation_policy="Foreground")
683 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
685def is_available(location, namespace, component_name):
686 _configure_api(location)
687 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
688 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
689 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
690 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
692def scale(deployment_description, replicas):
693 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
695 def update_replica_count(spec):
696 spec.spec.replicas = replicas
697 return spec
699 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
701def upgrade(deployment_description, image, container_index = 0):
702 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
704 def update_image(spec):
705 spec.spec.template.spec.containers[container_index].image = image
706 return spec
708 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
710def rollback(deployment_description, rollback_to=0):
711 '''
712 Undo upgrade by rolling back to a previous revision of the deployment.
713 By default, go back one revision.
714 rollback_to can be used to supply a specific revision number.
715 Returns the image for the app container and the replica count from the rolled-back deployment
716 '''
717 '''
718 2018-07-13
719 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
720 The k8s python client code throws an exception while processing the response from the API.
721 See:
722 - https://github.com/kubernetes-client/python/issues/491
723 - https://github.com/kubernetes/kubernetes/pull/63837
724 The fix has been merged into the master branch but is not in the latest release.
725 '''
726 _configure_api(deployment_description["location"])
727 deployment = deployment_description["deployment"]
728 namespace = deployment_description["namespace"]
730 # Initiate the rollback
731 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
732 deployment,
733 namespace,
734 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
736 # Read back the spec for the rolled-back deployment
737 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
738 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
740def execute_command_in_deployment(deployment_description, command):
741 '''
742 Enumerates the pods in the k8s deployment identified by "deployment_description",
743 then executes the command (represented as an argv-style list) in "command" in
744 container 0 (the main application container) each of those pods.
746 Note that the sets of pods associated with a deployment can change over time. The
747 enumeration is a snapshot at one point in time. The command will not be executed in
748 pods that are created after the initial enumeration. If a pod disappears after the
749 initial enumeration and before the command is executed, the attempt to execute the
750 command will fail. This is not treated as a fatal error.
752 This approach is reasonable for the one current use case for "execute_command": running a
753 script to notify a container that its configuration has changed as a result of a
754 policy change. In this use case, the new configuration information is stored into
755 the configuration store (Consul), the pods are enumerated, and the command is executed.
756 If a pod disappears after the enumeration, the fact that the command cannot be run
757 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
758 comes up after the enumeration will get its initial configuration from the updated version
759 in Consul.
761 The optimal solution here would be for k8s to provide an API call to execute a command in
762 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
763 only call provided by k8s operates at the pod level, not the deployment level.
765 Another interesting k8s factoid: there's no direct way to list the pods belong to a
766 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
767 the pod that has the k8s deployment name. To list the pods, the code below queries for
768 pods with the label carrying the deployment name.
769 '''
770 location = deployment_description["location"]
771 _configure_api(location)
772 deployment = deployment_description["deployment"]
773 namespace = deployment_description["namespace"]
775 # Get names of all the running pods belonging to the deployment
776 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
777 namespace = namespace,
778 label_selector = "k8sdeployment={0}".format(deployment),
779 field_selector = "status.phase=Running"
780 ).items]
782 # Execute command in the running pods
783 return [_execute_command_in_pod(location, namespace, pod_name, command)
784 for pod_name in pod_names]