Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020 Nokia. All rights reserved. 

7# Copyright (c) 2020 J. F. Lucas. All rights reserved. 

8# ================================================================================ 

9# Licensed under the Apache License, Version 2.0 (the "License"); 

10# you may not use this file except in compliance with the License. 

11# You may obtain a copy of the License at 

12# 

13# http://www.apache.org/licenses/LICENSE-2.0 

14# 

15# Unless required by applicable law or agreed to in writing, software 

16# distributed under the License is distributed on an "AS IS" BASIS, 

17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

18# See the License for the specific language governing permissions and 

19# limitations under the License. 

20# ============LICENSE_END========================================================= 

21# 

22import os 

23import re 

24import uuid 

25 

26from kubernetes import config, client, stream 

27 

28# Default values for readiness probe 

29PROBE_DEFAULT_PERIOD = 15 

30PROBE_DEFAULT_TIMEOUT = 1 

31 

32# Location of k8s cluster config file ("kubeconfig") 

33K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig" 

34 

35# Regular expression for interval/timeout specification 

36INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") 

37# Conversion factors to seconds 

38FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} 

39 

40# Regular expression for port mapping 

41# group 1: container port 

42# group 2: / + protocol 

43# group 3: protocol 

44# group 4: host port 

45PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") 

46 

47# Constants for external_cert 

48MOUNT_PATH = "/etc/onap/oom/certservice/certs/" 

49KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks" 

50TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks" 

51DEFAULT_CERT_TYPE = "p12" 

52 

53def _create_deployment_name(component_name): 

54 return "dep-{0}".format(component_name)[:63] 

55 

56def _create_service_name(component_name): 

57 return "{0}".format(component_name)[:63] 

58 

59def _create_exposed_service_name(component_name): 

60 return ("x{0}".format(component_name))[:63] 

61 

62def _create_exposed_v6_service_name(component_name): 

63 return ("x{0}-ipv6".format(component_name))[:63] 

64 

65def _configure_api(location=None): 

66 # Look for a kubernetes config file 

67 if os.path.exists(K8S_CONFIG_PATH): 

68 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) 

69 else: 

70 # Maybe we're running in a k8s container and we can use info provided by k8s 

71 # We would like to use: 

72 # config.load_incluster_config() 

73 # but this looks into os.environ for kubernetes host and port, and from 

74 # the plugin those aren't visible. So we use the InClusterConfigLoader class, 

75 # where we can set the environment to what we like. 

76 # This is probably brittle! Maybe there's a better alternative. 

77 localenv = { 

78 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local", 

79 config.incluster_config.SERVICE_PORT_ENV_NAME : "443" 

80 } 

81 config.incluster_config.InClusterConfigLoader( 

82 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME, 

83 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME, 

84 environ=localenv 

85 ).load_and_set() 

86 

87def _parse_interval(t): 

88 """ 

89 Parse an interval specification 

90 t can be 

91 - a simple integer quantity, interpreted as seconds 

92 - a string representation of a decimal integer, interpreted as seconds 

93 - a string consisting of a represention of an decimal integer followed by a unit, 

94 with "s" representing seconds, "m" representing minutes, 

95 and "h" representing hours 

96 Used for compatibility with the Docker plugin, where time intervals 

97 for health checks were specified as strings with a number and a unit. 

98 See 'intervalspec' above for the regular expression that's accepted. 

99 """ 

100 m = INTERVAL_SPEC.match(str(t)) 

101 if m: 

102 time = int(m.group(1)) * FACTORS[m.group(2)] 

103 else: 

104 raise ValueError("Bad interval specification: {0}".format(t)) 

105 return time 

106 

107def _create_probe(hc, port): 

108 ''' Create a Kubernetes probe based on info in the health check dictionary hc ''' 

109 probe_type = hc['type'] 

110 probe = None 

111 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD)) 

112 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT)) 

113 if probe_type in ['http', 'https']: 

114 probe = client.V1Probe( 

115 failure_threshold = 1, 

116 initial_delay_seconds = 5, 

117 period_seconds = period, 

118 timeout_seconds = timeout, 

119 http_get = client.V1HTTPGetAction( 

120 path = hc['endpoint'], 

121 port = port, 

122 scheme = probe_type.upper() 

123 ) 

124 ) 

125 elif probe_type in ['script', 'docker']: 125 ↛ 135line 125 didn't jump to line 135, because the condition on line 125 was never false

126 probe = client.V1Probe( 

127 failure_threshold = 1, 

128 initial_delay_seconds = 5, 

129 period_seconds = period, 

130 timeout_seconds = timeout, 

131 _exec = client.V1ExecAction( 

132 command = hc['script'].split( ) 

133 ) 

134 ) 

135 return probe 

136 

137def _create_resources(resources=None): 

138 if resources is not None: 138 ↛ 145line 138 didn't jump to line 145, because the condition on line 138 was never false

139 resources_obj = client.V1ResourceRequirements( 

140 limits = resources.get("limits"), 

141 requests = resources.get("requests") 

142 ) 

143 return resources_obj 

144 else: 

145 return None 

146 

147def _create_container_object(name, image, always_pull, **kwargs): 

148 # Set up environment variables 

149 # Copy any passed in environment variables 

150 env = kwargs.get('env') or {} 

151 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env] 

152 # Add POD_IP with the IP address of the pod running the container 

153 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP")) 

154 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip)) 

155 

156 # If a health check is specified, create a readiness/liveness probe 

157 # (For an HTTP-based check, we assume it's at the first container port) 

158 readiness = kwargs.get('readiness') 

159 liveness = kwargs.get('liveness') 

160 resources = kwargs.get('resources') 

161 container_ports = kwargs.get('container_ports') or [] 

162 

163 hc_port = container_ports[0][0] if container_ports else None 

164 probe = _create_probe(readiness, hc_port) if readiness else None 

165 live_probe = _create_probe(liveness, hc_port) if liveness else None 

166 resources_obj = _create_resources(resources) if resources else None 

167 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto) 

168 for port, proto in container_ports] 

169 

170 # Define container for pod 

171 return client.V1Container( 

172 name=name, 

173 image=image, 

174 image_pull_policy='Always' if always_pull else 'IfNotPresent', 

175 env=env_vars, 

176 ports=port_objs, 

177 volume_mounts=kwargs.get('volume_mounts') or [], 

178 resources=resources_obj, 

179 readiness_probe=probe, 

180 liveness_probe=live_probe 

181 ) 

182 

183def _create_deployment_object(component_name, 

184 containers, 

185 init_containers, 

186 replicas, 

187 volumes, 

188 labels={}, 

189 pull_secrets=[]): 

190 

191 deployment_name = _create_deployment_name(component_name) 

192 

193 # Label the pod with the deployment name, so we can find it easily 

194 labels.update({"k8sdeployment" : deployment_name}) 

195 

196 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials 

197 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod 

198 ips = [] 

199 for secret in pull_secrets: 

200 ips.append(client.V1LocalObjectReference(name=secret)) 

201 

202 # Define pod template 

203 template = client.V1PodTemplateSpec( 

204 metadata=client.V1ObjectMeta(labels=labels), 

205 spec=client.V1PodSpec(hostname=component_name, 

206 containers=containers, 

207 init_containers=init_containers, 

208 volumes=volumes, 

209 image_pull_secrets=ips) 

210 ) 

211 

212 # Define deployment spec 

213 spec = client.V1DeploymentSpec( 

214 replicas=replicas, 

215 selector=client.V1LabelSelector(match_labels=labels), 

216 template=template 

217 ) 

218 

219 # Create deployment object 

220 deployment = client.V1Deployment( 

221 api_version="apps/v1", 

222 kind="Deployment", 

223 metadata=client.V1ObjectMeta(name=deployment_name, labels=labels), 

224 spec=spec 

225 ) 

226 

227 return deployment 

228 

229 

230def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family): 

231 service_spec = client.V1ServiceSpec( 

232 ports=service_ports, 

233 selector={"app": component_name}, 

234 type=service_type, 

235 ip_family=ip_family 

236 ) 

237 if annotations: 237 ↛ 238line 237 didn't jump to line 238, because the condition on line 237 was never true

238 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations) 

239 else: 

240 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels) 

241 

242 service = client.V1Service( 

243 kind="Service", 

244 api_version="v1", 

245 metadata=metadata, 

246 spec=service_spec 

247 ) 

248 return service 

249 

250def parse_ports(port_list): 

251 ''' 

252 Parse the port list into a list of container ports (needed to create the container) 

253 and to a set of port mappings to set up k8s services. 

254 ''' 

255 container_ports = [] 

256 port_map = {} 

257 for p in port_list: 

258 ipv6 = False 

259 if type(p) is dict: 

260 ipv6 = "ipv6" in p and p['ipv6'] 

261 p = "".join(str(v) for v in p['concat']) 

262 m = PORTS.match(p.strip()) 

263 if m: 

264 cport = int(m.group(1)) 

265 hport = int(m.group(4)) 

266 if m.group(3): 

267 proto = (m.group(3)).upper() 

268 else: 

269 proto = "TCP" 

270 port = (cport, proto) 

271 if port not in container_ports: 271 ↛ 273line 271 didn't jump to line 273, because the condition on line 271 was never false

272 container_ports.append(port) 

273 port_map[(cport, proto, ipv6)] = hport 

274 else: 

275 raise ValueError("Bad port specification: {0}".format(p)) 

276 

277 return container_ports, port_map 

278 

279 

280def _parse_volumes(volume_list): 

281 volumes = [] 

282 volume_mounts = [] 

283 for v in volume_list: 

284 vname = str(uuid.uuid4()) 

285 vhost = v['host']['path'] 

286 vcontainer = v['container']['bind'] 

287 vro = (v['container'].get('mode') == 'ro') 

288 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost))) 

289 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro)) 

290 

291 return volumes, volume_mounts 

292 

293def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat): 

294 if not log_info or not filebeat: 294 ↛ 295line 294 didn't jump to line 295, because the condition on line 294 was never true

295 return 

296 log_dir = log_info.get("log_directory") 

297 if not log_dir: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 return 

299 sidecar_volume_mounts = [] 

300 

301 # Create the volume for component log files and volume mounts for the component and sidecar containers 

302 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource())) 

303 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir)) 

304 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name) 

305 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path)) 

306 

307 # Create the volume for sidecar data and the volume mount for it 

308 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource())) 

309 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"])) 

310 

311 # Create the volume for the sidecar configuration data and the volume mount for it 

312 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed. 

313 volumes.append( 

314 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"]))) 

315 sidecar_volume_mounts.append( 

316 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"])) 

317 

318 # Finally create the container for the sidecar 

319 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts)) 

320 

321def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config): 

322 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a 

323 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate 

324 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified), 

325 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats. 

326 # In either case, the certificate directory is mounted onto the component container filesystem at the location 

327 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point 

328 # (tls_config["component_cert_dir"]). 

329 docker_image = tls_config["image"] 

330 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]") 

331 

332 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

333 env = {} 

334 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false" 

335 

336 # Create the certificate volume and volume mounts 

337 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource())) 

338 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory)) 

339 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])] 

340 

341 # Create the init container 

342 init_containers.append(_create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

343 

344def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config): 

345 # Adds an InitContainer to the pod which will generate external TLS certificates. 

346 docker_image = external_tls_config["image_tag"] 

347 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]") 

348 

349 env = {} 

350 output_path = external_cert.get("external_cert_directory") 

351 if not output_path.endswith('/'): 351 ↛ 352line 351 didn't jump to line 352, because the condition on line 351 was never true

352 output_path += '/' 

353 

354 env["REQUEST_URL"] = external_tls_config.get("request_url") 

355 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout") 

356 env["OUTPUT_PATH"] = output_path + "external" 

357 env["OUTPUT_TYPE"] = external_cert.get("cert_type") 

358 env["CA_NAME"] = external_cert.get("ca_name") 

359 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name") 

360 env["ORGANIZATION"] = external_tls_config.get("organization") 

361 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit") 

362 env["LOCATION"] = external_tls_config.get("location") 

363 env["STATE"] = external_tls_config.get("state") 

364 env["COUNTRY"] = external_tls_config.get("country") 

365 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans") 

366 env["KEYSTORE_PATH"] = KEYSTORE_PATH 

367 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password") 

368 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH 

369 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password") 

370 

371 # Create the volumes and volume mounts 

372 sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name")) 

373 volumes.append(client.V1Volume(name="tls-volume", secret=sec)) 

374 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")), 

375 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)] 

376 

377 # Create the init container 

378 init_containers.append(_create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

379 

380 

381def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, cert_post_processor_config): 

382 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file. 

383 docker_image = cert_post_processor_config["image_tag"] 

384 ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]") 

385 

386 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir") 

387 if not tls_cert_dir.endswith('/'): 387 ↛ 390line 387 didn't jump to line 390, because the condition on line 387 was never false

388 tls_cert_dir += '/' 

389 

390 tls_cert_file_path = tls_cert_dir + "trust.jks" 

391 tls_cert_file_pass = tls_cert_dir + "trust.pass" 

392 

393 ext_cert_dir = tls_cert_dir + "external/" 

394 

395 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower() 

396 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type) 

397 ext_truststore_pass = '' 

398 if output_type != 'pem': 398 ↛ 401line 398 didn't jump to line 401, because the condition on line 398 was never false

399 ext_truststore_pass = ext_cert_dir + "truststore.pass" 

400 

401 env = {} 

402 env["TRUSTSTORES_PATHS"] = tls_cert_file_path + ":" + ext_truststore_path 

403 env["TRUSTSTORES_PASSWORDS_PATHS"] = tls_cert_file_pass + ":" + ext_truststore_pass 

404 env["KEYSTORE_SOURCE_PATHS"] = _get_keystore_source_paths(output_type, ext_cert_dir) 

405 env["KEYSTORE_DESTINATION_PATHS"] = _get_keystore_destination_paths(output_type, tls_cert_dir) 

406 

407 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"]) 

408 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"]) 

409 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"]) 

410 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"]) 

411 

412 # Create the volumes and volume mounts 

413 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)] 

414 

415 # Create the init container 

416 init_containers.append(_create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env)) 

417 

418 

419def _get_file_extension(output_type): 

420 return { 

421 'p12': 'p12', 

422 'pem': 'pem', 

423 'jks': 'jks', 

424 }[output_type] 

425 

426def _get_keystore_source_paths(output_type, ext_cert_dir): 

427 source_paths_template = { 

428 'p12': "{0}keystore.p12:{0}keystore.pass", 

429 'jks': "{0}keystore.jks:{0}keystore.pass", 

430 'pem': "{0}keystore.pem:{0}key.pem", 

431 }[output_type] 

432 return source_paths_template.format(ext_cert_dir) 

433 

434def _get_keystore_destination_paths(output_type, tls_cert_dir): 

435 destination_paths_template = { 

436 'p12': "{0}cert.p12:{0}p12.pass", 

437 'jks': "{0}cert.jks:{0}jks.pass", 

438 'pem': "{0}cert.pem:{0}key.pem", 

439 }[output_type] 

440 return destination_paths_template.format(tls_cert_dir) 

441 

442 

443def _process_port_map(port_map): 

444 service_ports = [] # Ports exposed internally on the k8s network 

445 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort 

446 exposed_ports_ipv6 = [] 

447 for (cport, proto, ipv6), hport in port_map.items(): 

448 name = "xport-{0}-{1}".format(proto[0].lower(), cport) 

449 cport = int(cport) 

450 hport = int(hport) 

451 port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:]) 

452 if port not in service_ports: 452 ↛ 454line 452 didn't jump to line 454, because the condition on line 452 was never false

453 service_ports.append(port) 

454 if hport != 0: 454 ↛ 455line 454 didn't jump to line 455, because the condition on line 454 was never true

455 if ipv6: 

456 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

457 else: 

458 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name)) 

459 return service_ports, exposed_ports, exposed_ports_ipv6 

460 

461def _service_exists(location, namespace, component_name): 

462 exists = False 

463 try: 

464 _configure_api(location) 

465 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) 

466 exists = True 

467 except client.rest.ApiException: 

468 pass 

469 

470 return exists 

471 

472def _patch_deployment(location, namespace, deployment, modify): 

473 ''' 

474 Gets the current spec for 'deployment' in 'namespace' 

475 in the k8s cluster at 'location', 

476 uses the 'modify' function to change the spec, 

477 then sends the updated spec to k8s. 

478 ''' 

479 _configure_api(location) 

480 

481 # Get deployment spec 

482 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

483 

484 # Apply changes to spec 

485 spec = modify(spec) 

486 

487 # Patch the deploy with updated spec 

488 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec) 

489 

490def _execute_command_in_pod(location, namespace, pod_name, command): 

491 ''' 

492 Execute the command (specified by an argv-style list in the "command" parameter) in 

493 the specified pod in the specified namespace at the specified location. 

494 For now at least, we use this only to 

495 run a notification script in a pod after a configuration change. 

496 

497 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". 

498 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly. 

499 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API. 

500 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py. 

501 There are several issues tracking this, in various states. It isn't clear that there will ever 

502 be a fix. 

503 - https://github.com/kubernetes-client/python/issues/58 

504 - https://github.com/kubernetes-client/python/issues/409 

505 - https://github.com/kubernetes-client/python/issues/526 

506 

507 The main consequence of the workaround using "stream" is that the caller does not get an indication 

508 of the exit code returned by the command when it completes execution. It turns out that the 

509 original implementation of notification in the Docker plugin did not use this result, so we can 

510 still match the original notification functionality. 

511 

512 The "stream" approach returns a string containing any output sent by the command to stdout or stderr. 

513 We'll return that so it can logged. 

514 ''' 

515 _configure_api(location) 

516 try: 

517 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, 

518 name=pod_name, 

519 namespace=namespace, 

520 command=command, 

521 stdout=True, 

522 stderr=True, 

523 stdin=False, 

524 tty=False) 

525 except client.rest.ApiException as e: 

526 # If the exception indicates the pod wasn't found, it's not a fatal error. 

527 # It existed when we enumerated the pods for the deployment but no longer exists. 

528 # Unfortunately, the only way to distinguish a pod not found from any other error 

529 # is by looking at the reason text. 

530 # (The ApiException's "status" field should contain the HTTP status code, which would 

531 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set 

532 # to zero.) 

533 if "404 not found" in e.reason.lower(): 

534 output = "Pod not found" 

535 else: 

536 raise e 

537 

538 return {"pod" : pod_name, "output" : output} 

539 

540def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs): 

541 ''' 

542 This will create a k8s Deployment and, if needed, one or two k8s Services. 

543 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

544 We're not exposing k8s to the component developer and the blueprint author. 

545 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

546 the details from the component developer and the blueprint author.) 

547 

548 namespace: the Kubernetes namespace into which the component is deployed 

549 component_name: the component name, used to derive names of Kubernetes entities 

550 image: the docker image for the component being deployed 

551 replica: the number of instances of the component to be deployed 

552 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of 

553 the Docker image for the component, even if it is already present on the Kubernetes node. 

554 k8sconfig contains: 

555 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images. 

556 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.) 

557 - filebeat: a dictionary of filebeat sidecar parameters: 

558 "log_path" : mount point for log volume in filebeat container 

559 "data_path" : mount point for data volume in filebeat container 

560 "config_path" : mount point for config volume in filebeat container 

561 "config_subpath" : subpath for config data in filebeat container 

562 "config_map" : ConfigMap holding the filebeat configuration 

563 "image": Docker image to use for filebeat 

564 - tls: a dictionary of TLS-related information: 

565 "cert_path": mount point for certificate volume in init container 

566 "image": Docker image to use for TLS init container 

567 "component_cert_dir" : default mount point for certs 

568 - cert_post_processor: a dictionary of cert_post_processor information: 

569 "image_tag": docker image to use for cert-post-processor init container 

570 kwargs may have: 

571 - volumes: array of volume objects, where a volume object is: 

572 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

573 - ports: array of strings in the form "container_port:host_port" 

574 - env: map of name-value pairs ( {name0: value0, name1: value1...} 

575 - log_info: an object with info for setting up ELK logging, with the form: 

576 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"} 

577 - tls_info: an object with info for setting up TLS (HTTPS), with the form: 

578 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" } 

579 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

580 {"external_cert": 

581 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

582 "use_external_tls": true or false, 

583 "ca_name": "ca-name-value", 

584 "cert_type": "P12" or "JKS" or "PEM", 

585 "external_certificate_parameters": 

586 "common_name": "common-name-value", 

587 "sans": "sans-value"} 

588 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"} 

589 These label will be set on all the pods deployed as a result of this deploy() invocation. 

590 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing: 

591 - cpu: number CPU usage, like 0.5 

592 - memory: string memory requirement, like "2Gi" 

593 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes: 

594 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

595 - interval: period (in seconds) between probes 

596 - timeout: time (in seconds) to allow a probe to complete 

597 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types 

598 - path: the full path to the script to be executed in the container for "script" and "docker" types 

599 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes: 

600 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker") 

601 - interval: period (in seconds) between probes 

602 - timeout: time (in seconds) to allow a probe to complete 

603 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types 

604 - path: the full path to the script to be executed in the container for "script" and "docker" types 

605 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

606 

607 ''' 

608 

609 deployment_ok = False 

610 cip_service_created = False 

611 deployment_description = { 

612 "namespace": namespace, 

613 "location" : kwargs.get("k8s_location"), 

614 "deployment": '', 

615 "services": [] 

616 } 

617 

618 try: 

619 

620 # Get API handles 

621 _configure_api(kwargs.get("k8s_location")) 

622 core = client.CoreV1Api() 

623 k8s_apps_v1_api_client = client.AppsV1Api() 

624 

625 # Parse the port mapping 

626 container_ports, port_map = parse_ports(kwargs.get("ports", [])) 

627 

628 # Parse the volumes list into volumes and volume_mounts for the deployment 

629 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", [])) 

630 

631 # Initialize the list of containers that will be part of the pod 

632 containers = [] 

633 init_containers = [] 

634 

635 # Set up the ELK logging sidecar container, if needed 

636 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat")) 

637 

638 # Set up TLS information 

639 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls")) 

640 

641 # Set up external TLS information 

642 external_cert = kwargs.get("external_cert") 

643 if external_cert and external_cert.get("use_external_tls"): 

644 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, k8sconfig.get("external_cert")) 

645 _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, k8sconfig.get("tls"), external_cert, k8sconfig.get("cert_post_processor")) 

646 

647 # Create the container for the component 

648 # Make it the first container in the pod 

649 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")} 

650 container_args['container_ports'] = container_ports 

651 container_args['volume_mounts'] = volume_mounts 

652 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args)) 

653 

654 # Build the k8s Deployment object 

655 labels = kwargs.get("labels", {}) 

656 labels["app"] = component_name 

657 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"]) 

658 

659 # Have k8s deploy it 

660 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep) 

661 deployment_ok = True 

662 deployment_description["deployment"] = _create_deployment_name(component_name) 

663 

664 # Create service(s), if a port mapping is specified 

665 if port_map: 665 ↛ 699line 665 didn't jump to line 699, because the condition on line 665 was never false

666 service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map) 

667 

668 # Create a ClusterIP service for access via the k8s network 

669 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, 

670 labels, "ClusterIP", "IPv4") 

671 core.create_namespaced_service(namespace, service) 

672 cip_service_created = True 

673 deployment_description["services"].append(_create_service_name(component_name)) 

674 

675 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service 

676 if exposed_ports: 676 ↛ 677line 676 didn't jump to line 677

677 exposed_service = \ 

678 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, 

679 '', labels, "NodePort", "IPv4") 

680 core.create_namespaced_service(namespace, exposed_service) 

681 deployment_description["services"].append(_create_exposed_service_name(component_name)) 

682 

683 if exposed_ports_ipv6: 683 ↛ 684line 683 didn't jump to line 684

684 exposed_service_ipv6 = \ 

685 _create_service_object(_create_exposed_v6_service_name(component_name), component_name, 

686 exposed_ports_ipv6, '', labels, "NodePort", "IPv6") 

687 core.create_namespaced_service(namespace, exposed_service_ipv6) 

688 deployment_description["services"].append(_create_exposed_v6_service_name(component_name)) 

689 

690 except Exception as e: 

691 # If the ClusterIP service was created, delete the service: 

692 if cip_service_created: 

693 core.delete_namespaced_service(_create_service_name(component_name), namespace) 

694 # If the deployment was created but not the service, delete the deployment 

695 if deployment_ok: 

696 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions()) 

697 raise e 

698 

699 return dep, deployment_description 

700 

701def undeploy(deployment_description): 

702 _configure_api(deployment_description["location"]) 

703 

704 namespace = deployment_description["namespace"] 

705 

706 # remove any services associated with the component 

707 for service in deployment_description["services"]: 

708 client.CoreV1Api().delete_namespaced_service(service, namespace) 

709 

710 # Have k8s delete the underlying pods and replicaset when deleting the deployment. 

711 options = client.V1DeleteOptions(propagation_policy="Foreground") 

712 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) 

713 

714def is_available(location, namespace, component_name): 

715 _configure_api(location) 

716 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace) 

717 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec 

718 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation 

719 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas 

720 

721def scale(deployment_description, replicas): 

722 ''' Trigger a scaling operation by updating the replica count for the Deployment ''' 

723 

724 def update_replica_count(spec): 

725 spec.spec.replicas = replicas 

726 return spec 

727 

728 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count) 

729 

730def upgrade(deployment_description, image, container_index = 0): 

731 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s ''' 

732 

733 def update_image(spec): 

734 spec.spec.template.spec.containers[container_index].image = image 

735 return spec 

736 

737 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image) 

738 

739def rollback(deployment_description, rollback_to=0): 

740 ''' 

741 Undo upgrade by rolling back to a previous revision of the deployment. 

742 By default, go back one revision. 

743 rollback_to can be used to supply a specific revision number. 

744 Returns the image for the app container and the replica count from the rolled-back deployment 

745 ''' 

746 ''' 

747 2018-07-13 

748 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method. 

749 The k8s python client code throws an exception while processing the response from the API. 

750 See: 

751 - https://github.com/kubernetes-client/python/issues/491 

752 - https://github.com/kubernetes/kubernetes/pull/63837 

753 The fix has been merged into the master branch but is not in the latest release. 

754 ''' 

755 _configure_api(deployment_description["location"]) 

756 deployment = deployment_description["deployment"] 

757 namespace = deployment_description["namespace"] 

758 

759 # Initiate the rollback 

760 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback( 

761 deployment, 

762 namespace, 

763 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to))) 

764 

765 # Read back the spec for the rolled-back deployment 

766 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace) 

767 return spec.spec.template.spec.containers[0].image, spec.spec.replicas 

768 

769def execute_command_in_deployment(deployment_description, command): 

770 ''' 

771 Enumerates the pods in the k8s deployment identified by "deployment_description", 

772 then executes the command (represented as an argv-style list) in "command" in 

773 container 0 (the main application container) each of those pods. 

774 

775 Note that the sets of pods associated with a deployment can change over time. The 

776 enumeration is a snapshot at one point in time. The command will not be executed in 

777 pods that are created after the initial enumeration. If a pod disappears after the 

778 initial enumeration and before the command is executed, the attempt to execute the 

779 command will fail. This is not treated as a fatal error. 

780 

781 This approach is reasonable for the one current use case for "execute_command": running a 

782 script to notify a container that its configuration has changed as a result of a 

783 policy change. In this use case, the new configuration information is stored into 

784 the configuration store (Consul), the pods are enumerated, and the command is executed. 

785 If a pod disappears after the enumeration, the fact that the command cannot be run 

786 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that 

787 comes up after the enumeration will get its initial configuration from the updated version 

788 in Consul. 

789 

790 The optimal solution here would be for k8s to provide an API call to execute a command in 

791 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the 

792 only call provided by k8s operates at the pod level, not the deployment level. 

793 

794 Another interesting k8s factoid: there's no direct way to list the pods belong to a 

795 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on 

796 the pod that has the k8s deployment name. To list the pods, the code below queries for 

797 pods with the label carrying the deployment name. 

798 ''' 

799 location = deployment_description["location"] 

800 _configure_api(location) 

801 deployment = deployment_description["deployment"] 

802 namespace = deployment_description["namespace"] 

803 

804 # Get names of all the running pods belonging to the deployment 

805 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod( 

806 namespace = namespace, 

807 label_selector = "k8sdeployment={0}".format(deployment), 

808 field_selector = "status.phase=Running" 

809 ).items] 

810 

811 # Execute command in the running pods 

812 return [_execute_command_in_pod(location, namespace, pod_name, command) 

813 for pod_name in pod_names]