Coverage for dmaapplugin/mr_lifecycle.py : 67%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# ============LICENSE_START====================================================
2# org.onap.dcaegen2
3# =============================================================================
4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5# Copyright (c) 2020 Pantheon.tech. All rights reserved.
6# Modifications Copyright (c) 2021 Nordix Foundation.
7# =============================================================================
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19# ============LICENSE_END======================================================
21from cloudify import ctx
22from cloudify.decorators import operation
23from cloudify.exceptions import NonRecoverableError
24from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
25from dmaapplugin.dmaaputils import random_string
26from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
28# Lifecycle operations for DMaaP Message Router topics
29@operation
30def create_topic(**kwargs):
31 '''
32 Creates a message router topic.
33 Allows 'topic_name', 'topic_description', 'tnxEnabled', 'replication_case', 'global_mr_url',
34 and 'useExisting' as optional node properties. If 'topic_name' is not set,
35 generates a random one.
36 Sets 'fqtn' in the instance runtime_properties.
37 Note that 'tnxEnabled' is a Message Router flag indicating whether transactions
38 are enabled on the topic.
39 Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if
40 the topic already exists.
41 '''
42 try:
43 # Make sure there's a topic_name
44 if "topic_name" in ctx.node.properties:
45 topic_name = ctx.node.properties["topic_name"]
46 if topic_name == '' or topic_name.isspace():
47 topic_name = random_string(12)
48 else:
49 topic_name = random_string(12)
51 # Make sure there's a topic description
52 topic_description = ctx.node.properties.get("topic_description", "No description provided")
54 # ..and the truly optional setting
55 tnx_enabled = ctx.node.properties.get("tnxEnabled", False)
57 replication_case = ctx.node.properties.get("replication_case")
59 global_mr_url = ctx.node.properties.get("global_mr_url")
61 use_existing = ctx.node.properties.get("useExisting", False)
63 # Make the request to the controller
64 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
65 ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
66 t = dmc.create_topic(topic_name, topic_description, tnx_enabled, DMAAP_OWNER, replication_case, global_mr_url, use_existing)
67 t.raise_for_status()
69 # Capture important properties from the result
70 topic = t.json()
71 ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
73 except Exception as e:
74 ctx.logger.error("Error creating topic: {er}".format(er=e))
75 raise NonRecoverableError(e)
77@operation
78def get_existing_topic(**kwargs):
79 '''
80 Get data for an existing topic.
81 Expects 'fqtn' as a node property.
82 Copies this property to 'fqtn' in runtime properties for consistency
83 with a newly-created topic.
84 While there's no real need to make a call to the DMaaP bus controller,
85 we do so just to make sure the fqtn is known to the controller, so we
86 don't run into problems when we try to add a publisher or subscriber later.
87 '''
88 try:
89 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
90 fqtn_input = False
91 if "fqtn" in ctx.node.properties:
92 fqtn = ctx.node.properties["fqtn"]
93 fqtn_input = True
94 elif "topic_name" in ctx.node.properties:
95 topic_name = ctx.node.properties["topic_name"]
96 ctx.logger.info("Attempting to get fqtn for existing topic {0}".format(topic_name))
97 fqtn = dmc.get_topic_fqtn_by_name(topic_name)
98 if fqtn is None:
99 raise ValueError("Not find existing topic with name " + topic_name)
100 else:
101 ctx.logger.error("Not find existing topic with name {0}".format(topic_name))
102 raise ValueError("Either fqtn or topic_name must be defined to get existing topic")
104 ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn))
105 t = dmc.get_topic_info(fqtn)
106 t.raise_for_status()
108 ctx.instance.runtime_properties["fqtn"] = fqtn
110 except Exception as e:
111 ctx.logger.error("Error getting existing topic: {er}".format(er=e))
112 raise NonRecoverableError(e)
114@operation
115def delete_topic(**kwargs):
116 '''
117 Delete the topic. Expects the instance runtime property "fqtn" to have been
118 set when the topic was created.
119 '''
120 try:
121 fqtn = ctx.instance.runtime_properties["fqtn"]
122 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
123 ctx.logger.info("Attempting to delete topic {0}".format(fqtn))
124 t = dmc.delete_topic(fqtn)
125 t.raise_for_status()
127 except Exception as e:
128 ctx.logger.error("Error getting existing topic: {er}".format(er=e))
129 # don't raise a NonRecoverable error here--let the uninstall workflow continue