Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020 Nokia. All rights reserved. 

7# ================================================================================ 

8# Licensed under the Apache License, Version 2.0 (the "License"); 

9# you may not use this file except in compliance with the License. 

10# You may obtain a copy of the License at 

11# 

12# http://www.apache.org/licenses/LICENSE-2.0 

13# 

14# Unless required by applicable law or agreed to in writing, software 

15# distributed under the License is distributed on an "AS IS" BASIS, 

16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19# ============LICENSE_END========================================================= 

20# 

21import os 

22import re 

23import uuid 

24 

25from kubernetes import config, client, stream 

26 

27# Default values for readiness probe 

28PROBE_DEFAULT_PERIOD = 15 

29PROBE_DEFAULT_TIMEOUT = 1 

30 

31# Location of k8s cluster config file ("kubeconfig") 

32K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig" 

33 

34# Regular expression for interval/timeout specification 

35INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") 

36# Conversion factors to seconds 

37FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} 

38 

39# Regular expression for port mapping 

40# group 1: container port 

41# group 2: / + protocol 

42# group 3: protocol 

43# group 4: host port 

44PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") 

45 

46# Constants for external_cert 

47MOUNT_PATH = "/etc/onap/aaf/certservice/certs/" 

48KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks" 

49TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks" 

50CERT_SECRET_NAME = "aaf-cert-service-client-tls-secret" 

51 

52def _create_deployment_name(component_name): 

53 return "dep-{0}".format(component_name)[:63] 

54 

55def _create_service_name(component_name): 

56 return "{0}".format(component_name)[:63] 

57 

58def _create_exposed_service_name(component_name): 

59 return ("x{0}".format(component_name))[:63] 

60 

61def _configure_api(location=None): 

62 # Look for a kubernetes config file 

63 if os.path.exists(K8S_CONFIG_PATH): 

64 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) 

65 else: 

66 # Maybe we're running in a k8s container and we can use info provided by k8s 

67 # We would like to use: 

68 # config.load_incluster_config() 

69 # but this looks into os.environ for kubernetes host and port, and from 

70 # the plugin those aren't visible. So we use the InClusterConfigLoader class, 

71 # where we can set the environment to what we like. 

72 # This is probably brittle! Maybe there's a better alternative. 

73 localenv = { 

74 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local", 

75 config.incluster_config.SERVICE_PORT_ENV_NAME : "443" 

76 } 

77 config.incluster_config.InClusterConfigLoader( 

78 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME, 

79 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME, 

80 environ=localenv 

81 ).load_and_set() 

82 

83def _parse_interval(t): 

84 """ 

85 Parse an interval specification 

86 t can be 

87 - a simple integer quantity, interpreted as seconds 

88 - a string representation of a decimal integer, interpreted as seconds 

89 - a string consisting of a represention of an decimal integer followed by a unit, 

90 with "s" representing seconds, "m" representing minutes, 

91 and "h" representing hours 

92 Used for compatibility with the Docker plugin, where time intervals 

93 for health checks were specified as strings with a number and a unit. 

94 See 'intervalspec' above for the regular expression that's accepted. 

95 """ 

96 m = INTERVAL_SPEC.match(str(t)) 

97 if m: 

98 time = int(m.group(1)) * FACTORS[m.group(2)] 

99 else: 

100 raise ValueError("Bad interval specification: {0}".format(t)) 

101 return time 

102 

103def _create_probe(hc, port): 

104 ''' Create a Kubernetes probe based on info in the health check dictionary hc ''' 

105 probe_type = hc['type'] 

106 probe = None 

107 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD)) 

108 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT)) 

109 if probe_type in ['http', 'https']: 

110 probe = client.V1Probe( 

111 failure_threshold = 1, 

112 initial_delay_seconds = 5, 

113 period_seconds = period, 

114 timeout_seconds = timeout, 

115 http_get = client.V1HTTPGetAction( 

116 path = hc['endpoint'], 

117 port = port, 

118 scheme = probe_type.upper() 

119 ) 

120 ) 

121 elif probe_type in ['script', 'docker']: 121 ↛ 131line 121 didn't jump to line 131, because the condition on line 121 was never false

122 probe = client.V1Probe( 

123 failure_threshold = 1, 

124 initial_delay_seconds = 5, 

125 period_seconds = period, 

126 timeout_seconds = timeout, 

127 _exec = client.V1ExecAction( 

128 command = hc['script'].split( ) 

129 ) 

130 ) 

131 return probe 

132 

133def _create_resources(resources=None): 

134 if resources is not None: 134 ↛ 141line 134 didn't jump to line 141, because the condition on line 134 was never false

135 resources_obj = client.V1ResourceRequirements( 

136 limits = resources.get("limits"), 

137 requests = resources.get("requests") 

138 ) 

139 return resources_obj 

140 else: 

141 return None 

142 

143def _create_container_object(name, image, always_pull, **kwargs): 

144 # Set up environment variables 

145 # Copy any passed in environment variables 

146 env = kwargs.get('env') or {} 

147 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env] 

148 # Add POD_IP with the IP address of the pod running the container 

149 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP")) 

150 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip)) 

151 

152 # If a health check is specified, create a readiness/liveness probe 

153 # (For an HTTP-based check, we assume it's at the first container port) 

154 readiness = kwargs.get('readiness') 

155 liveness = kwargs.get('liveness') 

156 resources = kwargs.get('resources') 

157 container_ports = kwargs.get('container_ports') or [] 

158 

159 hc_port = container_ports[0][0] if container_ports else None 

160 probe = _create_probe(readiness, hc_port) if readiness else None 

161 live_probe = _create_probe(liveness, hc_port) if liveness else None 

162 resources_obj = _create_resources(resources) if resources else None 

163 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto) 

164 for port, proto in container_ports] 

165 

166 # Define container for pod 

167 return client.V1Container( 

168 name=name, 

169 image=image, 

170 image_pull_policy='Always' if always_pull else 'IfNotPresent', 

171 env=env_vars, 

172 ports=port_objs, 

173 volume_mounts=kwargs.get('volume_mounts') or [], 

174 resources=resources_obj, 

175 readiness_probe=probe, 

176 liveness_probe=live_probe 

177 ) 

178 

179def _create_deployment_object(component_name, 

180 containers, 

181 init_containers, 

182 replicas, 

183 volumes, 

184 labels={}, 

185 pull_secrets=[]): 

186 

187 deployment_name = _create_deployment_name(component_name) 

188 

189 # Label the pod with the deployment name, so we can find it easily 

190 labels.update({"k8sdeployment" : deployment_name}) 

191 

192 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials 

193 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod 

194 ips = [] 

195 for secret in pull_secrets: 

196 ips.append(client.V1LocalObjectReference(name=secret)) 

197 

198 # Define pod template 

199 template = client.V1PodTemplateSpec( 

200 metadata=client.V1ObjectMeta(labels=labels), 

201 spec=client.V1PodSpec(hostname=component_name, 

202 containers=containers, 

203 init_containers=init_containers, 

204 volumes=volumes, 

205 image_pull_secrets=ips) 

206 ) 

207 

208 # Define deployment spec 

209 spec = client.V1DeploymentSpec( 

210 replicas=replicas, 

211 selector=client.V1LabelSelector(match_labels=labels), 

212 template=template 

213 ) 

214 

215 # Create deployment object 

216 deployment = client.V1Deployment( 

217 api_version="apps/v1", 

218 kind="Deployment", 

219 metadata=client.V1ObjectMeta(name=deployment_name), 

220 spec=spec 

221 ) 

222 

223 return deployment 

224 

225def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type): 

226 service_spec = client.V1ServiceSpec( 

227 ports=service_ports, 

228 selector={"app" : component_name}, 

229 type=service_type 

230 ) 

231 if annotations: 231 ↛ 232line 231 didn't jump to line 232, because the condition on line 231 was never true

232 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations) 

233 else: 

234 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels) 

235 

236 service = client.V1Service( 

237 kind="Service", 

238 api_version="v1", 

239 metadata=metadata, 

240 spec=service_spec 

241 ) 

242 return service 

243 

244def parse_ports(port_list): 

245 ''' 

246 Parse the port list into a list of container ports (needed to create the container) 

247 and to a set of port mappings to set up k8s services. 

248 ''' 

249 container_ports = [] 

250 port_map = {} 

251 for p in port_list: 

252 m = PORTS.match(p.strip()) 

253 if m: 

254 cport = int(m.group(1)) 

255 hport = int (m.group(4)) 

256 if m.group(3): 

257 proto = (m.group(3)).upper() 

258 else: 

259 proto = "TCP" 

260 container_ports.append((cport, proto)) 

261 port_map[(cport, proto)] = hport 

262 else: 

263 raise ValueError("Bad port specification: {0}".format(p)) 

264 

265 return container_ports, port_map 

266 

267def _parse_volumes(volume_list): 

268 volumes = [] 

269 volume_mounts = [] 

270 for v in volume_list: 

271 vname = str(uuid.uuid4()) 

272 vhost = v['host']['path'] 

273 vcontainer = v['container']['bind'] 

274 vro = (v['container'].get('mode') == 'ro') 

275 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost))) 

276 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro)) 

277 

278 return volumes, volume_mounts 

279 

280def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat): 

281 if not log_info or not filebeat: 281 ↛ 282line 281 didn't jump to line 282, because the condition on line 281 was never true

282 return 

283 log_dir = log_info.get("log_directory") 

284 if not log_dir: 284 ↛ 285line 284 didn't jump to line 285, because the condition on line 284 was never true

285 return 

286 sidecar_volume_mounts = [] 

287 

288 # Create the volume for component log files and volume mounts for the component and sidecar containers 

289 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource())) 

290 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir)) 

291 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name) 

292 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path)) 

293 

294 # Create the volume for sidecar data and the volume mount for it 

295 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource())) 

296 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"])) 

297 

298 # Create the volume for the sidecar configuration data and the volume mount for it 

299 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed. 

300 volumes.append( 

301 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"]))) 

302 sidecar_volume_mounts.append( 

303 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"])) 

304 

305 # Finally create the container for the sidecar 

306 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts)) 

307 

308def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config): 

309 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a 

310 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate 

311 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified), 

312 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats. 

313 # In either case, the certificate directory is mounted onto the component container filesystem at the location 

314 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point 

315 # (tls_config["component_cert_dir"]). 

316 docker_image = tls_config["image"] 

317 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]") 

318 

319 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

320 env = {} 

321 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false" 

322 

323 # Create the certificate volume and volume mounts 

324 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource())) 

325 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory)) 

326 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])] 

327 

328 # Create the init container 

329 init_containers.append(_create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

330 

331def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config): 

332 # Adds an InitContainer to the pod which will generate external TLS certificates. 

333 docker_image = external_tls_config["image_tag"] 

334 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]") 

335 

336 env = {} 

337 output_path = external_cert.get("external_cert_directory") 

338 if not output_path.endswith('/'): 338 ↛ 339line 338 didn't jump to line 339, because the condition on line 338 was never true

339 output_path += '/' 

340 

341 env["REQUEST_URL"] = external_tls_config.get("request_url") 

342 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout") 

343 env["OUTPUT_PATH"] = output_path + "external" 

344 env["OUTPUT_TYPE"] = external_cert.get("cert_type") 

345 env["CA_NAME"] = external_cert.get("ca_name") 

346 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name") 

347 env["ORGANIZATION"] = external_tls_config.get("organization") 

348 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit") 

349 env["LOCATION"] = external_tls_config.get("location") 

350 env["STATE"] = external_tls_config.get("state") 

351 env["COUNTRY"] = external_tls_config.get("country") 

352 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans") 

353 env["KEYSTORE_PATH"] = KEYSTORE_PATH 

354 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password") 

355 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH 

356 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password") 

357 

358 # Create the volumes and volume mounts 

359 sec = client.V1SecretVolumeSource(secret_name=CERT_SECRET_NAME) 

360 volumes.append(client.V1Volume(name="tls-volume", secret=sec)) 

361 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")), 

362 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)] 

363 

364 # Create the init container 

365 init_containers.append(_create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

366 

367 

368def _add_truststore_merger_init_container(ctx, init_containers, tls_info, tls_config, external_cert, truststore_merger_config): 

369 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file. 

370 docker_image = truststore_merger_config["image_tag"] 

371 ctx.logger.info("Creating init container: truststore merger \n * [" + docker_image + "]") 

372 

373 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

374 if not tls_cert_dir.endswith('/'): 374 ↛ 377line 374 didn't jump to line 377, because the condition on line 374 was never false

375 tls_cert_dir += '/' 

376 

377 tls_cert_file_path = tls_cert_dir + "trust.jks" 

378 tls_cert_file_pass = tls_cert_dir + "trust.pass" 

379 

380 ext_cert_dir = tls_cert_dir + "external/" 

381 

382 output_type = (external_cert.get("cert_type") or 'p12').lower() 

383 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type) 

384 ext_truststore_pass = '' 

385 if output_type != 'pem': 385 ↛ 388line 385 didn't jump to line 388, because the condition on line 385 was never false

386 ext_truststore_pass = ext_cert_dir + "truststore.pass" 

387 

388 env = {} 

389 env["TRUSTSTORES_PATHS"] = tls_cert_file_path + ":" + ext_truststore_path 

390 env["TRUSTSTORES_PASSWORDS_PATHS"] = tls_cert_file_pass + ":" + ext_truststore_pass 

391 

392 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"]) 

393 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"]) 

394 

395 # Create the volumes and volume mounts 

396 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)] 

397 

398 # Create the init container 

399 init_containers.append(_create_container_object("truststore-merger", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

400 

401def _get_file_extension(output_type): 

402 return { 

403 'p12': 'p12', 

404 'pem': 'pem', 

405 'jks': 'jks', 

406 }[output_type] 

407 

408def _process_port_map(port_map): 

409 service_ports = [] # Ports exposed internally on the k8s network 

410 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort 

411 for (cport, proto), hport in port_map.items(): 

412 name = "xport-{0}-{1}".format(proto[0].lower(), cport) 

413 cport = int(cport) 

414 hport = int(hport) 

415 service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:])) 

416 if hport != 0: 416 ↛ 417line 416 didn't jump to line 417, because the condition on line 416 was never true

417 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

418 return service_ports, exposed_ports 

419 

420def _service_exists(location, namespace, component_name): 

421 exists = False 

422 try: 

423 _configure_api(location) 

424 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) 

425 exists = True 

426 except client.rest.ApiException: 

427 pass 

428 

429 return exists 

430 

431def _patch_deployment(location, namespace, deployment, modify): 

432 ''' 

433 Gets the current spec for 'deployment' in 'namespace' 

434 in the k8s cluster at 'location', 

435 uses the 'modify' function to change the spec, 

436 then sends the updated spec to k8s. 

437 ''' 

438 _configure_api(location) 

439 

440 # Get deployment spec 

441 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

442 

443 # Apply changes to spec 

444 spec = modify(spec) 

445 

446 # Patch the deploy with updated spec 

447 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec) 

448 

449def _execute_command_in_pod(location, namespace, pod_name, command): 

450 ''' 

451 Execute the command (specified by an argv-style list in the "command" parameter) in 

452 the specified pod in the specified namespace at the specified location. 

453 For now at least, we use this only to 

454 run a notification script in a pod after a configuration change. 

455 

456 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". 

457 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly. 

458 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API. 

459 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py. 

460 There are several issues tracking this, in various states. It isn't clear that there will ever 

461 be a fix. 

462 - https://github.com/kubernetes-client/python/issues/58 

463 - https://github.com/kubernetes-client/python/issues/409 

464 - https://github.com/kubernetes-client/python/issues/526 

465 

466 The main consequence of the workaround using "stream" is that the caller does not get an indication 

467 of the exit code returned by the command when it completes execution. It turns out that the 

468 original implementation of notification in the Docker plugin did not use this result, so we can 

469 still match the original notification functionality. 

470 

471 The "stream" approach returns a string containing any output sent by the command to stdout or stderr. 

472 We'll return that so it can logged. 

473 ''' 

474 _configure_api(location) 

475 try: 

476 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, 

477 name=pod_name, 

478 namespace=namespace, 

479 command=command, 

480 stdout=True, 

481 stderr=True, 

482 stdin=False, 

483 tty=False) 

484 except client.rest.ApiException as e: 

485 # If the exception indicates the pod wasn't found, it's not a fatal error. 

486 # It existed when we enumerated the pods for the deployment but no longer exists. 

487 # Unfortunately, the only way to distinguish a pod not found from any other error 

488 # is by looking at the reason text. 

489 # (The ApiException's "status" field should contain the HTTP status code, which would 

490 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set 

491 # to zero.) 

492 if "404 not found" in e.reason.lower(): 

493 output = "Pod not found" 

494 else: 

495 raise e 

496 

497 return {"pod" : pod_name, "output" : output} 

498 

499def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs): 

500 ''' 

501 This will create a k8s Deployment and, if needed, one or two k8s Services. 

502 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

503 We're not exposing k8s to the component developer and the blueprint author. 

504 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

505 the details from the component developer and the blueprint author.) 

506 

507 namespace: the Kubernetes namespace into which the component is deployed 

508 component_name: the component name, used to derive names of Kubernetes entities 

509 image: the docker image for the component being deployed 

510 replica: the number of instances of the component to be deployed 

511 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of 

512 the Docker image for the component, even if it is already present on the Kubernetes node. 

513 k8sconfig contains: 

514 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images. 

515 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.) 

516 - filebeat: a dictionary of filebeat sidecar parameters: 

517 "log_path" : mount point for log volume in filebeat container 

518 "data_path" : mount point for data volume in filebeat container 

519 "config_path" : mount point for config volume in filebeat container 

520 "config_subpath" : subpath for config data in filebeat container 

521 "config_map" : ConfigMap holding the filebeat configuration 

522 "image": Docker image to use for filebeat 

523 - tls: a dictionary of TLS-related information: 

524 "cert_path": mount point for certificate volume in init container 

525 "image": Docker image to use for TLS init container 

526 "component_cert_dir" : default mount point for certs 

527 - truststore-merger: a dictionary of trustore-merger information: 

528 "image_tag": docker image to use for truststore-merger init container 

529 kwargs may have: 

530 - volumes: array of volume objects, where a volume object is: 

531 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

532 - ports: array of strings in the form "container_port:host_port" 

533 - env: map of name-value pairs ( {name0: value0, name1: value1...} 

534 - log_info: an object with info for setting up ELK logging, with the form: 

535 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"} 

536 - tls_info: an object with info for setting up TLS (HTTPS), with the form: 

537 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" } 

538 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

539 {"external_cert": 

540 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

541 "use_external_tls": true or false, 

542 "ca_name": "ca-name-value", 

543 "cert_type": "P12" or "JKS" or "PEM", 

544 "external_certificate_parameters": 

545 "common_name": "common-name-value", 

546 "sans": "sans-value"} 

547 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"} 

548 These label will be set on all the pods deployed as a result of this deploy() invocation. 

549 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing: 

550 - cpu: number CPU usage, like 0.5 

551 - memory: string memory requirement, like "2Gi" 

552 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes: 

553 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

554 - interval: period (in seconds) between probes 

555 - timeout: time (in seconds) to allow a probe to complete 

556 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types 

557 - path: the full path to the script to be executed in the container for "script" and "docker" types 

558 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes: 

559 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

560 - interval: period (in seconds) between probes 

561 - timeout: time (in seconds) to allow a probe to complete 

562 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types 

563 - path: the full path to the script to be executed in the container for "script" and "docker" types 

564 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

565 

566 ''' 

567 

568 deployment_ok = False 

569 cip_service_created = False 

570 deployment_description = { 

571 "namespace": namespace, 

572 "location" : kwargs.get("k8s_location"), 

573 "deployment": '', 

574 "services": [] 

575 } 

576 

577 try: 

578 

579 # Get API handles 

580 _configure_api(kwargs.get("k8s_location")) 

581 core = client.CoreV1Api() 

582 k8s_apps_v1_api_client = client.AppsV1Api() 

583 

584 # Parse the port mapping 

585 container_ports, port_map = parse_ports(kwargs.get("ports", [])) 

586 

587 # Parse the volumes list into volumes and volume_mounts for the deployment 

588 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", [])) 

589 

590 # Initialize the list of containers that will be part of the pod 

591 containers = [] 

592 init_containers = [] 

593 

594 # Set up the ELK logging sidecar container, if needed 

595 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat")) 

596 

597 # Set up TLS information 

598 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls")) 

599 

600 # Set up external TLS information 

601 external_cert = kwargs.get("external_cert") 

602 if external_cert and external_cert.get("use_external_tls"): 

603 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, k8sconfig.get("external_cert")) 

604 _add_truststore_merger_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, k8sconfig.get("tls"), external_cert, k8sconfig.get("truststore_merger")) 

605 

606 # Create the container for the component 

607 # Make it the first container in the pod 

608 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")} 

609 container_args['container_ports'] = container_ports 

610 container_args['volume_mounts'] = volume_mounts 

611 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args)) 

612 

613 # Build the k8s Deployment object 

614 labels = kwargs.get("labels", {}) 

615 labels["app"] = component_name 

616 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"]) 

617 

618 # Have k8s deploy it 

619 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep) 

620 deployment_ok = True 

621 deployment_description["deployment"] = _create_deployment_name(component_name) 

622 

623 # Create service(s), if a port mapping is specified 

624 if port_map: 624 ↛ 649line 624 didn't jump to line 649, because the condition on line 624 was never false

625 service_ports, exposed_ports = _process_port_map(port_map) 

626 

627 # Create a ClusterIP service for access via the k8s network 

628 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP") 

629 core.create_namespaced_service(namespace, service) 

630 cip_service_created = True 

631 deployment_description["services"].append(_create_service_name(component_name)) 

632 

633 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service 

634 if exposed_ports: 634 ↛ 635line 634 didn't jump to line 635

635 exposed_service = \ 

636 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort") 

637 core.create_namespaced_service(namespace, exposed_service) 

638 deployment_description["services"].append(_create_exposed_service_name(component_name)) 

639 

640 except Exception as e: 

641 # If the ClusterIP service was created, delete the service: 

642 if cip_service_created: 

643 core.delete_namespaced_service(_create_service_name(component_name), namespace) 

644 # If the deployment was created but not the service, delete the deployment 

645 if deployment_ok: 

646 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions()) 

647 raise e 

648 

649 return dep, deployment_description 

650 

651def undeploy(deployment_description): 

652 _configure_api(deployment_description["location"]) 

653 

654 namespace = deployment_description["namespace"] 

655 

656 # remove any services associated with the component 

657 for service in deployment_description["services"]: 

658 client.CoreV1Api().delete_namespaced_service(service, namespace) 

659 

660 # Have k8s delete the underlying pods and replicaset when deleting the deployment. 

661 options = client.V1DeleteOptions(propagation_policy="Foreground") 

662 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) 

663 

664def is_available(location, namespace, component_name): 

665 _configure_api(location) 

666 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace) 

667 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec 

668 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation 

669 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas 

670 

671def scale(deployment_description, replicas): 

672 ''' Trigger a scaling operation by updating the replica count for the Deployment ''' 

673 

674 def update_replica_count(spec): 

675 spec.spec.replicas = replicas 

676 return spec 

677 

678 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count) 

679 

680def upgrade(deployment_description, image, container_index = 0): 

681 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s ''' 

682 

683 def update_image(spec): 

684 spec.spec.template.spec.containers[container_index].image = image 

685 return spec 

686 

687 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image) 

688 

689def rollback(deployment_description, rollback_to=0): 

690 ''' 

691 Undo upgrade by rolling back to a previous revision of the deployment. 

692 By default, go back one revision. 

693 rollback_to can be used to supply a specific revision number. 

694 Returns the image for the app container and the replica count from the rolled-back deployment 

695 ''' 

696 ''' 

697 2018-07-13 

698 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method. 

699 The k8s python client code throws an exception while processing the response from the API. 

700 See: 

701 - https://github.com/kubernetes-client/python/issues/491 

702 - https://github.com/kubernetes/kubernetes/pull/63837 

703 The fix has been merged into the master branch but is not in the latest release. 

704 ''' 

705 _configure_api(deployment_description["location"]) 

706 deployment = deployment_description["deployment"] 

707 namespace = deployment_description["namespace"] 

708 

709 # Initiate the rollback 

710 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback( 

711 deployment, 

712 namespace, 

713 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to))) 

714 

715 # Read back the spec for the rolled-back deployment 

716 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

717 return spec.spec.template.spec.containers[0].image, spec.spec.replicas 

718 

719def execute_command_in_deployment(deployment_description, command): 

720 ''' 

721 Enumerates the pods in the k8s deployment identified by "deployment_description", 

722 then executes the command (represented as an argv-style list) in "command" in 

723 container 0 (the main application container) each of those pods. 

724 

725 Note that the sets of pods associated with a deployment can change over time. The 

726 enumeration is a snapshot at one point in time. The command will not be executed in 

727 pods that are created after the initial enumeration. If a pod disappears after the 

728 initial enumeration and before the command is executed, the attempt to execute the 

729 command will fail. This is not treated as a fatal error. 

730 

731 This approach is reasonable for the one current use case for "execute_command": running a 

732 script to notify a container that its configuration has changed as a result of a 

733 policy change. In this use case, the new configuration information is stored into 

734 the configuration store (Consul), the pods are enumerated, and the command is executed. 

735 If a pod disappears after the enumeration, the fact that the command cannot be run 

736 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that 

737 comes up after the enumeration will get its initial configuration from the updated version 

738 in Consul. 

739 

740 The optimal solution here would be for k8s to provide an API call to execute a command in 

741 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the 

742 only call provided by k8s operates at the pod level, not the deployment level. 

743 

744 Another interesting k8s factoid: there's no direct way to list the pods belong to a 

745 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on 

746 the pod that has the k8s deployment name. To list the pods, the code below queries for 

747 pods with the label carrying the deployment name. 

748 ''' 

749 location = deployment_description["location"] 

750 _configure_api(location) 

751 deployment = deployment_description["deployment"] 

752 namespace = deployment_description["namespace"] 

753 

754 # Get names of all the running pods belonging to the deployment 

755 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod( 

756 namespace = namespace, 

757 label_selector = "k8sdeployment={0}".format(deployment), 

758 field_selector = "status.phase=Running" 

759 ).items] 

760 

761 # Execute command in the running pods 

762 return [_execute_command_in_pod(location, namespace, pod_name, command) 

763 for pod_name in pod_names]