Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020 Nokia. All rights reserved. 

7# Copyright (c) 2020 J. F. Lucas. All rights reserved. 

8# ================================================================================ 

9# Licensed under the Apache License, Version 2.0 (the "License"); 

10# you may not use this file except in compliance with the License. 

11# You may obtain a copy of the License at 

12# 

13# http://www.apache.org/licenses/LICENSE-2.0 

14# 

15# Unless required by applicable law or agreed to in writing, software 

16# distributed under the License is distributed on an "AS IS" BASIS, 

17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

18# See the License for the specific language governing permissions and 

19# limitations under the License. 

20# ============LICENSE_END========================================================= 

21# 

22import os 

23import re 

24import uuid 

25 

26from kubernetes import config, client, stream 

27 

28# Default values for readiness probe 

29PROBE_DEFAULT_PERIOD = 15 

30PROBE_DEFAULT_TIMEOUT = 1 

31 

32# Location of k8s cluster config file ("kubeconfig") 

33K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig" 

34 

35# Regular expression for interval/timeout specification 

36INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") 

37# Conversion factors to seconds 

38FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} 

39 

40# Regular expression for port mapping 

41# group 1: container port 

42# group 2: / + protocol 

43# group 3: protocol 

44# group 4: host port 

45PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") 

46 

47# Constants for external_cert 

48MOUNT_PATH = "/etc/onap/oom/certservice/certs/" 

49KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks" 

50TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks" 

51DEFAULT_CERT_TYPE = "p12" 

52 

53def _create_deployment_name(component_name): 

54 return "dep-{0}".format(component_name)[:63] 

55 

56def _create_service_name(component_name): 

57 return "{0}".format(component_name)[:63] 

58 

59def _create_exposed_service_name(component_name): 

60 return ("x{0}".format(component_name))[:63] 

61 

62def _configure_api(location=None): 

63 # Look for a kubernetes config file 

64 if os.path.exists(K8S_CONFIG_PATH): 

65 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) 

66 else: 

67 # Maybe we're running in a k8s container and we can use info provided by k8s 

68 # We would like to use: 

69 # config.load_incluster_config() 

70 # but this looks into os.environ for kubernetes host and port, and from 

71 # the plugin those aren't visible. So we use the InClusterConfigLoader class, 

72 # where we can set the environment to what we like. 

73 # This is probably brittle! Maybe there's a better alternative. 

74 localenv = { 

75 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local", 

76 config.incluster_config.SERVICE_PORT_ENV_NAME : "443" 

77 } 

78 config.incluster_config.InClusterConfigLoader( 

79 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME, 

80 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME, 

81 environ=localenv 

82 ).load_and_set() 

83 

84def _parse_interval(t): 

85 """ 

86 Parse an interval specification 

87 t can be 

88 - a simple integer quantity, interpreted as seconds 

89 - a string representation of a decimal integer, interpreted as seconds 

90 - a string consisting of a represention of an decimal integer followed by a unit, 

91 with "s" representing seconds, "m" representing minutes, 

92 and "h" representing hours 

93 Used for compatibility with the Docker plugin, where time intervals 

94 for health checks were specified as strings with a number and a unit. 

95 See 'intervalspec' above for the regular expression that's accepted. 

96 """ 

97 m = INTERVAL_SPEC.match(str(t)) 

98 if m: 

99 time = int(m.group(1)) * FACTORS[m.group(2)] 

100 else: 

101 raise ValueError("Bad interval specification: {0}".format(t)) 

102 return time 

103 

104def _create_probe(hc, port): 

105 ''' Create a Kubernetes probe based on info in the health check dictionary hc ''' 

106 probe_type = hc['type'] 

107 probe = None 

108 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD)) 

109 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT)) 

110 if probe_type in ['http', 'https']: 

111 probe = client.V1Probe( 

112 failure_threshold = 1, 

113 initial_delay_seconds = 5, 

114 period_seconds = period, 

115 timeout_seconds = timeout, 

116 http_get = client.V1HTTPGetAction( 

117 path = hc['endpoint'], 

118 port = port, 

119 scheme = probe_type.upper() 

120 ) 

121 ) 

122 elif probe_type in ['script', 'docker']: 122 ↛ 132line 122 didn't jump to line 132, because the condition on line 122 was never false

123 probe = client.V1Probe( 

124 failure_threshold = 1, 

125 initial_delay_seconds = 5, 

126 period_seconds = period, 

127 timeout_seconds = timeout, 

128 _exec = client.V1ExecAction( 

129 command = hc['script'].split( ) 

130 ) 

131 ) 

132 return probe 

133 

134def _create_resources(resources=None): 

135 if resources is not None: 135 ↛ 142line 135 didn't jump to line 142, because the condition on line 135 was never false

136 resources_obj = client.V1ResourceRequirements( 

137 limits = resources.get("limits"), 

138 requests = resources.get("requests") 

139 ) 

140 return resources_obj 

141 else: 

142 return None 

143 

144def _create_container_object(name, image, always_pull, **kwargs): 

145 # Set up environment variables 

146 # Copy any passed in environment variables 

147 env = kwargs.get('env') or {} 

148 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env] 

149 # Add POD_IP with the IP address of the pod running the container 

150 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP")) 

151 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip)) 

152 

153 # If a health check is specified, create a readiness/liveness probe 

154 # (For an HTTP-based check, we assume it's at the first container port) 

155 readiness = kwargs.get('readiness') 

156 liveness = kwargs.get('liveness') 

157 resources = kwargs.get('resources') 

158 container_ports = kwargs.get('container_ports') or [] 

159 

160 hc_port = container_ports[0][0] if container_ports else None 

161 probe = _create_probe(readiness, hc_port) if readiness else None 

162 live_probe = _create_probe(liveness, hc_port) if liveness else None 

163 resources_obj = _create_resources(resources) if resources else None 

164 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto) 

165 for port, proto in container_ports] 

166 

167 # Define container for pod 

168 return client.V1Container( 

169 name=name, 

170 image=image, 

171 image_pull_policy='Always' if always_pull else 'IfNotPresent', 

172 env=env_vars, 

173 ports=port_objs, 

174 volume_mounts=kwargs.get('volume_mounts') or [], 

175 resources=resources_obj, 

176 readiness_probe=probe, 

177 liveness_probe=live_probe 

178 ) 

179 

180def _create_deployment_object(component_name, 

181 containers, 

182 init_containers, 

183 replicas, 

184 volumes, 

185 labels={}, 

186 pull_secrets=[]): 

187 

188 deployment_name = _create_deployment_name(component_name) 

189 

190 # Label the pod with the deployment name, so we can find it easily 

191 labels.update({"k8sdeployment" : deployment_name}) 

192 

193 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials 

194 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod 

195 ips = [] 

196 for secret in pull_secrets: 

197 ips.append(client.V1LocalObjectReference(name=secret)) 

198 

199 # Define pod template 

200 template = client.V1PodTemplateSpec( 

201 metadata=client.V1ObjectMeta(labels=labels), 

202 spec=client.V1PodSpec(hostname=component_name, 

203 containers=containers, 

204 init_containers=init_containers, 

205 volumes=volumes, 

206 image_pull_secrets=ips) 

207 ) 

208 

209 # Define deployment spec 

210 spec = client.V1DeploymentSpec( 

211 replicas=replicas, 

212 selector=client.V1LabelSelector(match_labels=labels), 

213 template=template 

214 ) 

215 

216 # Create deployment object 

217 deployment = client.V1Deployment( 

218 api_version="apps/v1", 

219 kind="Deployment", 

220 metadata=client.V1ObjectMeta(name=deployment_name, labels=labels), 

221 spec=spec 

222 ) 

223 

224 return deployment 

225 

226def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type): 

227 service_spec = client.V1ServiceSpec( 

228 ports=service_ports, 

229 selector={"app" : component_name}, 

230 type=service_type 

231 ) 

232 if annotations: 232 ↛ 233line 232 didn't jump to line 233, because the condition on line 232 was never true

233 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations) 

234 else: 

235 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels) 

236 

237 service = client.V1Service( 

238 kind="Service", 

239 api_version="v1", 

240 metadata=metadata, 

241 spec=service_spec 

242 ) 

243 return service 

244 

245def parse_ports(port_list): 

246 ''' 

247 Parse the port list into a list of container ports (needed to create the container) 

248 and to a set of port mappings to set up k8s services. 

249 ''' 

250 container_ports = [] 

251 port_map = {} 

252 for p in port_list: 

253 m = PORTS.match(p.strip()) 

254 if m: 

255 cport = int(m.group(1)) 

256 hport = int (m.group(4)) 

257 if m.group(3): 

258 proto = (m.group(3)).upper() 

259 else: 

260 proto = "TCP" 

261 container_ports.append((cport, proto)) 

262 port_map[(cport, proto)] = hport 

263 else: 

264 raise ValueError("Bad port specification: {0}".format(p)) 

265 

266 return container_ports, port_map 

267 

268def _parse_volumes(volume_list): 

269 volumes = [] 

270 volume_mounts = [] 

271 for v in volume_list: 

272 vname = str(uuid.uuid4()) 

273 vhost = v['host']['path'] 

274 vcontainer = v['container']['bind'] 

275 vro = (v['container'].get('mode') == 'ro') 

276 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost))) 

277 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro)) 

278 

279 return volumes, volume_mounts 

280 

281def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat): 

282 if not log_info or not filebeat: 282 ↛ 283line 282 didn't jump to line 283, because the condition on line 282 was never true

283 return 

284 log_dir = log_info.get("log_directory") 

285 if not log_dir: 285 ↛ 286line 285 didn't jump to line 286, because the condition on line 285 was never true

286 return 

287 sidecar_volume_mounts = [] 

288 

289 # Create the volume for component log files and volume mounts for the component and sidecar containers 

290 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource())) 

291 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir)) 

292 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name) 

293 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path)) 

294 

295 # Create the volume for sidecar data and the volume mount for it 

296 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource())) 

297 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"])) 

298 

299 # Create the volume for the sidecar configuration data and the volume mount for it 

300 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed. 

301 volumes.append( 

302 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"]))) 

303 sidecar_volume_mounts.append( 

304 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"])) 

305 

306 # Finally create the container for the sidecar 

307 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts)) 

308 

309def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config): 

310 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a 

311 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate 

312 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified), 

313 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats. 

314 # In either case, the certificate directory is mounted onto the component container filesystem at the location 

315 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point 

316 # (tls_config["component_cert_dir"]). 

317 docker_image = tls_config["image"] 

318 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]") 

319 

320 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

321 env = {} 

322 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false" 

323 

324 # Create the certificate volume and volume mounts 

325 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource())) 

326 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory)) 

327 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])] 

328 

329 # Create the init container 

330 init_containers.append(_create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

331 

332def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config): 

333 # Adds an InitContainer to the pod which will generate external TLS certificates. 

334 docker_image = external_tls_config["image_tag"] 

335 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]") 

336 

337 env = {} 

338 output_path = external_cert.get("external_cert_directory") 

339 if not output_path.endswith('/'): 339 ↛ 340line 339 didn't jump to line 340, because the condition on line 339 was never true

340 output_path += '/' 

341 

342 env["REQUEST_URL"] = external_tls_config.get("request_url") 

343 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout") 

344 env["OUTPUT_PATH"] = output_path + "external" 

345 env["OUTPUT_TYPE"] = external_cert.get("cert_type") 

346 env["CA_NAME"] = external_cert.get("ca_name") 

347 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name") 

348 env["ORGANIZATION"] = external_tls_config.get("organization") 

349 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit") 

350 env["LOCATION"] = external_tls_config.get("location") 

351 env["STATE"] = external_tls_config.get("state") 

352 env["COUNTRY"] = external_tls_config.get("country") 

353 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans") 

354 env["KEYSTORE_PATH"] = KEYSTORE_PATH 

355 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password") 

356 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH 

357 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password") 

358 

359 # Create the volumes and volume mounts 

360 sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name")) 

361 volumes.append(client.V1Volume(name="tls-volume", secret=sec)) 

362 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")), 

363 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)] 

364 

365 # Create the init container 

366 init_containers.append(_create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

367 

368 

369def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, cert_post_processor_config): 

370 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file. 

371 docker_image = cert_post_processor_config["image_tag"] 

372 ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]") 

373 

374 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

375 if not tls_cert_dir.endswith('/'): 375 ↛ 378line 375 didn't jump to line 378, because the condition on line 375 was never false

376 tls_cert_dir += '/' 

377 

378 tls_cert_file_path = tls_cert_dir + "trust.jks" 

379 tls_cert_file_pass = tls_cert_dir + "trust.pass" 

380 

381 ext_cert_dir = tls_cert_dir + "external/" 

382 

383 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower() 

384 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type) 

385 ext_truststore_pass = '' 

386 if output_type != 'pem': 386 ↛ 389line 386 didn't jump to line 389, because the condition on line 386 was never false

387 ext_truststore_pass = ext_cert_dir + "truststore.pass" 

388 

389 env = {} 

390 env["TRUSTSTORES_PATHS"] = tls_cert_file_path + ":" + ext_truststore_path 

391 env["TRUSTSTORES_PASSWORDS_PATHS"] = tls_cert_file_pass + ":" + ext_truststore_pass 

392 env["KEYSTORE_SOURCE_PATHS"] = _get_keystore_source_paths(output_type, ext_cert_dir) 

393 env["KEYSTORE_DESTINATION_PATHS"] = _get_keystore_destination_paths(output_type, tls_cert_dir) 

394 

395 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"]) 

396 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"]) 

397 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"]) 

398 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"]) 

399 

400 # Create the volumes and volume mounts 

401 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)] 

402 

403 # Create the init container 

404 init_containers.append(_create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

405 

406 

407def _get_file_extension(output_type): 

408 return { 

409 'p12': 'p12', 

410 'pem': 'pem', 

411 'jks': 'jks', 

412 }[output_type] 

413 

414def _get_keystore_source_paths(output_type, ext_cert_dir): 

415 source_paths_template = { 

416 'p12': "{0}keystore.p12:{0}keystore.pass", 

417 'jks': "{0}keystore.jks:{0}keystore.pass", 

418 'pem': "{0}keystore.pem:{0}key.pem", 

419 }[output_type] 

420 return source_paths_template.format(ext_cert_dir) 

421 

422def _get_keystore_destination_paths(output_type, tls_cert_dir): 

423 destination_paths_template = { 

424 'p12': "{0}cert.p12:{0}p12.pass", 

425 'jks': "{0}cert.jks:{0}jks.pass", 

426 'pem': "{0}cert.pem:{0}key.pem", 

427 }[output_type] 

428 return destination_paths_template.format(tls_cert_dir) 

429 

430def _process_port_map(port_map): 

431 service_ports = [] # Ports exposed internally on the k8s network 

432 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort 

433 for (cport, proto), hport in port_map.items(): 

434 name = "xport-{0}-{1}".format(proto[0].lower(), cport) 

435 cport = int(cport) 

436 hport = int(hport) 

437 service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:])) 

438 if hport != 0: 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true

439 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

440 return service_ports, exposed_ports 

441 

442def _service_exists(location, namespace, component_name): 

443 exists = False 

444 try: 

445 _configure_api(location) 

446 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) 

447 exists = True 

448 except client.rest.ApiException: 

449 pass 

450 

451 return exists 

452 

453def _patch_deployment(location, namespace, deployment, modify): 

454 ''' 

455 Gets the current spec for 'deployment' in 'namespace' 

456 in the k8s cluster at 'location', 

457 uses the 'modify' function to change the spec, 

458 then sends the updated spec to k8s. 

459 ''' 

460 _configure_api(location) 

461 

462 # Get deployment spec 

463 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

464 

465 # Apply changes to spec 

466 spec = modify(spec) 

467 

468 # Patch the deploy with updated spec 

469 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec) 

470 

471def _execute_command_in_pod(location, namespace, pod_name, command): 

472 ''' 

473 Execute the command (specified by an argv-style list in the "command" parameter) in 

474 the specified pod in the specified namespace at the specified location. 

475 For now at least, we use this only to 

476 run a notification script in a pod after a configuration change. 

477 

478 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". 

479 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly. 

480 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API. 

481 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py. 

482 There are several issues tracking this, in various states. It isn't clear that there will ever 

483 be a fix. 

484 - https://github.com/kubernetes-client/python/issues/58 

485 - https://github.com/kubernetes-client/python/issues/409 

486 - https://github.com/kubernetes-client/python/issues/526 

487 

488 The main consequence of the workaround using "stream" is that the caller does not get an indication 

489 of the exit code returned by the command when it completes execution. It turns out that the 

490 original implementation of notification in the Docker plugin did not use this result, so we can 

491 still match the original notification functionality. 

492 

493 The "stream" approach returns a string containing any output sent by the command to stdout or stderr. 

494 We'll return that so it can logged. 

495 ''' 

496 _configure_api(location) 

497 try: 

498 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, 

499 name=pod_name, 

500 namespace=namespace, 

501 command=command, 

502 stdout=True, 

503 stderr=True, 

504 stdin=False, 

505 tty=False) 

506 except client.rest.ApiException as e: 

507 # If the exception indicates the pod wasn't found, it's not a fatal error. 

508 # It existed when we enumerated the pods for the deployment but no longer exists. 

509 # Unfortunately, the only way to distinguish a pod not found from any other error 

510 # is by looking at the reason text. 

511 # (The ApiException's "status" field should contain the HTTP status code, which would 

512 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set 

513 # to zero.) 

514 if "404 not found" in e.reason.lower(): 

515 output = "Pod not found" 

516 else: 

517 raise e 

518 

519 return {"pod" : pod_name, "output" : output} 

520 

521def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs): 

522 ''' 

523 This will create a k8s Deployment and, if needed, one or two k8s Services. 

524 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

525 We're not exposing k8s to the component developer and the blueprint author. 

526 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

527 the details from the component developer and the blueprint author.) 

528 

529 namespace: the Kubernetes namespace into which the component is deployed 

530 component_name: the component name, used to derive names of Kubernetes entities 

531 image: the docker image for the component being deployed 

532 replica: the number of instances of the component to be deployed 

533 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of 

534 the Docker image for the component, even if it is already present on the Kubernetes node. 

535 k8sconfig contains: 

536 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images. 

537 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.) 

538 - filebeat: a dictionary of filebeat sidecar parameters: 

539 "log_path" : mount point for log volume in filebeat container 

540 "data_path" : mount point for data volume in filebeat container 

541 "config_path" : mount point for config volume in filebeat container 

542 "config_subpath" : subpath for config data in filebeat container 

543 "config_map" : ConfigMap holding the filebeat configuration 

544 "image": Docker image to use for filebeat 

545 - tls: a dictionary of TLS-related information: 

546 "cert_path": mount point for certificate volume in init container 

547 "image": Docker image to use for TLS init container 

548 "component_cert_dir" : default mount point for certs 

549 - cert_post_processor: a dictionary of cert_post_processor information: 

550 "image_tag": docker image to use for cert-post-processor init container 

551 kwargs may have: 

552 - volumes: array of volume objects, where a volume object is: 

553 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

554 - ports: array of strings in the form "container_port:host_port" 

555 - env: map of name-value pairs ( {name0: value0, name1: value1...} 

556 - log_info: an object with info for setting up ELK logging, with the form: 

557 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"} 

558 - tls_info: an object with info for setting up TLS (HTTPS), with the form: 

559 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" } 

560 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

561 {"external_cert": 

562 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

563 "use_external_tls": true or false, 

564 "ca_name": "ca-name-value", 

565 "cert_type": "P12" or "JKS" or "PEM", 

566 "external_certificate_parameters": 

567 "common_name": "common-name-value", 

568 "sans": "sans-value"} 

569 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"} 

570 These label will be set on all the pods deployed as a result of this deploy() invocation. 

571 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing: 

572 - cpu: number CPU usage, like 0.5 

573 - memory: string memory requirement, like "2Gi" 

574 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes: 

575 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

576 - interval: period (in seconds) between probes 

577 - timeout: time (in seconds) to allow a probe to complete 

578 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types 

579 - path: the full path to the script to be executed in the container for "script" and "docker" types 

580 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes: 

581 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

582 - interval: period (in seconds) between probes 

583 - timeout: time (in seconds) to allow a probe to complete 

584 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types 

585 - path: the full path to the script to be executed in the container for "script" and "docker" types 

586 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

587 

588 ''' 

589 

590 deployment_ok = False 

591 cip_service_created = False 

592 deployment_description = { 

593 "namespace": namespace, 

594 "location" : kwargs.get("k8s_location"), 

595 "deployment": '', 

596 "services": [] 

597 } 

598 

599 try: 

600 

601 # Get API handles 

602 _configure_api(kwargs.get("k8s_location")) 

603 core = client.CoreV1Api() 

604 k8s_apps_v1_api_client = client.AppsV1Api() 

605 

606 # Parse the port mapping 

607 container_ports, port_map = parse_ports(kwargs.get("ports", [])) 

608 

609 # Parse the volumes list into volumes and volume_mounts for the deployment 

610 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", [])) 

611 

612 # Initialize the list of containers that will be part of the pod 

613 containers = [] 

614 init_containers = [] 

615 

616 # Set up the ELK logging sidecar container, if needed 

617 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat")) 

618 

619 # Set up TLS information 

620 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls")) 

621 

622 # Set up external TLS information 

623 external_cert = kwargs.get("external_cert") 

624 if external_cert and external_cert.get("use_external_tls"): 

625 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, k8sconfig.get("external_cert")) 

626 _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, k8sconfig.get("tls"), external_cert, k8sconfig.get("cert_post_processor")) 

627 

628 # Create the container for the component 

629 # Make it the first container in the pod 

630 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")} 

631 container_args['container_ports'] = container_ports 

632 container_args['volume_mounts'] = volume_mounts 

633 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args)) 

634 

635 # Build the k8s Deployment object 

636 labels = kwargs.get("labels", {}) 

637 labels["app"] = component_name 

638 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"]) 

639 

640 # Have k8s deploy it 

641 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep) 

642 deployment_ok = True 

643 deployment_description["deployment"] = _create_deployment_name(component_name) 

644 

645 # Create service(s), if a port mapping is specified 

646 if port_map: 646 ↛ 671line 646 didn't jump to line 671, because the condition on line 646 was never false

647 service_ports, exposed_ports = _process_port_map(port_map) 

648 

649 # Create a ClusterIP service for access via the k8s network 

650 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP") 

651 core.create_namespaced_service(namespace, service) 

652 cip_service_created = True 

653 deployment_description["services"].append(_create_service_name(component_name)) 

654 

655 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service 

656 if exposed_ports: 656 ↛ 657line 656 didn't jump to line 657

657 exposed_service = \ 

658 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort") 

659 core.create_namespaced_service(namespace, exposed_service) 

660 deployment_description["services"].append(_create_exposed_service_name(component_name)) 

661 

662 except Exception as e: 

663 # If the ClusterIP service was created, delete the service: 

664 if cip_service_created: 

665 core.delete_namespaced_service(_create_service_name(component_name), namespace) 

666 # If the deployment was created but not the service, delete the deployment 

667 if deployment_ok: 

668 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions()) 

669 raise e 

670 

671 return dep, deployment_description 

672 

673def undeploy(deployment_description): 

674 _configure_api(deployment_description["location"]) 

675 

676 namespace = deployment_description["namespace"] 

677 

678 # remove any services associated with the component 

679 for service in deployment_description["services"]: 

680 client.CoreV1Api().delete_namespaced_service(service, namespace) 

681 

682 # Have k8s delete the underlying pods and replicaset when deleting the deployment. 

683 options = client.V1DeleteOptions(propagation_policy="Foreground") 

684 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) 

685 

686def is_available(location, namespace, component_name): 

687 _configure_api(location) 

688 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace) 

689 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec 

690 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation 

691 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas 

692 

693def scale(deployment_description, replicas): 

694 ''' Trigger a scaling operation by updating the replica count for the Deployment ''' 

695 

696 def update_replica_count(spec): 

697 spec.spec.replicas = replicas 

698 return spec 

699 

700 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count) 

701 

702def upgrade(deployment_description, image, container_index = 0): 

703 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s ''' 

704 

705 def update_image(spec): 

706 spec.spec.template.spec.containers[container_index].image = image 

707 return spec 

708 

709 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image) 

710 

711def rollback(deployment_description, rollback_to=0): 

712 ''' 

713 Undo upgrade by rolling back to a previous revision of the deployment. 

714 By default, go back one revision. 

715 rollback_to can be used to supply a specific revision number. 

716 Returns the image for the app container and the replica count from the rolled-back deployment 

717 ''' 

718 ''' 

719 2018-07-13 

720 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method. 

721 The k8s python client code throws an exception while processing the response from the API. 

722 See: 

723 - https://github.com/kubernetes-client/python/issues/491 

724 - https://github.com/kubernetes/kubernetes/pull/63837 

725 The fix has been merged into the master branch but is not in the latest release. 

726 ''' 

727 _configure_api(deployment_description["location"]) 

728 deployment = deployment_description["deployment"] 

729 namespace = deployment_description["namespace"] 

730 

731 # Initiate the rollback 

732 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback( 

733 deployment, 

734 namespace, 

735 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to))) 

736 

737 # Read back the spec for the rolled-back deployment 

738 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

739 return spec.spec.template.spec.containers[0].image, spec.spec.replicas 

740 

741def execute_command_in_deployment(deployment_description, command): 

742 ''' 

743 Enumerates the pods in the k8s deployment identified by "deployment_description", 

744 then executes the command (represented as an argv-style list) in "command" in 

745 container 0 (the main application container) each of those pods. 

746 

747 Note that the sets of pods associated with a deployment can change over time. The 

748 enumeration is a snapshot at one point in time. The command will not be executed in 

749 pods that are created after the initial enumeration. If a pod disappears after the 

750 initial enumeration and before the command is executed, the attempt to execute the 

751 command will fail. This is not treated as a fatal error. 

752 

753 This approach is reasonable for the one current use case for "execute_command": running a 

754 script to notify a container that its configuration has changed as a result of a 

755 policy change. In this use case, the new configuration information is stored into 

756 the configuration store (Consul), the pods are enumerated, and the command is executed. 

757 If a pod disappears after the enumeration, the fact that the command cannot be run 

758 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that 

759 comes up after the enumeration will get its initial configuration from the updated version 

760 in Consul. 

761 

762 The optimal solution here would be for k8s to provide an API call to execute a command in 

763 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the 

764 only call provided by k8s operates at the pod level, not the deployment level. 

765 

766 Another interesting k8s factoid: there's no direct way to list the pods belong to a 

767 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on 

768 the pod that has the k8s deployment name. To list the pods, the code below queries for 

769 pods with the label carrying the deployment name. 

770 ''' 

771 location = deployment_description["location"] 

772 _configure_api(location) 

773 deployment = deployment_description["deployment"] 

774 namespace = deployment_description["namespace"] 

775 

776 # Get names of all the running pods belonging to the deployment 

777 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod( 

778 namespace = namespace, 

779 label_selector = "k8sdeployment={0}".format(deployment), 

780 field_selector = "status.phase=Running" 

781 ).items] 

782 

783 # Execute command in the running pods 

784 return [_execute_command_in_pod(location, namespace, pod_name, command) 

785 for pod_name in pod_names]