Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020-2021 Nokia. All rights reserved. 

7# Copyright (c) 2020 J. F. Lucas. All rights reserved. 

8# ================================================================================ 

9# Licensed under the Apache License, Version 2.0 (the "License"); 

10# you may not use this file except in compliance with the License. 

11# You may obtain a copy of the License at 

12# 

13# http://www.apache.org/licenses/LICENSE-2.0 

14# 

15# Unless required by applicable law or agreed to in writing, software 

16# distributed under the License is distributed on an "AS IS" BASIS, 

17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

18# See the License for the specific language governing permissions and 

19# limitations under the License. 

20# ============LICENSE_END========================================================= 

21# 

22import os 

23import re 

24import uuid 

25 

26from kubernetes import config, client, stream 

27 

28# Default values for readiness probe 

29PROBE_DEFAULT_PERIOD = 15 

30PROBE_DEFAULT_TIMEOUT = 1 

31 

32# Location of k8s cluster config file ("kubeconfig") 

33K8S_CONFIG_PATH = "/opt/onap/kube/kubeconfig" 

34 

35# Regular expression for interval/timeout specification 

36INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") 

37# Conversion factors to seconds 

38FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} 

39 

40# Regular expression for port mapping 

41# group 1: container port 

42# group 2: / + protocol 

43# group 3: protocol 

44# group 4: host port 

45PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") 

46 

47# Constants for external_cert 

48MOUNT_PATH = "/etc/onap/oom/certservice/certs/" 

49KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks" 

50TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks" 

51DEFAULT_CERT_TYPE = "p12" 

52 

53 

54def _create_deployment_name(component_name): 

55 return "dep-{0}".format(component_name)[:63] 

56 

57 

58def _create_service_name(component_name): 

59 return "{0}".format(component_name)[:63] 

60 

61 

62def _create_exposed_service_name(component_name): 

63 return ("x{0}".format(component_name))[:63] 

64 

65 

66def _create_exposed_v6_service_name(component_name): 

67 return ("x{0}-ipv6".format(component_name))[:63] 

68 

69 

70def _configure_api(location=None): 

71 # Look for a kubernetes config file 

72 if os.path.exists(K8S_CONFIG_PATH): 

73 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) 

74 else: 

75 # Maybe we're running in a k8s container and we can use info provided by k8s 

76 # We would like to use: 

77 # config.load_incluster_config() 

78 # but this looks into os.environ for kubernetes host and port, and from 

79 # the plugin those aren't visible. So we use the InClusterConfigLoader class, 

80 # where we can set the environment to what we like. 

81 # This is probably brittle! Maybe there's a better alternative. 

82 localenv = { 

83 config.incluster_config.SERVICE_HOST_ENV_NAME: "kubernetes.default.svc.cluster.local", 

84 config.incluster_config.SERVICE_PORT_ENV_NAME: "443" 

85 } 

86 config.incluster_config.InClusterConfigLoader( 

87 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME, 

88 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME, 

89 environ=localenv 

90 ).load_and_set() 

91 

92 

93def _parse_interval(t): 

94 """ 

95 Parse an interval specification 

96 t can be 

97 - a simple integer quantity, interpreted as seconds 

98 - a string representation of a decimal integer, interpreted as seconds 

99 - a string consisting of a represention of an decimal integer followed by a unit, 

100 with "s" representing seconds, "m" representing minutes, 

101 and "h" representing hours 

102 Used for compatibility with the Docker plugin, where time intervals 

103 for health checks were specified as strings with a number and a unit. 

104 See 'intervalspec' above for the regular expression that's accepted. 

105 """ 

106 m = INTERVAL_SPEC.match(str(t)) 

107 if m: 

108 time = int(m.group(1)) * FACTORS[m.group(2)] 

109 else: 

110 raise ValueError("Bad interval specification: {0}".format(t)) 

111 return time 

112 

113 

114def _create_probe(hc, port): 

115 """ Create a Kubernetes probe based on info in the health check dictionary hc """ 

116 probe_type = hc['type'] 

117 probe = None 

118 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD)) 

119 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT)) 

120 if probe_type in ['http', 'https']: 

121 probe = client.V1Probe( 

122 failure_threshold=1, 

123 initial_delay_seconds=5, 

124 period_seconds=period, 

125 timeout_seconds=timeout, 

126 http_get=client.V1HTTPGetAction( 

127 path=hc['endpoint'], 

128 port=port, 

129 scheme=probe_type.upper() 

130 ) 

131 ) 

132 elif probe_type in ['script', 'docker']: 132 ↛ 142line 132 didn't jump to line 142, because the condition on line 132 was never false

133 probe = client.V1Probe( 

134 failure_threshold=1, 

135 initial_delay_seconds=5, 

136 period_seconds=period, 

137 timeout_seconds=timeout, 

138 _exec=client.V1ExecAction( 

139 command=hc['script'].split() 

140 ) 

141 ) 

142 return probe 

143 

144 

145def _create_resources(resources=None): 

146 if resources is not None: 146 ↛ 153line 146 didn't jump to line 153, because the condition on line 146 was never false

147 resources_obj = client.V1ResourceRequirements( 

148 limits=resources.get("limits"), 

149 requests=resources.get("requests") 

150 ) 

151 return resources_obj 

152 else: 

153 return None 

154 

155 

156def _create_container_object(name, image, always_pull, **kwargs): 

157 # Set up environment variables 

158 # Copy any passed in environment variables 

159 env = kwargs.get('env') or {} 

160 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env] 

161 # Add POD_IP with the IP address of the pod running the container 

162 pod_ip = client.V1EnvVarSource(field_ref=client.V1ObjectFieldSelector(field_path="status.podIP")) 

163 env_vars.append(client.V1EnvVar(name="POD_IP", value_from=pod_ip)) 

164 

165 # If a health check is specified, create a readiness/liveness probe 

166 # (For an HTTP-based check, we assume it's at the first container port) 

167 readiness = kwargs.get('readiness') 

168 liveness = kwargs.get('liveness') 

169 resources = kwargs.get('resources') 

170 container_ports = kwargs.get('container_ports') or [] 

171 

172 hc_port = container_ports[0][0] if container_ports else None 

173 probe = _create_probe(readiness, hc_port) if readiness else None 

174 live_probe = _create_probe(liveness, hc_port) if liveness else None 

175 resources_obj = _create_resources(resources) if resources else None 

176 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto) 

177 for port, proto in container_ports] 

178 

179 # Define container for pod 

180 return client.V1Container( 

181 name=name, 

182 image=image, 

183 image_pull_policy='Always' if always_pull else 'IfNotPresent', 

184 env=env_vars, 

185 ports=port_objs, 

186 volume_mounts=kwargs.get('volume_mounts') or [], 

187 resources=resources_obj, 

188 readiness_probe=probe, 

189 liveness_probe=live_probe 

190 ) 

191 

192 

193def _create_deployment_object(component_name, 

194 containers, 

195 init_containers, 

196 replicas, 

197 volumes, 

198 labels=None, 

199 pull_secrets=None): 

200 if labels is None: 200 ↛ 201line 200 didn't jump to line 201, because the condition on line 200 was never true

201 labels = {} 

202 if pull_secrets is None: 202 ↛ 203line 202 didn't jump to line 203, because the condition on line 202 was never true

203 pull_secrets = [] 

204 deployment_name = _create_deployment_name(component_name) 

205 

206 # Label the pod with the deployment name, so we can find it easily 

207 labels.update({"k8sdeployment": deployment_name}) 

208 

209 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials 

210 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod 

211 ips = [] 

212 for secret in pull_secrets: 

213 ips.append(client.V1LocalObjectReference(name=secret)) 

214 

215 # Define pod template 

216 template = client.V1PodTemplateSpec( 

217 metadata=client.V1ObjectMeta(labels=labels), 

218 spec=client.V1PodSpec(hostname=component_name, 

219 containers=containers, 

220 init_containers=init_containers, 

221 volumes=volumes, 

222 image_pull_secrets=ips) 

223 ) 

224 

225 # Define deployment spec 

226 spec = client.V1DeploymentSpec( 

227 replicas=replicas, 

228 selector=client.V1LabelSelector(match_labels=labels), 

229 template=template 

230 ) 

231 

232 # Create deployment object 

233 deployment = client.V1Deployment( 

234 api_version="apps/v1", 

235 kind="Deployment", 

236 metadata=client.V1ObjectMeta(name=deployment_name, labels=labels), 

237 spec=spec 

238 ) 

239 

240 return deployment 

241 

242 

243def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family): 

244 service_spec = client.V1ServiceSpec( 

245 ports=service_ports, 

246 selector={"app": component_name}, 

247 type=service_type, 

248 ip_family=ip_family 

249 ) 

250 if annotations: 250 ↛ 251line 250 didn't jump to line 251, because the condition on line 250 was never true

251 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations) 

252 else: 

253 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels) 

254 

255 service = client.V1Service( 

256 kind="Service", 

257 api_version="v1", 

258 metadata=metadata, 

259 spec=service_spec 

260 ) 

261 return service 

262 

263 

264def parse_ports(port_list): 

265 """ 

266 Parse the port list into a list of container ports (needed to create the container) 

267 and to a set of port mappings to set up k8s services. 

268 """ 

269 container_ports = [] 

270 port_map = {} 

271 for p in port_list: 

272 ipv6 = False 

273 if type(p) is dict: 

274 ipv6 = "ipv6" in p and p['ipv6'] 

275 p = "".join(str(v) for v in p['concat']) 

276 m = PORTS.match(p.strip()) 

277 if m: 

278 cport = int(m.group(1)) 

279 hport = int(m.group(4)) 

280 if m.group(3): 

281 proto = (m.group(3)).upper() 

282 else: 

283 proto = "TCP" 

284 port = (cport, proto) 

285 if port not in container_ports: 285 ↛ 287line 285 didn't jump to line 287, because the condition on line 285 was never false

286 container_ports.append(port) 

287 port_map[(cport, proto, ipv6)] = hport 

288 else: 

289 raise ValueError("Bad port specification: {0}".format(p)) 

290 

291 return container_ports, port_map 

292 

293 

294def _parse_volumes(volume_list): 

295 volumes = [] 

296 volume_mounts = [] 

297 for v in volume_list: 

298 vname = str(uuid.uuid4()) 

299 vcontainer = v['container']['bind'] 

300 vro = (v['container'].get('mode') == 'ro') 

301 if 'host' in v: 

302 vhost = v['host']['path'] 

303 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost))) 

304 if 'config_volume' in v: 

305 vconfig_volume = v['config_volume']['name'] 

306 volumes.append(client.V1Volume(name=vname, config_map=client.V1ConfigMapVolumeSource(default_mode="0644", 

307 name=vconfig_volume, 

308 optional=True))) 

309 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro)) 

310 

311 return volumes, volume_mounts 

312 

313 

314def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat): 

315 if not log_info or not filebeat: 315 ↛ 316line 315 didn't jump to line 316, because the condition on line 315 was never true

316 return 

317 log_dir = log_info.get("log_directory") 

318 if not log_dir: 318 ↛ 319line 318 didn't jump to line 319, because the condition on line 318 was never true

319 return 

320 sidecar_volume_mounts = [] 

321 

322 # Create the volume for component log files and volume mounts for the component and sidecar containers 

323 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource())) 

324 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir)) 

325 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name) 

326 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path)) 

327 

328 # Create the volume for sidecar data and the volume mount for it 

329 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource())) 

330 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"])) 

331 

332 # Create the volume for the sidecar configuration data and the volume mount for it 

333 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed. 

334 volumes.append( 

335 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"]))) 

336 sidecar_volume_mounts.append( 

337 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], 

338 sub_path=filebeat["config_subpath"])) 

339 

340 # Finally create the container for the sidecar 

341 containers.append( 

342 _create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts)) 

343 

344 

345def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config): 

346 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a server( 

347 # tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate 

348 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not 

349 # specified), the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats. 

350 # In either case, the certificate directory is mounted onto the component container filesystem at the location 

351 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point ( 

352 # tls_config["component_cert_dir"]). 

353 docker_image = tls_config["image"] 

354 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]") 

355 

356 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

357 env = {} 

358 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false" 

359 

360 # Create the certificate volume and volume mounts 

361 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource())) 

362 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory)) 

363 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])] 

364 

365 # Create the init container 

366 init_containers.append( 

367 _create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

368 

369 

370def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config): 

371 # Adds an InitContainer to the pod which will generate external TLS certificates. 

372 docker_image = external_tls_config["image_tag"] 

373 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]") 

374 

375 env = {} 

376 output_path = external_cert.get("external_cert_directory") 

377 if not output_path.endswith('/'): 377 ↛ 378line 377 didn't jump to line 378, because the condition on line 377 was never true

378 output_path += '/' 

379 

380 env["REQUEST_URL"] = external_tls_config.get("request_url") 

381 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout") 

382 env["OUTPUT_PATH"] = output_path + "external" 

383 env["OUTPUT_TYPE"] = external_cert.get("cert_type") 

384 env["CA_NAME"] = external_cert.get("ca_name") 

385 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name") 

386 env["ORGANIZATION"] = external_tls_config.get("organization") 

387 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit") 

388 env["LOCATION"] = external_tls_config.get("location") 

389 env["STATE"] = external_tls_config.get("state") 

390 env["COUNTRY"] = external_tls_config.get("country") 

391 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans") 

392 env["KEYSTORE_PATH"] = KEYSTORE_PATH 

393 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password") 

394 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH 

395 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password") 

396 

397 # Create the volumes and volume mounts 

398 sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name")) 

399 volumes.append(client.V1Volume(name="tls-volume", secret=sec)) 

400 init_volume_mounts = [ 

401 client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")), 

402 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)] 

403 

404 # Create the init container 

405 init_containers.append( 

406 _create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

407 

408 

409def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, 

410 cert_post_processor_config): 

411 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file. 

412 docker_image = cert_post_processor_config["image_tag"] 

413 ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]") 

414 

415 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

416 if not tls_cert_dir.endswith('/'): 416 ↛ 419line 416 didn't jump to line 419, because the condition on line 416 was never false

417 tls_cert_dir += '/' 

418 

419 tls_cert_file_path = tls_cert_dir + "trust.jks" 

420 tls_cert_file_pass = tls_cert_dir + "trust.pass" 

421 

422 ext_cert_dir = tls_cert_dir + "external/" 

423 

424 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower() 

425 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type) 

426 ext_truststore_pass = '' 

427 if output_type != 'pem': 427 ↛ 430line 427 didn't jump to line 430, because the condition on line 427 was never false

428 ext_truststore_pass = ext_cert_dir + "truststore.pass" 

429 

430 env = {"TRUSTSTORES_PATHS": tls_cert_file_path + ":" + ext_truststore_path, 

431 "TRUSTSTORES_PASSWORDS_PATHS": tls_cert_file_pass + ":" + ext_truststore_pass, 

432 "KEYSTORE_SOURCE_PATHS": _get_keystore_source_paths(output_type, ext_cert_dir), 

433 "KEYSTORE_DESTINATION_PATHS": _get_keystore_destination_paths(output_type, tls_cert_dir)} 

434 

435 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"]) 

436 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"]) 

437 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"]) 

438 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"]) 

439 

440 # Create the volumes and volume mounts 

441 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)] 

442 

443 # Create the init container 

444 init_containers.append( 

445 _create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

446 

447 

448def _get_file_extension(output_type): 

449 return { 

450 'p12': 'p12', 

451 'pem': 'pem', 

452 'jks': 'jks', 

453 }[output_type] 

454 

455 

456def _get_keystore_source_paths(output_type, ext_cert_dir): 

457 source_paths_template = { 

458 'p12': "{0}keystore.p12:{0}keystore.pass", 

459 'jks': "{0}keystore.jks:{0}keystore.pass", 

460 'pem': "{0}keystore.pem:{0}key.pem", 

461 }[output_type] 

462 return source_paths_template.format(ext_cert_dir) 

463 

464 

465def _get_keystore_destination_paths(output_type, tls_cert_dir): 

466 destination_paths_template = { 

467 'p12': "{0}cert.p12:{0}p12.pass", 

468 'jks': "{0}cert.jks:{0}jks.pass", 

469 'pem': "{0}cert.pem:{0}key.pem", 

470 }[output_type] 

471 return destination_paths_template.format(tls_cert_dir) 

472 

473 

474def _process_port_map(port_map): 

475 service_ports = [] # Ports exposed internally on the k8s network 

476 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort 

477 exposed_ports_ipv6 = [] 

478 for (cport, proto, ipv6), hport in port_map.items(): 

479 name = "xport-{0}-{1}".format(proto[0].lower(), cport) 

480 cport = int(cport) 

481 hport = int(hport) 

482 port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:]) 

483 if port not in service_ports: 483 ↛ 485line 483 didn't jump to line 485, because the condition on line 483 was never false

484 service_ports.append(port) 

485 if hport != 0: 485 ↛ 486line 485 didn't jump to line 486, because the condition on line 485 was never true

486 if ipv6: 

487 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

488 else: 

489 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

490 return service_ports, exposed_ports, exposed_ports_ipv6 

491 

492 

493def _service_exists(location, namespace, component_name): 

494 exists = False 

495 try: 

496 _configure_api(location) 

497 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) 

498 exists = True 

499 except client.rest.ApiException: 

500 pass 

501 

502 return exists 

503 

504 

505def _patch_deployment(location, namespace, deployment, modify): 

506 ''' 

507 Gets the current spec for 'deployment' in 'namespace' 

508 in the k8s cluster at 'location', 

509 uses the 'modify' function to change the spec, 

510 then sends the updated spec to k8s. 

511 ''' 

512 _configure_api(location) 

513 

514 # Get deployment spec 

515 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

516 

517 # Apply changes to spec 

518 spec = modify(spec) 

519 

520 # Patch the deploy with updated spec 

521 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec) 

522 

523 

524def _execute_command_in_pod(location, namespace, pod_name, command): 

525 ''' 

526 Execute the command (specified by an argv-style list in the "command" parameter) in 

527 the specified pod in the specified namespace at the specified location. 

528 For now at least, we use this only to 

529 run a notification script in a pod after a configuration change. 

530 

531 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". 

532 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly. 

533 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API. 

534 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py. 

535 There are several issues tracking this, in various states. It isn't clear that there will ever 

536 be a fix. 

537 - https://github.com/kubernetes-client/python/issues/58 

538 - https://github.com/kubernetes-client/python/issues/409 

539 - https://github.com/kubernetes-client/python/issues/526 

540 

541 The main consequence of the workaround using "stream" is that the caller does not get an indication 

542 of the exit code returned by the command when it completes execution. It turns out that the 

543 original implementation of notification in the Docker plugin did not use this result, so we can 

544 still match the original notification functionality. 

545 

546 The "stream" approach returns a string containing any output sent by the command to stdout or stderr. 

547 We'll return that so it can logged. 

548 ''' 

549 _configure_api(location) 

550 try: 

551 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, 

552 name=pod_name, 

553 namespace=namespace, 

554 command=command, 

555 stdout=True, 

556 stderr=True, 

557 stdin=False, 

558 tty=False) 

559 except client.rest.ApiException as e: 

560 # If the exception indicates the pod wasn't found, it's not a fatal error. 

561 # It existed when we enumerated the pods for the deployment but no longer exists. 

562 # Unfortunately, the only way to distinguish a pod not found from any other error 

563 # is by looking at the reason text. 

564 # (The ApiException's "status" field should contain the HTTP status code, which would 

565 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set 

566 # to zero.) 

567 if "404 not found" in e.reason.lower(): 

568 output = "Pod not found" 

569 else: 

570 raise e 

571 

572 return {"pod": pod_name, "output": output} 

573 

574 

575def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs): 

576 """ 

577 This will create a k8s Deployment and, if needed, one or two k8s Services. 

578 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

579 We're not exposing k8s to the component developer and the blueprint author. 

580 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

581 the details from the component developer and the blueprint author.) 

582 

583 namespace: the Kubernetes namespace into which the component is deployed 

584 component_name: the component name, used to derive names of Kubernetes entities 

585 image: the docker image for the component being deployed 

586 replica: the number of instances of the component to be deployed 

587 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of 

588 the Docker image for the component, even if it is already present on the Kubernetes node. 

589 k8sconfig contains: 

590 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images. 

591 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.) 

592 - filebeat: a dictionary of filebeat sidecar parameters: 

593 "log_path" : mount point for log volume in filebeat container 

594 "data_path" : mount point for data volume in filebeat container 

595 "config_path" : mount point for config volume in filebeat container 

596 "config_subpath" : subpath for config data in filebeat container 

597 "config_map" : ConfigMap holding the filebeat configuration 

598 "image": Docker image to use for filebeat 

599 - tls: a dictionary of TLS-related information: 

600 "cert_path": mount point for certificate volume in init container 

601 "image": Docker image to use for TLS init container 

602 "component_cert_dir" : default mount point for certs 

603 - cert_post_processor: a dictionary of cert_post_processor information: 

604 "image_tag": docker image to use for cert-post-processor init container 

605 kwargs may have: 

606 - volumes: array of volume objects, where a volume object is: 

607 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

608 - ports: array of strings in the form "container_port:host_port" 

609 - env: map of name-value pairs ( {name0: value0, name1: value1...} 

610 - log_info: an object with info for setting up ELK logging, with the form: 

611 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"} 

612 - tls_info: an object with info for setting up TLS (HTTPS), with the form: 

613 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" } 

614 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

615 {"external_cert": 

616 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

617 "use_external_tls": true or false, 

618 "ca_name": "ca-name-value", 

619 "cert_type": "P12" or "JKS" or "PEM", 

620 "external_certificate_parameters": 

621 "common_name": "common-name-value", 

622 "sans": "sans-value"} 

623 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"} 

624 These label will be set on all the pods deployed as a result of this deploy() invocation. 

625 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing: 

626 - cpu: number CPU usage, like 0.5 

627 - memory: string memory requirement, like "2Gi" 

628 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes: 

629 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

630 - interval: period (in seconds) between probes 

631 - timeout: time (in seconds) to allow a probe to complete 

632 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types 

633 - path: the full path to the script to be executed in the container for "script" and "docker" types 

634 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes: 

635 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

636 - interval: period (in seconds) between probes 

637 - timeout: time (in seconds) to allow a probe to complete 

638 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types 

639 - path: the full path to the script to be executed in the container for "script" and "docker" types 

640 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

641 

642 """ 

643 

644 deployment_ok = False 

645 cip_service_created = False 

646 deployment_description = { 

647 "namespace": namespace, 

648 "location": kwargs.get("k8s_location"), 

649 "deployment": '', 

650 "services": [] 

651 } 

652 

653 try: 

654 

655 # Get API handles 

656 _configure_api(kwargs.get("k8s_location")) 

657 core = client.CoreV1Api() 

658 k8s_apps_v1_api_client = client.AppsV1Api() 

659 

660 # Parse the port mapping 

661 container_ports, port_map = parse_ports(kwargs.get("ports", [])) 

662 

663 # Parse the volumes list into volumes and volume_mounts for the deployment 

664 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", [])) 

665 

666 # Initialize the list of containers that will be part of the pod 

667 containers = [] 

668 init_containers = [] 

669 

670 # Set up the ELK logging sidecar container, if needed 

671 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), 

672 k8sconfig.get("filebeat")) 

673 

674 # Set up TLS information 

675 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, 

676 k8sconfig.get("tls")) 

677 

678 # Set up external TLS information 

679 external_cert = kwargs.get("external_cert") 

680 if external_cert and external_cert.get("use_external_tls"): 

681 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, 

682 k8sconfig.get("external_cert")) 

683 _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, 

684 k8sconfig.get("tls"), external_cert, 

685 k8sconfig.get("cert_post_processor")) 

686 

687 # Create the container for the component 

688 # Make it the first container in the pod 

689 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")} 

690 container_args['container_ports'] = container_ports 

691 container_args['volume_mounts'] = volume_mounts 

692 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args)) 

693 

694 # Build the k8s Deployment object 

695 labels = kwargs.get("labels", {}) 

696 labels["app"] = component_name 

697 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, 

698 pull_secrets=k8sconfig["image_pull_secrets"]) 

699 

700 # Have k8s deploy it 

701 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep) 

702 deployment_ok = True 

703 deployment_description["deployment"] = _create_deployment_name(component_name) 

704 

705 # Create service(s), if a port mapping is specified 

706 if port_map: 706 ↛ 741line 706 didn't jump to line 741, because the condition on line 706 was never false

707 service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map) 

708 

709 # Create a ClusterIP service for access via the k8s network 

710 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, 

711 labels, "ClusterIP", "IPv4") 

712 core.create_namespaced_service(namespace, service) 

713 cip_service_created = True 

714 deployment_description["services"].append(_create_service_name(component_name)) 

715 

716 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service 

717 if exposed_ports: 717 ↛ 718line 717 didn't jump to line 718

718 exposed_service = \ 

719 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, 

720 '', labels, "NodePort", "IPv4") 

721 core.create_namespaced_service(namespace, exposed_service) 

722 deployment_description["services"].append(_create_exposed_service_name(component_name)) 

723 

724 if exposed_ports_ipv6: 724 ↛ 725line 724 didn't jump to line 725

725 exposed_service_ipv6 = \ 

726 _create_service_object(_create_exposed_v6_service_name(component_name), component_name, 

727 exposed_ports_ipv6, '', labels, "NodePort", "IPv6") 

728 core.create_namespaced_service(namespace, exposed_service_ipv6) 

729 deployment_description["services"].append(_create_exposed_v6_service_name(component_name)) 

730 

731 except Exception as e: 

732 # If the ClusterIP service was created, delete the service: 

733 if cip_service_created: 

734 core.delete_namespaced_service(_create_service_name(component_name), namespace) 

735 # If the deployment was created but not the service, delete the deployment 

736 if deployment_ok: 

737 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, 

738 body=client.V1DeleteOptions()) 

739 raise e 

740 

741 return dep, deployment_description 

742 

743 

744def undeploy(deployment_description): 

745 _configure_api(deployment_description["location"]) 

746 

747 namespace = deployment_description["namespace"] 

748 

749 # remove any services associated with the component 

750 for service in deployment_description["services"]: 

751 client.CoreV1Api().delete_namespaced_service(service, namespace) 

752 

753 # Have k8s delete the underlying pods and replicaset when deleting the deployment. 

754 options = client.V1DeleteOptions(propagation_policy="Foreground") 

755 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) 

756 

757 

758def is_available(location, namespace, component_name): 

759 _configure_api(location) 

760 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), 

761 namespace) 

762 # Check if the number of available replicas is equal to the number requested and that the replicas match the 

763 # current spec This check can be used to verify completion of an initial deployment, a scale operation, 

764 # or an update operation 

765 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas 

766 

767 

768def scale(deployment_description, replicas): 

769 """ Trigger a scaling operation by updating the replica count for the Deployment """ 

770 

771 def update_replica_count(spec): 

772 spec.spec.replicas = replicas 

773 return spec 

774 

775 _patch_deployment(deployment_description["location"], deployment_description["namespace"], 

776 deployment_description["deployment"], update_replica_count) 

777 

778 

779def upgrade(deployment_description, image, container_index=0): 

780 """ Trigger a rolling upgrade by sending a new image name/tag to k8s """ 

781 

782 def update_image(spec): 

783 spec.spec.template.spec.containers[container_index].image = image 

784 return spec 

785 

786 _patch_deployment(deployment_description["location"], deployment_description["namespace"], 

787 deployment_description["deployment"], update_image) 

788 

789 

790def rollback(deployment_description, rollback_to=0): 

791 """ 

792 Undo upgrade by rolling back to a previous revision of the deployment. 

793 By default, go back one revision. 

794 rollback_to can be used to supply a specific revision number. 

795 Returns the image for the app container and the replica count from the rolled-back deployment 

796 """ 

797 ''' 

798 2018-07-13 

799 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method. 

800 The k8s python client code throws an exception while processing the response from the API. 

801 See: 

802 - https://github.com/kubernetes-client/python/issues/491 

803 - https://github.com/kubernetes/kubernetes/pull/63837 

804 The fix has been merged into the master branch but is not in the latest release. 

805 ''' 

806 _configure_api(deployment_description["location"]) 

807 deployment = deployment_description["deployment"] 

808 namespace = deployment_description["namespace"] 

809 

810 # Initiate the rollback 

811 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback( 

812 deployment, 

813 namespace, 

814 client.AppsV1beta1DeploymentRollback(name=deployment, 

815 rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to))) 

816 

817 # Read back the spec for the rolled-back deployment 

818 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

819 return spec.spec.template.spec.containers[0].image, spec.spec.replicas 

820 

821 

822def execute_command_in_deployment(deployment_description, command): 

823 """ 

824 Enumerates the pods in the k8s deployment identified by "deployment_description", 

825 then executes the command (represented as an argv-style list) in "command" in 

826 container 0 (the main application container) each of those pods. 

827 

828 Note that the sets of pods associated with a deployment can change over time. The 

829 enumeration is a snapshot at one point in time. The command will not be executed in 

830 pods that are created after the initial enumeration. If a pod disappears after the 

831 initial enumeration and before the command is executed, the attempt to execute the 

832 command will fail. This is not treated as a fatal error. 

833 

834 This approach is reasonable for the one current use case for "execute_command": running a 

835 script to notify a container that its configuration has changed as a result of a 

836 policy change. In this use case, the new configuration information is stored into 

837 the configuration store (Consul), the pods are enumerated, and the command is executed. 

838 If a pod disappears after the enumeration, the fact that the command cannot be run 

839 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that 

840 comes up after the enumeration will get its initial configuration from the updated version 

841 in Consul. 

842 

843 The optimal solution here would be for k8s to provide an API call to execute a command in 

844 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the 

845 only call provided by k8s operates at the pod level, not the deployment level. 

846 

847 Another interesting k8s factoid: there's no direct way to list the pods belong to a 

848 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on 

849 the pod that has the k8s deployment name. To list the pods, the code below queries for 

850 pods with the label carrying the deployment name. 

851 """ 

852 location = deployment_description["location"] 

853 _configure_api(location) 

854 deployment = deployment_description["deployment"] 

855 namespace = deployment_description["namespace"] 

856 

857 # Get names of all the running pods belonging to the deployment 

858 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod( 

859 namespace=namespace, 

860 label_selector="k8sdeployment={0}".format(deployment), 

861 field_selector="status.phase=Running" 

862 ).items] 

863 

864 # Execute command in the running pods 

865 return [_execute_command_in_pod(location, namespace, pod_name, command) 

866 for pod_name in pod_names]