Coverage for k8sclient/k8sclient.py : 60%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# ============LICENSE_START=======================================================
2# org.onap.dcae
3# ================================================================================
4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5# Copyright (c) 2020 Pantheon.tech. All rights reserved.
6# Copyright (c) 2020-2021 Nokia. All rights reserved.
7# Copyright (c) 2020 J. F. Lucas. All rights reserved.
8# ================================================================================
9# Licensed under the Apache License, Version 2.0 (the "License");
10# you may not use this file except in compliance with the License.
11# You may obtain a copy of the License at
12#
13# http://www.apache.org/licenses/LICENSE-2.0
14#
15# Unless required by applicable law or agreed to in writing, software
16# distributed under the License is distributed on an "AS IS" BASIS,
17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18# See the License for the specific language governing permissions and
19# limitations under the License.
20# ============LICENSE_END=========================================================
21#
22from distutils import util
23import os
24import re
25import uuid
26import base64
28from binascii import hexlify
29from kubernetes import config, client, stream
30from .sans_parser import SansParser
32# Default values for readiness probe
33PROBE_DEFAULT_PERIOD = 15
34PROBE_DEFAULT_TIMEOUT = 1
36# Location of k8s cluster config file ("kubeconfig")
37K8S_CONFIG_PATH = "/opt/onap/kube/kubeconfig"
39# Regular expression for interval/timeout specification
40INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
41# Conversion factors to seconds
42FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
44# Regular expression for port mapping
45# group 1: container port
46# group 2: / + protocol
47# group 3: protocol
48# group 4: host port
49PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
51# Constants for external_cert
52MOUNT_PATH = "/etc/onap/oom/certservice/certs/"
53KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks"
54TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks"
55DEFAULT_CERT_TYPE = "p12"
58def _create_deployment_name(component_name):
59 return "dep-{0}".format(component_name)[:63]
62def _create_service_name(component_name):
63 return "{0}".format(component_name)[:63]
66def _create_exposed_service_name(component_name):
67 return ("x{0}".format(component_name))[:63]
70def _create_exposed_v6_service_name(component_name):
71 return ("x{0}-ipv6".format(component_name))[:63]
74def _configure_api(location=None):
75 # Look for a kubernetes config file
76 if os.path.exists(K8S_CONFIG_PATH):
77 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
78 else:
79 # Maybe we're running in a k8s container and we can use info provided by k8s
80 # We would like to use:
81 # config.load_incluster_config()
82 # but this looks into os.environ for kubernetes host and port, and from
83 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
84 # where we can set the environment to what we like.
85 # This is probably brittle! Maybe there's a better alternative.
86 localenv = {
87 config.incluster_config.SERVICE_HOST_ENV_NAME: "kubernetes.default.svc.cluster.local",
88 config.incluster_config.SERVICE_PORT_ENV_NAME: "443"
89 }
90 config.incluster_config.InClusterConfigLoader(
91 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
92 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
93 environ=localenv
94 ).load_and_set()
97def _parse_interval(t):
98 """
99 Parse an interval specification
100 t can be
101 - a simple integer quantity, interpreted as seconds
102 - a string representation of a decimal integer, interpreted as seconds
103 - a string consisting of a represention of an decimal integer followed by a unit,
104 with "s" representing seconds, "m" representing minutes,
105 and "h" representing hours
106 Used for compatibility with the Docker plugin, where time intervals
107 for health checks were specified as strings with a number and a unit.
108 See 'intervalspec' above for the regular expression that's accepted.
109 """
110 m = INTERVAL_SPEC.match(str(t))
111 if m:
112 time = int(m.group(1)) * FACTORS[m.group(2)]
113 else:
114 raise ValueError("Bad interval specification: {0}".format(t))
115 return time
118def _create_probe(hc, port):
119 """ Create a Kubernetes probe based on info in the health check dictionary hc """
120 probe_type = hc['type']
121 probe = None
122 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
123 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
124 if probe_type in ['http', 'https']:
125 probe = client.V1Probe(
126 failure_threshold=1,
127 initial_delay_seconds=5,
128 period_seconds=period,
129 timeout_seconds=timeout,
130 http_get=client.V1HTTPGetAction(
131 path=hc['endpoint'],
132 port=port,
133 scheme=probe_type.upper()
134 )
135 )
136 elif probe_type in ['script', 'docker']: 136 ↛ 146line 136 didn't jump to line 146, because the condition on line 136 was never false
137 probe = client.V1Probe(
138 failure_threshold=1,
139 initial_delay_seconds=5,
140 period_seconds=period,
141 timeout_seconds=timeout,
142 _exec=client.V1ExecAction(
143 command=hc['script'].split()
144 )
145 )
146 return probe
149def _create_resources(resources=None):
150 if resources is not None: 150 ↛ 157line 150 didn't jump to line 157, because the condition on line 150 was never false
151 resources_obj = client.V1ResourceRequirements(
152 limits=resources.get("limits"),
153 requests=resources.get("requests")
154 )
155 return resources_obj
156 else:
157 return None
160def _create_container_object(name, image, always_pull, **kwargs):
161 # Set up environment variables
162 # Copy any passed in environment variables
163 env = kwargs.get('env') or {}
164 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
165 # Add POD_IP with the IP address of the pod running the container
166 pod_ip = client.V1EnvVarSource(field_ref=client.V1ObjectFieldSelector(field_path="status.podIP"))
167 env_vars.append(client.V1EnvVar(name="POD_IP", value_from=pod_ip))
169 # If a health check is specified, create a readiness/liveness probe
170 # (For an HTTP-based check, we assume it's at the first container port)
171 readiness = kwargs.get('readiness')
172 liveness = kwargs.get('liveness')
173 resources = kwargs.get('resources')
174 container_ports = kwargs.get('container_ports') or []
176 hc_port = container_ports[0][0] if container_ports else None
177 probe = _create_probe(readiness, hc_port) if readiness else None
178 live_probe = _create_probe(liveness, hc_port) if liveness else None
179 resources_obj = _create_resources(resources) if resources else None
180 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
181 for port, proto in container_ports]
183 # Define container for pod
184 return client.V1Container(
185 name=name,
186 image=image,
187 image_pull_policy='Always' if always_pull else 'IfNotPresent',
188 env=env_vars,
189 ports=port_objs,
190 volume_mounts=kwargs.get('volume_mounts') or [],
191 resources=resources_obj,
192 readiness_probe=probe,
193 liveness_probe=live_probe
194 )
197def _create_deployment_object(component_name,
198 containers,
199 init_containers,
200 replicas,
201 volumes,
202 labels=None,
203 pull_secrets=None):
204 if labels is None: 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true
205 labels = {}
206 if pull_secrets is None: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true
207 pull_secrets = []
208 deployment_name = _create_deployment_name(component_name)
210 # Label the pod with the deployment name, so we can find it easily
211 labels.update({"k8sdeployment": deployment_name})
213 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
214 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
215 ips = []
216 for secret in pull_secrets:
217 ips.append(client.V1LocalObjectReference(name=secret))
219 # Define pod template
220 template = client.V1PodTemplateSpec(
221 metadata=client.V1ObjectMeta(labels=labels),
222 spec=client.V1PodSpec(hostname=component_name,
223 containers=containers,
224 init_containers=init_containers,
225 volumes=volumes,
226 image_pull_secrets=ips)
227 )
229 # Define deployment spec
230 spec = client.V1DeploymentSpec(
231 replicas=replicas,
232 selector=client.V1LabelSelector(match_labels=labels),
233 template=template
234 )
236 # Create deployment object
237 deployment = client.V1Deployment(
238 api_version="apps/v1",
239 kind="Deployment",
240 metadata=client.V1ObjectMeta(name=deployment_name, labels=labels),
241 spec=spec
242 )
244 return deployment
247def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family):
248 service_spec = client.V1ServiceSpec(
249 ports=service_ports,
250 selector={"app": component_name},
251 type=service_type,
252 ip_family=ip_family
253 )
254 if annotations: 254 ↛ 255line 254 didn't jump to line 255, because the condition on line 254 was never true
255 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
256 else:
257 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
259 service = client.V1Service(
260 kind="Service",
261 api_version="v1",
262 metadata=metadata,
263 spec=service_spec
264 )
265 return service
268def create_secret_with_password(namespace, secret_prefix, password_key, password_length):
269 """
270 Creates K8s secret object with a generated password.
271 Returns: secret name and data key.
273 Example usage:
274 create_secret_with_password('onap', 'dcae-keystore-password-', 128)
275 """
276 password = _generate_password(password_length)
277 password_base64 = _encode_base64(password)
279 metadata = {'generateName': secret_prefix, 'namespace': namespace}
280 key = password_key
281 data = {key: password_base64}
283 response = _create_k8s_secret(namespace, metadata, data, 'Opaque')
284 secret_name = response.metadata.name
285 return secret_name, key
288def _generate_password(length):
289 rand = os.urandom(length)
290 password = hexlify(rand)
291 return password.decode("ascii");
294def _encode_base64(value):
295 value_bytes = value.encode("ascii")
296 base64_encoded_bytes = base64.b64encode(value_bytes)
297 encoded_value = base64_encoded_bytes.decode("ascii")
298 return encoded_value
301def _create_k8s_secret(namespace, metadata, data, secret_type):
302 api_version = 'v1'
303 kind = 'Secret'
304 body = client.V1Secret(api_version, data, kind, metadata, type=secret_type)
306 response = client.CoreV1Api().create_namespaced_secret(namespace, body)
307 return response
310def parse_ports(port_list):
311 """
312 Parse the port list into a list of container ports (needed to create the container)
313 and to a set of port mappings to set up k8s services.
314 """
315 container_ports = []
316 port_map = {}
317 for p in port_list:
318 ipv6 = False
319 if type(p) is dict:
320 ipv6 = "ipv6" in p and p['ipv6']
321 p = "".join(str(v) for v in p['concat'])
322 m = PORTS.match(p.strip())
323 if m:
324 cport = int(m.group(1))
325 hport = int(m.group(4))
326 if m.group(3):
327 proto = (m.group(3)).upper()
328 else:
329 proto = "TCP"
330 port = (cport, proto)
331 if port not in container_ports: 331 ↛ 333line 331 didn't jump to line 333, because the condition on line 331 was never false
332 container_ports.append(port)
333 port_map[(cport, proto, ipv6)] = hport
334 else:
335 raise ValueError("Bad port specification: {0}".format(p))
337 return container_ports, port_map
340def _parse_volumes(volume_list):
341 volumes = []
342 volume_mounts = []
343 for v in volume_list:
344 vname = str(uuid.uuid4())
345 vcontainer = v['container']['bind']
346 vro = (v['container'].get('mode') == 'ro')
347 if ('host' in v) and ('path' in v['host']):
348 vhost = v['host']['path']
349 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
350 if ('config_volume' in v) and ('name' in v['config_volume']):
351 vconfig_volume = v['config_volume']['name']
352 volumes.append(client.V1Volume(name=vname, config_map=client.V1ConfigMapVolumeSource(default_mode=0o0644,
353 name=vconfig_volume,
354 optional=True)))
355 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
357 return volumes, volume_mounts
360def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
361 if not log_info or not filebeat: 361 ↛ 362line 361 didn't jump to line 362, because the condition on line 361 was never true
362 return
363 log_dir = log_info.get("log_directory")
364 if not log_dir: 364 ↛ 365line 364 didn't jump to line 365, because the condition on line 364 was never true
365 return
366 sidecar_volume_mounts = []
368 # Create the volume for component log files and volume mounts for the component and sidecar containers
369 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
370 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
371 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
372 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
374 # Create the volume for sidecar data and the volume mount for it
375 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
376 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
378 # Create the volume for the sidecar configuration data and the volume mount for it
379 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
380 volumes.append(
381 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
382 sidecar_volume_mounts.append(
383 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"],
384 sub_path=filebeat["config_subpath"]))
386 # Finally create the container for the sidecar
387 containers.append(
388 _create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
391def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config):
392 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a server(
393 # tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
394 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not
395 # specified), the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
396 # In either case, the certificate directory is mounted onto the component container filesystem at the location
397 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point (
398 # tls_config["component_cert_dir"]).
399 docker_image = tls_config["image"]
400 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]")
402 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
403 env = {}
404 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
406 # Create the certificate volume and volume mounts
407 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
408 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
409 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
411 # Create the init container
412 init_containers.append(
413 _create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
416def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config):
417 # Adds an InitContainer to the pod which will generate external TLS certificates.
418 docker_image = external_tls_config["image_tag"]
419 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]")
421 env = {}
422 output_path = external_cert.get("external_cert_directory")
423 if not output_path.endswith('/'): 423 ↛ 424line 423 didn't jump to line 424, because the condition on line 423 was never true
424 output_path += '/'
426 env["REQUEST_URL"] = external_tls_config.get("request_url")
427 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
428 env["OUTPUT_PATH"] = output_path + "external"
429 env["OUTPUT_TYPE"] = external_cert.get("cert_type")
430 env["CA_NAME"] = external_cert.get("ca_name")
431 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name")
432 env["ORGANIZATION"] = external_tls_config.get("organization")
433 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit")
434 env["LOCATION"] = external_tls_config.get("location")
435 env["STATE"] = external_tls_config.get("state")
436 env["COUNTRY"] = external_tls_config.get("country")
437 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans")
438 env["KEYSTORE_PATH"] = KEYSTORE_PATH
439 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password")
440 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH
441 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password")
443 # Create the volumes and volume mounts
444 sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name"))
445 volumes.append(client.V1Volume(name="tls-volume", secret=sec))
446 init_volume_mounts = [
447 client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
448 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
450 # Create the init container
451 init_containers.append(
452 _create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env))
455def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert,
456 cert_post_processor_config, isCertManagerIntegration):
457 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file.
458 docker_image = cert_post_processor_config["image_tag"]
459 ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]")
461 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
462 if not tls_cert_dir.endswith('/'): 462 ↛ 465line 462 didn't jump to line 465, because the condition on line 462 was never false
463 tls_cert_dir += '/'
465 tls_cert_file_path = tls_cert_dir + "trust.jks"
466 tls_cert_file_pass = tls_cert_dir + "trust.pass"
468 ext_cert_dir = tls_cert_dir + "external/"
470 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower()
471 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type)
472 ext_truststore_pass = ''
473 if output_type != 'pem': 473 ↛ 476line 473 didn't jump to line 476, because the condition on line 473 was never false
474 ext_truststore_pass = ext_cert_dir + "truststore.pass"
476 env = {"TRUSTSTORES_PATHS": tls_cert_file_path + ":" + ext_truststore_path,
477 "TRUSTSTORES_PASSWORDS_PATHS": tls_cert_file_pass + ":" + ext_truststore_pass,
478 "KEYSTORE_SOURCE_PATHS": _get_keystore_source_paths(output_type, ext_cert_dir),
479 "KEYSTORE_DESTINATION_PATHS": _get_keystore_destination_paths(output_type, tls_cert_dir)}
481 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"])
482 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"])
483 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"])
484 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"])
486 # Create the volumes and volume mounts
487 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)]
488 if isCertManagerIntegration: 488 ↛ 489line 488 didn't jump to line 489, because the condition on line 488 was never true
489 init_volume_mounts.append(client.V1VolumeMount(
490 name="certmanager-certs-volume", mount_path=ext_cert_dir))
491 # Create the init container
492 init_containers.append(
493 _create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env))
496def _get_file_extension(output_type):
497 return {
498 'p12': 'p12',
499 'pem': 'pem',
500 'jks': 'jks',
501 }[output_type]
504def _get_keystore_source_paths(output_type, ext_cert_dir):
505 source_paths_template = {
506 'p12': "{0}keystore.p12:{0}keystore.pass",
507 'jks': "{0}keystore.jks:{0}keystore.pass",
508 'pem': "{0}keystore.pem:{0}key.pem",
509 }[output_type]
510 return source_paths_template.format(ext_cert_dir)
513def _get_keystore_destination_paths(output_type, tls_cert_dir):
514 destination_paths_template = {
515 'p12': "{0}cert.p12:{0}p12.pass",
516 'jks': "{0}cert.jks:{0}jks.pass",
517 'pem': "{0}cert.pem:{0}key.pem",
518 }[output_type]
519 return destination_paths_template.format(tls_cert_dir)
522def _process_port_map(port_map):
523 service_ports = [] # Ports exposed internally on the k8s network
524 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
525 exposed_ports_ipv6 = []
526 for (cport, proto, ipv6), hport in port_map.items():
527 name = "xport-{0}-{1}".format(proto[0].lower(), cport)
528 cport = int(cport)
529 hport = int(hport)
530 port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:])
531 if port not in service_ports: 531 ↛ 533line 531 didn't jump to line 533, because the condition on line 531 was never false
532 service_ports.append(port)
533 if hport != 0: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true
534 if ipv6:
535 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
536 else:
537 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
538 return service_ports, exposed_ports, exposed_ports_ipv6
541def _service_exists(location, namespace, component_name):
542 exists = False
543 try:
544 _configure_api(location)
545 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
546 exists = True
547 except client.rest.ApiException:
548 pass
550 return exists
553def _patch_deployment(location, namespace, deployment, modify):
554 '''
555 Gets the current spec for 'deployment' in 'namespace'
556 in the k8s cluster at 'location',
557 uses the 'modify' function to change the spec,
558 then sends the updated spec to k8s.
559 '''
560 _configure_api(location)
562 # Get deployment spec
563 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
565 # Apply changes to spec
566 spec = modify(spec)
568 # Patch the deploy with updated spec
569 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec)
572def _execute_command_in_pod(location, namespace, pod_name, command):
573 '''
574 Execute the command (specified by an argv-style list in the "command" parameter) in
575 the specified pod in the specified namespace at the specified location.
576 For now at least, we use this only to
577 run a notification script in a pod after a configuration change.
579 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
580 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
581 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
582 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
583 There are several issues tracking this, in various states. It isn't clear that there will ever
584 be a fix.
585 - https://github.com/kubernetes-client/python/issues/58
586 - https://github.com/kubernetes-client/python/issues/409
587 - https://github.com/kubernetes-client/python/issues/526
589 The main consequence of the workaround using "stream" is that the caller does not get an indication
590 of the exit code returned by the command when it completes execution. It turns out that the
591 original implementation of notification in the Docker plugin did not use this result, so we can
592 still match the original notification functionality.
594 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
595 We'll return that so it can logged.
596 '''
597 _configure_api(location)
598 try:
599 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
600 name=pod_name,
601 namespace=namespace,
602 command=command,
603 stdout=True,
604 stderr=True,
605 stdin=False,
606 tty=False)
607 except client.rest.ApiException as e:
608 # If the exception indicates the pod wasn't found, it's not a fatal error.
609 # It existed when we enumerated the pods for the deployment but no longer exists.
610 # Unfortunately, the only way to distinguish a pod not found from any other error
611 # is by looking at the reason text.
612 # (The ApiException's "status" field should contain the HTTP status code, which would
613 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
614 # to zero.)
615 if "404 not found" in e.reason.lower():
616 output = "Pod not found"
617 else:
618 raise e
620 return {"pod": pod_name, "output": output}
623def _create_certificate_subject(external_tls_config):
624 """
625 Map parameters to custom resource subject
626 """
627 organization = external_tls_config.get("organization")
628 organization_unit = external_tls_config.get("organizational_unit")
629 country = external_tls_config.get("country")
630 location = external_tls_config.get("location")
631 state = external_tls_config.get("state")
632 subject = {
633 "organizations": [organization],
634 "countries": [country],
635 "localities": [location],
636 "provinces": [state],
637 "organizationalUnits": [organization_unit]
638 }
639 return subject
642def _create_keystores_object(type, password_secret):
643 """
644 Create keystore property (JKS and PKC12 certificate) for custom resource
645 """
646 return {type: {
647 "create": True,
648 "passwordSecretRef": {
649 "name": password_secret,
650 "key": "password"
651 }}}
654def _get_keystores_object_type(output_type):
655 """
656 Map config type to custom resource cert type
657 """
658 return {
659 'p12': 'pkcs12',
660 'jks': 'jks',
661 }[output_type]
664def _create_projected_volume_with_password(cert_type, cert_secret_name, password_secret_name, password_secret_key):
665 """
666 Create volume for password protected certificates.
667 Secret contains passwords must be provided
668 """
669 extension = _get_file_extension(cert_type)
670 keystore_file_name = "keystore." + extension
671 truststore_file_name = "truststore." + extension
672 items = [client.V1KeyToPath(key=keystore_file_name, path=keystore_file_name),
673 client.V1KeyToPath(key=truststore_file_name, path=truststore_file_name)]
674 passwords = [client.V1KeyToPath(key=password_secret_key, path="keystore.pass"), client.V1KeyToPath(key=password_secret_key, path="truststore.pass")]
676 sec_projection = client.V1SecretProjection(name=cert_secret_name, items=items)
677 sec_passwords_projection = client.V1SecretProjection(name=password_secret_name, items=passwords)
678 sec_volume_projection = client.V1VolumeProjection(secret=sec_projection)
679 sec_passwords_volume_projection = client.V1VolumeProjection(secret=sec_passwords_projection)
681 return [sec_volume_projection, sec_passwords_volume_projection]
684def _create_pem_projected_volume(cert_secret_name):
685 """
686 Create volume for pem certificate
687 """
688 items = [client.V1KeyToPath(key="tls.crt", path="keystore.pem"),
689 client.V1KeyToPath(key="ca.crt", path="truststore.pem"),
690 client.V1KeyToPath(key="tls.key", path="key.pem")]
691 sec_projection = client.V1SecretProjection(name=cert_secret_name, items=items)
692 return [client.V1VolumeProjection(secret=sec_projection)]
695def create_certificate_object(ctx, cert_secret_name, external_cert_data, external_tls_config, cert_name, issuer):
696 """
697 Create cert-manager certificate custom resource object
698 """
699 common_name = external_cert_data.get("external_certificate_parameters").get("common_name")
700 subject = _create_certificate_subject(external_tls_config)
702 custom_resource = {
703 "apiVersion": "cert-manager.io/v1",
704 "kind": "Certificate",
705 "metadata": {"name": cert_name },
706 "spec": {
707 "secretName": cert_secret_name,
708 "commonName": common_name,
709 "issuerRef": {
710 "group": "certmanager.onap.org",
711 "kind": "CMPv2Issuer",
712 "name": issuer
713 }
714 }
715 }
716 custom_resource.get("spec")["subject"] = subject
718 raw_sans = external_cert_data.get("external_certificate_parameters").get("sans")
719 ctx.logger.info("Read SANS property: " + str(raw_sans))
720 sans = SansParser().parse_sans(raw_sans)
721 ctx.logger.info("Parsed SANS: " + str(sans))
723 if len(sans["ips"]) > 0:
724 custom_resource.get("spec")["ipAddresses"] = sans["ips"]
725 if len(sans["dnss"]) > 0:
726 custom_resource.get("spec")["dnsNames"] = sans["dnss"]
727 if len(sans["emails"]) > 0:
728 custom_resource.get("spec")["emailAddresses"] = sans["emails"]
729 if len(sans["uris"]) > 0:
730 custom_resource.get("spec")["uris"] = sans["uris"]
732 return custom_resource
735def _create_certificate_custom_resource(ctx, external_cert_data, external_tls_config, issuer, namespace, component_name, volumes, volume_mounts, deployment_description):
736 """
737 Create certificate custom resource for provided configuration
738 :param ctx: context
739 :param external_cert_data: object contains certificate common name and
740 SANs list
741 :param external_tls_config: object contains information about certificate subject
742 :param issuer: issuer-name
743 :param namespace: namespace
744 :param component_name: component name
745 :param volumes: list of deployment volume
746 :param volume_mounts: list of deployment volume mounts
747 :param deployment_description: list contains deployment information,
748 method appends created cert and secrets
749 """
750 ctx.logger.info("Creating certificate custom resource")
751 ctx.logger.info("External cert data: " + str(external_cert_data))
753 cert_type = (external_cert_data.get("cert_type") or DEFAULT_CERT_TYPE).lower()
755 api = client.CustomObjectsApi()
756 cert_secret_name = component_name + "-secret"
757 cert_name = component_name + "-cert"
758 cert_dir = external_cert_data.get("external_cert_directory") + "external/"
759 custom_resource = create_certificate_object(ctx, cert_secret_name,
760 external_cert_data,
761 external_tls_config,
762 cert_name, issuer)
763 # Create the volumes
764 if cert_type != 'pem':
765 ctx.logger.info("Creating volume with passwords")
766 password_secret_name, password_secret_key = create_secret_with_password(namespace, component_name + "-cert-password", "password", 30)
767 deployment_description["secrets"].append(password_secret_name)
768 custom_resource.get("spec")["keystores"] = _create_keystores_object(_get_keystores_object_type(cert_type), password_secret_name)
769 projected_volume_sources = _create_projected_volume_with_password(
770 cert_type, cert_secret_name, password_secret_name, password_secret_key)
771 else:
772 ctx.logger.info("Creating PEM volume")
773 projected_volume_sources = _create_pem_projected_volume(cert_secret_name)
775 # Create the volume mounts
776 projected_volume = client.V1ProjectedVolumeSource(sources=projected_volume_sources)
777 volumes.append(client.V1Volume(name="certmanager-certs-volume", projected=projected_volume))
778 volume_mounts.append(client.V1VolumeMount(name="certmanager-certs-volume", mount_path=cert_dir))
780 #Create certificate custom resource
781 ctx.logger.info("Certificate CRD: " + str(custom_resource))
782 api.create_namespaced_custom_object(
783 group="cert-manager.io",
784 version="v1",
785 namespace=namespace,
786 plural="certificates",
787 body=custom_resource
788 )
789 deployment_description["certificates"].append(cert_name)
790 deployment_description["secrets"].append(cert_secret_name)
791 ctx.logger.info("CRD certificate created")
794def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
795 """
796 This will create a k8s Deployment and, if needed, one or two k8s Services.
797 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
798 We're not exposing k8s to the component developer and the blueprint author.
799 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
800 the details from the component developer and the blueprint author.)
802 namespace: the Kubernetes namespace into which the component is deployed
803 component_name: the component name, used to derive names of Kubernetes entities
804 image: the docker image for the component being deployed
805 replica: the number of instances of the component to be deployed
806 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
807 the Docker image for the component, even if it is already present on the Kubernetes node.
808 k8sconfig contains:
809 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
810 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
811 - filebeat: a dictionary of filebeat sidecar parameters:
812 "log_path" : mount point for log volume in filebeat container
813 "data_path" : mount point for data volume in filebeat container
814 "config_path" : mount point for config volume in filebeat container
815 "config_subpath" : subpath for config data in filebeat container
816 "config_map" : ConfigMap holding the filebeat configuration
817 "image": Docker image to use for filebeat
818 - tls: a dictionary of TLS-related information:
819 "cert_path": mount point for certificate volume in init container
820 "image": Docker image to use for TLS init container
821 "component_cert_dir" : default mount point for certs
822 - cert_post_processor: a dictionary of cert_post_processor information:
823 "image_tag": docker image to use for cert-post-processor init container
824 kwargs may have:
825 - volumes: array of volume objects, where a volume object is:
826 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
827 - ports: array of strings in the form "container_port:host_port"
828 - env: map of name-value pairs ( {name0: value0, name1: value1...}
829 - log_info: an object with info for setting up ELK logging, with the form:
830 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
831 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
832 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
833 - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
834 {"external_cert":
835 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
836 "use_external_tls": true or false,
837 "ca_name": "ca-name-value",
838 "cert_type": "P12" or "JKS" or "PEM",
839 "external_certificate_parameters":
840 "common_name": "common-name-value",
841 "sans": "sans-value"}
842 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
843 These label will be set on all the pods deployed as a result of this deploy() invocation.
844 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
845 - cpu: number CPU usage, like 0.5
846 - memory: string memory requirement, like "2Gi"
847 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
848 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
849 - interval: period (in seconds) between probes
850 - timeout: time (in seconds) to allow a probe to complete
851 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
852 - path: the full path to the script to be executed in the container for "script" and "docker" types
853 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes:
854 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
855 - interval: period (in seconds) between probes
856 - timeout: time (in seconds) to allow a probe to complete
857 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
858 - path: the full path to the script to be executed in the container for "script" and "docker" types
859 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
861 """
863 deployment_ok = False
864 cip_service_created = False
865 deployment_description = {
866 "namespace": namespace,
867 "location": kwargs.get("k8s_location"),
868 "deployment": '',
869 "services": [],
870 "certificates": [],
871 "secrets": []
872 }
874 try:
876 # Get API handles
877 _configure_api(kwargs.get("k8s_location"))
878 core = client.CoreV1Api()
879 k8s_apps_v1_api_client = client.AppsV1Api()
881 # Parse the port mapping
882 container_ports, port_map = parse_ports(kwargs.get("ports", []))
884 # Parse the volumes list into volumes and volume_mounts for the deployment
885 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
887 # Initialize the list of containers that will be part of the pod
888 containers = []
889 init_containers = []
891 # Set up the ELK logging sidecar container, if needed
892 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"),
893 k8sconfig.get("filebeat"))
895 # Set up TLS information
896 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {},
897 k8sconfig.get("tls"))
899 # Set up external TLS information
900 external_cert = kwargs.get("external_cert")
901 cmpv2_issuer_config = k8sconfig.get("cmpv2_issuer")
902 ctx.logger.info("CMPv2 Issuer properties: " + str(cmpv2_issuer_config))
904 cmpv2_integration_enabled = bool(util.strtobool(cmpv2_issuer_config.get("enabled")))
905 ctx.logger.info("CMPv2 integration enabled: " + str(cmpv2_integration_enabled))
908 if external_cert and external_cert.get("use_external_tls"):
909 if cmpv2_integration_enabled: 909 ↛ 910line 909 didn't jump to line 910, because the condition on line 909 was never true
910 _create_certificate_custom_resource(ctx, external_cert,
911 k8sconfig.get("external_cert"),
912 cmpv2_issuer_config.get("name"),
913 namespace,
914 component_name, volumes,
915 volume_mounts, deployment_description)
916 else:
917 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert,
918 k8sconfig.get("external_cert"))
919 _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {},
920 k8sconfig.get("tls"), external_cert,
921 k8sconfig.get(
922 "cert_post_processor"),cmpv2_integration_enabled)
924 # Create the container for the component
925 # Make it the first container in the pod
926 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
927 container_args['container_ports'] = container_ports
928 container_args['volume_mounts'] = volume_mounts
929 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
931 # Build the k8s Deployment object
932 labels = kwargs.get("labels", {})
933 labels["app"] = component_name
934 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels,
935 pull_secrets=k8sconfig["image_pull_secrets"])
937 # Have k8s deploy it
938 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep)
939 deployment_ok = True
940 deployment_description["deployment"] = _create_deployment_name(component_name)
942 # Create service(s), if a port mapping is specified
943 if port_map: 943 ↛ 978line 943 didn't jump to line 978, because the condition on line 943 was never false
944 service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map)
946 # Create a ClusterIP service for access via the k8s network
947 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None,
948 labels, "ClusterIP", "IPv4")
949 core.create_namespaced_service(namespace, service)
950 cip_service_created = True
951 deployment_description["services"].append(_create_service_name(component_name))
953 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
954 if exposed_ports: 954 ↛ 955line 954 didn't jump to line 955
955 exposed_service = \
956 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports,
957 '', labels, "NodePort", "IPv4")
958 core.create_namespaced_service(namespace, exposed_service)
959 deployment_description["services"].append(_create_exposed_service_name(component_name))
961 if exposed_ports_ipv6: 961 ↛ 962line 961 didn't jump to line 962
962 exposed_service_ipv6 = \
963 _create_service_object(_create_exposed_v6_service_name(component_name), component_name,
964 exposed_ports_ipv6, '', labels, "NodePort", "IPv6")
965 core.create_namespaced_service(namespace, exposed_service_ipv6)
966 deployment_description["services"].append(_create_exposed_v6_service_name(component_name))
968 except Exception as e:
969 # If the ClusterIP service was created, delete the service:
970 if cip_service_created:
971 core.delete_namespaced_service(_create_service_name(component_name), namespace)
972 # If the deployment was created but not the service, delete the deployment
973 if deployment_ok:
974 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace,
975 body=client.V1DeleteOptions())
976 raise e
978 return dep, deployment_description
981def undeploy(deployment_description):
982 _configure_api(deployment_description["location"])
984 namespace = deployment_description["namespace"]
986 # remove any services associated with the component
987 for service in deployment_description["services"]:
988 client.CoreV1Api().delete_namespaced_service(service, namespace)
990 for secret in deployment_description["secrets"]:
991 client.CoreV1Api().delete_namespaced_secret(secret, namespace)
993 for cert in deployment_description["certificates"]:
994 # client.CoreV1Api().delete_namespaced_service(service, namespace)
995 client.CustomObjectsApi().delete_namespaced_custom_object(
996 group="cert-manager.io",
997 version="v1",
998 name=cert,
999 namespace=namespace,
1000 plural="certificates"
1001 )
1002 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
1003 options = client.V1DeleteOptions(propagation_policy="Foreground")
1004 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
1007def is_available(location, namespace, component_name):
1008 _configure_api(location)
1009 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name),
1010 namespace)
1011 # Check if the number of available replicas is equal to the number requested and that the replicas match the
1012 # current spec This check can be used to verify completion of an initial deployment, a scale operation,
1013 # or an update operation
1014 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
1017def scale(deployment_description, replicas):
1018 """ Trigger a scaling operation by updating the replica count for the Deployment """
1020 def update_replica_count(spec):
1021 spec.spec.replicas = replicas
1022 return spec
1024 _patch_deployment(deployment_description["location"], deployment_description["namespace"],
1025 deployment_description["deployment"], update_replica_count)
1028def upgrade(deployment_description, image, container_index=0):
1029 """ Trigger a rolling upgrade by sending a new image name/tag to k8s """
1031 def update_image(spec):
1032 spec.spec.template.spec.containers[container_index].image = image
1033 return spec
1035 _patch_deployment(deployment_description["location"], deployment_description["namespace"],
1036 deployment_description["deployment"], update_image)
1039def rollback(deployment_description, rollback_to=0):
1040 """
1041 Undo upgrade by rolling back to a previous revision of the deployment.
1042 By default, go back one revision.
1043 rollback_to can be used to supply a specific revision number.
1044 Returns the image for the app container and the replica count from the rolled-back deployment
1045 """
1046 '''
1047 2018-07-13
1048 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
1049 The k8s python client code throws an exception while processing the response from the API.
1050 See:
1051 - https://github.com/kubernetes-client/python/issues/491
1052 - https://github.com/kubernetes/kubernetes/pull/63837
1053 The fix has been merged into the master branch but is not in the latest release.
1054 '''
1055 _configure_api(deployment_description["location"])
1056 deployment = deployment_description["deployment"]
1057 namespace = deployment_description["namespace"]
1059 # Initiate the rollback
1060 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
1061 deployment,
1062 namespace,
1063 client.AppsV1beta1DeploymentRollback(name=deployment,
1064 rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
1066 # Read back the spec for the rolled-back deployment
1067 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
1068 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
1071def execute_command_in_deployment(deployment_description, command):
1072 """
1073 Enumerates the pods in the k8s deployment identified by "deployment_description",
1074 then executes the command (represented as an argv-style list) in "command" in
1075 container 0 (the main application container) each of those pods.
1077 Note that the sets of pods associated with a deployment can change over time. The
1078 enumeration is a snapshot at one point in time. The command will not be executed in
1079 pods that are created after the initial enumeration. If a pod disappears after the
1080 initial enumeration and before the command is executed, the attempt to execute the
1081 command will fail. This is not treated as a fatal error.
1083 This approach is reasonable for the one current use case for "execute_command": running a
1084 script to notify a container that its configuration has changed as a result of a
1085 policy change. In this use case, the new configuration information is stored into
1086 the configuration store (Consul), the pods are enumerated, and the command is executed.
1087 If a pod disappears after the enumeration, the fact that the command cannot be run
1088 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
1089 comes up after the enumeration will get its initial configuration from the updated version
1090 in Consul.
1092 The optimal solution here would be for k8s to provide an API call to execute a command in
1093 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
1094 only call provided by k8s operates at the pod level, not the deployment level.
1096 Another interesting k8s factoid: there's no direct way to list the pods belong to a
1097 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
1098 the pod that has the k8s deployment name. To list the pods, the code below queries for
1099 pods with the label carrying the deployment name.
1100 """
1101 location = deployment_description["location"]
1102 _configure_api(location)
1103 deployment = deployment_description["deployment"]
1104 namespace = deployment_description["namespace"]
1106 # Get names of all the running pods belonging to the deployment
1107 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
1108 namespace=namespace,
1109 label_selector="k8sdeployment={0}".format(deployment),
1110 field_selector="status.phase=Running"
1111 ).items]
1113 # Execute command in the running pods
1114 return [_execute_command_in_pod(location, namespace, pod_name, command)
1115 for pod_name in pod_names]