Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START==================================================== 

2# org.onap.dcaegen2 

3# ============================================================================= 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# ============================================================================= 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# ============LICENSE_END====================================================== 

19 

20from cloudify import ctx 

21from cloudify.decorators import operation 

22from cloudify.exceptions import NonRecoverableError 

23from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST 

24from dmaapplugin.dmaaputils import random_string 

25from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle 

26from consulif.consulif import ConsulHandle 

27 

28# Lifecycle operations for DMaaP Data Router 

29# publish and subscribe relationships 

30 

31@operation 

32def add_dr_publisher(**kwargs): 

33 ''' 

34 Sets up the source of the publishes_relationship as a publisher to the feed that 

35 is the target of the relationship 

36 Assumes target (the feed) has the following runtime properties set 

37 - feed_id 

38 - log_url 

39 - publish_url 

40 Assumes source (the publisher) has a runtime property whose name matches the node name of the feed. 

41 This is a dictionary containing one property: 

42 - location (the dcaeLocationName to pass when adding the publisher to the feed) 

43 Generates a user name and password that the publisher will need to use when publishing 

44 Adds the following properties to the dictionary above: 

45 - publish_url 

46 - log_url 

47 - username 

48 - password 

49 ''' 

50 try: 

51 # Make sure we have a name under which to store DMaaP configuration 

52 # Check early so we don't needlessly create DMaaP entities 

53 if 'service_component_name' not in ctx.source.instance.runtime_properties: 

54 raise Exception("Source node does not have 'service_component_name' in runtime_properties") 

55 

56 target_feed = ctx.target.node.id 

57 ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed)) 

58 

59 # Set up the parameters for the add_publisher request to the DMaaP bus controller 

60 feed_id = ctx.target.instance.runtime_properties["feed_id"] 

61 location = ctx.source.instance.runtime_properties[target_feed]["location"] 

62 username = random_string(8) 

63 password = random_string(16) 

64 

65 # Make the request to add the publisher to the feed 

66 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) 

67 add_pub = dmc.add_publisher(feed_id, location, username, password) 

68 add_pub.raise_for_status() 

69 publisher_info = add_pub.json() 

70 publisher_id = publisher_info["pubId"] 

71 ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password)) 

72 

73 # Set runtime properties on the source 

74 ctx.source.instance.runtime_properties[target_feed] = { 

75 "publisher_id" : publisher_id, 

76 "location" : location, 

77 "publish_url" : ctx.target.instance.runtime_properties["publish_url"], 

78 "log_url" : ctx.target.instance.runtime_properties["log_url"], 

79 "username" : username, 

80 "password" : password 

81 } 

82 

83 # Set key in Consul 

84 ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) 

85 cpy = dict(ctx.source.instance.runtime_properties[target_feed]) 

86 ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy) 

87 

88 except Exception as e: 

89 ctx.logger.error("Error adding publisher to feed: {er}".format(er=e)) 

90 raise NonRecoverableError(e) 

91 

92 

93@operation 

94def delete_dr_publisher(**kwargs): 

95 ''' 

96 Deletes publisher (the source of the publishes_files relationship) 

97 from the feed (the target of the relationship). 

98 Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties, 

99 when the publisher was added to the feed. 

100 ''' 

101 

102 try: 

103 # Make sure we have a name under which to store DMaaP configuration 

104 # Check early so we don't needlessly create DMaaP entities 

105 if 'service_component_name' not in ctx.source.instance.runtime_properties: 

106 raise Exception("Source node does not have 'service_component_name' in runtime_properties") 

107 

108 # Get the publisher id 

109 target_feed = ctx.target.node.id 

110 publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"] 

111 ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id)) 

112 

113 # Make the request 

114 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) 

115 del_result = dmc.delete_publisher(publisher_id) 

116 del_result.raise_for_status() 

117 

118 ctx.logger.info("Deleted publisher {0}".format(publisher_id)) 

119 

120 # Attempt to remove the entire ":dmaap" entry from the Consul KV store 

121 # Will quietly do nothing if the entry has already been removed 

122 ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) 

123 ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) 

124 

125 except Exception as e: 

126 ctx.logger.error("Error deleting publisher: {er}".format(er=e)) 

127 # don't raise a NonRecoverable error here--let the uninstall workflow continue 

128 

129 

130@operation 

131def add_dr_subscriber(**kwargs): 

132 ''' 

133 Sets up the source of the subscribes_to_files relationship as a subscriber to the 

134 feed that is the target of the relationship. 

135 Assumes target (the feed) has the following runtime property set 

136 - feed_id 

137 Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed. 

138 This is a dictionary containing the following properties: 

139 - location (the dcaeLocationName to pass when adding the publisher to the feed) 

140 - delivery_url (the URL to which data router will deliver files) 

141 - username (the username data router will use when delivering files) 

142 - password (the password data router will use when delivering files) 

143 Adds a property to the dictionary above: 

144 - subscriber_id (used to delete the subscriber in the uninstall workflow 

145 ''' 

146 try: 

147 target_feed = ctx.target.node.id 

148 ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed)) 

149 

150 # Get the parameters for the call 

151 feed_id = ctx.target.instance.runtime_properties["feed_id"] 

152 feed = ctx.source.instance.runtime_properties[target_feed] 

153 location = feed["location"] 

154 delivery_url = feed["delivery_url"] 

155 username = feed["username"] 

156 password = feed["password"] 

157 decompress = feed["decompress"] if "decompress" in feed else False 

158 privileged = feed["privileged"] if "privileged" in feed else False 

159 

160 # Make the request to add the subscriber to the feed 

161 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) 

162 add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged) 

163 add_sub.raise_for_status() 

164 subscriber_info = add_sub.json() 

165 subscriber_id = subscriber_info["subId"] 

166 ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location)) 

167 

168 # Add subscriber_id to the runtime properties 

169 # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id 

170 ctx.source.instance.runtime_properties[target_feed] = { 

171 "subscriber_id": subscriber_id, 

172 "location" : location, 

173 "delivery_url" : delivery_url, 

174 "username" : username, 

175 "password" : password, 

176 "decompress": decompress, 

177 "privilegedSubscriber": privileged 

178 } 

179 ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed])) 

180 

181 # Set key in Consul 

182 ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) 

183 cpy = dict(ctx.source.instance.runtime_properties[target_feed]) 

184 ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy) 

185 

186 except Exception as e: 

187 ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e)) 

188 raise NonRecoverableError(e) 

189 

190 

191@operation 

192def delete_dr_subscriber(**kwargs): 

193 ''' 

194 Deletes subscriber (the source of the subscribes_to_files relationship) 

195 from the feed (the target of the relationship). 

196 Assumes that the source node's runtime properties dictionary for the target feed 

197 includes 'subscriber_id', set when the publisher was added to the feed. 

198 ''' 

199 try: 

200 # Get the subscriber id 

201 target_feed = ctx.target.node.id 

202 subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] 

203 ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed)) 

204 

205 # Make the request 

206 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) 

207 del_result = dmc.delete_subscriber(subscriber_id) 

208 del_result.raise_for_status() 

209 

210 ctx.logger.info("Deleted subscriber {0}".format(subscriber_id)) 

211 

212 # Attempt to remove the entire ":dmaap" entry from the Consul KV store 

213 # Will quietly do nothing if the entry has already been removed 

214 ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) 

215 ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) 

216 

217 except Exception as e: 

218 ctx.logger.error("Error deleting subscriber: {er}".format(er=e)) 

219 # don't raise a NonRecoverable error here--let the uninstall workflow continue