Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020 Nokia. All rights reserved. 

7# ================================================================================ 

8# Licensed under the Apache License, Version 2.0 (the "License"); 

9# you may not use this file except in compliance with the License. 

10# You may obtain a copy of the License at 

11# 

12# http://www.apache.org/licenses/LICENSE-2.0 

13# 

14# Unless required by applicable law or agreed to in writing, software 

15# distributed under the License is distributed on an "AS IS" BASIS, 

16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19# ============LICENSE_END========================================================= 

20# 

21import os 

22import re 

23import uuid 

24from kubernetes import config, client, stream 

25 

26# Default values for readiness probe 

27PROBE_DEFAULT_PERIOD = 15 

28PROBE_DEFAULT_TIMEOUT = 1 

29 

30# Location of k8s cluster config file ("kubeconfig") 

31K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig" 

32 

33# Regular expression for interval/timeout specification 

34INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") 

35# Conversion factors to seconds 

36FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} 

37 

38# Regular expression for port mapping 

39# group 1: container port 

40# group 2: / + protocol 

41# group 3: protocol 

42# group 4: host port 

43PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") 

44 

45# Constants for external_cert 

46MOUNT_PATH = "/etc/onap/aaf/certservice/certs/" 

47KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks" 

48TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks" 

49CERT_SECRET_NAME = "aaf-cert-service-client-tls-secret" 

50 

51def _create_deployment_name(component_name): 

52 return "dep-{0}".format(component_name)[:63] 

53 

54def _create_service_name(component_name): 

55 return "{0}".format(component_name)[:63] 

56 

57def _create_exposed_service_name(component_name): 

58 return ("x{0}".format(component_name))[:63] 

59 

60def _configure_api(location=None): 

61 # Look for a kubernetes config file 

62 if os.path.exists(K8S_CONFIG_PATH): 

63 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) 

64 else: 

65 # Maybe we're running in a k8s container and we can use info provided by k8s 

66 # We would like to use: 

67 # config.load_incluster_config() 

68 # but this looks into os.environ for kubernetes host and port, and from 

69 # the plugin those aren't visible. So we use the InClusterConfigLoader class, 

70 # where we can set the environment to what we like. 

71 # This is probably brittle! Maybe there's a better alternative. 

72 localenv = { 

73 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local", 

74 config.incluster_config.SERVICE_PORT_ENV_NAME : "443" 

75 } 

76 config.incluster_config.InClusterConfigLoader( 

77 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME, 

78 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME, 

79 environ=localenv 

80 ).load_and_set() 

81 

82def _parse_interval(t): 

83 """ 

84 Parse an interval specification 

85 t can be 

86 - a simple integer quantity, interpreted as seconds 

87 - a string representation of a decimal integer, interpreted as seconds 

88 - a string consisting of a represention of an decimal integer followed by a unit, 

89 with "s" representing seconds, "m" representing minutes, 

90 and "h" representing hours 

91 Used for compatibility with the Docker plugin, where time intervals 

92 for health checks were specified as strings with a number and a unit. 

93 See 'intervalspec' above for the regular expression that's accepted. 

94 """ 

95 m = INTERVAL_SPEC.match(str(t)) 

96 if m: 

97 time = int(m.group(1)) * FACTORS[m.group(2)] 

98 else: 

99 raise ValueError("Bad interval specification: {0}".format(t)) 

100 return time 

101 

102def _create_probe(hc, port): 

103 ''' Create a Kubernetes probe based on info in the health check dictionary hc ''' 

104 probe_type = hc['type'] 

105 probe = None 

106 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD)) 

107 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT)) 

108 if probe_type in ['http', 'https']: 

109 probe = client.V1Probe( 

110 failure_threshold = 1, 

111 initial_delay_seconds = 5, 

112 period_seconds = period, 

113 timeout_seconds = timeout, 

114 http_get = client.V1HTTPGetAction( 

115 path = hc['endpoint'], 

116 port = port, 

117 scheme = probe_type.upper() 

118 ) 

119 ) 

120 elif probe_type in ['script', 'docker']: 120 ↛ 130line 120 didn't jump to line 130, because the condition on line 120 was never false

121 probe = client.V1Probe( 

122 failure_threshold = 1, 

123 initial_delay_seconds = 5, 

124 period_seconds = period, 

125 timeout_seconds = timeout, 

126 _exec = client.V1ExecAction( 

127 command = hc['script'].split( ) 

128 ) 

129 ) 

130 return probe 

131 

132def _create_resources(resources=None): 

133 if resources is not None: 133 ↛ 140line 133 didn't jump to line 140, because the condition on line 133 was never false

134 resources_obj = client.V1ResourceRequirements( 

135 limits = resources.get("limits"), 

136 requests = resources.get("requests") 

137 ) 

138 return resources_obj 

139 else: 

140 return None 

141 

142def _create_container_object(name, image, always_pull, **kwargs): 

143 # Set up environment variables 

144 # Copy any passed in environment variables 

145 env = kwargs.get('env') or {} 

146 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env] 

147 # Add POD_IP with the IP address of the pod running the container 

148 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP")) 

149 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip)) 

150 

151 # If a health check is specified, create a readiness/liveness probe 

152 # (For an HTTP-based check, we assume it's at the first container port) 

153 readiness = kwargs.get('readiness') 

154 liveness = kwargs.get('liveness') 

155 resources = kwargs.get('resources') 

156 container_ports = kwargs.get('container_ports') or [] 

157 

158 hc_port = container_ports[0][0] if container_ports else None 

159 probe = _create_probe(readiness, hc_port) if readiness else None 

160 live_probe = _create_probe(liveness, hc_port) if liveness else None 

161 resources_obj = _create_resources(resources) if resources else None 

162 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto) 

163 for port, proto in container_ports] 

164 

165 # Define container for pod 

166 return client.V1Container( 

167 name=name, 

168 image=image, 

169 image_pull_policy='Always' if always_pull else 'IfNotPresent', 

170 env=env_vars, 

171 ports=port_objs, 

172 volume_mounts=kwargs.get('volume_mounts') or [], 

173 resources=resources_obj, 

174 readiness_probe=probe, 

175 liveness_probe=live_probe 

176 ) 

177 

178def _create_deployment_object(component_name, 

179 containers, 

180 init_containers, 

181 replicas, 

182 volumes, 

183 labels={}, 

184 pull_secrets=[]): 

185 

186 deployment_name = _create_deployment_name(component_name) 

187 

188 # Label the pod with the deployment name, so we can find it easily 

189 labels.update({"k8sdeployment" : deployment_name}) 

190 

191 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials 

192 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod 

193 ips = [] 

194 for secret in pull_secrets: 

195 ips.append(client.V1LocalObjectReference(name=secret)) 

196 

197 # Define pod template 

198 template = client.V1PodTemplateSpec( 

199 metadata=client.V1ObjectMeta(labels=labels), 

200 spec=client.V1PodSpec(hostname=component_name, 

201 containers=containers, 

202 init_containers=init_containers, 

203 volumes=volumes, 

204 image_pull_secrets=ips) 

205 ) 

206 

207 # Define deployment spec 

208 spec = client.V1DeploymentSpec( 

209 replicas=replicas, 

210 selector=client.V1LabelSelector(match_labels=labels), 

211 template=template 

212 ) 

213 

214 # Create deployment object 

215 deployment = client.V1Deployment( 

216 api_version="apps/v1", 

217 kind="Deployment", 

218 metadata=client.V1ObjectMeta(name=deployment_name), 

219 spec=spec 

220 ) 

221 

222 return deployment 

223 

224def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type): 

225 service_spec = client.V1ServiceSpec( 

226 ports=service_ports, 

227 selector={"app" : component_name}, 

228 type=service_type 

229 ) 

230 if annotations: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true

231 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations) 

232 else: 

233 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels) 

234 

235 service = client.V1Service( 

236 kind="Service", 

237 api_version="v1", 

238 metadata=metadata, 

239 spec=service_spec 

240 ) 

241 return service 

242 

243def parse_ports(port_list): 

244 ''' 

245 Parse the port list into a list of container ports (needed to create the container) 

246 and to a set of port mappings to set up k8s services. 

247 ''' 

248 container_ports = [] 

249 port_map = {} 

250 for p in port_list: 

251 m = PORTS.match(p.strip()) 

252 if m: 

253 cport = int(m.group(1)) 

254 hport = int (m.group(4)) 

255 if m.group(3): 

256 proto = (m.group(3)).upper() 

257 else: 

258 proto = "TCP" 

259 container_ports.append((cport, proto)) 

260 port_map[(cport, proto)] = hport 

261 else: 

262 raise ValueError("Bad port specification: {0}".format(p)) 

263 

264 return container_ports, port_map 

265 

266def _parse_volumes(volume_list): 

267 volumes = [] 

268 volume_mounts = [] 

269 for v in volume_list: 

270 vname = str(uuid.uuid4()) 

271 vhost = v['host']['path'] 

272 vcontainer = v['container']['bind'] 

273 vro = (v['container'].get('mode') == 'ro') 

274 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost))) 

275 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro)) 

276 

277 return volumes, volume_mounts 

278 

279def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat): 

280 if not log_info or not filebeat: 280 ↛ 281line 280 didn't jump to line 281, because the condition on line 280 was never true

281 return 

282 log_dir = log_info.get("log_directory") 

283 if not log_dir: 283 ↛ 284line 283 didn't jump to line 284, because the condition on line 283 was never true

284 return 

285 sidecar_volume_mounts = [] 

286 

287 # Create the volume for component log files and volume mounts for the component and sidecar containers 

288 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource())) 

289 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir)) 

290 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name) 

291 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path)) 

292 

293 # Create the volume for sidecar data and the volume mount for it 

294 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource())) 

295 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"])) 

296 

297 # Create the volume for the sidecar configuration data and the volume mount for it 

298 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed. 

299 volumes.append( 

300 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"]))) 

301 sidecar_volume_mounts.append( 

302 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"])) 

303 

304 # Finally create the container for the sidecar 

305 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts)) 

306 

307def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config): 

308 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a 

309 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate 

310 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified), 

311 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats. 

312 # In either case, the certificate directory is mounted onto the component container filesystem at the location 

313 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point 

314 # (tls_config["component_cert_dir"]). 

315 

316 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

317 env = {} 

318 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false" 

319 

320 # Create the certificate volume and volume mounts 

321 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource())) 

322 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory)) 

323 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])] 

324 

325 # Create the init container 

326 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env)) 

327 

328def _add_external_tls_init_container(init_containers, volumes, external_cert, external_tls_config): 

329 env = {} 

330 output_path = external_cert.get("external_cert_directory") 

331 if not output_path.endswith('/'): 331 ↛ 332line 331 didn't jump to line 332, because the condition on line 331 was never true

332 output_path += '/' 

333 

334 env["REQUEST_URL"] = external_tls_config.get("request_url") 

335 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout") 

336 env["OUTPUT_PATH"] = output_path + "external" 

337 env["OUTPUT_TYPE"] = external_cert.get("cert_type") 

338 env["CA_NAME"] = external_cert.get("ca_name") 

339 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name") 

340 env["ORGANIZATION"] = external_tls_config.get("organization") 

341 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit") 

342 env["LOCATION"] = external_tls_config.get("location") 

343 env["STATE"] = external_tls_config.get("state") 

344 env["COUNTRY"] = external_tls_config.get("country") 

345 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans") 

346 env["KEYSTORE_PATH"] = KEYSTORE_PATH 

347 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password") 

348 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH 

349 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password") 

350 

351 # Create the volumes and volume mounts 

352 sec = client.V1SecretVolumeSource(secret_name=CERT_SECRET_NAME) 

353 volumes.append(client.V1Volume(name="tls-volume", secret=sec)) 

354 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")), 

355 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)] 

356 

357 # Create the init container 

358 init_containers.append(_create_container_object("cert-service-client", external_tls_config["image_tag"], False, volume_mounts=init_volume_mounts, env=env)) 

359 

360def _process_port_map(port_map): 

361 service_ports = [] # Ports exposed internally on the k8s network 

362 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort 

363 for (cport, proto), hport in port_map.items(): 

364 name = "xport-{0}-{1}".format(proto[0].lower(), cport) 

365 cport = int(cport) 

366 hport = int(hport) 

367 service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:])) 

368 if hport != 0: 368 ↛ 369line 368 didn't jump to line 369, because the condition on line 368 was never true

369 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

370 return service_ports, exposed_ports 

371 

372def _service_exists(location, namespace, component_name): 

373 exists = False 

374 try: 

375 _configure_api(location) 

376 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) 

377 exists = True 

378 except client.rest.ApiException: 

379 pass 

380 

381 return exists 

382 

383def _patch_deployment(location, namespace, deployment, modify): 

384 ''' 

385 Gets the current spec for 'deployment' in 'namespace' 

386 in the k8s cluster at 'location', 

387 uses the 'modify' function to change the spec, 

388 then sends the updated spec to k8s. 

389 ''' 

390 _configure_api(location) 

391 

392 # Get deployment spec 

393 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

394 

395 # Apply changes to spec 

396 spec = modify(spec) 

397 

398 # Patch the deploy with updated spec 

399 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec) 

400 

401def _execute_command_in_pod(location, namespace, pod_name, command): 

402 ''' 

403 Execute the command (specified by an argv-style list in the "command" parameter) in 

404 the specified pod in the specified namespace at the specified location. 

405 For now at least, we use this only to 

406 run a notification script in a pod after a configuration change. 

407 

408 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". 

409 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly. 

410 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API. 

411 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py. 

412 There are several issues tracking this, in various states. It isn't clear that there will ever 

413 be a fix. 

414 - https://github.com/kubernetes-client/python/issues/58 

415 - https://github.com/kubernetes-client/python/issues/409 

416 - https://github.com/kubernetes-client/python/issues/526 

417 

418 The main consequence of the workaround using "stream" is that the caller does not get an indication 

419 of the exit code returned by the command when it completes execution. It turns out that the 

420 original implementation of notification in the Docker plugin did not use this result, so we can 

421 still match the original notification functionality. 

422 

423 The "stream" approach returns a string containing any output sent by the command to stdout or stderr. 

424 We'll return that so it can logged. 

425 ''' 

426 _configure_api(location) 

427 try: 

428 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, 

429 name=pod_name, 

430 namespace=namespace, 

431 command=command, 

432 stdout=True, 

433 stderr=True, 

434 stdin=False, 

435 tty=False) 

436 except client.rest.ApiException as e: 

437 # If the exception indicates the pod wasn't found, it's not a fatal error. 

438 # It existed when we enumerated the pods for the deployment but no longer exists. 

439 # Unfortunately, the only way to distinguish a pod not found from any other error 

440 # is by looking at the reason text. 

441 # (The ApiException's "status" field should contain the HTTP status code, which would 

442 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set 

443 # to zero.) 

444 if "404 not found" in e.reason.lower(): 

445 output = "Pod not found" 

446 else: 

447 raise e 

448 

449 return {"pod" : pod_name, "output" : output} 

450 

451def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs): 

452 ''' 

453 This will create a k8s Deployment and, if needed, one or two k8s Services. 

454 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

455 We're not exposing k8s to the component developer and the blueprint author. 

456 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

457 the details from the component developer and the blueprint author.) 

458 

459 namespace: the Kubernetes namespace into which the component is deployed 

460 component_name: the component name, used to derive names of Kubernetes entities 

461 image: the docker image for the component being deployed 

462 replica: the number of instances of the component to be deployed 

463 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of 

464 the Docker image for the component, even if it is already present on the Kubernetes node. 

465 k8sconfig contains: 

466 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images. 

467 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.) 

468 - filebeat: a dictionary of filebeat sidecar parameters: 

469 "log_path" : mount point for log volume in filebeat container 

470 "data_path" : mount point for data volume in filebeat container 

471 "config_path" : mount point for config volume in filebeat container 

472 "config_subpath" : subpath for config data in filebeat container 

473 "config_map" : ConfigMap holding the filebeat configuration 

474 "image": Docker image to use for filebeat 

475 - tls: a dictionary of TLS-related information: 

476 "cert_path": mount point for certificate volume in init container 

477 "image": Docker image to use for TLS init container 

478 "component_cert_dir" : default mount point for certs 

479 kwargs may have: 

480 - volumes: array of volume objects, where a volume object is: 

481 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

482 - ports: array of strings in the form "container_port:host_port" 

483 - env: map of name-value pairs ( {name0: value0, name1: value1...} 

484 - log_info: an object with info for setting up ELK logging, with the form: 

485 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"} 

486 - tls_info: an object with info for setting up TLS (HTTPS), with the form: 

487 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" } 

488 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

489 {"external_cert": 

490 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

491 "use_external_tls": true or false, 

492 "ca_name": "ca-name-value", 

493 "cert_type": "P12" or "JKS" or "PEM", 

494 "external_certificate_parameters": 

495 "common_name": "common-name-value", 

496 "sans": "sans-value"} 

497 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"} 

498 These label will be set on all the pods deployed as a result of this deploy() invocation. 

499 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing: 

500 - cpu: number CPU usage, like 0.5 

501 - memory: string memory requirement, like "2Gi" 

502 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes: 

503 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

504 - interval: period (in seconds) between probes 

505 - timeout: time (in seconds) to allow a probe to complete 

506 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types 

507 - path: the full path to the script to be executed in the container for "script" and "docker" types 

508 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes: 

509 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

510 - interval: period (in seconds) between probes 

511 - timeout: time (in seconds) to allow a probe to complete 

512 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types 

513 - path: the full path to the script to be executed in the container for "script" and "docker" types 

514 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

515 

516 ''' 

517 

518 deployment_ok = False 

519 cip_service_created = False 

520 deployment_description = { 

521 "namespace": namespace, 

522 "location" : kwargs.get("k8s_location"), 

523 "deployment": '', 

524 "services": [] 

525 } 

526 

527 try: 

528 

529 # Get API handles 

530 _configure_api(kwargs.get("k8s_location")) 

531 core = client.CoreV1Api() 

532 k8s_apps_v1_api_client = client.AppsV1Api() 

533 

534 # Parse the port mapping 

535 container_ports, port_map = parse_ports(kwargs.get("ports", [])) 

536 

537 # Parse the volumes list into volumes and volume_mounts for the deployment 

538 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", [])) 

539 

540 # Initialize the list of containers that will be part of the pod 

541 containers = [] 

542 init_containers = [] 

543 

544 # Set up the ELK logging sidecar container, if needed 

545 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat")) 

546 

547 # Set up TLS information 

548 _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls")) 

549 

550 # Set up external TLS information 

551 external_cert = kwargs.get("external_cert") 

552 if external_cert and external_cert.get("use_external_tls"): 

553 _add_external_tls_init_container(init_containers, volumes, external_cert, k8sconfig.get("external_cert")) 

554 

555 # Create the container for the component 

556 # Make it the first container in the pod 

557 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")} 

558 container_args['container_ports'] = container_ports 

559 container_args['volume_mounts'] = volume_mounts 

560 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args)) 

561 

562 # Build the k8s Deployment object 

563 labels = kwargs.get("labels", {}) 

564 labels["app"] = component_name 

565 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"]) 

566 

567 # Have k8s deploy it 

568 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep) 

569 deployment_ok = True 

570 deployment_description["deployment"] = _create_deployment_name(component_name) 

571 

572 # Create service(s), if a port mapping is specified 

573 if port_map: 573 ↛ 598line 573 didn't jump to line 598, because the condition on line 573 was never false

574 service_ports, exposed_ports = _process_port_map(port_map) 

575 

576 # Create a ClusterIP service for access via the k8s network 

577 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP") 

578 core.create_namespaced_service(namespace, service) 

579 cip_service_created = True 

580 deployment_description["services"].append(_create_service_name(component_name)) 

581 

582 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service 

583 if exposed_ports: 583 ↛ 584line 583 didn't jump to line 584

584 exposed_service = \ 

585 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort") 

586 core.create_namespaced_service(namespace, exposed_service) 

587 deployment_description["services"].append(_create_exposed_service_name(component_name)) 

588 

589 except Exception as e: 

590 # If the ClusterIP service was created, delete the service: 

591 if cip_service_created: 

592 core.delete_namespaced_service(_create_service_name(component_name), namespace) 

593 # If the deployment was created but not the service, delete the deployment 

594 if deployment_ok: 

595 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions()) 

596 raise e 

597 

598 return dep, deployment_description 

599 

600def undeploy(deployment_description): 

601 _configure_api(deployment_description["location"]) 

602 

603 namespace = deployment_description["namespace"] 

604 

605 # remove any services associated with the component 

606 for service in deployment_description["services"]: 

607 client.CoreV1Api().delete_namespaced_service(service, namespace) 

608 

609 # Have k8s delete the underlying pods and replicaset when deleting the deployment. 

610 options = client.V1DeleteOptions(propagation_policy="Foreground") 

611 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) 

612 

613def is_available(location, namespace, component_name): 

614 _configure_api(location) 

615 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace) 

616 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec 

617 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation 

618 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas 

619 

620def scale(deployment_description, replicas): 

621 ''' Trigger a scaling operation by updating the replica count for the Deployment ''' 

622 

623 def update_replica_count(spec): 

624 spec.spec.replicas = replicas 

625 return spec 

626 

627 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count) 

628 

629def upgrade(deployment_description, image, container_index = 0): 

630 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s ''' 

631 

632 def update_image(spec): 

633 spec.spec.template.spec.containers[container_index].image = image 

634 return spec 

635 

636 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image) 

637 

638def rollback(deployment_description, rollback_to=0): 

639 ''' 

640 Undo upgrade by rolling back to a previous revision of the deployment. 

641 By default, go back one revision. 

642 rollback_to can be used to supply a specific revision number. 

643 Returns the image for the app container and the replica count from the rolled-back deployment 

644 ''' 

645 ''' 

646 2018-07-13 

647 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method. 

648 The k8s python client code throws an exception while processing the response from the API. 

649 See: 

650 - https://github.com/kubernetes-client/python/issues/491 

651 - https://github.com/kubernetes/kubernetes/pull/63837 

652 The fix has been merged into the master branch but is not in the latest release. 

653 ''' 

654 _configure_api(deployment_description["location"]) 

655 deployment = deployment_description["deployment"] 

656 namespace = deployment_description["namespace"] 

657 

658 # Initiate the rollback 

659 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback( 

660 deployment, 

661 namespace, 

662 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to))) 

663 

664 # Read back the spec for the rolled-back deployment 

665 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

666 return spec.spec.template.spec.containers[0].image, spec.spec.replicas 

667 

668def execute_command_in_deployment(deployment_description, command): 

669 ''' 

670 Enumerates the pods in the k8s deployment identified by "deployment_description", 

671 then executes the command (represented as an argv-style list) in "command" in 

672 container 0 (the main application container) each of those pods. 

673 

674 Note that the sets of pods associated with a deployment can change over time. The 

675 enumeration is a snapshot at one point in time. The command will not be executed in 

676 pods that are created after the initial enumeration. If a pod disappears after the 

677 initial enumeration and before the command is executed, the attempt to execute the 

678 command will fail. This is not treated as a fatal error. 

679 

680 This approach is reasonable for the one current use case for "execute_command": running a 

681 script to notify a container that its configuration has changed as a result of a 

682 policy change. In this use case, the new configuration information is stored into 

683 the configuration store (Consul), the pods are enumerated, and the command is executed. 

684 If a pod disappears after the enumeration, the fact that the command cannot be run 

685 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that 

686 comes up after the enumeration will get its initial configuration from the updated version 

687 in Consul. 

688 

689 The optimal solution here would be for k8s to provide an API call to execute a command in 

690 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the 

691 only call provided by k8s operates at the pod level, not the deployment level. 

692 

693 Another interesting k8s factoid: there's no direct way to list the pods belong to a 

694 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on 

695 the pod that has the k8s deployment name. To list the pods, the code below queries for 

696 pods with the label carrying the deployment name. 

697 ''' 

698 location = deployment_description["location"] 

699 _configure_api(location) 

700 deployment = deployment_description["deployment"] 

701 namespace = deployment_description["namespace"] 

702 

703 # Get names of all the running pods belonging to the deployment 

704 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod( 

705 namespace = namespace, 

706 label_selector = "k8sdeployment={0}".format(deployment), 

707 field_selector = "status.phase=Running" 

708 ).items] 

709 

710 # Execute command in the running pods 

711 return [_execute_command_in_pod(location, namespace, pod_name, command) 

712 for pod_name in pod_names]