Coverage for dmaapplugin/mr_lifecycle.py : 65%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# ============LICENSE_START====================================================
2# org.onap.dcaegen2
3# =============================================================================
4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5# Copyright (c) 2020 Pantheon.tech. All rights reserved.
6# =============================================================================
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18# ============LICENSE_END======================================================
20from cloudify import ctx
21from cloudify.decorators import operation
22from cloudify.exceptions import NonRecoverableError
23from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
24from dmaapplugin.dmaaputils import random_string
25from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
27# Lifecycle operations for DMaaP Message Router topics
28@operation
29def create_topic(**kwargs):
30 '''
31 Creates a message router topic.
32 Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url',
33 and 'useExisting' as optional node properties. If 'topic_name' is not set,
34 generates a random one.
35 Sets 'fqtn' in the instance runtime_properties.
36 Note that 'txenable' is a Message Router flag indicating whether transactions
37 are enabled on the topic.
38 Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if
39 the topic already exists.
40 '''
41 try:
42 # Make sure there's a topic_name
43 if "topic_name" in ctx.node.properties:
44 topic_name = ctx.node.properties["topic_name"]
45 if topic_name == '' or topic_name.isspace():
46 topic_name = random_string(12)
47 else:
48 topic_name = random_string(12)
50 # Make sure there's a topic description
51 if "topic_description" in ctx.node.properties:
52 topic_description = ctx.node.properties["topic_description"]
53 else:
54 topic_description = "No description provided"
56 # ..and the truly optional setting
57 if "txenable" in ctx.node.properties:
58 txenable = ctx.node.properties["txenable"]
59 else:
60 txenable= False
62 if "replication_case" in ctx.node.properties:
63 replication_case = ctx.node.properties["replication_case"]
64 else:
65 replication_case = None
67 if "global_mr_url" in ctx.node.properties:
68 global_mr_url = ctx.node.properties["global_mr_url"]
69 else:
70 global_mr_url = None
72 if "useExisting" in ctx.node.properties:
73 useExisting = ctx.node.properties["useExisting"]
74 else:
75 useExisting = False
77 # Make the request to the controller
78 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
79 ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
80 t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting)
81 t.raise_for_status()
83 # Capture important properties from the result
84 topic = t.json()
85 ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
87 except Exception as e:
88 ctx.logger.error("Error creating topic: {er}".format(er=e))
89 raise NonRecoverableError(e)
91@operation
92def get_existing_topic(**kwargs):
93 '''
94 Get data for an existing topic.
95 Expects 'fqtn' as a node property.
96 Copies this property to 'fqtn' in runtime properties for consistency
97 with a newly-created topic.
98 While there's no real need to make a call to the DMaaP bus controller,
99 we do so just to make sure the fqtn is known to the controller, so we
100 don't run into problems when we try to add a publisher or subscriber later.
101 '''
102 try:
103 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
104 fqtn_input = False
105 if "fqtn" in ctx.node.properties:
106 fqtn = ctx.node.properties["fqtn"]
107 fqtn_input = True
108 elif "topic_name" in ctx.node.properties:
109 topic_name = ctx.node.properties["topic_name"]
110 ctx.logger.info("Attempting to get fqtn for existing topic {0}".format(topic_name))
111 fqtn = dmc.get_topic_fqtn_by_name(topic_name)
112 if fqtn is None:
113 raise ValueError("Not find existing topic with name " + topic_name)
114 else:
115 ctx.logger.error("Not find existing topic with name {0}".format(topic_name))
116 raise ValueError("Either fqtn or topic_name must be defined to get existing topic")
118 ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn))
119 t = dmc.get_topic_info(fqtn)
120 t.raise_for_status()
122 ctx.instance.runtime_properties["fqtn"] = fqtn
124 except Exception as e:
125 ctx.logger.error("Error getting existing topic: {er}".format(er=e))
126 raise NonRecoverableError(e)
128@operation
129def delete_topic(**kwargs):
130 '''
131 Delete the topic. Expects the instance runtime property "fqtn" to have been
132 set when the topic was created.
133 '''
134 try:
135 fqtn = ctx.instance.runtime_properties["fqtn"]
136 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
137 ctx.logger.info("Attempting to delete topic {0}".format(fqtn))
138 t = dmc.delete_topic(fqtn)
139 t.raise_for_status()
141 except Exception as e:
142 ctx.logger.error("Error getting existing topic: {er}".format(er=e))
143 # don't raise a NonRecoverable error here--let the uninstall workflow continue