Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020-2021 Nokia. All rights reserved. 

7# Copyright (c) 2020 J. F. Lucas. All rights reserved. 

8# ================================================================================ 

9# Licensed under the Apache License, Version 2.0 (the "License"); 

10# you may not use this file except in compliance with the License. 

11# You may obtain a copy of the License at 

12# 

13# http://www.apache.org/licenses/LICENSE-2.0 

14# 

15# Unless required by applicable law or agreed to in writing, software 

16# distributed under the License is distributed on an "AS IS" BASIS, 

17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

18# See the License for the specific language governing permissions and 

19# limitations under the License. 

20# ============LICENSE_END========================================================= 

21# 

22import os 

23import re 

24import uuid 

25import base64 

26 

27from binascii import hexlify 

28from kubernetes import config, client, stream 

29 

30# Default values for readiness probe 

31PROBE_DEFAULT_PERIOD = 15 

32PROBE_DEFAULT_TIMEOUT = 1 

33 

34# Location of k8s cluster config file ("kubeconfig") 

35K8S_CONFIG_PATH = "/opt/onap/kube/kubeconfig" 

36 

37# Regular expression for interval/timeout specification 

38INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") 

39# Conversion factors to seconds 

40FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} 

41 

42# Regular expression for port mapping 

43# group 1: container port 

44# group 2: / + protocol 

45# group 3: protocol 

46# group 4: host port 

47PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") 

48 

49# Constants for external_cert 

50MOUNT_PATH = "/etc/onap/oom/certservice/certs/" 

51KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks" 

52TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks" 

53DEFAULT_CERT_TYPE = "p12" 

54 

55 

56def _create_deployment_name(component_name): 

57 return "dep-{0}".format(component_name)[:63] 

58 

59 

60def _create_service_name(component_name): 

61 return "{0}".format(component_name)[:63] 

62 

63 

64def _create_exposed_service_name(component_name): 

65 return ("x{0}".format(component_name))[:63] 

66 

67 

68def _create_exposed_v6_service_name(component_name): 

69 return ("x{0}-ipv6".format(component_name))[:63] 

70 

71 

72def _configure_api(location=None): 

73 # Look for a kubernetes config file 

74 if os.path.exists(K8S_CONFIG_PATH): 

75 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) 

76 else: 

77 # Maybe we're running in a k8s container and we can use info provided by k8s 

78 # We would like to use: 

79 # config.load_incluster_config() 

80 # but this looks into os.environ for kubernetes host and port, and from 

81 # the plugin those aren't visible. So we use the InClusterConfigLoader class, 

82 # where we can set the environment to what we like. 

83 # This is probably brittle! Maybe there's a better alternative. 

84 localenv = { 

85 config.incluster_config.SERVICE_HOST_ENV_NAME: "kubernetes.default.svc.cluster.local", 

86 config.incluster_config.SERVICE_PORT_ENV_NAME: "443" 

87 } 

88 config.incluster_config.InClusterConfigLoader( 

89 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME, 

90 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME, 

91 environ=localenv 

92 ).load_and_set() 

93 

94 

95def _parse_interval(t): 

96 """ 

97 Parse an interval specification 

98 t can be 

99 - a simple integer quantity, interpreted as seconds 

100 - a string representation of a decimal integer, interpreted as seconds 

101 - a string consisting of a represention of an decimal integer followed by a unit, 

102 with "s" representing seconds, "m" representing minutes, 

103 and "h" representing hours 

104 Used for compatibility with the Docker plugin, where time intervals 

105 for health checks were specified as strings with a number and a unit. 

106 See 'intervalspec' above for the regular expression that's accepted. 

107 """ 

108 m = INTERVAL_SPEC.match(str(t)) 

109 if m: 

110 time = int(m.group(1)) * FACTORS[m.group(2)] 

111 else: 

112 raise ValueError("Bad interval specification: {0}".format(t)) 

113 return time 

114 

115 

116def _create_probe(hc, port): 

117 """ Create a Kubernetes probe based on info in the health check dictionary hc """ 

118 probe_type = hc['type'] 

119 probe = None 

120 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD)) 

121 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT)) 

122 if probe_type in ['http', 'https']: 

123 probe = client.V1Probe( 

124 failure_threshold=1, 

125 initial_delay_seconds=5, 

126 period_seconds=period, 

127 timeout_seconds=timeout, 

128 http_get=client.V1HTTPGetAction( 

129 path=hc['endpoint'], 

130 port=port, 

131 scheme=probe_type.upper() 

132 ) 

133 ) 

134 elif probe_type in ['script', 'docker']: 134 ↛ 144line 134 didn't jump to line 144, because the condition on line 134 was never false

135 probe = client.V1Probe( 

136 failure_threshold=1, 

137 initial_delay_seconds=5, 

138 period_seconds=period, 

139 timeout_seconds=timeout, 

140 _exec=client.V1ExecAction( 

141 command=hc['script'].split() 

142 ) 

143 ) 

144 return probe 

145 

146 

147def _create_resources(resources=None): 

148 if resources is not None: 148 ↛ 155line 148 didn't jump to line 155, because the condition on line 148 was never false

149 resources_obj = client.V1ResourceRequirements( 

150 limits=resources.get("limits"), 

151 requests=resources.get("requests") 

152 ) 

153 return resources_obj 

154 else: 

155 return None 

156 

157 

158def _create_container_object(name, image, always_pull, **kwargs): 

159 # Set up environment variables 

160 # Copy any passed in environment variables 

161 env = kwargs.get('env') or {} 

162 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env] 

163 # Add POD_IP with the IP address of the pod running the container 

164 pod_ip = client.V1EnvVarSource(field_ref=client.V1ObjectFieldSelector(field_path="status.podIP")) 

165 env_vars.append(client.V1EnvVar(name="POD_IP", value_from=pod_ip)) 

166 

167 # If a health check is specified, create a readiness/liveness probe 

168 # (For an HTTP-based check, we assume it's at the first container port) 

169 readiness = kwargs.get('readiness') 

170 liveness = kwargs.get('liveness') 

171 resources = kwargs.get('resources') 

172 container_ports = kwargs.get('container_ports') or [] 

173 

174 hc_port = container_ports[0][0] if container_ports else None 

175 probe = _create_probe(readiness, hc_port) if readiness else None 

176 live_probe = _create_probe(liveness, hc_port) if liveness else None 

177 resources_obj = _create_resources(resources) if resources else None 

178 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto) 

179 for port, proto in container_ports] 

180 

181 # Define container for pod 

182 return client.V1Container( 

183 name=name, 

184 image=image, 

185 image_pull_policy='Always' if always_pull else 'IfNotPresent', 

186 env=env_vars, 

187 ports=port_objs, 

188 volume_mounts=kwargs.get('volume_mounts') or [], 

189 resources=resources_obj, 

190 readiness_probe=probe, 

191 liveness_probe=live_probe 

192 ) 

193 

194 

195def _create_deployment_object(component_name, 

196 containers, 

197 init_containers, 

198 replicas, 

199 volumes, 

200 labels=None, 

201 pull_secrets=None): 

202 if labels is None: 202 ↛ 203line 202 didn't jump to line 203, because the condition on line 202 was never true

203 labels = {} 

204 if pull_secrets is None: 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true

205 pull_secrets = [] 

206 deployment_name = _create_deployment_name(component_name) 

207 

208 # Label the pod with the deployment name, so we can find it easily 

209 labels.update({"k8sdeployment": deployment_name}) 

210 

211 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials 

212 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod 

213 ips = [] 

214 for secret in pull_secrets: 

215 ips.append(client.V1LocalObjectReference(name=secret)) 

216 

217 # Define pod template 

218 template = client.V1PodTemplateSpec( 

219 metadata=client.V1ObjectMeta(labels=labels), 

220 spec=client.V1PodSpec(hostname=component_name, 

221 containers=containers, 

222 init_containers=init_containers, 

223 volumes=volumes, 

224 image_pull_secrets=ips) 

225 ) 

226 

227 # Define deployment spec 

228 spec = client.V1DeploymentSpec( 

229 replicas=replicas, 

230 selector=client.V1LabelSelector(match_labels=labels), 

231 template=template 

232 ) 

233 

234 # Create deployment object 

235 deployment = client.V1Deployment( 

236 api_version="apps/v1", 

237 kind="Deployment", 

238 metadata=client.V1ObjectMeta(name=deployment_name, labels=labels), 

239 spec=spec 

240 ) 

241 

242 return deployment 

243 

244 

245def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family): 

246 service_spec = client.V1ServiceSpec( 

247 ports=service_ports, 

248 selector={"app": component_name}, 

249 type=service_type, 

250 ip_family=ip_family 

251 ) 

252 if annotations: 252 ↛ 253line 252 didn't jump to line 253, because the condition on line 252 was never true

253 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations) 

254 else: 

255 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels) 

256 

257 service = client.V1Service( 

258 kind="Service", 

259 api_version="v1", 

260 metadata=metadata, 

261 spec=service_spec 

262 ) 

263 return service 

264 

265 

266def create_secret_with_password(namespace, secret_prefix, password_length): 

267 """ 

268 Creates K8s secret object with a generated password. 

269 Returns: secret name and data key. 

270 

271 Example usage: 

272 create_secret_with_password('onap', 'dcae-keystore-password-', 128) 

273 """ 

274 password = _generate_password(password_length) 

275 password_base64 = _encode_base64(password) 

276 

277 metadata = {'generateName': secret_prefix, 'namespace': namespace} 

278 key = 'data' 

279 data = {key: password_base64} 

280 

281 response = _create_k8s_secret(namespace, metadata, data, 'Opaque') 

282 secret_name = response.metadata.name 

283 return secret_name, key 

284 

285 

286def _generate_password(length): 

287 rand = os.urandom(length) 

288 password = hexlify(rand) 

289 return password.decode("ascii"); 

290 

291 

292def _encode_base64(value): 

293 value_bytes = value.encode("ascii") 

294 base64_encoded_bytes = base64.b64encode(value_bytes) 

295 encoded_value = base64_encoded_bytes.decode("ascii") 

296 return encoded_value 

297 

298 

299def _create_k8s_secret(namespace, metadata, data, secret_type): 

300 api_version = 'v1' 

301 kind = 'Secret' 

302 body = client.V1Secret(api_version, data, kind, metadata, type=secret_type) 

303 

304 response = client.CoreV1Api().create_namespaced_secret(namespace, body) 

305 return response 

306 

307 

308def parse_ports(port_list): 

309 """ 

310 Parse the port list into a list of container ports (needed to create the container) 

311 and to a set of port mappings to set up k8s services. 

312 """ 

313 container_ports = [] 

314 port_map = {} 

315 for p in port_list: 

316 ipv6 = False 

317 if type(p) is dict: 

318 ipv6 = "ipv6" in p and p['ipv6'] 

319 p = "".join(str(v) for v in p['concat']) 

320 m = PORTS.match(p.strip()) 

321 if m: 

322 cport = int(m.group(1)) 

323 hport = int(m.group(4)) 

324 if m.group(3): 

325 proto = (m.group(3)).upper() 

326 else: 

327 proto = "TCP" 

328 port = (cport, proto) 

329 if port not in container_ports: 329 ↛ 331line 329 didn't jump to line 331, because the condition on line 329 was never false

330 container_ports.append(port) 

331 port_map[(cport, proto, ipv6)] = hport 

332 else: 

333 raise ValueError("Bad port specification: {0}".format(p)) 

334 

335 return container_ports, port_map 

336 

337 

338def _parse_volumes(volume_list): 

339 volumes = [] 

340 volume_mounts = [] 

341 for v in volume_list: 

342 vname = str(uuid.uuid4()) 

343 vcontainer = v['container']['bind'] 

344 vro = (v['container'].get('mode') == 'ro') 

345 if ('host' in v) and ('path' in v['host']): 

346 vhost = v['host']['path'] 

347 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost))) 

348 if ('config_volume' in v) and ('name' in v['config_volume']): 

349 vconfig_volume = v['config_volume']['name'] 

350 volumes.append(client.V1Volume(name=vname, config_map=client.V1ConfigMapVolumeSource(default_mode=0o0644, 

351 name=vconfig_volume, 

352 optional=True))) 

353 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro)) 

354 

355 return volumes, volume_mounts 

356 

357 

358def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat): 

359 if not log_info or not filebeat: 359 ↛ 360line 359 didn't jump to line 360, because the condition on line 359 was never true

360 return 

361 log_dir = log_info.get("log_directory") 

362 if not log_dir: 362 ↛ 363line 362 didn't jump to line 363, because the condition on line 362 was never true

363 return 

364 sidecar_volume_mounts = [] 

365 

366 # Create the volume for component log files and volume mounts for the component and sidecar containers 

367 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource())) 

368 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir)) 

369 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name) 

370 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path)) 

371 

372 # Create the volume for sidecar data and the volume mount for it 

373 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource())) 

374 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"])) 

375 

376 # Create the volume for the sidecar configuration data and the volume mount for it 

377 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed. 

378 volumes.append( 

379 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"]))) 

380 sidecar_volume_mounts.append( 

381 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], 

382 sub_path=filebeat["config_subpath"])) 

383 

384 # Finally create the container for the sidecar 

385 containers.append( 

386 _create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts)) 

387 

388 

389def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config): 

390 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a server( 

391 # tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate 

392 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not 

393 # specified), the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats. 

394 # In either case, the certificate directory is mounted onto the component container filesystem at the location 

395 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point ( 

396 # tls_config["component_cert_dir"]). 

397 docker_image = tls_config["image"] 

398 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]") 

399 

400 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

401 env = {} 

402 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false" 

403 

404 # Create the certificate volume and volume mounts 

405 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource())) 

406 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory)) 

407 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])] 

408 

409 # Create the init container 

410 init_containers.append( 

411 _create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

412 

413 

414def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config): 

415 # Adds an InitContainer to the pod which will generate external TLS certificates. 

416 docker_image = external_tls_config["image_tag"] 

417 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]") 

418 

419 env = {} 

420 output_path = external_cert.get("external_cert_directory") 

421 if not output_path.endswith('/'): 421 ↛ 422line 421 didn't jump to line 422, because the condition on line 421 was never true

422 output_path += '/' 

423 

424 env["REQUEST_URL"] = external_tls_config.get("request_url") 

425 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout") 

426 env["OUTPUT_PATH"] = output_path + "external" 

427 env["OUTPUT_TYPE"] = external_cert.get("cert_type") 

428 env["CA_NAME"] = external_cert.get("ca_name") 

429 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name") 

430 env["ORGANIZATION"] = external_tls_config.get("organization") 

431 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit") 

432 env["LOCATION"] = external_tls_config.get("location") 

433 env["STATE"] = external_tls_config.get("state") 

434 env["COUNTRY"] = external_tls_config.get("country") 

435 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans") 

436 env["KEYSTORE_PATH"] = KEYSTORE_PATH 

437 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password") 

438 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH 

439 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password") 

440 

441 # Create the volumes and volume mounts 

442 sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name")) 

443 volumes.append(client.V1Volume(name="tls-volume", secret=sec)) 

444 init_volume_mounts = [ 

445 client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")), 

446 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)] 

447 

448 # Create the init container 

449 init_containers.append( 

450 _create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

451 

452 

453def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, 

454 cert_post_processor_config): 

455 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file. 

456 docker_image = cert_post_processor_config["image_tag"] 

457 ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]") 

458 

459 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

460 if not tls_cert_dir.endswith('/'): 460 ↛ 463line 460 didn't jump to line 463, because the condition on line 460 was never false

461 tls_cert_dir += '/' 

462 

463 tls_cert_file_path = tls_cert_dir + "trust.jks" 

464 tls_cert_file_pass = tls_cert_dir + "trust.pass" 

465 

466 ext_cert_dir = tls_cert_dir + "external/" 

467 

468 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower() 

469 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type) 

470 ext_truststore_pass = '' 

471 if output_type != 'pem': 471 ↛ 474line 471 didn't jump to line 474, because the condition on line 471 was never false

472 ext_truststore_pass = ext_cert_dir + "truststore.pass" 

473 

474 env = {"TRUSTSTORES_PATHS": tls_cert_file_path + ":" + ext_truststore_path, 

475 "TRUSTSTORES_PASSWORDS_PATHS": tls_cert_file_pass + ":" + ext_truststore_pass, 

476 "KEYSTORE_SOURCE_PATHS": _get_keystore_source_paths(output_type, ext_cert_dir), 

477 "KEYSTORE_DESTINATION_PATHS": _get_keystore_destination_paths(output_type, tls_cert_dir)} 

478 

479 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"]) 

480 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"]) 

481 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"]) 

482 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"]) 

483 

484 # Create the volumes and volume mounts 

485 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)] 

486 

487 # Create the init container 

488 init_containers.append( 

489 _create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

490 

491 

492def _get_file_extension(output_type): 

493 return { 

494 'p12': 'p12', 

495 'pem': 'pem', 

496 'jks': 'jks', 

497 }[output_type] 

498 

499 

500def _get_keystore_source_paths(output_type, ext_cert_dir): 

501 source_paths_template = { 

502 'p12': "{0}keystore.p12:{0}keystore.pass", 

503 'jks': "{0}keystore.jks:{0}keystore.pass", 

504 'pem': "{0}keystore.pem:{0}key.pem", 

505 }[output_type] 

506 return source_paths_template.format(ext_cert_dir) 

507 

508 

509def _get_keystore_destination_paths(output_type, tls_cert_dir): 

510 destination_paths_template = { 

511 'p12': "{0}cert.p12:{0}p12.pass", 

512 'jks': "{0}cert.jks:{0}jks.pass", 

513 'pem': "{0}cert.pem:{0}key.pem", 

514 }[output_type] 

515 return destination_paths_template.format(tls_cert_dir) 

516 

517 

518def _process_port_map(port_map): 

519 service_ports = [] # Ports exposed internally on the k8s network 

520 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort 

521 exposed_ports_ipv6 = [] 

522 for (cport, proto, ipv6), hport in port_map.items(): 

523 name = "xport-{0}-{1}".format(proto[0].lower(), cport) 

524 cport = int(cport) 

525 hport = int(hport) 

526 port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:]) 

527 if port not in service_ports: 527 ↛ 529line 527 didn't jump to line 529, because the condition on line 527 was never false

528 service_ports.append(port) 

529 if hport != 0: 529 ↛ 530line 529 didn't jump to line 530, because the condition on line 529 was never true

530 if ipv6: 

531 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

532 else: 

533 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

534 return service_ports, exposed_ports, exposed_ports_ipv6 

535 

536 

537def _service_exists(location, namespace, component_name): 

538 exists = False 

539 try: 

540 _configure_api(location) 

541 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) 

542 exists = True 

543 except client.rest.ApiException: 

544 pass 

545 

546 return exists 

547 

548 

549def _patch_deployment(location, namespace, deployment, modify): 

550 ''' 

551 Gets the current spec for 'deployment' in 'namespace' 

552 in the k8s cluster at 'location', 

553 uses the 'modify' function to change the spec, 

554 then sends the updated spec to k8s. 

555 ''' 

556 _configure_api(location) 

557 

558 # Get deployment spec 

559 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

560 

561 # Apply changes to spec 

562 spec = modify(spec) 

563 

564 # Patch the deploy with updated spec 

565 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec) 

566 

567 

568def _execute_command_in_pod(location, namespace, pod_name, command): 

569 ''' 

570 Execute the command (specified by an argv-style list in the "command" parameter) in 

571 the specified pod in the specified namespace at the specified location. 

572 For now at least, we use this only to 

573 run a notification script in a pod after a configuration change. 

574 

575 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". 

576 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly. 

577 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API. 

578 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py. 

579 There are several issues tracking this, in various states. It isn't clear that there will ever 

580 be a fix. 

581 - https://github.com/kubernetes-client/python/issues/58 

582 - https://github.com/kubernetes-client/python/issues/409 

583 - https://github.com/kubernetes-client/python/issues/526 

584 

585 The main consequence of the workaround using "stream" is that the caller does not get an indication 

586 of the exit code returned by the command when it completes execution. It turns out that the 

587 original implementation of notification in the Docker plugin did not use this result, so we can 

588 still match the original notification functionality. 

589 

590 The "stream" approach returns a string containing any output sent by the command to stdout or stderr. 

591 We'll return that so it can logged. 

592 ''' 

593 _configure_api(location) 

594 try: 

595 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, 

596 name=pod_name, 

597 namespace=namespace, 

598 command=command, 

599 stdout=True, 

600 stderr=True, 

601 stdin=False, 

602 tty=False) 

603 except client.rest.ApiException as e: 

604 # If the exception indicates the pod wasn't found, it's not a fatal error. 

605 # It existed when we enumerated the pods for the deployment but no longer exists. 

606 # Unfortunately, the only way to distinguish a pod not found from any other error 

607 # is by looking at the reason text. 

608 # (The ApiException's "status" field should contain the HTTP status code, which would 

609 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set 

610 # to zero.) 

611 if "404 not found" in e.reason.lower(): 

612 output = "Pod not found" 

613 else: 

614 raise e 

615 

616 return {"pod": pod_name, "output": output} 

617 

618 

619def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs): 

620 """ 

621 This will create a k8s Deployment and, if needed, one or two k8s Services. 

622 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

623 We're not exposing k8s to the component developer and the blueprint author. 

624 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

625 the details from the component developer and the blueprint author.) 

626 

627 namespace: the Kubernetes namespace into which the component is deployed 

628 component_name: the component name, used to derive names of Kubernetes entities 

629 image: the docker image for the component being deployed 

630 replica: the number of instances of the component to be deployed 

631 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of 

632 the Docker image for the component, even if it is already present on the Kubernetes node. 

633 k8sconfig contains: 

634 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images. 

635 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.) 

636 - filebeat: a dictionary of filebeat sidecar parameters: 

637 "log_path" : mount point for log volume in filebeat container 

638 "data_path" : mount point for data volume in filebeat container 

639 "config_path" : mount point for config volume in filebeat container 

640 "config_subpath" : subpath for config data in filebeat container 

641 "config_map" : ConfigMap holding the filebeat configuration 

642 "image": Docker image to use for filebeat 

643 - tls: a dictionary of TLS-related information: 

644 "cert_path": mount point for certificate volume in init container 

645 "image": Docker image to use for TLS init container 

646 "component_cert_dir" : default mount point for certs 

647 - cert_post_processor: a dictionary of cert_post_processor information: 

648 "image_tag": docker image to use for cert-post-processor init container 

649 kwargs may have: 

650 - volumes: array of volume objects, where a volume object is: 

651 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

652 - ports: array of strings in the form "container_port:host_port" 

653 - env: map of name-value pairs ( {name0: value0, name1: value1...} 

654 - log_info: an object with info for setting up ELK logging, with the form: 

655 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"} 

656 - tls_info: an object with info for setting up TLS (HTTPS), with the form: 

657 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" } 

658 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

659 {"external_cert": 

660 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

661 "use_external_tls": true or false, 

662 "ca_name": "ca-name-value", 

663 "cert_type": "P12" or "JKS" or "PEM", 

664 "external_certificate_parameters": 

665 "common_name": "common-name-value", 

666 "sans": "sans-value"} 

667 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"} 

668 These label will be set on all the pods deployed as a result of this deploy() invocation. 

669 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing: 

670 - cpu: number CPU usage, like 0.5 

671 - memory: string memory requirement, like "2Gi" 

672 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes: 

673 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

674 - interval: period (in seconds) between probes 

675 - timeout: time (in seconds) to allow a probe to complete 

676 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types 

677 - path: the full path to the script to be executed in the container for "script" and "docker" types 

678 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes: 

679 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

680 - interval: period (in seconds) between probes 

681 - timeout: time (in seconds) to allow a probe to complete 

682 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types 

683 - path: the full path to the script to be executed in the container for "script" and "docker" types 

684 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

685 

686 """ 

687 

688 deployment_ok = False 

689 cip_service_created = False 

690 deployment_description = { 

691 "namespace": namespace, 

692 "location": kwargs.get("k8s_location"), 

693 "deployment": '', 

694 "services": [] 

695 } 

696 

697 try: 

698 

699 # Get API handles 

700 _configure_api(kwargs.get("k8s_location")) 

701 core = client.CoreV1Api() 

702 k8s_apps_v1_api_client = client.AppsV1Api() 

703 

704 # Parse the port mapping 

705 container_ports, port_map = parse_ports(kwargs.get("ports", [])) 

706 

707 # Parse the volumes list into volumes and volume_mounts for the deployment 

708 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", [])) 

709 

710 # Initialize the list of containers that will be part of the pod 

711 containers = [] 

712 init_containers = [] 

713 

714 # Set up the ELK logging sidecar container, if needed 

715 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), 

716 k8sconfig.get("filebeat")) 

717 

718 # Set up TLS information 

719 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, 

720 k8sconfig.get("tls")) 

721 

722 # Set up external TLS information 

723 external_cert = kwargs.get("external_cert") 

724 if external_cert and external_cert.get("use_external_tls"): 

725 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, 

726 k8sconfig.get("external_cert")) 

727 _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, 

728 k8sconfig.get("tls"), external_cert, 

729 k8sconfig.get("cert_post_processor")) 

730 

731 # Create the container for the component 

732 # Make it the first container in the pod 

733 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")} 

734 container_args['container_ports'] = container_ports 

735 container_args['volume_mounts'] = volume_mounts 

736 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args)) 

737 

738 # Build the k8s Deployment object 

739 labels = kwargs.get("labels", {}) 

740 labels["app"] = component_name 

741 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, 

742 pull_secrets=k8sconfig["image_pull_secrets"]) 

743 

744 # Have k8s deploy it 

745 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep) 

746 deployment_ok = True 

747 deployment_description["deployment"] = _create_deployment_name(component_name) 

748 

749 # Create service(s), if a port mapping is specified 

750 if port_map: 750 ↛ 785line 750 didn't jump to line 785, because the condition on line 750 was never false

751 service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map) 

752 

753 # Create a ClusterIP service for access via the k8s network 

754 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, 

755 labels, "ClusterIP", "IPv4") 

756 core.create_namespaced_service(namespace, service) 

757 cip_service_created = True 

758 deployment_description["services"].append(_create_service_name(component_name)) 

759 

760 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service 

761 if exposed_ports: 761 ↛ 762line 761 didn't jump to line 762

762 exposed_service = \ 

763 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, 

764 '', labels, "NodePort", "IPv4") 

765 core.create_namespaced_service(namespace, exposed_service) 

766 deployment_description["services"].append(_create_exposed_service_name(component_name)) 

767 

768 if exposed_ports_ipv6: 768 ↛ 769line 768 didn't jump to line 769

769 exposed_service_ipv6 = \ 

770 _create_service_object(_create_exposed_v6_service_name(component_name), component_name, 

771 exposed_ports_ipv6, '', labels, "NodePort", "IPv6") 

772 core.create_namespaced_service(namespace, exposed_service_ipv6) 

773 deployment_description["services"].append(_create_exposed_v6_service_name(component_name)) 

774 

775 except Exception as e: 

776 # If the ClusterIP service was created, delete the service: 

777 if cip_service_created: 

778 core.delete_namespaced_service(_create_service_name(component_name), namespace) 

779 # If the deployment was created but not the service, delete the deployment 

780 if deployment_ok: 

781 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, 

782 body=client.V1DeleteOptions()) 

783 raise e 

784 

785 return dep, deployment_description 

786 

787 

788def undeploy(deployment_description): 

789 _configure_api(deployment_description["location"]) 

790 

791 namespace = deployment_description["namespace"] 

792 

793 # remove any services associated with the component 

794 for service in deployment_description["services"]: 

795 client.CoreV1Api().delete_namespaced_service(service, namespace) 

796 

797 # Have k8s delete the underlying pods and replicaset when deleting the deployment. 

798 options = client.V1DeleteOptions(propagation_policy="Foreground") 

799 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) 

800 

801 

802def is_available(location, namespace, component_name): 

803 _configure_api(location) 

804 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), 

805 namespace) 

806 # Check if the number of available replicas is equal to the number requested and that the replicas match the 

807 # current spec This check can be used to verify completion of an initial deployment, a scale operation, 

808 # or an update operation 

809 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas 

810 

811 

812def scale(deployment_description, replicas): 

813 """ Trigger a scaling operation by updating the replica count for the Deployment """ 

814 

815 def update_replica_count(spec): 

816 spec.spec.replicas = replicas 

817 return spec 

818 

819 _patch_deployment(deployment_description["location"], deployment_description["namespace"], 

820 deployment_description["deployment"], update_replica_count) 

821 

822 

823def upgrade(deployment_description, image, container_index=0): 

824 """ Trigger a rolling upgrade by sending a new image name/tag to k8s """ 

825 

826 def update_image(spec): 

827 spec.spec.template.spec.containers[container_index].image = image 

828 return spec 

829 

830 _patch_deployment(deployment_description["location"], deployment_description["namespace"], 

831 deployment_description["deployment"], update_image) 

832 

833 

834def rollback(deployment_description, rollback_to=0): 

835 """ 

836 Undo upgrade by rolling back to a previous revision of the deployment. 

837 By default, go back one revision. 

838 rollback_to can be used to supply a specific revision number. 

839 Returns the image for the app container and the replica count from the rolled-back deployment 

840 """ 

841 ''' 

842 2018-07-13 

843 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method. 

844 The k8s python client code throws an exception while processing the response from the API. 

845 See: 

846 - https://github.com/kubernetes-client/python/issues/491 

847 - https://github.com/kubernetes/kubernetes/pull/63837 

848 The fix has been merged into the master branch but is not in the latest release. 

849 ''' 

850 _configure_api(deployment_description["location"]) 

851 deployment = deployment_description["deployment"] 

852 namespace = deployment_description["namespace"] 

853 

854 # Initiate the rollback 

855 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback( 

856 deployment, 

857 namespace, 

858 client.AppsV1beta1DeploymentRollback(name=deployment, 

859 rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to))) 

860 

861 # Read back the spec for the rolled-back deployment 

862 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

863 return spec.spec.template.spec.containers[0].image, spec.spec.replicas 

864 

865 

866def execute_command_in_deployment(deployment_description, command): 

867 """ 

868 Enumerates the pods in the k8s deployment identified by "deployment_description", 

869 then executes the command (represented as an argv-style list) in "command" in 

870 container 0 (the main application container) each of those pods. 

871 

872 Note that the sets of pods associated with a deployment can change over time. The 

873 enumeration is a snapshot at one point in time. The command will not be executed in 

874 pods that are created after the initial enumeration. If a pod disappears after the 

875 initial enumeration and before the command is executed, the attempt to execute the 

876 command will fail. This is not treated as a fatal error. 

877 

878 This approach is reasonable for the one current use case for "execute_command": running a 

879 script to notify a container that its configuration has changed as a result of a 

880 policy change. In this use case, the new configuration information is stored into 

881 the configuration store (Consul), the pods are enumerated, and the command is executed. 

882 If a pod disappears after the enumeration, the fact that the command cannot be run 

883 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that 

884 comes up after the enumeration will get its initial configuration from the updated version 

885 in Consul. 

886 

887 The optimal solution here would be for k8s to provide an API call to execute a command in 

888 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the 

889 only call provided by k8s operates at the pod level, not the deployment level. 

890 

891 Another interesting k8s factoid: there's no direct way to list the pods belong to a 

892 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on 

893 the pod that has the k8s deployment name. To list the pods, the code below queries for 

894 pods with the label carrying the deployment name. 

895 """ 

896 location = deployment_description["location"] 

897 _configure_api(location) 

898 deployment = deployment_description["deployment"] 

899 namespace = deployment_description["namespace"] 

900 

901 # Get names of all the running pods belonging to the deployment 

902 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod( 

903 namespace=namespace, 

904 label_selector="k8sdeployment={0}".format(deployment), 

905 field_selector="status.phase=Running" 

906 ).items] 

907 

908 # Execute command in the running pods 

909 return [_execute_command_in_pod(location, namespace, pod_name, command) 

910 for pod_name in pod_names] 

911 

912 

913 

914