Coverage for dmaapcontrollerif/dmaap_requests.py : 65%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# ============LICENSE_START====================================================
2# org.onap.dcaegen2
3# =============================================================================
4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5# Copyright (c) 2020 Pantheon.tech. All rights reserved.
6# Modifications Copyright (c) 2021 Nordix Foundation.
7# =============================================================================
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19# ============LICENSE_END======================================================
21import requests
23### "Constants"
24FEEDS_PATH = '/feeds'
25PUBS_PATH = '/dr_pubs'
26SUBS_PATH = '/dr_subs'
27TOPICS_PATH = '/topics'
28CLIENTS_PATH = '/mr_clients'
29LOCATIONS_PATH = '/dcaeLocations'
31class DMaaPControllerHandle(object):
32 '''
33 A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module
34 '''
36 def __init__(self, api_url, user, password, logger,
37 feeds_path = FEEDS_PATH,
38 pubs_path = PUBS_PATH,
39 subs_path = SUBS_PATH,
40 topics_path = TOPICS_PATH,
41 clients_path = CLIENTS_PATH):
42 '''
43 Constructor
44 '''
45 self.api_url = api_url # URL for the root of the Controller resource tree, no trailing "/"
46 self.auth = (user, password) # user name and password for HTTP basic auth
47 self.logger = logger
48 self.feeds_path = feeds_path
49 self.pubs_path = pubs_path
50 self.subs_path = subs_path
51 self.topics_path = topics_path
52 self.clients_path = clients_path
55 ### INTERNAL FUNCTIONS ###
57 def _make_url(self, path):
58 '''
59 Make a full URL given the path relative to the root
60 '''
61 if not path.startswith('/'):
62 path = '/' + path
64 return self.api_url + path
66 def _get_resource(self, path):
67 '''
68 Get the DMaaP resource at path, where path is relative to the root.
69 '''
70 url = self._make_url(path)
71 self.logger.info("Querying URL: {0}".format(url))
72 return requests.get(url, auth=self.auth)
74 def _create_resource(self, path, resource_content):
75 '''
76 Create a DMaaP resource by POSTing to the resource collection
77 identified by path (relative to root), using resource_content as the body of the post
78 '''
79 url = self._make_url(path)
80 self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content))
81 return requests.post(url, auth=self.auth, json=resource_content)
83 def _delete_resource(self, path):
84 '''
85 Delete the DMaaP resource at path, where path is relative to the root.
86 '''
87 url = self._make_url(path)
88 self.logger.info("Deleting URL: {0}".format(url))
89 return requests.delete(url, auth=self.auth)
91 ### PUBLIC API ###
93 # Data Router Feeds
94 def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None):
95 '''
96 Create a DMaaP data router feed with the given feed name
97 and (optionally) feed version, feed description, ASPR classification,
98 owner, and useExisting flag
99 '''
100 feed_definition = {'feedName' : name}
101 if version:
102 feed_definition['feedVersion'] = version
103 if description:
104 feed_definition['feedDescription'] = description
105 if aspr_class:
106 feed_definition['asprClassification'] = aspr_class
107 if owner:
108 feed_definition['owner'] = owner
109 feeds_path_query = self.feeds_path
110 if useExisting == True: # It's a boolean!
111 feeds_path_query += "?useExisting=true"
113 return self._create_resource(feeds_path_query, feed_definition)
115 def get_feed_info(self, feed_id):
116 '''
117 Get the representation of the DMaaP data router feed whose feed id is feed_id.
118 '''
119 return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
121 def get_feed_info_by_name(self, feed_name):
122 '''
123 Get the representation of the DMaaP data router feed whose feed name is feed_name.
124 '''
125 feeds = self._get_resource("{0}".format(self.feeds_path))
126 feed_list = feeds.json()
127 for feed in feed_list:
128 if feed["feedName"] == feed_name:
129 self.logger.info("Found feed with {0}".format(feed_name))
130 feed_id = feed["feedId"]
131 return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
133 self.logger.info("feed_name {0} not found".format(feed_name))
134 return None
136 def delete_feed(self, feed_id):
137 '''
138 Delete the DMaaP data router feed whose feed id is feed_id.
139 '''
140 return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id))
142 # Data Router Publishers
143 def add_publisher(self, feed_id, location, username, password, status=None):
144 '''
145 Add a publisher to feed feed_id at location location with user, pass, and status
146 '''
147 publisher_definition = {
148 'feedId' : feed_id,
149 'dcaeLocationName' : location,
150 'username' : username,
151 'userpwd' : password
152 }
154 if status:
155 publisher_definition['status'] = status
157 return self._create_resource(self.pubs_path, publisher_definition)
159 def get_publisher_info(self, pub_id):
160 '''
161 Get the representation of the DMaaP data router publisher whose publisher id is pub_id
162 '''
163 return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id))
165 def delete_publisher(self, pub_id):
166 '''
167 Delete the DMaaP data router publisher whose publisher id is id.
168 '''
169 return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id))
172 # Data Router SUbscrihers
173 def add_subscriber(self, feed_id, location, delivery_url, username, password, decompress, privileged, status=None):
174 '''
175 Add a publisher to feed feed_id at location location with user, pass, and status
176 '''
177 subscriber_definition = {
178 'feedId' : feed_id,
179 'dcaeLocationName' : location,
180 'deliveryURL' : delivery_url,
181 'username' : username,
182 'userpwd' : password,
183 'decompress': decompress,
184 'privilegedSubscriber': privileged
185 }
187 if status:
188 subscriber_definition['status'] = status
190 return self._create_resource(self.subs_path, subscriber_definition)
192 def get_subscriber_info(self, sub_id):
193 '''
194 Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id
195 '''
196 return self._get_resource("{0}/{1}".format(self.subs_path, sub_id))
198 def delete_subscriber(self, sub_id):
199 '''
200 Delete the DMaaP data router subscriber whose subscriber id is sub_id.
201 '''
202 return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id))
204 # Message router topics
205 def create_topic(self, name, description = None, tnx_enabled = None, owner = None, replication_case = None, global_mr_url = None, use_existing = None):
206 '''
207 Create a message router topic with the topic name 'name' and optionally the topic_description
208 'description', the 'tnxEnabled' flag, the 'useExisting' flag and the topic owner 'owner'.
209 '''
210 topic_definition = {'topicName' : name}
211 if description:
212 topic_definition['topicDescription'] = description
213 if owner:
214 topic_definition['owner'] = owner
215 if tnx_enabled is not None: # It's a boolean!
216 topic_definition['tnxEnabled'] = tnx_enabled
217 if replication_case:
218 topic_definition['replicationCase'] = replication_case
219 if global_mr_url:
220 topic_definition['globalMrURL'] = global_mr_url
221 topics_path_query = self.topics_path
222 if use_existing: # It's a boolean!
223 topics_path_query += "?useExisting=true"
225 return self._create_resource(topics_path_query, topic_definition)
227 def get_topic_info(self, fqtn):
228 '''
229 Get information about the topic whose fully-qualified name is 'fqtn'
230 '''
231 return self._get_resource("{0}/{1}".format(self.topics_path, fqtn))
233 def get_topic_fqtn_by_name(self, topic_name):
234 '''
235 Get the representation of the DMaaP message router topic fqtn whose topic name is topic_name.
236 '''
237 topics = self._get_resource("{0}".format(self.topics_path))
238 topic_list = topics.json()
239 for topic in topic_list:
240 if topic["topicName"] == topic_name:
241 self.logger.info("Found existing topic with name {0}".format(topic_name))
242 fqtn = topic["fqtn"]
243 return fqtn
245 self.logger.info("topic_name {0} not found".format(topic_name))
246 return None
248 def delete_topic(self, fqtn):
249 '''
250 Delete the topic whose fully qualified name is 'fqtn'
251 '''
252 return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn))
254 # Message route clients (publishers and subscribers
255 def create_client(self, fqtn, location, client_role, actions):
256 '''
257 Creates a client authorized to access the topic with fully-qualified name 'fqtn',
258 from the location 'location', using the AAF client role 'client_role'. The
259 client is authorized to perform actions in the list 'actions'. (Valid
260 values are 'pub', 'sub', and 'view'
261 '''
262 client_definition = {
263 'fqtn' : fqtn,
264 'dcaeLocationName' : location,
265 'clientRole' : client_role,
266 'action' : actions
267 }
268 return self._create_resource(self.clients_path, client_definition)
270 def get_client_info(self, client_id):
271 '''
272 Get client information for the client whose client ID is 'client_id'
273 '''
274 return self._get_resource("{0}/{1}".format(self.clients_path, client_id))
276 def delete_client(self, client_id):
277 '''
278 Delete the client whose client ID is 'client_id'
279 '''
280 return self._delete_resource("{0}/{1}".format(self.clients_path, client_id))
282 def get_dcae_locations(self, dcae_layer):
283 '''
284 Get the list of location names known to the DMaaP bus controller
285 whose "dcaeLayer" property matches dcae_layer and whose status is "VALID".
286 '''
287 # Do these as a separate step so things like 404 get reported precisely
288 locations = self._get_resource(LOCATIONS_PATH)
289 locations.raise_for_status()
291 # pull out location names for VALID locations with matching dcae_layer
292 return [location["dcaeLocationName"] for location in locations.json()
293 if location['dcaeLayer'] == dcae_layer
294 and location['status'] == 'VALID']
296 def get_dcae_central_locations(self):
297 '''
298 Get the list of location names known to the DMaaP bus controller
299 whose "dcaeLayer" property contains "central" (ignoring case)
300 and whose status is "VALID".
301 "dcaeLayer" contains "central" for central sites.
302 '''
303 # Do these as a separate step so things like 404 get reported precisely
304 locations = self._get_resource(LOCATIONS_PATH)
305 locations.raise_for_status()
307 # pull out location names for VALID central locations
308 return [location["dcaeLocationName"] for location in locations.json()
309 if 'central' in location['dcaeLayer'].lower()
310 and location['status'] == 'VALID']