Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020-2021 Nokia. All rights reserved. 

7# Copyright (c) 2020 J. F. Lucas. All rights reserved. 

8# ================================================================================ 

9# Licensed under the Apache License, Version 2.0 (the "License"); 

10# you may not use this file except in compliance with the License. 

11# You may obtain a copy of the License at 

12# 

13# http://www.apache.org/licenses/LICENSE-2.0 

14# 

15# Unless required by applicable law or agreed to in writing, software 

16# distributed under the License is distributed on an "AS IS" BASIS, 

17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

18# See the License for the specific language governing permissions and 

19# limitations under the License. 

20# ============LICENSE_END========================================================= 

21# 

22from distutils import util 

23import os 

24import re 

25import uuid 

26import base64 

27 

28from binascii import hexlify 

29from kubernetes import config, client, stream 

30from .sans_parser import SansParser 

31 

32# Default values for readiness probe 

33PROBE_DEFAULT_PERIOD = 15 

34PROBE_DEFAULT_TIMEOUT = 1 

35 

36# Location of k8s cluster config file ("kubeconfig") 

37K8S_CONFIG_PATH = "/opt/onap/kube/kubeconfig" 

38 

39# Regular expression for interval/timeout specification 

40INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") 

41# Conversion factors to seconds 

42FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} 

43 

44# Regular expression for port mapping 

45# group 1: container port 

46# group 2: / + protocol 

47# group 3: protocol 

48# group 4: host port 

49PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") 

50 

51# Constants for external_cert 

52MOUNT_PATH = "/etc/onap/oom/certservice/certs/" 

53DEFAULT_CERT_TYPE = "p12" 

54 

55 

56def _create_deployment_name(component_name): 

57 return "dep-{0}".format(component_name)[:63] 

58 

59 

60def _create_service_name(component_name): 

61 return "{0}".format(component_name)[:63] 

62 

63 

64def _create_exposed_service_name(component_name): 

65 return ("x{0}".format(component_name))[:63] 

66 

67 

68def _create_exposed_v6_service_name(component_name): 

69 return ("x{0}-ipv6".format(component_name))[:63] 

70 

71 

72def _configure_api(location=None): 

73 # Look for a kubernetes config file 

74 if os.path.exists(K8S_CONFIG_PATH): 

75 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) 

76 else: 

77 # Maybe we're running in a k8s container and we can use info provided by k8s 

78 # We would like to use: 

79 # config.load_incluster_config() 

80 # but this looks into os.environ for kubernetes host and port, and from 

81 # the plugin those aren't visible. So we use the InClusterConfigLoader class, 

82 # where we can set the environment to what we like. 

83 # This is probably brittle! Maybe there's a better alternative. 

84 localenv = { 

85 config.incluster_config.SERVICE_HOST_ENV_NAME: "kubernetes.default.svc.cluster.local", 

86 config.incluster_config.SERVICE_PORT_ENV_NAME: "443" 

87 } 

88 config.incluster_config.InClusterConfigLoader( 

89 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME, 

90 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME, 

91 environ=localenv 

92 ).load_and_set() 

93 

94 

95def _parse_interval(t): 

96 """ 

97 Parse an interval specification 

98 t can be 

99 - a simple integer quantity, interpreted as seconds 

100 - a string representation of a decimal integer, interpreted as seconds 

101 - a string consisting of a represention of an decimal integer followed by a unit, 

102 with "s" representing seconds, "m" representing minutes, 

103 and "h" representing hours 

104 Used for compatibility with the Docker plugin, where time intervals 

105 for health checks were specified as strings with a number and a unit. 

106 See 'intervalspec' above for the regular expression that's accepted. 

107 """ 

108 m = INTERVAL_SPEC.match(str(t)) 

109 if m: 

110 time = int(m.group(1)) * FACTORS[m.group(2)] 

111 else: 

112 raise ValueError("Bad interval specification: {0}".format(t)) 

113 return time 

114 

115 

116def _create_probe(hc, port): 

117 """ Create a Kubernetes probe based on info in the health check dictionary hc """ 

118 probe_type = hc['type'] 

119 probe = None 

120 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD)) 

121 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT)) 

122 if probe_type in ['http', 'https']: 

123 probe = client.V1Probe( 

124 failure_threshold=1, 

125 initial_delay_seconds=5, 

126 period_seconds=period, 

127 timeout_seconds=timeout, 

128 http_get=client.V1HTTPGetAction( 

129 path=hc['endpoint'], 

130 port=port, 

131 scheme=probe_type.upper() 

132 ) 

133 ) 

134 elif probe_type in ['script', 'docker']: 134 ↛ 144line 134 didn't jump to line 144, because the condition on line 134 was never false

135 probe = client.V1Probe( 

136 failure_threshold=1, 

137 initial_delay_seconds=5, 

138 period_seconds=period, 

139 timeout_seconds=timeout, 

140 _exec=client.V1ExecAction( 

141 command=hc['script'].split() 

142 ) 

143 ) 

144 return probe 

145 

146 

147def _create_resources(resources=None): 

148 if resources is not None: 148 ↛ 155line 148 didn't jump to line 155, because the condition on line 148 was never false

149 resources_obj = client.V1ResourceRequirements( 

150 limits=resources.get("limits"), 

151 requests=resources.get("requests") 

152 ) 

153 return resources_obj 

154 else: 

155 return None 

156 

157 

158def _create_container_object(name, image, always_pull, **kwargs): 

159 # Set up environment variables 

160 # Copy any passed in environment variables 

161 env = kwargs.get('env') or {} 

162 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env] 

163 

164 # Add POD_IP with the IP address of the pod running the container 

165 pod_ip = client.V1EnvVarSource(field_ref=client.V1ObjectFieldSelector(field_path="status.podIP")) 

166 env_vars.append(client.V1EnvVar(name="POD_IP", value_from=pod_ip)) 

167 

168 # Add envs from Secret 

169 if 'env_from_secret' in kwargs: 

170 for env in kwargs.get('env_from_secret').values(): 

171 secret_key_selector = client.V1SecretKeySelector(key=env["secret_key"], name=env["secret_name"]) 

172 env_var_source = client.V1EnvVarSource(secret_key_ref=secret_key_selector) 

173 env_vars.append(client.V1EnvVar(name=env["env_name"], value_from=env_var_source)) 

174 

175 # If a health check is specified, create a readiness/liveness probe 

176 # (For an HTTP-based check, we assume it's at the first container port) 

177 readiness = kwargs.get('readiness') 

178 liveness = kwargs.get('liveness') 

179 resources = kwargs.get('resources') 

180 container_ports = kwargs.get('container_ports') or [] 

181 

182 hc_port = container_ports[0][0] if container_ports else None 

183 probe = _create_probe(readiness, hc_port) if readiness else None 

184 live_probe = _create_probe(liveness, hc_port) if liveness else None 

185 resources_obj = _create_resources(resources) if resources else None 

186 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto) 

187 for port, proto in container_ports] 

188 

189 # Define container for pod 

190 return client.V1Container( 

191 name=name, 

192 image=image, 

193 image_pull_policy='Always' if always_pull else 'IfNotPresent', 

194 env=env_vars, 

195 ports=port_objs, 

196 volume_mounts=kwargs.get('volume_mounts') or [], 

197 resources=resources_obj, 

198 readiness_probe=probe, 

199 liveness_probe=live_probe 

200 ) 

201 

202 

203def _create_deployment_object(component_name, 

204 containers, 

205 init_containers, 

206 replicas, 

207 volumes, 

208 labels=None, 

209 pull_secrets=None): 

210 if labels is None: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true

211 labels = {} 

212 if pull_secrets is None: 212 ↛ 213line 212 didn't jump to line 213, because the condition on line 212 was never true

213 pull_secrets = [] 

214 deployment_name = _create_deployment_name(component_name) 

215 

216 # Label the pod with the deployment name, so we can find it easily 

217 labels.update({"k8sdeployment": deployment_name}) 

218 

219 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials 

220 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod 

221 ips = [] 

222 for secret in pull_secrets: 

223 ips.append(client.V1LocalObjectReference(name=secret)) 

224 

225 # Define pod template 

226 template = client.V1PodTemplateSpec( 

227 metadata=client.V1ObjectMeta(labels=labels), 

228 spec=client.V1PodSpec(hostname=component_name, 

229 containers=containers, 

230 init_containers=init_containers, 

231 volumes=volumes, 

232 image_pull_secrets=ips) 

233 ) 

234 

235 # Define deployment spec 

236 spec = client.V1DeploymentSpec( 

237 replicas=replicas, 

238 selector=client.V1LabelSelector(match_labels=labels), 

239 template=template 

240 ) 

241 

242 # Create deployment object 

243 deployment = client.V1Deployment( 

244 api_version="apps/v1", 

245 kind="Deployment", 

246 metadata=client.V1ObjectMeta(name=deployment_name, labels=labels), 

247 spec=spec 

248 ) 

249 

250 return deployment 

251 

252 

253def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family): 

254 service_spec = client.V1ServiceSpec( 

255 ports=service_ports, 

256 selector={"app": component_name}, 

257 type=service_type, 

258 ip_family=ip_family 

259 ) 

260 if annotations: 260 ↛ 261line 260 didn't jump to line 261, because the condition on line 260 was never true

261 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations) 

262 else: 

263 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels) 

264 

265 service = client.V1Service( 

266 kind="Service", 

267 api_version="v1", 

268 metadata=metadata, 

269 spec=service_spec 

270 ) 

271 return service 

272 

273 

274def create_secret_with_password(namespace, secret_prefix, password_key, password_length): 

275 """ 

276 Creates K8s secret object with a generated password. 

277 Returns: secret name and data key. 

278 

279 Example usage: 

280 create_secret_with_password('onap', 'dcae-keystore-password-', 128) 

281 """ 

282 password = _generate_password(password_length) 

283 password_base64 = _encode_base64(password) 

284 

285 metadata = {'generateName': secret_prefix, 'namespace': namespace} 

286 key = password_key 

287 data = {key: password_base64} 

288 

289 response = _create_k8s_secret(namespace, metadata, data, 'Opaque') 

290 secret_name = response.metadata.name 

291 return secret_name, key 

292 

293 

294def _generate_password(length): 

295 rand = os.urandom(length) 

296 password = hexlify(rand) 

297 return password.decode("ascii"); 

298 

299 

300def _encode_base64(value): 

301 value_bytes = value.encode("ascii") 

302 base64_encoded_bytes = base64.b64encode(value_bytes) 

303 encoded_value = base64_encoded_bytes.decode("ascii") 

304 return encoded_value 

305 

306 

307def _create_k8s_secret(namespace, metadata, data, secret_type): 

308 api_version = 'v1' 

309 kind = 'Secret' 

310 body = client.V1Secret(api_version, data, kind, metadata, type=secret_type) 

311 

312 response = client.CoreV1Api().create_namespaced_secret(namespace, body) 

313 return response 

314 

315 

316def parse_ports(port_list): 

317 """ 

318 Parse the port list into a list of container ports (needed to create the container) 

319 and to a set of port mappings to set up k8s services. 

320 """ 

321 container_ports = [] 

322 port_map = {} 

323 for p in port_list: 

324 ipv6 = False 

325 if type(p) is dict: 

326 ipv6 = "ipv6" in p and p['ipv6'] 

327 p = "".join(str(v) for v in p['concat']) 

328 m = PORTS.match(p.strip()) 

329 if m: 

330 cport = int(m.group(1)) 

331 hport = int(m.group(4)) 

332 if m.group(3): 

333 proto = (m.group(3)).upper() 

334 else: 

335 proto = "TCP" 

336 port = (cport, proto) 

337 if port not in container_ports: 337 ↛ 339line 337 didn't jump to line 339, because the condition on line 337 was never false

338 container_ports.append(port) 

339 port_map[(cport, proto, ipv6)] = hport 

340 else: 

341 raise ValueError("Bad port specification: {0}".format(p)) 

342 

343 return container_ports, port_map 

344 

345 

346def _parse_volumes(volume_list): 

347 volumes = [] 

348 volume_mounts = [] 

349 for v in volume_list: 

350 vname = str(uuid.uuid4()) 

351 vcontainer = v['container']['bind'] 

352 vro = (v['container'].get('mode') == 'ro') 

353 if ('host' in v) and ('path' in v['host']): 

354 vhost = v['host']['path'] 

355 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost))) 

356 if ('config_volume' in v) and ('name' in v['config_volume']): 

357 vconfig_volume = v['config_volume']['name'] 

358 volumes.append(client.V1Volume(name=vname, config_map=client.V1ConfigMapVolumeSource(default_mode=0o0644, 

359 name=vconfig_volume, 

360 optional=True))) 

361 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro)) 

362 

363 return volumes, volume_mounts 

364 

365 

366def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat): 

367 if not log_info or not filebeat: 367 ↛ 368line 367 didn't jump to line 368, because the condition on line 367 was never true

368 return 

369 log_dir = log_info.get("log_directory") 

370 if not log_dir: 370 ↛ 371line 370 didn't jump to line 371, because the condition on line 370 was never true

371 return 

372 sidecar_volume_mounts = [] 

373 

374 # Create the volume for component log files and volume mounts for the component and sidecar containers 

375 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource())) 

376 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir)) 

377 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name) 

378 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path)) 

379 

380 # Create the volume for sidecar data and the volume mount for it 

381 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource())) 

382 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"])) 

383 

384 # Create the volume for the sidecar configuration data and the volume mount for it 

385 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed. 

386 volumes.append( 

387 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"]))) 

388 sidecar_volume_mounts.append( 

389 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], 

390 sub_path=filebeat["config_subpath"])) 

391 

392 # Finally create the container for the sidecar 

393 containers.append( 

394 _create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts)) 

395 

396 

397def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config): 

398 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a server( 

399 # tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate 

400 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not 

401 # specified), the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats. 

402 # In either case, the certificate directory is mounted onto the component container filesystem at the location 

403 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point ( 

404 # tls_config["component_cert_dir"]). 

405 docker_image = tls_config["image"] 

406 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]") 

407 

408 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

409 env = {} 

410 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false" 

411 

412 # Create the certificate volume and volume mounts 

413 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource())) 

414 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory)) 

415 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])] 

416 

417 # Create the init container 

418 init_containers.append( 

419 _create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

420 

421 

422def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config): 

423 # Adds an InitContainer to the pod which will generate external TLS certificates. 

424 docker_image = external_tls_config["image_tag"] 

425 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]") 

426 

427 env = {} 

428 env_from_secret = {} 

429 output_path = external_cert.get("external_cert_directory") 

430 if not output_path.endswith('/'): 430 ↛ 431line 430 didn't jump to line 431, because the condition on line 430 was never true

431 output_path += '/' 

432 

433 keystore_secret_key = external_tls_config.get("keystore_secret_key") 

434 truststore_secret_key = external_tls_config.get("truststore_secret_key") 

435 

436 env["REQUEST_URL"] = external_tls_config.get("request_url") 

437 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout") 

438 env["OUTPUT_PATH"] = output_path + "external" 

439 env["OUTPUT_TYPE"] = external_cert.get("cert_type") 

440 env["CA_NAME"] = external_cert.get("ca_name") 

441 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name") 

442 env["ORGANIZATION"] = external_tls_config.get("organization") 

443 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit") 

444 env["LOCATION"] = external_tls_config.get("location") 

445 env["STATE"] = external_tls_config.get("state") 

446 env["COUNTRY"] = external_tls_config.get("country") 

447 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans") 

448 env["KEYSTORE_PATH"] = MOUNT_PATH + keystore_secret_key 

449 env["TRUSTSTORE_PATH"] = MOUNT_PATH + truststore_secret_key 

450 env_from_secret["KEYSTORE_PASSWORD"] = \ 

451 {"env_name": "KEYSTORE_PASSWORD", 

452 "secret_name": external_tls_config.get("keystore_password_secret_name"), 

453 "secret_key": external_tls_config.get("keystore_password_secret_key")} 

454 env_from_secret["TRUSTSTORE_PASSWORD"] = \ 

455 {"env_name": "TRUSTSTORE_PASSWORD", 

456 "secret_name": external_tls_config.get("truststore_password_secret_name"), 

457 "secret_key": external_tls_config.get("truststore_password_secret_key")} 

458 # Create the volumes and volume mounts 

459 projected_volume = _create_projected_tls_volume(external_tls_config.get("cert_secret_name"), 

460 keystore_secret_key, 

461 truststore_secret_key) 

462 

463 volumes.append(client.V1Volume(name="tls-volume", projected=projected_volume)) 

464 init_volume_mounts = [ 

465 client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")), 

466 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)] 

467 

468 # Create the init container 

469 init_containers.append( 

470 _create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env, env_from_secret=env_from_secret)) 

471 

472 

473def _create_projected_tls_volume(secret_name, keystore_secret_key, truststore_secret_key): 

474 items = [ 

475 client.V1KeyToPath(key=keystore_secret_key, path=keystore_secret_key), 

476 client.V1KeyToPath(key=truststore_secret_key, path=truststore_secret_key)] 

477 secret_projection = client.V1SecretProjection(name=secret_name, items=items) 

478 volume_projection = [client.V1VolumeProjection(secret=secret_projection)] 

479 projected_volume = client.V1ProjectedVolumeSource(sources=volume_projection) 

480 return projected_volume 

481 

482 

483def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, 

484 cert_post_processor_config, isCertManagerIntegration): 

485 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file. 

486 docker_image = cert_post_processor_config["image_tag"] 

487 ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]") 

488 

489 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

490 if not tls_cert_dir.endswith('/'): 490 ↛ 493line 490 didn't jump to line 493, because the condition on line 490 was never false

491 tls_cert_dir += '/' 

492 

493 tls_cert_file_path = tls_cert_dir + "trust.jks" 

494 tls_cert_file_pass = tls_cert_dir + "trust.pass" 

495 

496 ext_cert_dir = tls_cert_dir + "external/" 

497 

498 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower() 

499 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type) 

500 ext_truststore_pass = '' 

501 if output_type != 'pem': 501 ↛ 504line 501 didn't jump to line 504, because the condition on line 501 was never false

502 ext_truststore_pass = ext_cert_dir + "truststore.pass" 

503 

504 env = {"TRUSTSTORES_PATHS": tls_cert_file_path + ":" + ext_truststore_path, 

505 "TRUSTSTORES_PASSWORDS_PATHS": tls_cert_file_pass + ":" + ext_truststore_pass, 

506 "KEYSTORE_SOURCE_PATHS": _get_keystore_source_paths(output_type, ext_cert_dir), 

507 "KEYSTORE_DESTINATION_PATHS": _get_keystore_destination_paths(output_type, tls_cert_dir)} 

508 

509 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"]) 

510 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"]) 

511 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"]) 

512 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"]) 

513 

514 # Create the volumes and volume mounts 

515 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)] 

516 if isCertManagerIntegration: 516 ↛ 517line 516 didn't jump to line 517, because the condition on line 516 was never true

517 init_volume_mounts.append(client.V1VolumeMount( 

518 name="certmanager-certs-volume", mount_path=ext_cert_dir)) 

519 # Create the init container 

520 init_containers.append( 

521 _create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

522 

523 

524def _get_file_extension(output_type): 

525 return { 

526 'p12': 'p12', 

527 'pem': 'pem', 

528 'jks': 'jks', 

529 }[output_type] 

530 

531 

532def _get_keystore_source_paths(output_type, ext_cert_dir): 

533 source_paths_template = { 

534 'p12': "{0}keystore.p12:{0}keystore.pass", 

535 'jks': "{0}keystore.jks:{0}keystore.pass", 

536 'pem': "{0}keystore.pem:{0}key.pem", 

537 }[output_type] 

538 return source_paths_template.format(ext_cert_dir) 

539 

540 

541def _get_keystore_destination_paths(output_type, tls_cert_dir): 

542 destination_paths_template = { 

543 'p12': "{0}cert.p12:{0}p12.pass", 

544 'jks': "{0}cert.jks:{0}jks.pass", 

545 'pem': "{0}cert.pem:{0}key.pem", 

546 }[output_type] 

547 return destination_paths_template.format(tls_cert_dir) 

548 

549 

550def _process_port_map(port_map): 

551 service_ports = [] # Ports exposed internally on the k8s network 

552 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort 

553 exposed_ports_ipv6 = [] 

554 for (cport, proto, ipv6), hport in port_map.items(): 

555 name = "xport-{0}-{1}".format(proto[0].lower(), cport) 

556 cport = int(cport) 

557 hport = int(hport) 

558 port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:]) 

559 if port not in service_ports: 559 ↛ 561line 559 didn't jump to line 561, because the condition on line 559 was never false

560 service_ports.append(port) 

561 if hport != 0: 561 ↛ 562line 561 didn't jump to line 562, because the condition on line 561 was never true

562 if ipv6: 

563 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

564 else: 

565 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

566 return service_ports, exposed_ports, exposed_ports_ipv6 

567 

568 

569def _service_exists(location, namespace, component_name): 

570 exists = False 

571 try: 

572 _configure_api(location) 

573 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) 

574 exists = True 

575 except client.rest.ApiException: 

576 pass 

577 

578 return exists 

579 

580 

581def _patch_deployment(location, namespace, deployment, modify): 

582 ''' 

583 Gets the current spec for 'deployment' in 'namespace' 

584 in the k8s cluster at 'location', 

585 uses the 'modify' function to change the spec, 

586 then sends the updated spec to k8s. 

587 ''' 

588 _configure_api(location) 

589 

590 # Get deployment spec 

591 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

592 

593 # Apply changes to spec 

594 spec = modify(spec) 

595 

596 # Patch the deploy with updated spec 

597 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec) 

598 

599 

600def _execute_command_in_pod(location, namespace, pod_name, command): 

601 ''' 

602 Execute the command (specified by an argv-style list in the "command" parameter) in 

603 the specified pod in the specified namespace at the specified location. 

604 For now at least, we use this only to 

605 run a notification script in a pod after a configuration change. 

606 

607 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". 

608 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly. 

609 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API. 

610 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py. 

611 There are several issues tracking this, in various states. It isn't clear that there will ever 

612 be a fix. 

613 - https://github.com/kubernetes-client/python/issues/58 

614 - https://github.com/kubernetes-client/python/issues/409 

615 - https://github.com/kubernetes-client/python/issues/526 

616 

617 The main consequence of the workaround using "stream" is that the caller does not get an indication 

618 of the exit code returned by the command when it completes execution. It turns out that the 

619 original implementation of notification in the Docker plugin did not use this result, so we can 

620 still match the original notification functionality. 

621 

622 The "stream" approach returns a string containing any output sent by the command to stdout or stderr. 

623 We'll return that so it can logged. 

624 ''' 

625 _configure_api(location) 

626 try: 

627 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, 

628 name=pod_name, 

629 namespace=namespace, 

630 command=command, 

631 stdout=True, 

632 stderr=True, 

633 stdin=False, 

634 tty=False) 

635 except client.rest.ApiException as e: 

636 # If the exception indicates the pod wasn't found, it's not a fatal error. 

637 # It existed when we enumerated the pods for the deployment but no longer exists. 

638 # Unfortunately, the only way to distinguish a pod not found from any other error 

639 # is by looking at the reason text. 

640 # (The ApiException's "status" field should contain the HTTP status code, which would 

641 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set 

642 # to zero.) 

643 if "404 not found" in e.reason.lower(): 

644 output = "Pod not found" 

645 else: 

646 raise e 

647 

648 return {"pod": pod_name, "output": output} 

649 

650 

651def _create_certificate_subject(external_tls_config): 

652 """ 

653 Map parameters to custom resource subject 

654 """ 

655 organization = external_tls_config.get("organization") 

656 organization_unit = external_tls_config.get("organizational_unit") 

657 country = external_tls_config.get("country") 

658 location = external_tls_config.get("location") 

659 state = external_tls_config.get("state") 

660 subject = { 

661 "organizations": [organization], 

662 "countries": [country], 

663 "localities": [location], 

664 "provinces": [state], 

665 "organizationalUnits": [organization_unit] 

666 } 

667 return subject 

668 

669 

670def _create_keystores_object(type, password_secret): 

671 """ 

672 Create keystore property (JKS and PKC12 certificate) for custom resource 

673 """ 

674 return {type: { 

675 "create": True, 

676 "passwordSecretRef": { 

677 "name": password_secret, 

678 "key": "password" 

679 }}} 

680 

681 

682def _get_keystores_object_type(output_type): 

683 """ 

684 Map config type to custom resource cert type 

685 """ 

686 return { 

687 'p12': 'pkcs12', 

688 'jks': 'jks', 

689 }[output_type] 

690 

691 

692def _create_projected_volume_with_password(cert_type, cert_secret_name, password_secret_name, password_secret_key): 

693 """ 

694 Create volume for password protected certificates. 

695 Secret contains passwords must be provided 

696 """ 

697 extension = _get_file_extension(cert_type) 

698 keystore_file_name = "keystore." + extension 

699 truststore_file_name = "truststore." + extension 

700 items = [client.V1KeyToPath(key=keystore_file_name, path=keystore_file_name), 

701 client.V1KeyToPath(key=truststore_file_name, path=truststore_file_name)] 

702 passwords = [client.V1KeyToPath(key=password_secret_key, path="keystore.pass"), client.V1KeyToPath(key=password_secret_key, path="truststore.pass")] 

703 

704 sec_projection = client.V1SecretProjection(name=cert_secret_name, items=items) 

705 sec_passwords_projection = client.V1SecretProjection(name=password_secret_name, items=passwords) 

706 sec_volume_projection = client.V1VolumeProjection(secret=sec_projection) 

707 sec_passwords_volume_projection = client.V1VolumeProjection(secret=sec_passwords_projection) 

708 

709 return [sec_volume_projection, sec_passwords_volume_projection] 

710 

711 

712def _create_pem_projected_volume(cert_secret_name): 

713 """ 

714 Create volume for pem certificate 

715 """ 

716 items = [client.V1KeyToPath(key="tls.crt", path="keystore.pem"), 

717 client.V1KeyToPath(key="ca.crt", path="truststore.pem"), 

718 client.V1KeyToPath(key="tls.key", path="key.pem")] 

719 sec_projection = client.V1SecretProjection(name=cert_secret_name, items=items) 

720 return [client.V1VolumeProjection(secret=sec_projection)] 

721 

722 

723def create_certificate_object(ctx, cert_secret_name, external_cert_data, external_tls_config, cert_name, issuer): 

724 """ 

725 Create cert-manager certificate custom resource object 

726 """ 

727 common_name = external_cert_data.get("external_certificate_parameters").get("common_name") 

728 subject = _create_certificate_subject(external_tls_config) 

729 

730 custom_resource = { 

731 "apiVersion": "cert-manager.io/v1", 

732 "kind": "Certificate", 

733 "metadata": {"name": cert_name }, 

734 "spec": { 

735 "secretName": cert_secret_name, 

736 "commonName": common_name, 

737 "issuerRef": { 

738 "group": "certmanager.onap.org", 

739 "kind": "CMPv2Issuer", 

740 "name": issuer 

741 } 

742 } 

743 } 

744 custom_resource.get("spec")["subject"] = subject 

745 

746 raw_sans = external_cert_data.get("external_certificate_parameters").get("sans") 

747 ctx.logger.info("Read SANS property: " + str(raw_sans)) 

748 sans = SansParser().parse_sans(raw_sans) 

749 ctx.logger.info("Parsed SANS: " + str(sans)) 

750 

751 if len(sans["ips"]) > 0: 

752 custom_resource.get("spec")["ipAddresses"] = sans["ips"] 

753 if len(sans["dnss"]) > 0: 

754 custom_resource.get("spec")["dnsNames"] = sans["dnss"] 

755 if len(sans["emails"]) > 0: 

756 custom_resource.get("spec")["emailAddresses"] = sans["emails"] 

757 if len(sans["uris"]) > 0: 

758 custom_resource.get("spec")["uris"] = sans["uris"] 

759 

760 return custom_resource 

761 

762 

763def _create_certificate_custom_resource(ctx, external_cert_data, external_tls_config, issuer, namespace, component_name, volumes, volume_mounts, deployment_description): 

764 """ 

765 Create certificate custom resource for provided configuration 

766 :param ctx: context 

767 :param external_cert_data: object contains certificate common name and 

768 SANs list 

769 :param external_tls_config: object contains information about certificate subject 

770 :param issuer: issuer-name 

771 :param namespace: namespace 

772 :param component_name: component name 

773 :param volumes: list of deployment volume 

774 :param volume_mounts: list of deployment volume mounts 

775 :param deployment_description: list contains deployment information, 

776 method appends created cert and secrets 

777 """ 

778 ctx.logger.info("Creating certificate custom resource") 

779 ctx.logger.info("External cert data: " + str(external_cert_data)) 

780 

781 cert_type = (external_cert_data.get("cert_type") or DEFAULT_CERT_TYPE).lower() 

782 

783 api = client.CustomObjectsApi() 

784 cert_secret_name = component_name + "-secret" 

785 cert_name = component_name + "-cert" 

786 cert_dir = external_cert_data.get("external_cert_directory") + "external/" 

787 custom_resource = create_certificate_object(ctx, cert_secret_name, 

788 external_cert_data, 

789 external_tls_config, 

790 cert_name, issuer) 

791 # Create the volumes 

792 if cert_type != 'pem': 

793 ctx.logger.info("Creating volume with passwords") 

794 password_secret_name, password_secret_key = create_secret_with_password(namespace, component_name + "-cert-password", "password", 30) 

795 deployment_description["secrets"].append(password_secret_name) 

796 custom_resource.get("spec")["keystores"] = _create_keystores_object(_get_keystores_object_type(cert_type), password_secret_name) 

797 projected_volume_sources = _create_projected_volume_with_password( 

798 cert_type, cert_secret_name, password_secret_name, password_secret_key) 

799 else: 

800 ctx.logger.info("Creating PEM volume") 

801 projected_volume_sources = _create_pem_projected_volume(cert_secret_name) 

802 

803 # Create the volume mounts 

804 projected_volume = client.V1ProjectedVolumeSource(sources=projected_volume_sources) 

805 volumes.append(client.V1Volume(name="certmanager-certs-volume", projected=projected_volume)) 

806 volume_mounts.append(client.V1VolumeMount(name="certmanager-certs-volume", mount_path=cert_dir)) 

807 

808 #Create certificate custom resource 

809 ctx.logger.info("Certificate CRD: " + str(custom_resource)) 

810 api.create_namespaced_custom_object( 

811 group="cert-manager.io", 

812 version="v1", 

813 namespace=namespace, 

814 plural="certificates", 

815 body=custom_resource 

816 ) 

817 deployment_description["certificates"].append(cert_name) 

818 deployment_description["secrets"].append(cert_secret_name) 

819 ctx.logger.info("CRD certificate created") 

820 

821 

822def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs): 

823 """ 

824 This will create a k8s Deployment and, if needed, one or two k8s Services. 

825 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

826 We're not exposing k8s to the component developer and the blueprint author. 

827 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

828 the details from the component developer and the blueprint author.) 

829 

830 namespace: the Kubernetes namespace into which the component is deployed 

831 component_name: the component name, used to derive names of Kubernetes entities 

832 image: the docker image for the component being deployed 

833 replica: the number of instances of the component to be deployed 

834 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of 

835 the Docker image for the component, even if it is already present on the Kubernetes node. 

836 k8sconfig contains: 

837 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images. 

838 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.) 

839 - filebeat: a dictionary of filebeat sidecar parameters: 

840 "log_path" : mount point for log volume in filebeat container 

841 "data_path" : mount point for data volume in filebeat container 

842 "config_path" : mount point for config volume in filebeat container 

843 "config_subpath" : subpath for config data in filebeat container 

844 "config_map" : ConfigMap holding the filebeat configuration 

845 "image": Docker image to use for filebeat 

846 - tls: a dictionary of TLS-related information: 

847 "cert_path": mount point for certificate volume in init container 

848 "image": Docker image to use for TLS init container 

849 "component_cert_dir" : default mount point for certs 

850 - cert_post_processor: a dictionary of cert_post_processor information: 

851 "image_tag": docker image to use for cert-post-processor init container 

852 kwargs may have: 

853 - volumes: array of volume objects, where a volume object is: 

854 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

855 - ports: array of strings in the form "container_port:host_port" 

856 - env: map of name-value pairs ( {name0: value0, name1: value1...} 

857 - log_info: an object with info for setting up ELK logging, with the form: 

858 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"} 

859 - tls_info: an object with info for setting up TLS (HTTPS), with the form: 

860 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" } 

861 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

862 {"external_cert": 

863 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

864 "use_external_tls": true or false, 

865 "ca_name": "ca-name-value", 

866 "cert_type": "P12" or "JKS" or "PEM", 

867 "external_certificate_parameters": 

868 "common_name": "common-name-value", 

869 "sans": "sans-value"} 

870 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"} 

871 These label will be set on all the pods deployed as a result of this deploy() invocation. 

872 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing: 

873 - cpu: number CPU usage, like 0.5 

874 - memory: string memory requirement, like "2Gi" 

875 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes: 

876 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

877 - interval: period (in seconds) between probes 

878 - timeout: time (in seconds) to allow a probe to complete 

879 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types 

880 - path: the full path to the script to be executed in the container for "script" and "docker" types 

881 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes: 

882 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

883 - interval: period (in seconds) between probes 

884 - timeout: time (in seconds) to allow a probe to complete 

885 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types 

886 - path: the full path to the script to be executed in the container for "script" and "docker" types 

887 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

888 

889 """ 

890 

891 deployment_ok = False 

892 cip_service_created = False 

893 deployment_description = { 

894 "namespace": namespace, 

895 "location": kwargs.get("k8s_location"), 

896 "deployment": '', 

897 "services": [], 

898 "certificates": [], 

899 "secrets": [] 

900 } 

901 

902 try: 

903 

904 # Get API handles 

905 _configure_api(kwargs.get("k8s_location")) 

906 core = client.CoreV1Api() 

907 k8s_apps_v1_api_client = client.AppsV1Api() 

908 

909 # Parse the port mapping 

910 container_ports, port_map = parse_ports(kwargs.get("ports", [])) 

911 

912 # Parse the volumes list into volumes and volume_mounts for the deployment 

913 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", [])) 

914 

915 # Initialize the list of containers that will be part of the pod 

916 containers = [] 

917 init_containers = [] 

918 

919 # Set up the ELK logging sidecar container, if needed 

920 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), 

921 k8sconfig.get("filebeat")) 

922 

923 # Set up TLS information 

924 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, 

925 k8sconfig.get("tls")) 

926 

927 # Set up external TLS information 

928 external_cert = kwargs.get("external_cert") 

929 cmpv2_issuer_config = k8sconfig.get("cmpv2_issuer") 

930 ctx.logger.info("CMPv2 Issuer properties: " + str(cmpv2_issuer_config)) 

931 

932 cmpv2_integration_enabled = bool(util.strtobool(cmpv2_issuer_config.get("enabled"))) 

933 ctx.logger.info("CMPv2 integration enabled: " + str(cmpv2_integration_enabled)) 

934 

935 

936 if external_cert and external_cert.get("use_external_tls"): 

937 if cmpv2_integration_enabled: 937 ↛ 938line 937 didn't jump to line 938, because the condition on line 937 was never true

938 _create_certificate_custom_resource(ctx, external_cert, 

939 k8sconfig.get("external_cert"), 

940 cmpv2_issuer_config.get("name"), 

941 namespace, 

942 component_name, volumes, 

943 volume_mounts, deployment_description) 

944 else: 

945 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, 

946 k8sconfig.get("external_cert")) 

947 _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, 

948 k8sconfig.get("tls"), external_cert, 

949 k8sconfig.get( 

950 "cert_post_processor"),cmpv2_integration_enabled) 

951 

952 # Create the container for the component 

953 # Make it the first container in the pod 

954 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")} 

955 container_args['container_ports'] = container_ports 

956 container_args['volume_mounts'] = volume_mounts 

957 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args)) 

958 

959 # Build the k8s Deployment object 

960 labels = kwargs.get("labels", {}) 

961 labels["app"] = component_name 

962 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, 

963 pull_secrets=k8sconfig["image_pull_secrets"]) 

964 

965 # Have k8s deploy it 

966 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep) 

967 deployment_ok = True 

968 deployment_description["deployment"] = _create_deployment_name(component_name) 

969 

970 # Create service(s), if a port mapping is specified 

971 if port_map: 971 ↛ 1006line 971 didn't jump to line 1006, because the condition on line 971 was never false

972 service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map) 

973 

974 # Create a ClusterIP service for access via the k8s network 

975 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, 

976 labels, "ClusterIP", "IPv4") 

977 core.create_namespaced_service(namespace, service) 

978 cip_service_created = True 

979 deployment_description["services"].append(_create_service_name(component_name)) 

980 

981 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service 

982 if exposed_ports: 982 ↛ 983line 982 didn't jump to line 983

983 exposed_service = \ 

984 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, 

985 '', labels, "NodePort", "IPv4") 

986 core.create_namespaced_service(namespace, exposed_service) 

987 deployment_description["services"].append(_create_exposed_service_name(component_name)) 

988 

989 if exposed_ports_ipv6: 989 ↛ 990line 989 didn't jump to line 990

990 exposed_service_ipv6 = \ 

991 _create_service_object(_create_exposed_v6_service_name(component_name), component_name, 

992 exposed_ports_ipv6, '', labels, "NodePort", "IPv6") 

993 core.create_namespaced_service(namespace, exposed_service_ipv6) 

994 deployment_description["services"].append(_create_exposed_v6_service_name(component_name)) 

995 

996 except Exception as e: 

997 # If the ClusterIP service was created, delete the service: 

998 if cip_service_created: 

999 core.delete_namespaced_service(_create_service_name(component_name), namespace) 

1000 # If the deployment was created but not the service, delete the deployment 

1001 if deployment_ok: 

1002 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, 

1003 body=client.V1DeleteOptions()) 

1004 raise e 

1005 

1006 return dep, deployment_description 

1007 

1008 

1009def undeploy(deployment_description): 

1010 _configure_api(deployment_description["location"]) 

1011 

1012 namespace = deployment_description["namespace"] 

1013 

1014 # remove any services associated with the component 

1015 for service in deployment_description["services"]: 

1016 client.CoreV1Api().delete_namespaced_service(service, namespace) 

1017 

1018 for secret in deployment_description["secrets"]: 

1019 client.CoreV1Api().delete_namespaced_secret(secret, namespace) 

1020 

1021 for cert in deployment_description["certificates"]: 

1022 # client.CoreV1Api().delete_namespaced_service(service, namespace) 

1023 client.CustomObjectsApi().delete_namespaced_custom_object( 

1024 group="cert-manager.io", 

1025 version="v1", 

1026 name=cert, 

1027 namespace=namespace, 

1028 plural="certificates" 

1029 ) 

1030 # Have k8s delete the underlying pods and replicaset when deleting the deployment. 

1031 options = client.V1DeleteOptions(propagation_policy="Foreground") 

1032 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) 

1033 

1034 

1035def is_available(location, namespace, component_name): 

1036 _configure_api(location) 

1037 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), 

1038 namespace) 

1039 # Check if the number of available replicas is equal to the number requested and that the replicas match the 

1040 # current spec This check can be used to verify completion of an initial deployment, a scale operation, 

1041 # or an update operation 

1042 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas 

1043 

1044 

1045def scale(deployment_description, replicas): 

1046 """ Trigger a scaling operation by updating the replica count for the Deployment """ 

1047 

1048 def update_replica_count(spec): 

1049 spec.spec.replicas = replicas 

1050 return spec 

1051 

1052 _patch_deployment(deployment_description["location"], deployment_description["namespace"], 

1053 deployment_description["deployment"], update_replica_count) 

1054 

1055 

1056def upgrade(deployment_description, image, container_index=0): 

1057 """ Trigger a rolling upgrade by sending a new image name/tag to k8s """ 

1058 

1059 def update_image(spec): 

1060 spec.spec.template.spec.containers[container_index].image = image 

1061 return spec 

1062 

1063 _patch_deployment(deployment_description["location"], deployment_description["namespace"], 

1064 deployment_description["deployment"], update_image) 

1065 

1066 

1067def rollback(deployment_description, rollback_to=0): 

1068 """ 

1069 Undo upgrade by rolling back to a previous revision of the deployment. 

1070 By default, go back one revision. 

1071 rollback_to can be used to supply a specific revision number. 

1072 Returns the image for the app container and the replica count from the rolled-back deployment 

1073 """ 

1074 ''' 

1075 2018-07-13 

1076 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method. 

1077 The k8s python client code throws an exception while processing the response from the API. 

1078 See: 

1079 - https://github.com/kubernetes-client/python/issues/491 

1080 - https://github.com/kubernetes/kubernetes/pull/63837 

1081 The fix has been merged into the master branch but is not in the latest release. 

1082 ''' 

1083 _configure_api(deployment_description["location"]) 

1084 deployment = deployment_description["deployment"] 

1085 namespace = deployment_description["namespace"] 

1086 

1087 # Initiate the rollback 

1088 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback( 

1089 deployment, 

1090 namespace, 

1091 client.AppsV1beta1DeploymentRollback(name=deployment, 

1092 rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to))) 

1093 

1094 # Read back the spec for the rolled-back deployment 

1095 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

1096 return spec.spec.template.spec.containers[0].image, spec.spec.replicas 

1097 

1098 

1099def execute_command_in_deployment(deployment_description, command): 

1100 """ 

1101 Enumerates the pods in the k8s deployment identified by "deployment_description", 

1102 then executes the command (represented as an argv-style list) in "command" in 

1103 container 0 (the main application container) each of those pods. 

1104 

1105 Note that the sets of pods associated with a deployment can change over time. The 

1106 enumeration is a snapshot at one point in time. The command will not be executed in 

1107 pods that are created after the initial enumeration. If a pod disappears after the 

1108 initial enumeration and before the command is executed, the attempt to execute the 

1109 command will fail. This is not treated as a fatal error. 

1110 

1111 This approach is reasonable for the one current use case for "execute_command": running a 

1112 script to notify a container that its configuration has changed as a result of a 

1113 policy change. In this use case, the new configuration information is stored into 

1114 the configuration store (Consul), the pods are enumerated, and the command is executed. 

1115 If a pod disappears after the enumeration, the fact that the command cannot be run 

1116 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that 

1117 comes up after the enumeration will get its initial configuration from the updated version 

1118 in Consul. 

1119 

1120 The optimal solution here would be for k8s to provide an API call to execute a command in 

1121 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the 

1122 only call provided by k8s operates at the pod level, not the deployment level. 

1123 

1124 Another interesting k8s factoid: there's no direct way to list the pods belong to a 

1125 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on 

1126 the pod that has the k8s deployment name. To list the pods, the code below queries for 

1127 pods with the label carrying the deployment name. 

1128 """ 

1129 location = deployment_description["location"] 

1130 _configure_api(location) 

1131 deployment = deployment_description["deployment"] 

1132 namespace = deployment_description["namespace"] 

1133 

1134 # Get names of all the running pods belonging to the deployment 

1135 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod( 

1136 namespace=namespace, 

1137 label_selector="k8sdeployment={0}".format(deployment), 

1138 field_selector="status.phase=Running" 

1139 ).items] 

1140 

1141 # Execute command in the running pods 

1142 return [_execute_command_in_pod(location, namespace, pod_name, command) 

1143 for pod_name in pod_names] 

1144 

1145 

1146 

1147