Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START==================================================== 

2# org.onap.dcaegen2 

3# ============================================================================= 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# Modifications Copyright (c) 2021 Nordix Foundation. 

7# ============================================================================= 

8# Licensed under the Apache License, Version 2.0 (the "License"); 

9# you may not use this file except in compliance with the License. 

10# You may obtain a copy of the License at 

11# 

12# http://www.apache.org/licenses/LICENSE-2.0 

13# 

14# Unless required by applicable law or agreed to in writing, software 

15# distributed under the License is distributed on an "AS IS" BASIS, 

16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

17# See the License for the specific language governing permissions and 

18# limitations under the License. 

19# ============LICENSE_END====================================================== 

20 

21import requests 

22 

23### "Constants" 

24FEEDS_PATH = '/feeds' 

25PUBS_PATH = '/dr_pubs' 

26SUBS_PATH = '/dr_subs' 

27TOPICS_PATH = '/topics' 

28CLIENTS_PATH = '/mr_clients' 

29LOCATIONS_PATH = '/dcaeLocations' 

30 

31class DMaaPControllerHandle(object): 

32 ''' 

33 A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module 

34 ''' 

35 

36 def __init__(self, api_url, user, password, logger, 

37 feeds_path = FEEDS_PATH, 

38 pubs_path = PUBS_PATH, 

39 subs_path = SUBS_PATH, 

40 topics_path = TOPICS_PATH, 

41 clients_path = CLIENTS_PATH): 

42 ''' 

43 Constructor 

44 ''' 

45 self.api_url = api_url # URL for the root of the Controller resource tree, no trailing "/" 

46 self.auth = (user, password) # user name and password for HTTP basic auth 

47 self.logger = logger 

48 self.feeds_path = feeds_path 

49 self.pubs_path = pubs_path 

50 self.subs_path = subs_path 

51 self.topics_path = topics_path 

52 self.clients_path = clients_path 

53 

54 

55 ### INTERNAL FUNCTIONS ### 

56 

57 def _make_url(self, path): 

58 ''' 

59 Make a full URL given the path relative to the root 

60 ''' 

61 if not path.startswith('/'): 

62 path = '/' + path 

63 

64 return self.api_url + path 

65 

66 def _get_resource(self, path): 

67 ''' 

68 Get the DMaaP resource at path, where path is relative to the root. 

69 ''' 

70 url = self._make_url(path) 

71 self.logger.info("Querying URL: {0}".format(url)) 

72 return requests.get(url, auth=self.auth) 

73 

74 def _create_resource(self, path, resource_content): 

75 ''' 

76 Create a DMaaP resource by POSTing to the resource collection 

77 identified by path (relative to root), using resource_content as the body of the post 

78 ''' 

79 url = self._make_url(path) 

80 self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content)) 

81 return requests.post(url, auth=self.auth, json=resource_content) 

82 

83 def _delete_resource(self, path): 

84 ''' 

85 Delete the DMaaP resource at path, where path is relative to the root. 

86 ''' 

87 url = self._make_url(path) 

88 self.logger.info("Deleting URL: {0}".format(url)) 

89 return requests.delete(url, auth=self.auth) 

90 

91 ### PUBLIC API ### 

92 

93 # Data Router Feeds 

94 def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None): 

95 ''' 

96 Create a DMaaP data router feed with the given feed name 

97 and (optionally) feed version, feed description, ASPR classification, 

98 owner, and useExisting flag 

99 ''' 

100 feed_definition = {'feedName' : name} 

101 if version: 

102 feed_definition['feedVersion'] = version 

103 if description: 

104 feed_definition['feedDescription'] = description 

105 if aspr_class: 

106 feed_definition['asprClassification'] = aspr_class 

107 if owner: 

108 feed_definition['owner'] = owner 

109 feeds_path_query = self.feeds_path 

110 if useExisting == True: # It's a boolean! 

111 feeds_path_query += "?useExisting=true" 

112 

113 return self._create_resource(feeds_path_query, feed_definition) 

114 

115 def get_feed_info(self, feed_id): 

116 ''' 

117 Get the representation of the DMaaP data router feed whose feed id is feed_id. 

118 ''' 

119 return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id)) 

120 

121 def get_feed_info_by_name(self, feed_name): 

122 ''' 

123 Get the representation of the DMaaP data router feed whose feed name is feed_name. 

124 ''' 

125 feeds = self._get_resource("{0}".format(self.feeds_path)) 

126 feed_list = feeds.json() 

127 for feed in feed_list: 

128 if feed["feedName"] == feed_name: 

129 self.logger.info("Found feed with {0}".format(feed_name)) 

130 feed_id = feed["feedId"] 

131 return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id)) 

132 

133 self.logger.info("feed_name {0} not found".format(feed_name)) 

134 return None 

135 

136 def delete_feed(self, feed_id): 

137 ''' 

138 Delete the DMaaP data router feed whose feed id is feed_id. 

139 ''' 

140 return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id)) 

141 

142 # Data Router Publishers 

143 def add_publisher(self, feed_id, location, username, password, status=None): 

144 ''' 

145 Add a publisher to feed feed_id at location location with user, pass, and status 

146 ''' 

147 publisher_definition = { 

148 'feedId' : feed_id, 

149 'dcaeLocationName' : location, 

150 'username' : username, 

151 'userpwd' : password 

152 } 

153 

154 if status: 

155 publisher_definition['status'] = status 

156 

157 return self._create_resource(self.pubs_path, publisher_definition) 

158 

159 def get_publisher_info(self, pub_id): 

160 ''' 

161 Get the representation of the DMaaP data router publisher whose publisher id is pub_id 

162 ''' 

163 return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id)) 

164 

165 def delete_publisher(self, pub_id): 

166 ''' 

167 Delete the DMaaP data router publisher whose publisher id is id. 

168 ''' 

169 return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id)) 

170 

171 

172 # Data Router SUbscrihers 

173 def add_subscriber(self, feed_id, location, delivery_url, username, password, decompress, privileged, status=None): 

174 ''' 

175 Add a publisher to feed feed_id at location location with user, pass, and status 

176 ''' 

177 subscriber_definition = { 

178 'feedId' : feed_id, 

179 'dcaeLocationName' : location, 

180 'deliveryURL' : delivery_url, 

181 'username' : username, 

182 'userpwd' : password, 

183 'decompress': decompress, 

184 'privilegedSubscriber': privileged 

185 } 

186 

187 if status: 

188 subscriber_definition['status'] = status 

189 

190 return self._create_resource(self.subs_path, subscriber_definition) 

191 

192 def get_subscriber_info(self, sub_id): 

193 ''' 

194 Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id 

195 ''' 

196 return self._get_resource("{0}/{1}".format(self.subs_path, sub_id)) 

197 

198 def delete_subscriber(self, sub_id): 

199 ''' 

200 Delete the DMaaP data router subscriber whose subscriber id is sub_id. 

201 ''' 

202 return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id)) 

203 

204 # Message router topics 

205 def create_topic(self, name, description = None, tnx_enabled = None, owner = None, replication_case = None, global_mr_url = None, use_existing = None): 

206 ''' 

207 Create a message router topic with the topic name 'name' and optionally the topic_description 

208 'description', the 'tnxEnabled' flag, the 'useExisting' flag and the topic owner 'owner'. 

209 ''' 

210 topic_definition = {'topicName' : name} 

211 if description: 

212 topic_definition['topicDescription'] = description 

213 if owner: 

214 topic_definition['owner'] = owner 

215 if tnx_enabled is not None: # It's a boolean! 

216 topic_definition['tnxEnabled'] = tnx_enabled 

217 if replication_case: 

218 topic_definition['replicationCase'] = replication_case 

219 if global_mr_url: 

220 topic_definition['globalMrURL'] = global_mr_url 

221 topics_path_query = self.topics_path 

222 if use_existing: # It's a boolean! 

223 topics_path_query += "?useExisting=true" 

224 

225 return self._create_resource(topics_path_query, topic_definition) 

226 

227 def get_topic_info(self, fqtn): 

228 ''' 

229 Get information about the topic whose fully-qualified name is 'fqtn' 

230 ''' 

231 return self._get_resource("{0}/{1}".format(self.topics_path, fqtn)) 

232 

233 def get_topic_fqtn_by_name(self, topic_name): 

234 ''' 

235 Get the representation of the DMaaP message router topic fqtn whose topic name is topic_name. 

236 ''' 

237 topics = self._get_resource("{0}".format(self.topics_path)) 

238 topic_list = topics.json() 

239 for topic in topic_list: 

240 if topic["topicName"] == topic_name: 

241 self.logger.info("Found existing topic with name {0}".format(topic_name)) 

242 fqtn = topic["fqtn"] 

243 return fqtn 

244 

245 self.logger.info("topic_name {0} not found".format(topic_name)) 

246 return None 

247 

248 def delete_topic(self, fqtn): 

249 ''' 

250 Delete the topic whose fully qualified name is 'fqtn' 

251 ''' 

252 return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn)) 

253 

254 # Message route clients (publishers and subscribers 

255 def create_client(self, fqtn, location, client_role, actions): 

256 ''' 

257 Creates a client authorized to access the topic with fully-qualified name 'fqtn', 

258 from the location 'location', using the AAF client role 'client_role'. The 

259 client is authorized to perform actions in the list 'actions'. (Valid 

260 values are 'pub', 'sub', and 'view' 

261 ''' 

262 client_definition = { 

263 'fqtn' : fqtn, 

264 'dcaeLocationName' : location, 

265 'clientRole' : client_role, 

266 'action' : actions 

267 } 

268 return self._create_resource(self.clients_path, client_definition) 

269 

270 def get_client_info(self, client_id): 

271 ''' 

272 Get client information for the client whose client ID is 'client_id' 

273 ''' 

274 return self._get_resource("{0}/{1}".format(self.clients_path, client_id)) 

275 

276 def delete_client(self, client_id): 

277 ''' 

278 Delete the client whose client ID is 'client_id' 

279 ''' 

280 return self._delete_resource("{0}/{1}".format(self.clients_path, client_id)) 

281 

282 def get_dcae_locations(self, dcae_layer): 

283 ''' 

284 Get the list of location names known to the DMaaP bus controller 

285 whose "dcaeLayer" property matches dcae_layer and whose status is "VALID". 

286 ''' 

287 # Do these as a separate step so things like 404 get reported precisely 

288 locations = self._get_resource(LOCATIONS_PATH) 

289 locations.raise_for_status() 

290 

291 # pull out location names for VALID locations with matching dcae_layer 

292 return [location["dcaeLocationName"] for location in locations.json() 

293 if location['dcaeLayer'] == dcae_layer 

294 and location['status'] == 'VALID'] 

295 

296 def get_dcae_central_locations(self): 

297 ''' 

298 Get the list of location names known to the DMaaP bus controller 

299 whose "dcaeLayer" property contains "central" (ignoring case) 

300 and whose status is "VALID". 

301 "dcaeLayer" contains "central" for central sites. 

302 ''' 

303 # Do these as a separate step so things like 404 get reported precisely 

304 locations = self._get_resource(LOCATIONS_PATH) 

305 locations.raise_for_status() 

306 

307 # pull out location names for VALID central locations 

308 return [location["dcaeLocationName"] for location in locations.json() 

309 if 'central' in location['dcaeLayer'].lower() 

310 and location['status'] == 'VALID'] 

311