Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020-2021 Nokia. All rights reserved. 

7# Copyright (c) 2020 J. F. Lucas. All rights reserved. 

8# ================================================================================ 

9# Licensed under the Apache License, Version 2.0 (the "License"); 

10# you may not use this file except in compliance with the License. 

11# You may obtain a copy of the License at 

12# 

13# http://www.apache.org/licenses/LICENSE-2.0 

14# 

15# Unless required by applicable law or agreed to in writing, software 

16# distributed under the License is distributed on an "AS IS" BASIS, 

17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

18# See the License for the specific language governing permissions and 

19# limitations under the License. 

20# ============LICENSE_END========================================================= 

21# 

22from distutils import util 

23import os 

24import re 

25import uuid 

26import base64 

27 

28from binascii import hexlify 

29from kubernetes import config, client, stream 

30from .sans_parser import SansParser 

31 

32# Default values for readiness probe 

33PROBE_DEFAULT_PERIOD = 15 

34PROBE_DEFAULT_TIMEOUT = 1 

35 

36# Location of k8s cluster config file ("kubeconfig") 

37K8S_CONFIG_PATH = "/opt/onap/kube/kubeconfig" 

38 

39# Regular expression for interval/timeout specification 

40INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") 

41# Conversion factors to seconds 

42FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} 

43 

44# Regular expression for port mapping 

45# group 1: container port 

46# group 2: / + protocol 

47# group 3: protocol 

48# group 4: host port 

49PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") 

50 

51# Constants for external_cert 

52MOUNT_PATH = "/etc/onap/oom/certservice/certs/" 

53KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks" 

54TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks" 

55DEFAULT_CERT_TYPE = "p12" 

56 

57 

58def _create_deployment_name(component_name): 

59 return "dep-{0}".format(component_name)[:63] 

60 

61 

62def _create_service_name(component_name): 

63 return "{0}".format(component_name)[:63] 

64 

65 

66def _create_exposed_service_name(component_name): 

67 return ("x{0}".format(component_name))[:63] 

68 

69 

70def _create_exposed_v6_service_name(component_name): 

71 return ("x{0}-ipv6".format(component_name))[:63] 

72 

73 

74def _configure_api(location=None): 

75 # Look for a kubernetes config file 

76 if os.path.exists(K8S_CONFIG_PATH): 

77 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) 

78 else: 

79 # Maybe we're running in a k8s container and we can use info provided by k8s 

80 # We would like to use: 

81 # config.load_incluster_config() 

82 # but this looks into os.environ for kubernetes host and port, and from 

83 # the plugin those aren't visible. So we use the InClusterConfigLoader class, 

84 # where we can set the environment to what we like. 

85 # This is probably brittle! Maybe there's a better alternative. 

86 localenv = { 

87 config.incluster_config.SERVICE_HOST_ENV_NAME: "kubernetes.default.svc.cluster.local", 

88 config.incluster_config.SERVICE_PORT_ENV_NAME: "443" 

89 } 

90 config.incluster_config.InClusterConfigLoader( 

91 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME, 

92 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME, 

93 environ=localenv 

94 ).load_and_set() 

95 

96 

97def _parse_interval(t): 

98 """ 

99 Parse an interval specification 

100 t can be 

101 - a simple integer quantity, interpreted as seconds 

102 - a string representation of a decimal integer, interpreted as seconds 

103 - a string consisting of a represention of an decimal integer followed by a unit, 

104 with "s" representing seconds, "m" representing minutes, 

105 and "h" representing hours 

106 Used for compatibility with the Docker plugin, where time intervals 

107 for health checks were specified as strings with a number and a unit. 

108 See 'intervalspec' above for the regular expression that's accepted. 

109 """ 

110 m = INTERVAL_SPEC.match(str(t)) 

111 if m: 

112 time = int(m.group(1)) * FACTORS[m.group(2)] 

113 else: 

114 raise ValueError("Bad interval specification: {0}".format(t)) 

115 return time 

116 

117 

118def _create_probe(hc, port): 

119 """ Create a Kubernetes probe based on info in the health check dictionary hc """ 

120 probe_type = hc['type'] 

121 probe = None 

122 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD)) 

123 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT)) 

124 if probe_type in ['http', 'https']: 

125 probe = client.V1Probe( 

126 failure_threshold=1, 

127 initial_delay_seconds=5, 

128 period_seconds=period, 

129 timeout_seconds=timeout, 

130 http_get=client.V1HTTPGetAction( 

131 path=hc['endpoint'], 

132 port=port, 

133 scheme=probe_type.upper() 

134 ) 

135 ) 

136 elif probe_type in ['script', 'docker']: 136 ↛ 146line 136 didn't jump to line 146, because the condition on line 136 was never false

137 probe = client.V1Probe( 

138 failure_threshold=1, 

139 initial_delay_seconds=5, 

140 period_seconds=period, 

141 timeout_seconds=timeout, 

142 _exec=client.V1ExecAction( 

143 command=hc['script'].split() 

144 ) 

145 ) 

146 return probe 

147 

148 

149def _create_resources(resources=None): 

150 if resources is not None: 150 ↛ 157line 150 didn't jump to line 157, because the condition on line 150 was never false

151 resources_obj = client.V1ResourceRequirements( 

152 limits=resources.get("limits"), 

153 requests=resources.get("requests") 

154 ) 

155 return resources_obj 

156 else: 

157 return None 

158 

159 

160def _create_container_object(name, image, always_pull, **kwargs): 

161 # Set up environment variables 

162 # Copy any passed in environment variables 

163 env = kwargs.get('env') or {} 

164 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env] 

165 # Add POD_IP with the IP address of the pod running the container 

166 pod_ip = client.V1EnvVarSource(field_ref=client.V1ObjectFieldSelector(field_path="status.podIP")) 

167 env_vars.append(client.V1EnvVar(name="POD_IP", value_from=pod_ip)) 

168 

169 # If a health check is specified, create a readiness/liveness probe 

170 # (For an HTTP-based check, we assume it's at the first container port) 

171 readiness = kwargs.get('readiness') 

172 liveness = kwargs.get('liveness') 

173 resources = kwargs.get('resources') 

174 container_ports = kwargs.get('container_ports') or [] 

175 

176 hc_port = container_ports[0][0] if container_ports else None 

177 probe = _create_probe(readiness, hc_port) if readiness else None 

178 live_probe = _create_probe(liveness, hc_port) if liveness else None 

179 resources_obj = _create_resources(resources) if resources else None 

180 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto) 

181 for port, proto in container_ports] 

182 

183 # Define container for pod 

184 return client.V1Container( 

185 name=name, 

186 image=image, 

187 image_pull_policy='Always' if always_pull else 'IfNotPresent', 

188 env=env_vars, 

189 ports=port_objs, 

190 volume_mounts=kwargs.get('volume_mounts') or [], 

191 resources=resources_obj, 

192 readiness_probe=probe, 

193 liveness_probe=live_probe 

194 ) 

195 

196 

197def _create_deployment_object(component_name, 

198 containers, 

199 init_containers, 

200 replicas, 

201 volumes, 

202 labels=None, 

203 pull_secrets=None): 

204 if labels is None: 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true

205 labels = {} 

206 if pull_secrets is None: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 pull_secrets = [] 

208 deployment_name = _create_deployment_name(component_name) 

209 

210 # Label the pod with the deployment name, so we can find it easily 

211 labels.update({"k8sdeployment": deployment_name}) 

212 

213 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials 

214 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod 

215 ips = [] 

216 for secret in pull_secrets: 

217 ips.append(client.V1LocalObjectReference(name=secret)) 

218 

219 # Define pod template 

220 template = client.V1PodTemplateSpec( 

221 metadata=client.V1ObjectMeta(labels=labels), 

222 spec=client.V1PodSpec(hostname=component_name, 

223 containers=containers, 

224 init_containers=init_containers, 

225 volumes=volumes, 

226 image_pull_secrets=ips) 

227 ) 

228 

229 # Define deployment spec 

230 spec = client.V1DeploymentSpec( 

231 replicas=replicas, 

232 selector=client.V1LabelSelector(match_labels=labels), 

233 template=template 

234 ) 

235 

236 # Create deployment object 

237 deployment = client.V1Deployment( 

238 api_version="apps/v1", 

239 kind="Deployment", 

240 metadata=client.V1ObjectMeta(name=deployment_name, labels=labels), 

241 spec=spec 

242 ) 

243 

244 return deployment 

245 

246 

247def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family): 

248 service_spec = client.V1ServiceSpec( 

249 ports=service_ports, 

250 selector={"app": component_name}, 

251 type=service_type, 

252 ip_family=ip_family 

253 ) 

254 if annotations: 254 ↛ 255line 254 didn't jump to line 255, because the condition on line 254 was never true

255 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations) 

256 else: 

257 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels) 

258 

259 service = client.V1Service( 

260 kind="Service", 

261 api_version="v1", 

262 metadata=metadata, 

263 spec=service_spec 

264 ) 

265 return service 

266 

267 

268def create_secret_with_password(namespace, secret_prefix, password_key, password_length): 

269 """ 

270 Creates K8s secret object with a generated password. 

271 Returns: secret name and data key. 

272 

273 Example usage: 

274 create_secret_with_password('onap', 'dcae-keystore-password-', 128) 

275 """ 

276 password = _generate_password(password_length) 

277 password_base64 = _encode_base64(password) 

278 

279 metadata = {'generateName': secret_prefix, 'namespace': namespace} 

280 key = password_key 

281 data = {key: password_base64} 

282 

283 response = _create_k8s_secret(namespace, metadata, data, 'Opaque') 

284 secret_name = response.metadata.name 

285 return secret_name, key 

286 

287 

288def _generate_password(length): 

289 rand = os.urandom(length) 

290 password = hexlify(rand) 

291 return password.decode("ascii"); 

292 

293 

294def _encode_base64(value): 

295 value_bytes = value.encode("ascii") 

296 base64_encoded_bytes = base64.b64encode(value_bytes) 

297 encoded_value = base64_encoded_bytes.decode("ascii") 

298 return encoded_value 

299 

300 

301def _create_k8s_secret(namespace, metadata, data, secret_type): 

302 api_version = 'v1' 

303 kind = 'Secret' 

304 body = client.V1Secret(api_version, data, kind, metadata, type=secret_type) 

305 

306 response = client.CoreV1Api().create_namespaced_secret(namespace, body) 

307 return response 

308 

309 

310def parse_ports(port_list): 

311 """ 

312 Parse the port list into a list of container ports (needed to create the container) 

313 and to a set of port mappings to set up k8s services. 

314 """ 

315 container_ports = [] 

316 port_map = {} 

317 for p in port_list: 

318 ipv6 = False 

319 if type(p) is dict: 

320 ipv6 = "ipv6" in p and p['ipv6'] 

321 p = "".join(str(v) for v in p['concat']) 

322 m = PORTS.match(p.strip()) 

323 if m: 

324 cport = int(m.group(1)) 

325 hport = int(m.group(4)) 

326 if m.group(3): 

327 proto = (m.group(3)).upper() 

328 else: 

329 proto = "TCP" 

330 port = (cport, proto) 

331 if port not in container_ports: 331 ↛ 333line 331 didn't jump to line 333, because the condition on line 331 was never false

332 container_ports.append(port) 

333 port_map[(cport, proto, ipv6)] = hport 

334 else: 

335 raise ValueError("Bad port specification: {0}".format(p)) 

336 

337 return container_ports, port_map 

338 

339 

340def _parse_volumes(volume_list): 

341 volumes = [] 

342 volume_mounts = [] 

343 for v in volume_list: 

344 vname = str(uuid.uuid4()) 

345 vcontainer = v['container']['bind'] 

346 vro = (v['container'].get('mode') == 'ro') 

347 if ('host' in v) and ('path' in v['host']): 

348 vhost = v['host']['path'] 

349 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost))) 

350 if ('config_volume' in v) and ('name' in v['config_volume']): 

351 vconfig_volume = v['config_volume']['name'] 

352 volumes.append(client.V1Volume(name=vname, config_map=client.V1ConfigMapVolumeSource(default_mode=0o0644, 

353 name=vconfig_volume, 

354 optional=True))) 

355 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro)) 

356 

357 return volumes, volume_mounts 

358 

359 

360def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat): 

361 if not log_info or not filebeat: 361 ↛ 362line 361 didn't jump to line 362, because the condition on line 361 was never true

362 return 

363 log_dir = log_info.get("log_directory") 

364 if not log_dir: 364 ↛ 365line 364 didn't jump to line 365, because the condition on line 364 was never true

365 return 

366 sidecar_volume_mounts = [] 

367 

368 # Create the volume for component log files and volume mounts for the component and sidecar containers 

369 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource())) 

370 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir)) 

371 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name) 

372 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path)) 

373 

374 # Create the volume for sidecar data and the volume mount for it 

375 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource())) 

376 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"])) 

377 

378 # Create the volume for the sidecar configuration data and the volume mount for it 

379 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed. 

380 volumes.append( 

381 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"]))) 

382 sidecar_volume_mounts.append( 

383 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], 

384 sub_path=filebeat["config_subpath"])) 

385 

386 # Finally create the container for the sidecar 

387 containers.append( 

388 _create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts)) 

389 

390 

391def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config): 

392 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a server( 

393 # tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate 

394 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not 

395 # specified), the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats. 

396 # In either case, the certificate directory is mounted onto the component container filesystem at the location 

397 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point ( 

398 # tls_config["component_cert_dir"]). 

399 docker_image = tls_config["image"] 

400 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]") 

401 

402 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

403 env = {} 

404 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false" 

405 

406 # Create the certificate volume and volume mounts 

407 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource())) 

408 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory)) 

409 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])] 

410 

411 # Create the init container 

412 init_containers.append( 

413 _create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

414 

415 

416def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config): 

417 # Adds an InitContainer to the pod which will generate external TLS certificates. 

418 docker_image = external_tls_config["image_tag"] 

419 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]") 

420 

421 env = {} 

422 output_path = external_cert.get("external_cert_directory") 

423 if not output_path.endswith('/'): 423 ↛ 424line 423 didn't jump to line 424, because the condition on line 423 was never true

424 output_path += '/' 

425 

426 env["REQUEST_URL"] = external_tls_config.get("request_url") 

427 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout") 

428 env["OUTPUT_PATH"] = output_path + "external" 

429 env["OUTPUT_TYPE"] = external_cert.get("cert_type") 

430 env["CA_NAME"] = external_cert.get("ca_name") 

431 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name") 

432 env["ORGANIZATION"] = external_tls_config.get("organization") 

433 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit") 

434 env["LOCATION"] = external_tls_config.get("location") 

435 env["STATE"] = external_tls_config.get("state") 

436 env["COUNTRY"] = external_tls_config.get("country") 

437 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans") 

438 env["KEYSTORE_PATH"] = KEYSTORE_PATH 

439 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password") 

440 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH 

441 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password") 

442 

443 # Create the volumes and volume mounts 

444 sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name")) 

445 volumes.append(client.V1Volume(name="tls-volume", secret=sec)) 

446 init_volume_mounts = [ 

447 client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")), 

448 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)] 

449 

450 # Create the init container 

451 init_containers.append( 

452 _create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

453 

454 

455def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, 

456 cert_post_processor_config, isCertManagerIntegration): 

457 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file. 

458 docker_image = cert_post_processor_config["image_tag"] 

459 ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]") 

460 

461 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

462 if not tls_cert_dir.endswith('/'): 462 ↛ 465line 462 didn't jump to line 465, because the condition on line 462 was never false

463 tls_cert_dir += '/' 

464 

465 tls_cert_file_path = tls_cert_dir + "trust.jks" 

466 tls_cert_file_pass = tls_cert_dir + "trust.pass" 

467 

468 ext_cert_dir = tls_cert_dir + "external/" 

469 

470 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower() 

471 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type) 

472 ext_truststore_pass = '' 

473 if output_type != 'pem': 473 ↛ 476line 473 didn't jump to line 476, because the condition on line 473 was never false

474 ext_truststore_pass = ext_cert_dir + "truststore.pass" 

475 

476 env = {"TRUSTSTORES_PATHS": tls_cert_file_path + ":" + ext_truststore_path, 

477 "TRUSTSTORES_PASSWORDS_PATHS": tls_cert_file_pass + ":" + ext_truststore_pass, 

478 "KEYSTORE_SOURCE_PATHS": _get_keystore_source_paths(output_type, ext_cert_dir), 

479 "KEYSTORE_DESTINATION_PATHS": _get_keystore_destination_paths(output_type, tls_cert_dir)} 

480 

481 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"]) 

482 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"]) 

483 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"]) 

484 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"]) 

485 

486 # Create the volumes and volume mounts 

487 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)] 

488 if isCertManagerIntegration: 488 ↛ 489line 488 didn't jump to line 489, because the condition on line 488 was never true

489 init_volume_mounts.append(client.V1VolumeMount( 

490 name="certmanager-certs-volume", mount_path=ext_cert_dir)) 

491 # Create the init container 

492 init_containers.append( 

493 _create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

494 

495 

496def _get_file_extension(output_type): 

497 return { 

498 'p12': 'p12', 

499 'pem': 'pem', 

500 'jks': 'jks', 

501 }[output_type] 

502 

503 

504def _get_keystore_source_paths(output_type, ext_cert_dir): 

505 source_paths_template = { 

506 'p12': "{0}keystore.p12:{0}keystore.pass", 

507 'jks': "{0}keystore.jks:{0}keystore.pass", 

508 'pem': "{0}keystore.pem:{0}key.pem", 

509 }[output_type] 

510 return source_paths_template.format(ext_cert_dir) 

511 

512 

513def _get_keystore_destination_paths(output_type, tls_cert_dir): 

514 destination_paths_template = { 

515 'p12': "{0}cert.p12:{0}p12.pass", 

516 'jks': "{0}cert.jks:{0}jks.pass", 

517 'pem': "{0}cert.pem:{0}key.pem", 

518 }[output_type] 

519 return destination_paths_template.format(tls_cert_dir) 

520 

521 

522def _process_port_map(port_map): 

523 service_ports = [] # Ports exposed internally on the k8s network 

524 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort 

525 exposed_ports_ipv6 = [] 

526 for (cport, proto, ipv6), hport in port_map.items(): 

527 name = "xport-{0}-{1}".format(proto[0].lower(), cport) 

528 cport = int(cport) 

529 hport = int(hport) 

530 port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:]) 

531 if port not in service_ports: 531 ↛ 533line 531 didn't jump to line 533, because the condition on line 531 was never false

532 service_ports.append(port) 

533 if hport != 0: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true

534 if ipv6: 

535 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

536 else: 

537 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

538 return service_ports, exposed_ports, exposed_ports_ipv6 

539 

540 

541def _service_exists(location, namespace, component_name): 

542 exists = False 

543 try: 

544 _configure_api(location) 

545 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) 

546 exists = True 

547 except client.rest.ApiException: 

548 pass 

549 

550 return exists 

551 

552 

553def _patch_deployment(location, namespace, deployment, modify): 

554 ''' 

555 Gets the current spec for 'deployment' in 'namespace' 

556 in the k8s cluster at 'location', 

557 uses the 'modify' function to change the spec, 

558 then sends the updated spec to k8s. 

559 ''' 

560 _configure_api(location) 

561 

562 # Get deployment spec 

563 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

564 

565 # Apply changes to spec 

566 spec = modify(spec) 

567 

568 # Patch the deploy with updated spec 

569 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec) 

570 

571 

572def _execute_command_in_pod(location, namespace, pod_name, command): 

573 ''' 

574 Execute the command (specified by an argv-style list in the "command" parameter) in 

575 the specified pod in the specified namespace at the specified location. 

576 For now at least, we use this only to 

577 run a notification script in a pod after a configuration change. 

578 

579 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". 

580 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly. 

581 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API. 

582 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py. 

583 There are several issues tracking this, in various states. It isn't clear that there will ever 

584 be a fix. 

585 - https://github.com/kubernetes-client/python/issues/58 

586 - https://github.com/kubernetes-client/python/issues/409 

587 - https://github.com/kubernetes-client/python/issues/526 

588 

589 The main consequence of the workaround using "stream" is that the caller does not get an indication 

590 of the exit code returned by the command when it completes execution. It turns out that the 

591 original implementation of notification in the Docker plugin did not use this result, so we can 

592 still match the original notification functionality. 

593 

594 The "stream" approach returns a string containing any output sent by the command to stdout or stderr. 

595 We'll return that so it can logged. 

596 ''' 

597 _configure_api(location) 

598 try: 

599 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, 

600 name=pod_name, 

601 namespace=namespace, 

602 command=command, 

603 stdout=True, 

604 stderr=True, 

605 stdin=False, 

606 tty=False) 

607 except client.rest.ApiException as e: 

608 # If the exception indicates the pod wasn't found, it's not a fatal error. 

609 # It existed when we enumerated the pods for the deployment but no longer exists. 

610 # Unfortunately, the only way to distinguish a pod not found from any other error 

611 # is by looking at the reason text. 

612 # (The ApiException's "status" field should contain the HTTP status code, which would 

613 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set 

614 # to zero.) 

615 if "404 not found" in e.reason.lower(): 

616 output = "Pod not found" 

617 else: 

618 raise e 

619 

620 return {"pod": pod_name, "output": output} 

621 

622 

623def _create_certificate_subject(external_tls_config): 

624 """ 

625 Map parameters to custom resource subject 

626 """ 

627 organization = external_tls_config.get("organization") 

628 organization_unit = external_tls_config.get("organizational_unit") 

629 country = external_tls_config.get("country") 

630 location = external_tls_config.get("location") 

631 state = external_tls_config.get("state") 

632 subject = { 

633 "organizations": [organization], 

634 "countries": [country], 

635 "localities": [location], 

636 "provinces": [state], 

637 "organizationalUnits": [organization_unit] 

638 } 

639 return subject 

640 

641 

642def _create_keystores_object(type, password_secret): 

643 """ 

644 Create keystore property (JKS and PKC12 certificate) for custom resource 

645 """ 

646 return {type: { 

647 "create": True, 

648 "passwordSecretRef": { 

649 "name": password_secret, 

650 "key": "password" 

651 }}} 

652 

653 

654def _get_keystores_object_type(output_type): 

655 """ 

656 Map config type to custom resource cert type 

657 """ 

658 return { 

659 'p12': 'pkcs12', 

660 'jks': 'jks', 

661 }[output_type] 

662 

663 

664def _create_projected_volume_with_password(cert_type, cert_secret_name, password_secret_name, password_secret_key): 

665 """ 

666 Create volume for password protected certificates. 

667 Secret contains passwords must be provided 

668 """ 

669 extension = _get_file_extension(cert_type) 

670 keystore_file_name = "keystore." + extension 

671 truststore_file_name = "truststore." + extension 

672 items = [client.V1KeyToPath(key=keystore_file_name, path=keystore_file_name), 

673 client.V1KeyToPath(key=truststore_file_name, path=truststore_file_name)] 

674 passwords = [client.V1KeyToPath(key=password_secret_key, path="keystore.pass"), client.V1KeyToPath(key=password_secret_key, path="truststore.pass")] 

675 

676 sec_projection = client.V1SecretProjection(name=cert_secret_name, items=items) 

677 sec_passwords_projection = client.V1SecretProjection(name=password_secret_name, items=passwords) 

678 sec_volume_projection = client.V1VolumeProjection(secret=sec_projection) 

679 sec_passwords_volume_projection = client.V1VolumeProjection(secret=sec_passwords_projection) 

680 

681 return [sec_volume_projection, sec_passwords_volume_projection] 

682 

683 

684def _create_pem_projected_volume(cert_secret_name): 

685 """ 

686 Create volume for pem certificate 

687 """ 

688 items = [client.V1KeyToPath(key="tls.crt", path="keystore.pem"), 

689 client.V1KeyToPath(key="ca.crt", path="truststore.pem"), 

690 client.V1KeyToPath(key="tls.key", path="key.pem")] 

691 sec_projection = client.V1SecretProjection(name=cert_secret_name, items=items) 

692 return [client.V1VolumeProjection(secret=sec_projection)] 

693 

694 

695def create_certificate_object(ctx, cert_secret_name, external_cert_data, external_tls_config, cert_name, issuer): 

696 """ 

697 Create cert-manager certificate custom resource object 

698 """ 

699 common_name = external_cert_data.get("external_certificate_parameters").get("common_name") 

700 subject = _create_certificate_subject(external_tls_config) 

701 

702 custom_resource = { 

703 "apiVersion": "cert-manager.io/v1", 

704 "kind": "Certificate", 

705 "metadata": {"name": cert_name }, 

706 "spec": { 

707 "secretName": cert_secret_name, 

708 "commonName": common_name, 

709 "issuerRef": { 

710 "group": "certmanager.onap.org", 

711 "kind": "CMPv2Issuer", 

712 "name": issuer 

713 } 

714 } 

715 } 

716 custom_resource.get("spec")["subject"] = subject 

717 

718 raw_sans = external_cert_data.get("external_certificate_parameters").get("sans") 

719 ctx.logger.info("Read SANS property: " + str(raw_sans)) 

720 sans = SansParser().parse_sans(raw_sans) 

721 ctx.logger.info("Parsed SANS: " + str(sans)) 

722 

723 if len(sans["ips"]) > 0: 

724 custom_resource.get("spec")["ipAddresses"] = sans["ips"] 

725 if len(sans["dnss"]) > 0: 

726 custom_resource.get("spec")["dnsNames"] = sans["dnss"] 

727 if len(sans["emails"]) > 0: 

728 custom_resource.get("spec")["emailAddresses"] = sans["emails"] 

729 if len(sans["uris"]) > 0: 

730 custom_resource.get("spec")["uris"] = sans["uris"] 

731 

732 return custom_resource 

733 

734 

735def _create_certificate_custom_resource(ctx, external_cert_data, external_tls_config, issuer, namespace, component_name, volumes, volume_mounts, deployment_description): 

736 """ 

737 Create certificate custom resource for provided configuration 

738 :param ctx: context 

739 :param external_cert_data: object contains certificate common name and 

740 SANs list 

741 :param external_tls_config: object contains information about certificate subject 

742 :param issuer: issuer-name 

743 :param namespace: namespace 

744 :param component_name: component name 

745 :param volumes: list of deployment volume 

746 :param volume_mounts: list of deployment volume mounts 

747 :param deployment_description: list contains deployment information, 

748 method appends created cert and secrets 

749 """ 

750 ctx.logger.info("Creating certificate custom resource") 

751 ctx.logger.info("External cert data: " + str(external_cert_data)) 

752 

753 cert_type = (external_cert_data.get("cert_type") or DEFAULT_CERT_TYPE).lower() 

754 

755 api = client.CustomObjectsApi() 

756 cert_secret_name = component_name + "-secret" 

757 cert_name = component_name + "-cert" 

758 cert_dir = external_cert_data.get("external_cert_directory") + "external/" 

759 custom_resource = create_certificate_object(ctx, cert_secret_name, 

760 external_cert_data, 

761 external_tls_config, 

762 cert_name, issuer) 

763 # Create the volumes 

764 if cert_type != 'pem': 

765 ctx.logger.info("Creating volume with passwords") 

766 password_secret_name, password_secret_key = create_secret_with_password(namespace, component_name + "-cert-password", "password", 30) 

767 deployment_description["secrets"].append(password_secret_name) 

768 custom_resource.get("spec")["keystores"] = _create_keystores_object(_get_keystores_object_type(cert_type), password_secret_name) 

769 projected_volume_sources = _create_projected_volume_with_password( 

770 cert_type, cert_secret_name, password_secret_name, password_secret_key) 

771 else: 

772 ctx.logger.info("Creating PEM volume") 

773 projected_volume_sources = _create_pem_projected_volume(cert_secret_name) 

774 

775 # Create the volume mounts 

776 projected_volume = client.V1ProjectedVolumeSource(sources=projected_volume_sources) 

777 volumes.append(client.V1Volume(name="certmanager-certs-volume", projected=projected_volume)) 

778 volume_mounts.append(client.V1VolumeMount(name="certmanager-certs-volume", mount_path=cert_dir)) 

779 

780 #Create certificate custom resource 

781 ctx.logger.info("Certificate CRD: " + str(custom_resource)) 

782 api.create_namespaced_custom_object( 

783 group="cert-manager.io", 

784 version="v1", 

785 namespace=namespace, 

786 plural="certificates", 

787 body=custom_resource 

788 ) 

789 deployment_description["certificates"].append(cert_name) 

790 deployment_description["secrets"].append(cert_secret_name) 

791 ctx.logger.info("CRD certificate created") 

792 

793 

794def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs): 

795 """ 

796 This will create a k8s Deployment and, if needed, one or two k8s Services. 

797 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

798 We're not exposing k8s to the component developer and the blueprint author. 

799 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

800 the details from the component developer and the blueprint author.) 

801 

802 namespace: the Kubernetes namespace into which the component is deployed 

803 component_name: the component name, used to derive names of Kubernetes entities 

804 image: the docker image for the component being deployed 

805 replica: the number of instances of the component to be deployed 

806 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of 

807 the Docker image for the component, even if it is already present on the Kubernetes node. 

808 k8sconfig contains: 

809 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images. 

810 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.) 

811 - filebeat: a dictionary of filebeat sidecar parameters: 

812 "log_path" : mount point for log volume in filebeat container 

813 "data_path" : mount point for data volume in filebeat container 

814 "config_path" : mount point for config volume in filebeat container 

815 "config_subpath" : subpath for config data in filebeat container 

816 "config_map" : ConfigMap holding the filebeat configuration 

817 "image": Docker image to use for filebeat 

818 - tls: a dictionary of TLS-related information: 

819 "cert_path": mount point for certificate volume in init container 

820 "image": Docker image to use for TLS init container 

821 "component_cert_dir" : default mount point for certs 

822 - cert_post_processor: a dictionary of cert_post_processor information: 

823 "image_tag": docker image to use for cert-post-processor init container 

824 kwargs may have: 

825 - volumes: array of volume objects, where a volume object is: 

826 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

827 - ports: array of strings in the form "container_port:host_port" 

828 - env: map of name-value pairs ( {name0: value0, name1: value1...} 

829 - log_info: an object with info for setting up ELK logging, with the form: 

830 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"} 

831 - tls_info: an object with info for setting up TLS (HTTPS), with the form: 

832 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" } 

833 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

834 {"external_cert": 

835 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

836 "use_external_tls": true or false, 

837 "ca_name": "ca-name-value", 

838 "cert_type": "P12" or "JKS" or "PEM", 

839 "external_certificate_parameters": 

840 "common_name": "common-name-value", 

841 "sans": "sans-value"} 

842 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"} 

843 These label will be set on all the pods deployed as a result of this deploy() invocation. 

844 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing: 

845 - cpu: number CPU usage, like 0.5 

846 - memory: string memory requirement, like "2Gi" 

847 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes: 

848 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

849 - interval: period (in seconds) between probes 

850 - timeout: time (in seconds) to allow a probe to complete 

851 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types 

852 - path: the full path to the script to be executed in the container for "script" and "docker" types 

853 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes: 

854 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

855 - interval: period (in seconds) between probes 

856 - timeout: time (in seconds) to allow a probe to complete 

857 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types 

858 - path: the full path to the script to be executed in the container for "script" and "docker" types 

859 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

860 

861 """ 

862 

863 deployment_ok = False 

864 cip_service_created = False 

865 deployment_description = { 

866 "namespace": namespace, 

867 "location": kwargs.get("k8s_location"), 

868 "deployment": '', 

869 "services": [], 

870 "certificates": [], 

871 "secrets": [] 

872 } 

873 

874 try: 

875 

876 # Get API handles 

877 _configure_api(kwargs.get("k8s_location")) 

878 core = client.CoreV1Api() 

879 k8s_apps_v1_api_client = client.AppsV1Api() 

880 

881 # Parse the port mapping 

882 container_ports, port_map = parse_ports(kwargs.get("ports", [])) 

883 

884 # Parse the volumes list into volumes and volume_mounts for the deployment 

885 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", [])) 

886 

887 # Initialize the list of containers that will be part of the pod 

888 containers = [] 

889 init_containers = [] 

890 

891 # Set up the ELK logging sidecar container, if needed 

892 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), 

893 k8sconfig.get("filebeat")) 

894 

895 # Set up TLS information 

896 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, 

897 k8sconfig.get("tls")) 

898 

899 # Set up external TLS information 

900 external_cert = kwargs.get("external_cert") 

901 cmpv2_issuer_config = k8sconfig.get("cmpv2_issuer") 

902 ctx.logger.info("CMPv2 Issuer properties: " + str(cmpv2_issuer_config)) 

903 

904 cmpv2_integration_enabled = bool(util.strtobool(cmpv2_issuer_config.get("enabled"))) 

905 ctx.logger.info("CMPv2 integration enabled: " + str(cmpv2_integration_enabled)) 

906 

907 

908 if external_cert and external_cert.get("use_external_tls"): 

909 if cmpv2_integration_enabled: 909 ↛ 910line 909 didn't jump to line 910, because the condition on line 909 was never true

910 _create_certificate_custom_resource(ctx, external_cert, 

911 k8sconfig.get("external_cert"), 

912 cmpv2_issuer_config.get("name"), 

913 namespace, 

914 component_name, volumes, 

915 volume_mounts, deployment_description) 

916 else: 

917 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, 

918 k8sconfig.get("external_cert")) 

919 _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, 

920 k8sconfig.get("tls"), external_cert, 

921 k8sconfig.get( 

922 "cert_post_processor"),cmpv2_integration_enabled) 

923 

924 # Create the container for the component 

925 # Make it the first container in the pod 

926 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")} 

927 container_args['container_ports'] = container_ports 

928 container_args['volume_mounts'] = volume_mounts 

929 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args)) 

930 

931 # Build the k8s Deployment object 

932 labels = kwargs.get("labels", {}) 

933 labels["app"] = component_name 

934 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, 

935 pull_secrets=k8sconfig["image_pull_secrets"]) 

936 

937 # Have k8s deploy it 

938 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep) 

939 deployment_ok = True 

940 deployment_description["deployment"] = _create_deployment_name(component_name) 

941 

942 # Create service(s), if a port mapping is specified 

943 if port_map: 943 ↛ 978line 943 didn't jump to line 978, because the condition on line 943 was never false

944 service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map) 

945 

946 # Create a ClusterIP service for access via the k8s network 

947 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, 

948 labels, "ClusterIP", "IPv4") 

949 core.create_namespaced_service(namespace, service) 

950 cip_service_created = True 

951 deployment_description["services"].append(_create_service_name(component_name)) 

952 

953 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service 

954 if exposed_ports: 954 ↛ 955line 954 didn't jump to line 955

955 exposed_service = \ 

956 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, 

957 '', labels, "NodePort", "IPv4") 

958 core.create_namespaced_service(namespace, exposed_service) 

959 deployment_description["services"].append(_create_exposed_service_name(component_name)) 

960 

961 if exposed_ports_ipv6: 961 ↛ 962line 961 didn't jump to line 962

962 exposed_service_ipv6 = \ 

963 _create_service_object(_create_exposed_v6_service_name(component_name), component_name, 

964 exposed_ports_ipv6, '', labels, "NodePort", "IPv6") 

965 core.create_namespaced_service(namespace, exposed_service_ipv6) 

966 deployment_description["services"].append(_create_exposed_v6_service_name(component_name)) 

967 

968 except Exception as e: 

969 # If the ClusterIP service was created, delete the service: 

970 if cip_service_created: 

971 core.delete_namespaced_service(_create_service_name(component_name), namespace) 

972 # If the deployment was created but not the service, delete the deployment 

973 if deployment_ok: 

974 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, 

975 body=client.V1DeleteOptions()) 

976 raise e 

977 

978 return dep, deployment_description 

979 

980 

981def undeploy(deployment_description): 

982 _configure_api(deployment_description["location"]) 

983 

984 namespace = deployment_description["namespace"] 

985 

986 # remove any services associated with the component 

987 for service in deployment_description["services"]: 

988 client.CoreV1Api().delete_namespaced_service(service, namespace) 

989 

990 for secret in deployment_description["secrets"]: 

991 client.CoreV1Api().delete_namespaced_secret(secret, namespace) 

992 

993 for cert in deployment_description["certificates"]: 

994 # client.CoreV1Api().delete_namespaced_service(service, namespace) 

995 client.CustomObjectsApi().delete_namespaced_custom_object( 

996 group="cert-manager.io", 

997 version="v1", 

998 name=cert, 

999 namespace=namespace, 

1000 plural="certificates" 

1001 ) 

1002 # Have k8s delete the underlying pods and replicaset when deleting the deployment. 

1003 options = client.V1DeleteOptions(propagation_policy="Foreground") 

1004 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) 

1005 

1006 

1007def is_available(location, namespace, component_name): 

1008 _configure_api(location) 

1009 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), 

1010 namespace) 

1011 # Check if the number of available replicas is equal to the number requested and that the replicas match the 

1012 # current spec This check can be used to verify completion of an initial deployment, a scale operation, 

1013 # or an update operation 

1014 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas 

1015 

1016 

1017def scale(deployment_description, replicas): 

1018 """ Trigger a scaling operation by updating the replica count for the Deployment """ 

1019 

1020 def update_replica_count(spec): 

1021 spec.spec.replicas = replicas 

1022 return spec 

1023 

1024 _patch_deployment(deployment_description["location"], deployment_description["namespace"], 

1025 deployment_description["deployment"], update_replica_count) 

1026 

1027 

1028def upgrade(deployment_description, image, container_index=0): 

1029 """ Trigger a rolling upgrade by sending a new image name/tag to k8s """ 

1030 

1031 def update_image(spec): 

1032 spec.spec.template.spec.containers[container_index].image = image 

1033 return spec 

1034 

1035 _patch_deployment(deployment_description["location"], deployment_description["namespace"], 

1036 deployment_description["deployment"], update_image) 

1037 

1038 

1039def rollback(deployment_description, rollback_to=0): 

1040 """ 

1041 Undo upgrade by rolling back to a previous revision of the deployment. 

1042 By default, go back one revision. 

1043 rollback_to can be used to supply a specific revision number. 

1044 Returns the image for the app container and the replica count from the rolled-back deployment 

1045 """ 

1046 ''' 

1047 2018-07-13 

1048 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method. 

1049 The k8s python client code throws an exception while processing the response from the API. 

1050 See: 

1051 - https://github.com/kubernetes-client/python/issues/491 

1052 - https://github.com/kubernetes/kubernetes/pull/63837 

1053 The fix has been merged into the master branch but is not in the latest release. 

1054 ''' 

1055 _configure_api(deployment_description["location"]) 

1056 deployment = deployment_description["deployment"] 

1057 namespace = deployment_description["namespace"] 

1058 

1059 # Initiate the rollback 

1060 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback( 

1061 deployment, 

1062 namespace, 

1063 client.AppsV1beta1DeploymentRollback(name=deployment, 

1064 rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to))) 

1065 

1066 # Read back the spec for the rolled-back deployment 

1067 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

1068 return spec.spec.template.spec.containers[0].image, spec.spec.replicas 

1069 

1070 

1071def execute_command_in_deployment(deployment_description, command): 

1072 """ 

1073 Enumerates the pods in the k8s deployment identified by "deployment_description", 

1074 then executes the command (represented as an argv-style list) in "command" in 

1075 container 0 (the main application container) each of those pods. 

1076 

1077 Note that the sets of pods associated with a deployment can change over time. The 

1078 enumeration is a snapshot at one point in time. The command will not be executed in 

1079 pods that are created after the initial enumeration. If a pod disappears after the 

1080 initial enumeration and before the command is executed, the attempt to execute the 

1081 command will fail. This is not treated as a fatal error. 

1082 

1083 This approach is reasonable for the one current use case for "execute_command": running a 

1084 script to notify a container that its configuration has changed as a result of a 

1085 policy change. In this use case, the new configuration information is stored into 

1086 the configuration store (Consul), the pods are enumerated, and the command is executed. 

1087 If a pod disappears after the enumeration, the fact that the command cannot be run 

1088 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that 

1089 comes up after the enumeration will get its initial configuration from the updated version 

1090 in Consul. 

1091 

1092 The optimal solution here would be for k8s to provide an API call to execute a command in 

1093 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the 

1094 only call provided by k8s operates at the pod level, not the deployment level. 

1095 

1096 Another interesting k8s factoid: there's no direct way to list the pods belong to a 

1097 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on 

1098 the pod that has the k8s deployment name. To list the pods, the code below queries for 

1099 pods with the label carrying the deployment name. 

1100 """ 

1101 location = deployment_description["location"] 

1102 _configure_api(location) 

1103 deployment = deployment_description["deployment"] 

1104 namespace = deployment_description["namespace"] 

1105 

1106 # Get names of all the running pods belonging to the deployment 

1107 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod( 

1108 namespace=namespace, 

1109 label_selector="k8sdeployment={0}".format(deployment), 

1110 field_selector="status.phase=Running" 

1111 ).items] 

1112 

1113 # Execute command in the running pods 

1114 return [_execute_command_in_pod(location, namespace, pod_name, command) 

1115 for pod_name in pod_names] 

1116 

1117 

1118 

1119