Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START==================================================== 

2# org.onap.dcaegen2 

3# ============================================================================= 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# ============================================================================= 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# ============LICENSE_END====================================================== 

19 

20import requests 

21 

22### "Constants" 

23FEEDS_PATH = '/feeds' 

24PUBS_PATH = '/dr_pubs' 

25SUBS_PATH = '/dr_subs' 

26TOPICS_PATH = '/topics' 

27CLIENTS_PATH = '/mr_clients' 

28LOCATIONS_PATH = '/dcaeLocations' 

29 

30class DMaaPControllerHandle(object): 

31 ''' 

32 A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module 

33 ''' 

34 

35 def __init__(self, api_url, user, password, logger, 

36 feeds_path = FEEDS_PATH, 

37 pubs_path = PUBS_PATH, 

38 subs_path = SUBS_PATH, 

39 topics_path = TOPICS_PATH, 

40 clients_path = CLIENTS_PATH): 

41 ''' 

42 Constructor 

43 ''' 

44 self.api_url = api_url # URL for the root of the Controller resource tree, no trailing "/" 

45 self.auth = (user, password) # user name and password for HTTP basic auth 

46 self.logger = logger 

47 self.feeds_path = feeds_path 

48 self.pubs_path = pubs_path 

49 self.subs_path = subs_path 

50 self.topics_path = topics_path 

51 self.clients_path = clients_path 

52 

53 

54 ### INTERNAL FUNCTIONS ### 

55 

56 def _make_url(self, path): 

57 ''' 

58 Make a full URL given the path relative to the root 

59 ''' 

60 if not path.startswith('/'): 

61 path = '/' + path 

62 

63 return self.api_url + path 

64 

65 def _get_resource(self, path): 

66 ''' 

67 Get the DMaaP resource at path, where path is relative to the root. 

68 ''' 

69 url = self._make_url(path) 

70 self.logger.info("Querying URL: {0}".format(url)) 

71 return requests.get(url, auth=self.auth) 

72 

73 def _create_resource(self, path, resource_content): 

74 ''' 

75 Create a DMaaP resource by POSTing to the resource collection 

76 identified by path (relative to root), using resource_content as the body of the post 

77 ''' 

78 url = self._make_url(path) 

79 self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content)) 

80 return requests.post(url, auth=self.auth, json=resource_content) 

81 

82 def _delete_resource(self, path): 

83 ''' 

84 Delete the DMaaP resource at path, where path is relative to the root. 

85 ''' 

86 url = self._make_url(path) 

87 self.logger.info("Deleting URL: {0}".format(url)) 

88 return requests.delete(url, auth=self.auth) 

89 

90 ### PUBLIC API ### 

91 

92 # Data Router Feeds 

93 def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None): 

94 ''' 

95 Create a DMaaP data router feed with the given feed name 

96 and (optionally) feed version, feed description, ASPR classification, 

97 owner, and useExisting flag 

98 ''' 

99 feed_definition = {'feedName' : name} 

100 if version: 

101 feed_definition['feedVersion'] = version 

102 if description: 

103 feed_definition['feedDescription'] = description 

104 if aspr_class: 

105 feed_definition['asprClassification'] = aspr_class 

106 if owner: 

107 feed_definition['owner'] = owner 

108 feeds_path_query = self.feeds_path 

109 if useExisting == True: # It's a boolean! 

110 feeds_path_query += "?useExisting=true" 

111 

112 return self._create_resource(feeds_path_query, feed_definition) 

113 

114 def get_feed_info(self, feed_id): 

115 ''' 

116 Get the representation of the DMaaP data router feed whose feed id is feed_id. 

117 ''' 

118 return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id)) 

119 

120 def get_feed_info_by_name(self, feed_name): 

121 ''' 

122 Get the representation of the DMaaP data router feed whose feed name is feed_name. 

123 ''' 

124 feeds = self._get_resource("{0}".format(self.feeds_path)) 

125 feed_list = feeds.json() 

126 for feed in feed_list: 

127 if feed["feedName"] == feed_name: 

128 self.logger.info("Found feed with {0}".format(feed_name)) 

129 feed_id = feed["feedId"] 

130 return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id)) 

131 

132 self.logger.info("feed_name {0} not found".format(feed_name)) 

133 return None 

134 

135 def delete_feed(self, feed_id): 

136 ''' 

137 Delete the DMaaP data router feed whose feed id is feed_id. 

138 ''' 

139 return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id)) 

140 

141 # Data Router Publishers 

142 def add_publisher(self, feed_id, location, username, password, status=None): 

143 ''' 

144 Add a publisher to feed feed_id at location location with user, pass, and status 

145 ''' 

146 publisher_definition = { 

147 'feedId' : feed_id, 

148 'dcaeLocationName' : location, 

149 'username' : username, 

150 'userpwd' : password 

151 } 

152 

153 if status: 

154 publisher_definition['status'] = status 

155 

156 return self._create_resource(self.pubs_path, publisher_definition) 

157 

158 def get_publisher_info(self, pub_id): 

159 ''' 

160 Get the representation of the DMaaP data router publisher whose publisher id is pub_id 

161 ''' 

162 return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id)) 

163 

164 def delete_publisher(self, pub_id): 

165 ''' 

166 Delete the DMaaP data router publisher whose publisher id is id. 

167 ''' 

168 return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id)) 

169 

170 

171 # Data Router SUbscrihers 

172 def add_subscriber(self, feed_id, location, delivery_url, username, password, decompress, privileged, status=None): 

173 ''' 

174 Add a publisher to feed feed_id at location location with user, pass, and status 

175 ''' 

176 subscriber_definition = { 

177 'feedId' : feed_id, 

178 'dcaeLocationName' : location, 

179 'deliveryURL' : delivery_url, 

180 'username' : username, 

181 'userpwd' : password, 

182 'decompress': decompress, 

183 'privilegedSubscriber': privileged 

184 } 

185 

186 if status: 

187 subscriber_definition['status'] = status 

188 

189 return self._create_resource(self.subs_path, subscriber_definition) 

190 

191 def get_subscriber_info(self, sub_id): 

192 ''' 

193 Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id 

194 ''' 

195 return self._get_resource("{0}/{1}".format(self.subs_path, sub_id)) 

196 

197 def delete_subscriber(self, sub_id): 

198 ''' 

199 Delete the DMaaP data router subscriber whose subscriber id is sub_id. 

200 ''' 

201 return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id)) 

202 

203 # Message router topics 

204 def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None, useExisting = None): 

205 ''' 

206 Create a message router topic with the topic name 'name' and optionally the topic_description 

207 'description', the 'txenable' flag, the 'useExisting' flag and the topic owner 'owner'. 

208 ''' 

209 topic_definition = {'topicName' : name} 

210 if description: 

211 topic_definition['topicDescription'] = description 

212 if owner: 

213 topic_definition['owner'] = owner 

214 if txenable != None: # It's a boolean! 

215 topic_definition['txenable'] = txenable 

216 if replication_case: 

217 topic_definition['replicationCase'] = replication_case 

218 if global_mr_url: 

219 topic_definition['globalMrURL'] = global_mr_url 

220 topics_path_query = self.topics_path 

221 if useExisting == True: # It's a boolean! 

222 topics_path_query += "?useExisting=true" 

223 

224 return self._create_resource(topics_path_query, topic_definition) 

225 

226 def get_topic_info(self, fqtn): 

227 ''' 

228 Get information about the topic whose fully-qualified name is 'fqtn' 

229 ''' 

230 return self._get_resource("{0}/{1}".format(self.topics_path, fqtn)) 

231 

232 def get_topic_fqtn_by_name(self, topic_name): 

233 ''' 

234 Get the representation of the DMaaP message router topic fqtn whose topic name is topic_name. 

235 ''' 

236 topics = self._get_resource("{0}".format(self.topics_path)) 

237 topic_list = topics.json() 

238 for topic in topic_list: 

239 if topic["topicName"] == topic_name: 

240 self.logger.info("Found existing topic with name {0}".format(topic_name)) 

241 fqtn = topic["fqtn"] 

242 return fqtn 

243 

244 self.logger.info("topic_name {0} not found".format(topic_name)) 

245 return None 

246 

247 def delete_topic(self, fqtn): 

248 ''' 

249 Delete the topic whose fully qualified name is 'fqtn' 

250 ''' 

251 return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn)) 

252 

253 # Message route clients (publishers and subscribers 

254 def create_client(self, fqtn, location, client_role, actions): 

255 ''' 

256 Creates a client authorized to access the topic with fully-qualified name 'fqtn', 

257 from the location 'location', using the AAF client role 'client_role'. The 

258 client is authorized to perform actions in the list 'actions'. (Valid 

259 values are 'pub', 'sub', and 'view' 

260 ''' 

261 client_definition = { 

262 'fqtn' : fqtn, 

263 'dcaeLocationName' : location, 

264 'clientRole' : client_role, 

265 'action' : actions 

266 } 

267 return self._create_resource(self.clients_path, client_definition) 

268 

269 def get_client_info(self, client_id): 

270 ''' 

271 Get client information for the client whose client ID is 'client_id' 

272 ''' 

273 return self._get_resource("{0}/{1}".format(self.clients_path, client_id)) 

274 

275 def delete_client(self, client_id): 

276 ''' 

277 Delete the client whose client ID is 'client_id' 

278 ''' 

279 return self._delete_resource("{0}/{1}".format(self.clients_path, client_id)) 

280 

281 def get_dcae_locations(self, dcae_layer): 

282 ''' 

283 Get the list of location names known to the DMaaP bus controller 

284 whose "dcaeLayer" property matches dcae_layer and whose status is "VALID". 

285 ''' 

286 # Do these as a separate step so things like 404 get reported precisely 

287 locations = self._get_resource(LOCATIONS_PATH) 

288 locations.raise_for_status() 

289 

290 # pull out location names for VALID locations with matching dcae_layer 

291 return [location["dcaeLocationName"] for location in locations.json() 

292 if location['dcaeLayer'] == dcae_layer 

293 and location['status'] == 'VALID'] 

294 

295 def get_dcae_central_locations(self): 

296 ''' 

297 Get the list of location names known to the DMaaP bus controller 

298 whose "dcaeLayer" property contains "central" (ignoring case) 

299 and whose status is "VALID". 

300 "dcaeLayer" contains "central" for central sites. 

301 ''' 

302 # Do these as a separate step so things like 404 get reported precisely 

303 locations = self._get_resource(LOCATIONS_PATH) 

304 locations.raise_for_status() 

305 

306 # pull out location names for VALID central locations 

307 return [location["dcaeLocationName"] for location in locations.json() 

308 if 'central' in location['dcaeLayer'].lower() 

309 and location['status'] == 'VALID'] 

310