Coverage for k8splugin/tasks.py : 47%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# ============LICENSE_START=======================================================
2# org.onap.dcae
3# ================================================================================
4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5# Copyright (c) 2020 Pantheon.tech. All rights reserved.
6# Copyright (c) 2020 Nokia. All rights reserved.
7# ================================================================================
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19# ============LICENSE_END=========================================================
21# Lifecycle interface calls for containerized components
23# Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24from . import cloudify_importer
26import time, copy
27import json
28from cloudify import ctx
29from cloudify.decorators import operation
30from cloudify.exceptions import NonRecoverableError, RecoverableError
31from onap_dcae_dcaepolicy_lib import Policies
32from k8splugin import discovery as dis
33from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
34 merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
35from k8splugin.exceptions import DockerPluginDeploymentError
36from k8splugin import utils
37from configure import configure
38import k8sclient
40# Get configuration
41plugin_conf = configure.configure()
42CONSUL_HOST = plugin_conf.get("consul_host")
43CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
44DCAE_NAMESPACE = plugin_conf.get("namespace")
45DEFAULT_MAX_WAIT = plugin_conf.get("max_wait")
46DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location")
47COMPONENT_CERT_DIR = plugin_conf.get("tls",{}).get("component_cert_dir")
48CBS_BASE_URL = plugin_conf.get("cbs").get("base_url")
50# Used to construct delivery urls for data router subscribers. Data router in FTL
51# requires https but this author believes that ONAP is to be defaulted to http.
52DEFAULT_SCHEME = "http"
54# Property keys
55SERVICE_COMPONENT_NAME = "service_component_name"
56CONTAINER_ID = "container_id"
57APPLICATION_CONFIG = "application_config"
58K8S_DEPLOYMENT = "k8s_deployment"
59RESOURCE_KW = "resource_config"
60LOCATION_ID = "location_id"
62# External cert parameters
63EXT_CERT_DIR = "external_cert_directory"
64EXT_CA_NAME = "ca_name"
65EXT_CERT_PARAMS = "external_certificate_parameters"
66EXT_COMMON_NAME = "common_name"
67EXT_CERT_ERROR_MESSAGE = "Provided blueprint is incorrect. It specifies external_cert without all the required parameters. " \
68 "Required parameters are: {0}, {1}, {2}.{3}".format(EXT_CERT_DIR, EXT_CA_NAME, EXT_CERT_PARAMS, EXT_COMMON_NAME)
70# Utility methods
72# Lifecycle interface calls for dcae.nodes.DockerContainer
74def _setup_for_discovery(**kwargs):
75 """Setup for config discovery"""
76 try:
77 name = kwargs['name']
78 application_config = kwargs[APPLICATION_CONFIG]
80 # NOTE: application_config is no longer a json string and is inputed as a
81 # YAML map which translates to a dict. We don't have to do any
82 # preprocessing anymore.
83 conn = dis.create_kv_conn(CONSUL_HOST)
84 dis.push_service_component_config(conn, name, application_config)
85 return kwargs
86 except dis.DiscoveryConnectionError as e: 86 ↛ 88line 86 didn't jump to line 88
87 raise RecoverableError(e)
88 except Exception as e:
89 ctx.logger.error("Unexpected error while pushing configuration: {0}"
90 .format(str(e)))
91 raise NonRecoverableError(e)
93def _generate_component_name(**kwargs):
94 """Generate component name"""
95 service_component_type = kwargs['service_component_type']
96 name_override = kwargs['service_component_name_override']
98 kwargs['name'] = name_override if name_override \
99 else dis.generate_service_component_name(service_component_type)
100 return kwargs
102def _done_for_create(**kwargs):
103 """Wrap up create operation"""
104 name = kwargs['name']
105 kwargs[SERVICE_COMPONENT_NAME] = name
106 # All updates to the runtime_properties happens here. I don't see a reason
107 # why we shouldn't do this because the context is not being mutated by
108 # something else and will keep the other functions pure (pure in the sense
109 # not dealing with CloudifyContext).
110 ctx.instance.runtime_properties.update(kwargs)
111 ctx.logger.info("Done setting up: {0}".format(name))
112 return kwargs
114def _get_resources(**kwargs):
115 if kwargs is not None:
116 ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW)))
117 return kwargs.get(RESOURCE_KW)
118 ctx.logger.info("set resources to None")
119 return None
121def _get_location():
122 ''' Get the k8s location property. Set to the default if the property is missing, None, or zero-length '''
123 return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \
124 else DEFAULT_K8S_LOCATION
126@merge_inputs_for_create
127@monkeypatch_loggers
128@Policies.gather_policies_to_node()
129@operation
130def create_for_components(**create_inputs):
131 """Create step for service components
133 This interface is responsible for:
135 1. Generating service component name
136 2. Populating config information into Consul
137 """
138 _done_for_create(
139 **_setup_for_discovery(
140 **_enhance_docker_params(
141 **_generate_component_name(
142 **create_inputs))))
145def _parse_streams(**kwargs):
146 """Parse streams and setup for DMaaP plugin"""
147 # The DMaaP plugin requires this plugin to set the runtime properties
148 # keyed by the node name.
149 for stream in kwargs["streams_publishes"]:
150 kwargs[stream["name"]] = stream
152 for stream in kwargs["streams_subscribes"]:
153 if stream["type"] == "data_router":
155 # Don't want to mutate the source
156 stream = copy.deepcopy(stream)
158 # Set up the delivery URL
159 # Using service_component_name as the host name in the subscriber URL
160 # will work in a single-cluster ONAP deployment. Whether it will also work
161 # in a multi-cluster ONAP deployment--with a central location and one or
162 # more remote ("edge") locations depends on how networking and DNS is set
163 # up in a multi-cluster deployment
164 service_component_name = kwargs["name"]
165 ports, _ = k8sclient.parse_ports(kwargs["ports"])
166 dport, _ = ports[0]
167 subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
169 scheme = stream.get("scheme", DEFAULT_SCHEME)
170 if "route" not in stream: 170 ↛ 171line 170 didn't jump to line 171, because the condition on line 170 was never true
171 raise NonRecoverableError("'route' key missing from data router subscriber")
172 path = stream["route"]
173 stream["delivery_url"] = "{scheme}://{host}/{path}".format(
174 scheme=scheme, host=subscriber_host, path=path)
176 # If username and password has not been provided then generate it. The
177 # DMaaP plugin doesn't generate for subscribers. The generation code
178 # and length of username password has been lifted from the DMaaP
179 # plugin.
180 if not stream.get("username", None):
181 stream["username"] = utils.random_string(8)
182 if not stream.get("password", None):
183 stream["password"] = utils.random_string(10)
185 kwargs[stream["name"]] = stream
187 return kwargs
189@merge_inputs_for_create
190@monkeypatch_loggers
191@Policies.gather_policies_to_node()
192@operation
193def create_for_components_with_streams(**create_inputs):
194 """Create step for service components that use DMaaP
196 This interface is responsible for:
198 1. Generating service component name
199 2. Setup runtime properties for DMaaP plugin
200 3. Populating application config into Consul
201 """
202 _done_for_create(
203 **_setup_for_discovery(
204 **_parse_streams(
205 **_enhance_docker_params(
206 **_generate_component_name(
207 **create_inputs)))))
209def _verify_k8s_deployment(location, service_component_name, max_wait):
210 """Verify that the k8s Deployment is ready
212 Args:
213 -----
214 location (string): location of the k8s cluster where the component was deployed
215 service_component_name: component's service component name
216 max_wait (integer): limit to how may attempts to make which translates to
217 seconds because each sleep is one second. 0 means infinite.
219 Return:
220 -------
221 True if deployment is ready within the maximum wait time, False otherwise
222 """
223 num_attempts = 1
225 while True:
226 if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name):
227 return True
228 else:
229 num_attempts += 1
231 if max_wait > 0 and max_wait < num_attempts:
232 return False
234 time.sleep(1)
236 return True
238def _fail_if_external_cert_incorrect(external_cert):
239 if not (external_cert.get(EXT_CERT_DIR)
240 and external_cert.get(EXT_CA_NAME)
241 and external_cert.get(EXT_CERT_PARAMS)
242 and external_cert.get(EXT_CERT_PARAMS).get(EXT_COMMON_NAME)):
243 ctx.logger.error(EXT_CERT_ERROR_MESSAGE)
244 raise NonRecoverableError(EXT_CERT_ERROR_MESSAGE)
246def _create_and_start_container(container_name, image, **kwargs):
247 '''
248 This will create a k8s Deployment and, if needed, a k8s Service or two.
249 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
250 We're not exposing k8s to the component developer and the blueprint author.
251 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
252 the details from the component developer and the blueprint author.)
254 kwargs may have:
255 - volumes: array of volume objects, where a volume object is:
256 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
257 - ports: array of strings in the form "container_port:host_port"
258 - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
259 - always_pull: boolean. If true, sets image pull policy to "Always"
260 so that a fresh copy of the image is always pull. Otherwise, sets
261 image pull policy to "IfNotPresent"
262 - log_info: an object with info for setting up ELK logging, with the form:
263 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
264 - tls_info: an object with information for setting up the component to act as a TLS server, with the form:
265 {"use_tls" : true_or_false, "cert_directory": "/path/to/directory_where_certs_should_be_placed" }
266 - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
267 {"external_cert":
268 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
269 "use_external_tls": true or false,
270 "ca_name": "ca-name-value",
271 "cert_type": "P12" or "JKS" or "PEM",
272 "external_certificate_parameters":
273 "common_name": "common-name-value",
274 "sans": "sans-value"}
275 - replicas: number of replicas to be launched initially
276 - readiness: object with information needed to create a readiness check
277 - liveness: object with information needed to create a liveness check
278 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
279 '''
280 tls_info = kwargs.get("tls_info") or {}
281 external_cert = kwargs.get("external_cert")
282 if external_cert and external_cert.get("use_external_tls"):
283 _fail_if_external_cert_incorrect(external_cert)
284 cert_dir = tls_info.get("cert_directory") or COMPONENT_CERT_DIR
285 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
286 "CONFIG_BINDING_SERVICE": "config-binding-service",
287 "DCAE_CA_CERTPATH" : "{0}/cacert.pem".format(cert_dir),
288 "CBS_CONFIG_URL" : "{0}/{1}".format(CBS_BASE_URL, container_name)
289 }
290 env.update(kwargs.get("envs", {}))
291 ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
292 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
293 replicas = kwargs.get("replicas", 1)
294 resource_config = _get_resources(**kwargs)
295 _, dep = k8sclient.deploy(
296 ctx,
297 DCAE_NAMESPACE,
298 container_name,
299 image,
300 replicas=replicas,
301 always_pull=kwargs.get("always_pull_image", False),
302 k8sconfig=plugin_conf,
303 resources=resource_config,
304 volumes=kwargs.get("volumes", []),
305 ports=kwargs.get("ports", []),
306 tls_info=kwargs.get("tls_info"),
307 external_cert=kwargs.get("external_cert"),
308 env=env,
309 labels=kwargs.get("labels", {}),
310 log_info=kwargs.get("log_info"),
311 readiness=kwargs.get("readiness"),
312 liveness=kwargs.get("liveness"),
313 k8s_location=kwargs.get("k8s_location"))
315 # Capture the result of deployment for future use
316 ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
317 kwargs[K8S_DEPLOYMENT] = dep
318 ctx.instance.runtime_properties["replicas"] = replicas
319 ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
320 return kwargs
322def _parse_cloudify_context(**kwargs):
323 """Parse Cloudify context
325 Extract what is needed. This is impure function because it requires ctx.
326 """
327 kwargs["deployment_id"] = ctx.deployment.id
329 # Set some labels for the Kubernetes pods
330 # The name segment is required and must be 63 characters or less
331 kwargs["labels"] = {
332 "cfydeployment" : ctx.deployment.id,
333 "cfynode": ctx.node.name[:63],
334 "cfynodeinstance": ctx.instance.id[:63]
335 }
337 # Pick up the centralized logging info
338 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
339 kwargs["log_info"] = ctx.node.properties["log_info"]
341 # Pick up TLS info if present
342 if "tls_info" in ctx.node.properties:
343 kwargs["tls_info"] = ctx.node.properties["tls_info"]
345 # Pick up external TLS info if present
346 if "external_cert" in ctx.node.properties:
347 kwargs["external_cert"] = ctx.node.properties["external_cert"]
349 # Pick up replica count and always_pull_image flag
350 if "replicas" in ctx.node.properties:
351 kwargs["replicas"] = ctx.node.properties["replicas"]
352 if "always_pull_image" in ctx.node.properties:
353 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
355 # Pick up location
356 kwargs["k8s_location"] = _get_location()
358 return kwargs
360def _enhance_docker_params(**kwargs):
361 '''
362 Set up Docker environment variables and readiness/liveness check info
363 and inject into kwargs.
364 '''
366 # Get info for setting up readiness/liveness probe, if present
367 docker_config = kwargs.get("docker_config", {})
368 if "healthcheck" in docker_config: 368 ↛ 369line 368 didn't jump to line 369, because the condition on line 368 was never true
369 kwargs["readiness"] = docker_config["healthcheck"]
370 if "livehealthcheck" in docker_config: 370 ↛ 371line 370 didn't jump to line 371, because the condition on line 370 was never true
371 kwargs["liveness"] = docker_config["livehealthcheck"]
373 envs = kwargs.get("envs", {})
375 kwargs["envs"] = envs
377 def combine_params(key, docker_config, kwargs):
378 v = docker_config.get(key, []) + kwargs.get(key, [])
379 kwargs[key] = v
380 return kwargs
382 # Add the lists of ports and volumes unintelligently - meaning just add the
383 # lists together with no deduping.
384 kwargs = combine_params("ports", docker_config, kwargs)
385 kwargs = combine_params("volumes", docker_config, kwargs)
387 # Merge env vars from kwarg inputs and docker_config
388 kwargs["envs"].update(docker_config.get("envs", {}))
391 return kwargs
393def _create_and_start_component(**kwargs):
394 """Create and start component (container)"""
395 image = kwargs["image"]
396 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
397 # Need to be picky and manually select out pieces because just using kwargs
398 # which contains everything confused the execution of
399 # _create_and_start_container because duplicate variables exist
400 sub_kwargs = {
401 "volumes": kwargs.get("volumes", []),
402 "ports": kwargs.get("ports", None),
403 "envs": kwargs.get("envs", {}),
404 "log_info": kwargs.get("log_info", {}),
405 "tls_info": kwargs.get("tls_info", {}),
406 "external_cert": kwargs.get("external_cert", {}),
407 "labels": kwargs.get("labels", {}),
408 "resource_config": kwargs.get("resource_config",{}),
409 "readiness": kwargs.get("readiness",{}),
410 "liveness": kwargs.get("liveness",{}),
411 "k8s_location": kwargs.get("k8s_location")}
412 returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs)
413 kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT]
415 return kwargs
417def _verify_component(**kwargs):
418 """Verify deployment is ready"""
419 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
421 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
422 ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
424 if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait):
425 ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
426 else:
427 # The component did not become ready within the "max_wait" interval.
428 # Delete the k8s components created already and remove configuration from Consul.
429 ctx.logger.error("k8s deployment never became ready for {0}".format(service_component_name))
430 if (K8S_DEPLOYMENT in kwargs) and (len(kwargs[K8S_DEPLOYMENT]["deployment"]) > 0):
431 ctx.logger.info("attempting to delete k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
432 k8sclient.undeploy(kwargs[K8S_DEPLOYMENT])
433 ctx.logger.info("deleted k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
434 cleanup_discovery(**kwargs)
435 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
437 return kwargs
439def _done_for_start(**kwargs):
440 ctx.instance.runtime_properties.update(kwargs)
441 ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
442 return kwargs
444@wrap_error_handling_start
445@merge_inputs_for_start
446@monkeypatch_loggers
447@operation
448def create_and_start_container_for_components(**start_inputs):
449 """Initiate Kubernetes deployment for service components
451 This operation method is to be used with the ContainerizedServiceComponent
452 node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
453 that the app is up and responding successfully to readiness probes.
454 """
455 _done_for_start(
456 **_verify_component(
457 **_create_and_start_component(
458 **_parse_cloudify_context(**start_inputs))))
460@wrap_error_handling_start
461@monkeypatch_loggers
462@operation
463def create_and_start_container(**kwargs):
464 """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
465 service_component_name = ctx.node.properties["name"]
466 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
468 image = ctx.node.properties["image"]
469 kwargs["k8s_location"] = _get_location()
471 _create_and_start_container(service_component_name, image,**kwargs)
473@monkeypatch_loggers
474@operation
475def stop_and_remove_container(**kwargs):
476 """Delete Kubernetes deployment"""
477 if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
478 try:
479 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
480 k8sclient.undeploy(deployment_description)
482 except Exception as e:
483 ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
484 .format(str(e)))
485 else:
486 # A previous install workflow may have failed,
487 # and no Kubernetes deployment info was recorded in runtime_properties.
488 # No need to run the undeploy operation
489 ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
491@wrap_error_handling_update
492@monkeypatch_loggers
493@operation
494def scale(replicas, **kwargs):
495 """Change number of replicas in the deployment"""
496 service_component_name = ctx.instance.runtime_properties["service_component_name"]
498 if replicas > 0:
499 current_replicas = ctx.instance.runtime_properties["replicas"]
500 ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
501 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
502 k8sclient.scale(deployment_description, replicas)
503 ctx.instance.runtime_properties["replicas"] = replicas
505 # Verify that the scaling took place as expected
506 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
507 ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
508 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
509 ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
511 else:
512 ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
514@wrap_error_handling_update
515@monkeypatch_loggers
516@operation
517def update_image(image, **kwargs):
518 """ Restart component with a new Docker image """
520 service_component_name = ctx.instance.runtime_properties["service_component_name"]
521 if image:
522 current_image = ctx.instance.runtime_properties["image"]
523 ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
524 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
525 k8sclient.upgrade(deployment_description, image)
526 ctx.instance.runtime_properties["image"] = image
528 # Verify that the update took place as expected
529 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
530 ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
531 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
532 ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
534 else:
535 ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
537#TODO: implement rollback operation when kubernetes python client fix is available.
538# (See comments in k8sclient.py.)
539# In the meantime, it's possible to undo an update_image operation by doing a second
540# update_image that specifies the older image.
542@monkeypatch_loggers
543@Policies.cleanup_policies_on_node
544@operation
545def cleanup_discovery(**kwargs):
546 """Delete configuration from Consul"""
547 if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
548 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
550 try:
551 conn = dis.create_kv_conn(CONSUL_HOST)
552 dis.remove_service_component_config(conn, service_component_name)
553 except dis.DiscoveryConnectionError as e:
554 raise RecoverableError(e)
555 else:
556 # When another node in the blueprint fails install,
557 # this node may not have generated a service component name.
558 # There's nothing to delete from Consul.
559 ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
561def _notify_container(**kwargs):
562 """
563 Notify container using the policy section in the docker_config.
564 Notification consists of running a script in the application container
565 in each pod in the Kubernetes deployment associated with this node.
566 Return the list of notification results.
567 """
568 dc = kwargs["docker_config"]
569 resp = []
571 if "policy" in dc and dc["policy"].get("trigger_type") == "docker": 571 ↛ 574line 571 didn't jump to line 574, because the condition on line 571 was never true
572 # Build the command to execute in the container
573 # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
574 script_path = dc["policy"]["script_path"]
575 policy_data = {
576 "policies": kwargs["policies"],
577 "updated_policies": kwargs["updated_policies"],
578 "removed_policies": kwargs["removed_policies"]
579 }
581 command = [script_path, "policies", json.dumps(policy_data)]
583 # Execute the command
584 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
585 resp = k8sclient.execute_command_in_deployment(deployment_description, command)
587 # else the default is no trigger
589 return resp
591@operation
592@monkeypatch_loggers
593@Policies.update_policies_on_node()
594def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
595 """Policy update task
597 This method is responsible for updating the application configuration and
598 notifying the applications that the change has occurred. This is to be used
599 for the dcae.interfaces.policy.policy_update operation.
601 :updated_policies: contains the list of changed policy-configs when configs_only=True
602 (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
603 """
604 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
605 ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
606 .format(service_component_name, updated_policies, removed_policies, policies))
607 update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
608 update_inputs["updated_policies"] = updated_policies
609 update_inputs["removed_policies"] = removed_policies
610 update_inputs["policies"] = policies
612 resp = _notify_container(**update_inputs)
613 ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))