Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020-2021 Nokia. All rights reserved. 

7# ================================================================================ 

8# Licensed under the Apache License, Version 2.0 (the "License"); 

9# you may not use this file except in compliance with the License. 

10# You may obtain a copy of the License at 

11# 

12# http://www.apache.org/licenses/LICENSE-2.0 

13# 

14# Unless required by applicable law or agreed to in writing, software 

15# distributed under the License is distributed on an "AS IS" BASIS, 

16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19# ============LICENSE_END========================================================= 

20 

21# Lifecycle interface calls for containerized components 

22 

23# Needed by Cloudify Manager to load google.auth for the Kubernetes python client 

24from . import cloudify_importer 

25 

26import time, copy 

27import json 

28from cloudify import ctx 

29from cloudify.decorators import operation 

30from cloudify.exceptions import NonRecoverableError, RecoverableError 

31from onap_dcae_dcaepolicy_lib import Policies 

32from k8splugin import discovery as dis 

33from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \ 

34 merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update 

35from k8splugin.exceptions import DockerPluginDeploymentError 

36from k8splugin import utils 

37from configure import configure 

38import k8sclient 

39 

40# Get configuration 

41plugin_conf = configure.configure() 

42CONSUL_HOST = plugin_conf.get("consul_host") 

43CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name") 

44DCAE_NAMESPACE = plugin_conf.get("namespace") 

45DEFAULT_MAX_WAIT = plugin_conf.get("max_wait") 

46DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location") 

47COMPONENT_CERT_DIR = plugin_conf.get("tls",{}).get("component_cert_dir") 

48CBS_BASE_URL = plugin_conf.get("cbs").get("base_url") 

49 

50# Used to construct delivery urls for data router subscribers. Data router in FTL 

51# requires https but this author believes that ONAP is to be defaulted to http. 

52DEFAULT_SCHEME = "http" 

53 

54# Property keys 

55SERVICE_COMPONENT_NAME = "service_component_name" 

56CONTAINER_ID = "container_id" 

57APPLICATION_CONFIG = "application_config" 

58K8S_DEPLOYMENT = "k8s_deployment" 

59RESOURCE_KW = "resource_config" 

60LOCATION_ID = "location_id" 

61 

62# External cert parameters 

63EXT_CERT_DIR = "external_cert_directory" 

64EXT_CA_NAME = "ca_name" 

65EXT_CERT_PARAMS = "external_certificate_parameters" 

66EXT_COMMON_NAME = "common_name" 

67EXT_CERT_ERROR_MESSAGE = "Provided blueprint is incorrect. It specifies external_cert without all the required parameters. " \ 

68 "Required parameters are: {0}, {1}, {2}.{3}".format(EXT_CERT_DIR, EXT_CA_NAME, EXT_CERT_PARAMS, EXT_COMMON_NAME) 

69 

70# Utility methods 

71 

72# Lifecycle interface calls for dcae.nodes.DockerContainer 

73 

74def _setup_for_discovery(**kwargs): 

75 """Setup for config discovery""" 

76 try: 

77 name = kwargs['name'] 

78 application_config = kwargs[APPLICATION_CONFIG] 

79 

80 # NOTE: application_config is no longer a json string and is inputed as a 

81 # YAML map which translates to a dict. We don't have to do any 

82 # preprocessing anymore. 

83 conn = dis.create_kv_conn(CONSUL_HOST) 

84 dis.push_service_component_config(conn, name, application_config) 

85 return kwargs 

86 except dis.DiscoveryConnectionError as e: 86 ↛ 88line 86 didn't jump to line 88

87 raise RecoverableError(e) 

88 except Exception as e: 

89 ctx.logger.error("Unexpected error while pushing configuration: {0}" 

90 .format(str(e))) 

91 raise NonRecoverableError(e) 

92 

93def _generate_component_name(**kwargs): 

94 """Generate component name""" 

95 service_component_type = kwargs['service_component_type'] 

96 name_override = kwargs['service_component_name_override'] 

97 

98 kwargs['name'] = name_override if name_override \ 

99 else dis.generate_service_component_name(service_component_type) 

100 return kwargs 

101 

102def _done_for_create(**kwargs): 

103 """Wrap up create operation""" 

104 name = kwargs['name'] 

105 kwargs[SERVICE_COMPONENT_NAME] = name 

106 # All updates to the runtime_properties happens here. I don't see a reason 

107 # why we shouldn't do this because the context is not being mutated by 

108 # something else and will keep the other functions pure (pure in the sense 

109 # not dealing with CloudifyContext). 

110 ctx.instance.runtime_properties.update(kwargs) 

111 ctx.logger.info("Done setting up: {0}".format(name)) 

112 return kwargs 

113 

114def _get_resources(**kwargs): 

115 if kwargs is not None: 

116 ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW))) 

117 return kwargs.get(RESOURCE_KW) 

118 ctx.logger.info("set resources to None") 

119 return None 

120 

121def _get_location(): 

122 ''' Get the k8s location property. Set to the default if the property is missing, None, or zero-length ''' 

123 return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \ 

124 else DEFAULT_K8S_LOCATION 

125 

126@merge_inputs_for_create 

127@monkeypatch_loggers 

128@Policies.gather_policies_to_node() 

129@operation 

130def create_for_components(**create_inputs): 

131 """Create step for service components 

132 

133 This interface is responsible for: 

134 

135 1. Generating service component name 

136 2. Populating config information into Consul 

137 """ 

138 _done_for_create( 

139 **_setup_for_discovery( 

140 **_enhance_docker_params( 

141 **_generate_component_name( 

142 **create_inputs)))) 

143 

144 

145def _parse_streams(**kwargs): 

146 """Parse streams and setup for DMaaP plugin""" 

147 # The DMaaP plugin requires this plugin to set the runtime properties 

148 # keyed by the node name. 

149 for stream in kwargs["streams_publishes"]: 

150 kwargs[stream["name"]] = stream 

151 

152 for stream in kwargs["streams_subscribes"]: 

153 if stream["type"] == "data_router": 

154 

155 # Don't want to mutate the source 

156 stream = copy.deepcopy(stream) 

157 

158 # Set up the delivery URL 

159 # Using service_component_name as the host name in the subscriber URL 

160 # will work in a single-cluster ONAP deployment. Whether it will also work 

161 # in a multi-cluster ONAP deployment--with a central location and one or 

162 # more remote ("edge") locations depends on how networking and DNS is set 

163 # up in a multi-cluster deployment 

164 service_component_name = kwargs["name"] 

165 ports, _ = k8sclient.parse_ports(kwargs["ports"]) 

166 dport, _ = ports[0] 

167 subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport) 

168 

169 scheme = stream.get("scheme", DEFAULT_SCHEME) 

170 if "route" not in stream: 170 ↛ 171line 170 didn't jump to line 171, because the condition on line 170 was never true

171 raise NonRecoverableError("'route' key missing from data router subscriber") 

172 path = stream["route"] 

173 stream["delivery_url"] = "{scheme}://{host}/{path}".format( 

174 scheme=scheme, host=subscriber_host, path=path) 

175 

176 # If username and password has not been provided then generate it. The 

177 # DMaaP plugin doesn't generate for subscribers. The generation code 

178 # and length of username password has been lifted from the DMaaP 

179 # plugin. 

180 if not stream.get("username", None): 

181 stream["username"] = utils.random_string(8) 

182 if not stream.get("password", None): 

183 stream["password"] = utils.random_string(10) 

184 

185 kwargs[stream["name"]] = stream 

186 

187 return kwargs 

188 

189@merge_inputs_for_create 

190@monkeypatch_loggers 

191@Policies.gather_policies_to_node() 

192@operation 

193def create_for_components_with_streams(**create_inputs): 

194 """Create step for service components that use DMaaP 

195 

196 This interface is responsible for: 

197 

198 1. Generating service component name 

199 2. Setup runtime properties for DMaaP plugin 

200 3. Populating application config into Consul 

201 """ 

202 _done_for_create( 

203 **_setup_for_discovery( 

204 **_parse_streams( 

205 **_enhance_docker_params( 

206 **_generate_component_name( 

207 **create_inputs))))) 

208 

209def _verify_k8s_deployment(location, service_component_name, max_wait): 

210 """Verify that the k8s Deployment is ready 

211 

212 Args: 

213 ----- 

214 location (string): location of the k8s cluster where the component was deployed 

215 service_component_name: component's service component name 

216 max_wait (integer): limit to how may attempts to make which translates to 

217 seconds because each sleep is one second. 0 means infinite. 

218 

219 Return: 

220 ------- 

221 True if deployment is ready within the maximum wait time, False otherwise 

222 """ 

223 num_attempts = 1 

224 

225 while True: 

226 if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name): 

227 return True 

228 else: 

229 num_attempts += 1 

230 

231 if max_wait > 0 and max_wait < num_attempts: 

232 return False 

233 

234 time.sleep(1) 

235 

236 return True 

237 

238def _fail_if_external_cert_incorrect(external_cert): 

239 if not (external_cert.get(EXT_CERT_DIR) 

240 and external_cert.get(EXT_CA_NAME) 

241 and external_cert.get(EXT_CERT_PARAMS) 

242 and external_cert.get(EXT_CERT_PARAMS).get(EXT_COMMON_NAME)): 

243 ctx.logger.error(EXT_CERT_ERROR_MESSAGE) 

244 raise NonRecoverableError(EXT_CERT_ERROR_MESSAGE) 

245 

246def _create_and_start_container(container_name, image, **kwargs): 

247 ''' 

248 This will create a k8s Deployment and, if needed, a k8s Service or two. 

249 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

250 We're not exposing k8s to the component developer and the blueprint author. 

251 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

252 the details from the component developer and the blueprint author.) 

253 

254 kwargs may have: 

255 - volumes: array of volume objects, where a volume object is: 

256 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

257 {'config_volume': {'name': 'myConfigMap'}, 'container': {'bind': '/path/on/config.yaml', 'mode': 'ro'} 

258 - ports: array of strings in the form "container_port:host_port" 

259 - envs: map of name-value pairs ( {name0: value0, name1: value1...} ) 

260 - always_pull: boolean. If true, sets image pull policy to "Always" 

261 so that a fresh copy of the image is always pull. Otherwise, sets 

262 image pull policy to "IfNotPresent" 

263 - log_info: an object with info for setting up ELK logging, with the form: 

264 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}" 

265 - tls_info: an object with information for setting up the component to act as a TLS server, with the form: 

266 {"use_tls" : true_or_false, "cert_directory": "/path/to/directory_where_certs_should_be_placed" } 

267 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

268 {"external_cert": 

269 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

270 "use_external_tls": true or false, 

271 "ca_name": "ca-name-value", 

272 "cert_type": "P12" or "JKS" or "PEM", 

273 "external_certificate_parameters": 

274 "common_name": "common-name-value", 

275 "sans": "sans-value"} 

276 - replicas: number of replicas to be launched initially 

277 - readiness: object with information needed to create a readiness check 

278 - liveness: object with information needed to create a liveness check 

279 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

280 ''' 

281 tls_info = kwargs.get("tls_info") or {} 

282 external_cert = kwargs.get("external_cert") 

283 if external_cert and external_cert.get("use_external_tls"): 

284 _fail_if_external_cert_incorrect(external_cert) 

285 cert_dir = tls_info.get("cert_directory") or COMPONENT_CERT_DIR 

286 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME, 

287 "CONFIG_BINDING_SERVICE": "config-binding-service", 

288 "DCAE_CA_CERTPATH" : "{0}/cacert.pem".format(cert_dir), 

289 "CBS_CONFIG_URL" : "{0}/{1}".format(CBS_BASE_URL, container_name) 

290 } 

291 env.update(kwargs.get("envs", {})) 

292 ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs)) 

293 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf)) 

294 replicas = kwargs.get("replicas", 1) 

295 resource_config = _get_resources(**kwargs) 

296 _, dep = k8sclient.deploy( 

297 ctx, 

298 DCAE_NAMESPACE, 

299 container_name, 

300 image, 

301 replicas=replicas, 

302 always_pull=kwargs.get("always_pull_image", False), 

303 k8sconfig=plugin_conf, 

304 resources=resource_config, 

305 volumes=kwargs.get("volumes", []), 

306 ports=kwargs.get("ports", []), 

307 tls_info=kwargs.get("tls_info"), 

308 external_cert=kwargs.get("external_cert"), 

309 env=env, 

310 labels=kwargs.get("labels", {}), 

311 log_info=kwargs.get("log_info"), 

312 readiness=kwargs.get("readiness"), 

313 liveness=kwargs.get("liveness"), 

314 k8s_location=kwargs.get("k8s_location")) 

315 

316 # Capture the result of deployment for future use 

317 ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep 

318 kwargs[K8S_DEPLOYMENT] = dep 

319 ctx.instance.runtime_properties["replicas"] = replicas 

320 ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep)) 

321 return kwargs 

322 

323def _parse_cloudify_context(**kwargs): 

324 """Parse Cloudify context 

325 

326 Extract what is needed. This is impure function because it requires ctx. 

327 """ 

328 kwargs["deployment_id"] = ctx.deployment.id 

329 

330 # Set some labels for the Kubernetes pods 

331 # The name segment is required and must be 63 characters or less 

332 kwargs["labels"] = { 

333 "cfydeployment" : ctx.deployment.id, 

334 "cfynode": ctx.node.name[:63], 

335 "cfynodeinstance": ctx.instance.id[:63] 

336 } 

337 

338 # Pick up the centralized logging info 

339 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]: 

340 kwargs["log_info"] = ctx.node.properties["log_info"] 

341 

342 # Pick up TLS info if present 

343 if "tls_info" in ctx.node.properties: 

344 kwargs["tls_info"] = ctx.node.properties["tls_info"] 

345 

346 # Pick up external TLS info if present 

347 if "external_cert" in ctx.node.properties: 

348 kwargs["external_cert"] = ctx.node.properties["external_cert"] 

349 

350 # Pick up replica count and always_pull_image flag 

351 if "replicas" in ctx.node.properties: 

352 kwargs["replicas"] = ctx.node.properties["replicas"] 

353 if "always_pull_image" in ctx.node.properties: 

354 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"] 

355 

356 # Pick up location 

357 kwargs["k8s_location"] = _get_location() 

358 

359 return kwargs 

360 

361def _enhance_docker_params(**kwargs): 

362 ''' 

363 Set up Docker environment variables and readiness/liveness check info 

364 and inject into kwargs. 

365 ''' 

366 

367 # Get info for setting up readiness/liveness probe, if present 

368 docker_config = kwargs.get("docker_config", {}) 

369 if "healthcheck" in docker_config: 369 ↛ 370line 369 didn't jump to line 370, because the condition on line 369 was never true

370 kwargs["readiness"] = docker_config["healthcheck"] 

371 if "livehealthcheck" in docker_config: 371 ↛ 372line 371 didn't jump to line 372, because the condition on line 371 was never true

372 kwargs["liveness"] = docker_config["livehealthcheck"] 

373 

374 envs = kwargs.get("envs", {}) 

375 

376 kwargs["envs"] = envs 

377 

378 def combine_params(key, docker_config, kwargs): 

379 v = docker_config.get(key, []) + kwargs.get(key, []) 

380 kwargs[key] = v 

381 return kwargs 

382 

383 # Add the lists of ports and volumes unintelligently - meaning just add the 

384 # lists together with no deduping. 

385 kwargs = combine_params("ports", docker_config, kwargs) 

386 kwargs = combine_params("volumes", docker_config, kwargs) 

387 

388 # Merge env vars from kwarg inputs and docker_config 

389 kwargs["envs"].update(docker_config.get("envs", {})) 

390 

391 

392 return kwargs 

393 

394def _create_and_start_component(**kwargs): 

395 """Create and start component (container)""" 

396 image = kwargs["image"] 

397 service_component_name = kwargs[SERVICE_COMPONENT_NAME] 

398 # Need to be picky and manually select out pieces because just using kwargs 

399 # which contains everything confused the execution of 

400 # _create_and_start_container because duplicate variables exist 

401 sub_kwargs = { 

402 "volumes": kwargs.get("volumes", []), 

403 "ports": kwargs.get("ports", None), 

404 "envs": kwargs.get("envs", {}), 

405 "log_info": kwargs.get("log_info", {}), 

406 "tls_info": kwargs.get("tls_info", {}), 

407 "external_cert": kwargs.get("external_cert", {}), 

408 "labels": kwargs.get("labels", {}), 

409 "resource_config": kwargs.get("resource_config",{}), 

410 "readiness": kwargs.get("readiness",{}), 

411 "liveness": kwargs.get("liveness",{}), 

412 "k8s_location": kwargs.get("k8s_location")} 

413 returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs) 

414 kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT] 

415 

416 return kwargs 

417 

418def _verify_component(**kwargs): 

419 """Verify deployment is ready""" 

420 service_component_name = kwargs[SERVICE_COMPONENT_NAME] 

421 

422 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) 

423 ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name)) 

424 

425 if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait): 

426 ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name)) 

427 else: 

428 # The component did not become ready within the "max_wait" interval. 

429 # Delete the k8s components created already and remove configuration from Consul. 

430 ctx.logger.error("k8s deployment never became ready for {0}".format(service_component_name)) 

431 if (K8S_DEPLOYMENT in kwargs) and (len(kwargs[K8S_DEPLOYMENT]["deployment"]) > 0): 

432 ctx.logger.info("attempting to delete k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT])) 

433 k8sclient.undeploy(kwargs[K8S_DEPLOYMENT]) 

434 ctx.logger.info("deleted k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT])) 

435 cleanup_discovery(**kwargs) 

436 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name)) 

437 

438 return kwargs 

439 

440def _done_for_start(**kwargs): 

441 ctx.instance.runtime_properties.update(kwargs) 

442 ctx.logger.info("Done starting: {0}".format(kwargs["name"])) 

443 return kwargs 

444 

445@wrap_error_handling_start 

446@merge_inputs_for_start 

447@monkeypatch_loggers 

448@operation 

449def create_and_start_container_for_components(**start_inputs): 

450 """Initiate Kubernetes deployment for service components 

451 

452 This operation method is to be used with the ContainerizedServiceComponent 

453 node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes 

454 that the app is up and responding successfully to readiness probes. 

455 """ 

456 _done_for_start( 

457 **_verify_component( 

458 **_create_and_start_component( 

459 **_parse_cloudify_context(**start_inputs)))) 

460 

461@wrap_error_handling_start 

462@monkeypatch_loggers 

463@operation 

464def create_and_start_container(**kwargs): 

465 """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type""" 

466 service_component_name = ctx.node.properties["name"] 

467 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name 

468 

469 image = ctx.node.properties["image"] 

470 kwargs["k8s_location"] = _get_location() 

471 

472 _create_and_start_container(service_component_name, image,**kwargs) 

473 

474@monkeypatch_loggers 

475@operation 

476def stop_and_remove_container(**kwargs): 

477 """Delete Kubernetes deployment""" 

478 if K8S_DEPLOYMENT in ctx.instance.runtime_properties: 

479 try: 

480 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT] 

481 k8sclient.undeploy(deployment_description) 

482 

483 except Exception as e: 

484 ctx.logger.error("Unexpected error while deleting k8s deployment: {0}" 

485 .format(str(e))) 

486 else: 

487 # A previous install workflow may have failed, 

488 # and no Kubernetes deployment info was recorded in runtime_properties. 

489 # No need to run the undeploy operation 

490 ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment") 

491 

492@wrap_error_handling_update 

493@monkeypatch_loggers 

494@operation 

495def scale(replicas, **kwargs): 

496 """Change number of replicas in the deployment""" 

497 service_component_name = ctx.instance.runtime_properties["service_component_name"] 

498 

499 if replicas > 0: 

500 current_replicas = ctx.instance.runtime_properties["replicas"] 

501 ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas)) 

502 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT] 

503 k8sclient.scale(deployment_description, replicas) 

504 ctx.instance.runtime_properties["replicas"] = replicas 

505 

506 # Verify that the scaling took place as expected 

507 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) 

508 ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name)) 

509 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait): 

510 ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas)) 

511 

512 else: 

513 ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name)) 

514 

515@wrap_error_handling_update 

516@monkeypatch_loggers 

517@operation 

518def update_image(image, **kwargs): 

519 """ Restart component with a new Docker image """ 

520 

521 service_component_name = ctx.instance.runtime_properties["service_component_name"] 

522 if image: 

523 current_image = ctx.instance.runtime_properties["image"] 

524 ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image)) 

525 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT] 

526 k8sclient.upgrade(deployment_description, image) 

527 ctx.instance.runtime_properties["image"] = image 

528 

529 # Verify that the update took place as expected 

530 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) 

531 ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name)) 

532 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait): 

533 ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image)) 

534 

535 else: 

536 ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image))) 

537 

538#TODO: implement rollback operation when kubernetes python client fix is available. 

539# (See comments in k8sclient.py.) 

540# In the meantime, it's possible to undo an update_image operation by doing a second 

541# update_image that specifies the older image. 

542 

543@monkeypatch_loggers 

544@Policies.cleanup_policies_on_node 

545@operation 

546def cleanup_discovery(**kwargs): 

547 """Delete configuration from Consul""" 

548 if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties: 

549 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] 

550 

551 try: 

552 conn = dis.create_kv_conn(CONSUL_HOST) 

553 dis.remove_service_component_config(conn, service_component_name) 

554 except dis.DiscoveryConnectionError as e: 

555 raise RecoverableError(e) 

556 else: 

557 # When another node in the blueprint fails install, 

558 # this node may not have generated a service component name. 

559 # There's nothing to delete from Consul. 

560 ctx.logger.info ("No service_component_name, not attempting to delete config from Consul") 

561 

562def _notify_container(**kwargs): 

563 """ 

564 Notify container using the policy section in the docker_config. 

565 Notification consists of running a script in the application container 

566 in each pod in the Kubernetes deployment associated with this node. 

567 Return the list of notification results. 

568 """ 

569 dc = kwargs["docker_config"] 

570 resp = [] 

571 

572 if "policy" in dc and dc["policy"].get("trigger_type") == "docker": 572 ↛ 575line 572 didn't jump to line 575, because the condition on line 572 was never true

573 # Build the command to execute in the container 

574 # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...} 

575 script_path = dc["policy"]["script_path"] 

576 policy_data = { 

577 "policies": kwargs["policies"], 

578 "updated_policies": kwargs["updated_policies"], 

579 "removed_policies": kwargs["removed_policies"] 

580 } 

581 

582 command = [script_path, "policies", json.dumps(policy_data)] 

583 

584 # Execute the command 

585 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT] 

586 resp = k8sclient.execute_command_in_deployment(deployment_description, command) 

587 

588 # else the default is no trigger 

589 

590 return resp 

591 

592@operation 

593@monkeypatch_loggers 

594@Policies.update_policies_on_node() 

595def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs): 

596 """Policy update task 

597 

598 This method is responsible for updating the application configuration and 

599 notifying the applications that the change has occurred. This is to be used 

600 for the dcae.interfaces.policy.policy_update operation. 

601 

602 :updated_policies: contains the list of changed policy-configs when configs_only=True 

603 (default) Use configs_only=False to bring the full policy objects in :updated_policies:. 

604 """ 

605 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] 

606 ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}" 

607 .format(service_component_name, updated_policies, removed_policies, policies)) 

608 update_inputs = copy.deepcopy(ctx.instance.runtime_properties) 

609 update_inputs["updated_policies"] = updated_policies 

610 update_inputs["removed_policies"] = removed_policies 

611 update_inputs["policies"] = policies 

612 

613 resp = _notify_container(**update_inputs) 

614 ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))