Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Copyright (c) 2020 Nokia. All rights reserved. 

7# ================================================================================ 

8# Licensed under the Apache License, Version 2.0 (the "License"); 

9# you may not use this file except in compliance with the License. 

10# You may obtain a copy of the License at 

11# 

12# http://www.apache.org/licenses/LICENSE-2.0 

13# 

14# Unless required by applicable law or agreed to in writing, software 

15# distributed under the License is distributed on an "AS IS" BASIS, 

16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19# ============LICENSE_END========================================================= 

20 

21# Lifecycle interface calls for containerized components 

22 

23# Needed by Cloudify Manager to load google.auth for the Kubernetes python client 

24from . import cloudify_importer 

25 

26import time, copy 

27import json 

28from cloudify import ctx 

29from cloudify.decorators import operation 

30from cloudify.exceptions import NonRecoverableError, RecoverableError 

31from onap_dcae_dcaepolicy_lib import Policies 

32from k8splugin import discovery as dis 

33from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \ 

34 merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update 

35from k8splugin.exceptions import DockerPluginDeploymentError 

36from k8splugin import utils 

37from configure import configure 

38import k8sclient 

39 

40# Get configuration 

41plugin_conf = configure.configure() 

42CONSUL_HOST = plugin_conf.get("consul_host") 

43CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name") 

44DCAE_NAMESPACE = plugin_conf.get("namespace") 

45DEFAULT_MAX_WAIT = plugin_conf.get("max_wait") 

46DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location") 

47COMPONENT_CERT_DIR = plugin_conf.get("tls",{}).get("component_cert_dir") 

48CBS_BASE_URL = plugin_conf.get("cbs").get("base_url") 

49 

50# Used to construct delivery urls for data router subscribers. Data router in FTL 

51# requires https but this author believes that ONAP is to be defaulted to http. 

52DEFAULT_SCHEME = "http" 

53 

54# Property keys 

55SERVICE_COMPONENT_NAME = "service_component_name" 

56CONTAINER_ID = "container_id" 

57APPLICATION_CONFIG = "application_config" 

58K8S_DEPLOYMENT = "k8s_deployment" 

59RESOURCE_KW = "resource_config" 

60LOCATION_ID = "location_id" 

61 

62# Utility methods 

63 

64# Lifecycle interface calls for dcae.nodes.DockerContainer 

65 

66def _setup_for_discovery(**kwargs): 

67 """Setup for config discovery""" 

68 try: 

69 name = kwargs['name'] 

70 application_config = kwargs[APPLICATION_CONFIG] 

71 

72 # NOTE: application_config is no longer a json string and is inputed as a 

73 # YAML map which translates to a dict. We don't have to do any 

74 # preprocessing anymore. 

75 conn = dis.create_kv_conn(CONSUL_HOST) 

76 dis.push_service_component_config(conn, name, application_config) 

77 return kwargs 

78 except dis.DiscoveryConnectionError as e: 78 ↛ 80line 78 didn't jump to line 80

79 raise RecoverableError(e) 

80 except Exception as e: 

81 ctx.logger.error("Unexpected error while pushing configuration: {0}" 

82 .format(str(e))) 

83 raise NonRecoverableError(e) 

84 

85def _generate_component_name(**kwargs): 

86 """Generate component name""" 

87 service_component_type = kwargs['service_component_type'] 

88 name_override = kwargs['service_component_name_override'] 

89 

90 kwargs['name'] = name_override if name_override \ 

91 else dis.generate_service_component_name(service_component_type) 

92 return kwargs 

93 

94def _done_for_create(**kwargs): 

95 """Wrap up create operation""" 

96 name = kwargs['name'] 

97 kwargs[SERVICE_COMPONENT_NAME] = name 

98 # All updates to the runtime_properties happens here. I don't see a reason 

99 # why we shouldn't do this because the context is not being mutated by 

100 # something else and will keep the other functions pure (pure in the sense 

101 # not dealing with CloudifyContext). 

102 ctx.instance.runtime_properties.update(kwargs) 

103 ctx.logger.info("Done setting up: {0}".format(name)) 

104 return kwargs 

105 

106def _get_resources(**kwargs): 

107 if kwargs is not None: 

108 ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW))) 

109 return kwargs.get(RESOURCE_KW) 

110 ctx.logger.info("set resources to None") 

111 return None 

112 

113def _get_location(): 

114 ''' Get the k8s location property. Set to the default if the property is missing, None, or zero-length ''' 

115 return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \ 

116 else DEFAULT_K8S_LOCATION 

117 

118@merge_inputs_for_create 

119@monkeypatch_loggers 

120@Policies.gather_policies_to_node() 

121@operation 

122def create_for_components(**create_inputs): 

123 """Create step for service components 

124 

125 This interface is responsible for: 

126 

127 1. Generating service component name 

128 2. Populating config information into Consul 

129 """ 

130 _done_for_create( 

131 **_setup_for_discovery( 

132 **_enhance_docker_params( 

133 **_generate_component_name( 

134 **create_inputs)))) 

135 

136 

137def _parse_streams(**kwargs): 

138 """Parse streams and setup for DMaaP plugin""" 

139 # The DMaaP plugin requires this plugin to set the runtime properties 

140 # keyed by the node name. 

141 for stream in kwargs["streams_publishes"]: 

142 kwargs[stream["name"]] = stream 

143 

144 for stream in kwargs["streams_subscribes"]: 

145 if stream["type"] == "data_router": 

146 

147 # Don't want to mutate the source 

148 stream = copy.deepcopy(stream) 

149 

150 # Set up the delivery URL 

151 # Using service_component_name as the host name in the subscriber URL 

152 # will work in a single-cluster ONAP deployment. Whether it will also work 

153 # in a multi-cluster ONAP deployment--with a central location and one or 

154 # more remote ("edge") locations depends on how networking and DNS is set 

155 # up in a multi-cluster deployment 

156 service_component_name = kwargs["name"] 

157 ports, _ = k8sclient.parse_ports(kwargs["ports"]) 

158 dport, _ = ports[0] 

159 subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport) 

160 

161 scheme = stream.get("scheme", DEFAULT_SCHEME) 

162 if "route" not in stream: 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true

163 raise NonRecoverableError("'route' key missing from data router subscriber") 

164 path = stream["route"] 

165 stream["delivery_url"] = "{scheme}://{host}/{path}".format( 

166 scheme=scheme, host=subscriber_host, path=path) 

167 

168 # If username and password has not been provided then generate it. The 

169 # DMaaP plugin doesn't generate for subscribers. The generation code 

170 # and length of username password has been lifted from the DMaaP 

171 # plugin. 

172 if not stream.get("username", None): 

173 stream["username"] = utils.random_string(8) 

174 if not stream.get("password", None): 

175 stream["password"] = utils.random_string(10) 

176 

177 kwargs[stream["name"]] = stream 

178 

179 return kwargs 

180 

181@merge_inputs_for_create 

182@monkeypatch_loggers 

183@Policies.gather_policies_to_node() 

184@operation 

185def create_for_components_with_streams(**create_inputs): 

186 """Create step for service components that use DMaaP 

187 

188 This interface is responsible for: 

189 

190 1. Generating service component name 

191 2. Setup runtime properties for DMaaP plugin 

192 3. Populating application config into Consul 

193 """ 

194 _done_for_create( 

195 **_setup_for_discovery( 

196 **_parse_streams( 

197 **_enhance_docker_params( 

198 **_generate_component_name( 

199 **create_inputs))))) 

200 

201def _verify_k8s_deployment(location, service_component_name, max_wait): 

202 """Verify that the k8s Deployment is ready 

203 

204 Args: 

205 ----- 

206 location (string): location of the k8s cluster where the component was deployed 

207 service_component_name: component's service component name 

208 max_wait (integer): limit to how may attempts to make which translates to 

209 seconds because each sleep is one second. 0 means infinite. 

210 

211 Return: 

212 ------- 

213 True if deployment is ready within the maximum wait time, False otherwise 

214 """ 

215 num_attempts = 1 

216 

217 while True: 

218 if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name): 

219 return True 

220 else: 

221 num_attempts += 1 

222 

223 if max_wait > 0 and max_wait < num_attempts: 

224 return False 

225 

226 time.sleep(1) 

227 

228 return True 

229 

230def _create_and_start_container(container_name, image, **kwargs): 

231 ''' 

232 This will create a k8s Deployment and, if needed, a k8s Service or two. 

233 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. 

234 We're not exposing k8s to the component developer and the blueprint author. 

235 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide 

236 the details from the component developer and the blueprint author.) 

237 

238 kwargs may have: 

239 - volumes: array of volume objects, where a volume object is: 

240 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} 

241 - ports: array of strings in the form "container_port:host_port" 

242 - envs: map of name-value pairs ( {name0: value0, name1: value1...} ) 

243 - always_pull: boolean. If true, sets image pull policy to "Always" 

244 so that a fresh copy of the image is always pull. Otherwise, sets 

245 image pull policy to "IfNotPresent" 

246 - log_info: an object with info for setting up ELK logging, with the form: 

247 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}" 

248 - tls_info: an object with information for setting up the component to act as a TLS server, with the form: 

249 {"use_tls" : true_or_false, "cert_directory": "/path/to/directory_where_certs_should_be_placed" } 

250 - external_cert: an object with information for setting up the init container for external certificates creation, with the form: 

251 {"external_cert": 

252 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed", 

253 "use_external_tls": true or false, 

254 "ca_name": "ca-name-value", 

255 "cert_type": "P12" or "JKS" or "PEM", 

256 "external_certificate_parameters": 

257 "common_name": "common-name-value", 

258 "sans": "sans-value"} 

259 - replicas: number of replicas to be launched initially 

260 - readiness: object with information needed to create a readiness check 

261 - liveness: object with information needed to create a liveness check 

262 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed 

263 ''' 

264 tls_info = kwargs.get("tls_info") or {} 

265 cert_dir = tls_info.get("cert_directory") or COMPONENT_CERT_DIR 

266 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME, 

267 "CONFIG_BINDING_SERVICE": "config-binding-service", 

268 "DCAE_CA_CERTPATH" : "{0}/cacert.pem".format(cert_dir), 

269 "CBS_CONFIG_URL" : "{0}/{1}".format(CBS_BASE_URL, container_name) 

270 } 

271 env.update(kwargs.get("envs", {})) 

272 ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs)) 

273 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf)) 

274 replicas = kwargs.get("replicas", 1) 

275 resource_config = _get_resources(**kwargs) 

276 _, dep = k8sclient.deploy(DCAE_NAMESPACE, 

277 container_name, 

278 image, 

279 replicas=replicas, 

280 always_pull=kwargs.get("always_pull_image", False), 

281 k8sconfig=plugin_conf, 

282 resources=resource_config, 

283 volumes=kwargs.get("volumes", []), 

284 ports=kwargs.get("ports", []), 

285 tls_info=kwargs.get("tls_info"), 

286 external_cert=kwargs.get("external_cert"), 

287 env=env, 

288 labels=kwargs.get("labels", {}), 

289 log_info=kwargs.get("log_info"), 

290 readiness=kwargs.get("readiness"), 

291 liveness=kwargs.get("liveness"), 

292 k8s_location=kwargs.get("k8s_location")) 

293 

294 # Capture the result of deployment for future use 

295 ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep 

296 kwargs[K8S_DEPLOYMENT] = dep 

297 ctx.instance.runtime_properties["replicas"] = replicas 

298 ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep)) 

299 return kwargs 

300 

301def _parse_cloudify_context(**kwargs): 

302 """Parse Cloudify context 

303 

304 Extract what is needed. This is impure function because it requires ctx. 

305 """ 

306 kwargs["deployment_id"] = ctx.deployment.id 

307 

308 # Set some labels for the Kubernetes pods 

309 # The name segment is required and must be 63 characters or less 

310 kwargs["labels"] = { 

311 "cfydeployment" : ctx.deployment.id, 

312 "cfynode": ctx.node.name[:63], 

313 "cfynodeinstance": ctx.instance.id[:63] 

314 } 

315 

316 # Pick up the centralized logging info 

317 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]: 

318 kwargs["log_info"] = ctx.node.properties["log_info"] 

319 

320 # Pick up TLS info if present 

321 if "tls_info" in ctx.node.properties: 

322 kwargs["tls_info"] = ctx.node.properties["tls_info"] 

323 

324 # Pick up external TLS info if present 

325 if "external_cert" in ctx.node.properties: 

326 kwargs["external_cert"] = ctx.node.properties["external_cert"] 

327 

328 # Pick up replica count and always_pull_image flag 

329 if "replicas" in ctx.node.properties: 

330 kwargs["replicas"] = ctx.node.properties["replicas"] 

331 if "always_pull_image" in ctx.node.properties: 

332 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"] 

333 

334 # Pick up location 

335 kwargs["k8s_location"] = _get_location() 

336 

337 return kwargs 

338 

339def _enhance_docker_params(**kwargs): 

340 ''' 

341 Set up Docker environment variables and readiness/liveness check info 

342 and inject into kwargs. 

343 ''' 

344 

345 # Get info for setting up readiness/liveness probe, if present 

346 docker_config = kwargs.get("docker_config", {}) 

347 if "healthcheck" in docker_config: 347 ↛ 348line 347 didn't jump to line 348, because the condition on line 347 was never true

348 kwargs["readiness"] = docker_config["healthcheck"] 

349 if "livehealthcheck" in docker_config: 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true

350 kwargs["liveness"] = docker_config["livehealthcheck"] 

351 

352 envs = kwargs.get("envs", {}) 

353 

354 kwargs["envs"] = envs 

355 

356 def combine_params(key, docker_config, kwargs): 

357 v = docker_config.get(key, []) + kwargs.get(key, []) 

358 kwargs[key] = v 

359 return kwargs 

360 

361 # Add the lists of ports and volumes unintelligently - meaning just add the 

362 # lists together with no deduping. 

363 kwargs = combine_params("ports", docker_config, kwargs) 

364 kwargs = combine_params("volumes", docker_config, kwargs) 

365 

366 # Merge env vars from kwarg inputs and docker_config 

367 kwargs["envs"].update(docker_config.get("envs", {})) 

368 

369 

370 return kwargs 

371 

372def _create_and_start_component(**kwargs): 

373 """Create and start component (container)""" 

374 image = kwargs["image"] 

375 service_component_name = kwargs[SERVICE_COMPONENT_NAME] 

376 # Need to be picky and manually select out pieces because just using kwargs 

377 # which contains everything confused the execution of 

378 # _create_and_start_container because duplicate variables exist 

379 sub_kwargs = { 

380 "volumes": kwargs.get("volumes", []), 

381 "ports": kwargs.get("ports", None), 

382 "envs": kwargs.get("envs", {}), 

383 "log_info": kwargs.get("log_info", {}), 

384 "tls_info": kwargs.get("tls_info", {}), 

385 "external_cert": kwargs.get("external_cert", {}), 

386 "labels": kwargs.get("labels", {}), 

387 "resource_config": kwargs.get("resource_config",{}), 

388 "readiness": kwargs.get("readiness",{}), 

389 "liveness": kwargs.get("liveness",{}), 

390 "k8s_location": kwargs.get("k8s_location")} 

391 returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs) 

392 kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT] 

393 

394 return kwargs 

395 

396def _verify_component(**kwargs): 

397 """Verify deployment is ready""" 

398 service_component_name = kwargs[SERVICE_COMPONENT_NAME] 

399 

400 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) 

401 ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name)) 

402 

403 if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait): 

404 ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name)) 

405 else: 

406 # The component did not become ready within the "max_wait" interval. 

407 # Delete the k8s components created already and remove configuration from Consul. 

408 ctx.logger.error("k8s deployment never became ready for {0}".format(service_component_name)) 

409 if (K8S_DEPLOYMENT in kwargs) and (len(kwargs[K8S_DEPLOYMENT]["deployment"]) > 0): 

410 ctx.logger.info("attempting to delete k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT])) 

411 k8sclient.undeploy(kwargs[K8S_DEPLOYMENT]) 

412 ctx.logger.info("deleted k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT])) 

413 cleanup_discovery(**kwargs) 

414 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name)) 

415 

416 return kwargs 

417 

418def _done_for_start(**kwargs): 

419 ctx.instance.runtime_properties.update(kwargs) 

420 ctx.logger.info("Done starting: {0}".format(kwargs["name"])) 

421 return kwargs 

422 

423@wrap_error_handling_start 

424@merge_inputs_for_start 

425@monkeypatch_loggers 

426@operation 

427def create_and_start_container_for_components(**start_inputs): 

428 """Initiate Kubernetes deployment for service components 

429 

430 This operation method is to be used with the ContainerizedServiceComponent 

431 node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes 

432 that the app is up and responding successfully to readiness probes. 

433 """ 

434 _done_for_start( 

435 **_verify_component( 

436 **_create_and_start_component( 

437 **_parse_cloudify_context(**start_inputs)))) 

438 

439@wrap_error_handling_start 

440@monkeypatch_loggers 

441@operation 

442def create_and_start_container(**kwargs): 

443 """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type""" 

444 service_component_name = ctx.node.properties["name"] 

445 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name 

446 

447 image = ctx.node.properties["image"] 

448 kwargs["k8s_location"] = _get_location() 

449 

450 _create_and_start_container(service_component_name, image,**kwargs) 

451 

452@monkeypatch_loggers 

453@operation 

454def stop_and_remove_container(**kwargs): 

455 """Delete Kubernetes deployment""" 

456 if K8S_DEPLOYMENT in ctx.instance.runtime_properties: 

457 try: 

458 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT] 

459 k8sclient.undeploy(deployment_description) 

460 

461 except Exception as e: 

462 ctx.logger.error("Unexpected error while deleting k8s deployment: {0}" 

463 .format(str(e))) 

464 else: 

465 # A previous install workflow may have failed, 

466 # and no Kubernetes deployment info was recorded in runtime_properties. 

467 # No need to run the undeploy operation 

468 ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment") 

469 

470@wrap_error_handling_update 

471@monkeypatch_loggers 

472@operation 

473def scale(replicas, **kwargs): 

474 """Change number of replicas in the deployment""" 

475 service_component_name = ctx.instance.runtime_properties["service_component_name"] 

476 

477 if replicas > 0: 

478 current_replicas = ctx.instance.runtime_properties["replicas"] 

479 ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas)) 

480 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT] 

481 k8sclient.scale(deployment_description, replicas) 

482 ctx.instance.runtime_properties["replicas"] = replicas 

483 

484 # Verify that the scaling took place as expected 

485 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) 

486 ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name)) 

487 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait): 

488 ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas)) 

489 

490 else: 

491 ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name)) 

492 

493@wrap_error_handling_update 

494@monkeypatch_loggers 

495@operation 

496def update_image(image, **kwargs): 

497 """ Restart component with a new Docker image """ 

498 

499 service_component_name = ctx.instance.runtime_properties["service_component_name"] 

500 if image: 

501 current_image = ctx.instance.runtime_properties["image"] 

502 ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image)) 

503 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT] 

504 k8sclient.upgrade(deployment_description, image) 

505 ctx.instance.runtime_properties["image"] = image 

506 

507 # Verify that the update took place as expected 

508 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) 

509 ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name)) 

510 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait): 

511 ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image)) 

512 

513 else: 

514 ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image))) 

515 

516#TODO: implement rollback operation when kubernetes python client fix is available. 

517# (See comments in k8sclient.py.) 

518# In the meantime, it's possible to undo an update_image operation by doing a second 

519# update_image that specifies the older image. 

520 

521@monkeypatch_loggers 

522@Policies.cleanup_policies_on_node 

523@operation 

524def cleanup_discovery(**kwargs): 

525 """Delete configuration from Consul""" 

526 if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties: 

527 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] 

528 

529 try: 

530 conn = dis.create_kv_conn(CONSUL_HOST) 

531 dis.remove_service_component_config(conn, service_component_name) 

532 except dis.DiscoveryConnectionError as e: 

533 raise RecoverableError(e) 

534 else: 

535 # When another node in the blueprint fails install, 

536 # this node may not have generated a service component name. 

537 # There's nothing to delete from Consul. 

538 ctx.logger.info ("No service_component_name, not attempting to delete config from Consul") 

539 

540def _notify_container(**kwargs): 

541 """ 

542 Notify container using the policy section in the docker_config. 

543 Notification consists of running a script in the application container 

544 in each pod in the Kubernetes deployment associated with this node. 

545 Return the list of notification results. 

546 """ 

547 dc = kwargs["docker_config"] 

548 resp = [] 

549 

550 if "policy" in dc and dc["policy"].get("trigger_type") == "docker": 550 ↛ 553line 550 didn't jump to line 553, because the condition on line 550 was never true

551 # Build the command to execute in the container 

552 # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...} 

553 script_path = dc["policy"]["script_path"] 

554 policy_data = { 

555 "policies": kwargs["policies"], 

556 "updated_policies": kwargs["updated_policies"], 

557 "removed_policies": kwargs["removed_policies"] 

558 } 

559 

560 command = [script_path, "policies", json.dumps(policy_data)] 

561 

562 # Execute the command 

563 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT] 

564 resp = k8sclient.execute_command_in_deployment(deployment_description, command) 

565 

566 # else the default is no trigger 

567 

568 return resp 

569 

570@operation 

571@monkeypatch_loggers 

572@Policies.update_policies_on_node() 

573def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs): 

574 """Policy update task 

575 

576 This method is responsible for updating the application configuration and 

577 notifying the applications that the change has occurred. This is to be used 

578 for the dcae.interfaces.policy.policy_update operation. 

579 

580 :updated_policies: contains the list of changed policy-configs when configs_only=True 

581 (default) Use configs_only=False to bring the full policy objects in :updated_policies:. 

582 """ 

583 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] 

584 ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}" 

585 .format(service_component_name, updated_policies, removed_policies, policies)) 

586 update_inputs = copy.deepcopy(ctx.instance.runtime_properties) 

587 update_inputs["updated_policies"] = updated_policies 

588 update_inputs["removed_policies"] = removed_policies 

589 update_inputs["policies"] = policies 

590 

591 resp = _notify_container(**update_inputs) 

592 ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))