Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START==================================================== 

2# org.onap.dcaegen2 

3# ============================================================================= 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# ============================================================================= 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# ============LICENSE_END====================================================== 

19 

20from cloudify import ctx 

21from cloudify.decorators import operation 

22from cloudify.exceptions import NonRecoverableError 

23from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS 

24from dmaapplugin.dmaaputils import random_string 

25from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle 

26 

27# Set up a subscriber to a source feed 

28def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw): 

29 # Add subscriber to source feed 

30 add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw) 

31 add_sub.raise_for_status() 

32 return add_sub.json() 

33 

34# Set up a publisher to a target feed 

35def _set_up_publisher(dmc, target_feed_id, loc): 

36 username = random_string(8) 

37 userpw = random_string(16) 

38 add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw) 

39 add_pub.raise_for_status() 

40 pub_info = add_pub.json() 

41 return pub_info["pubId"], username, userpw 

42 

43# Get a central location to use when creating a publisher or subscriber 

44def _get_central_location(dmc): 

45 locations = dmc.get_dcae_central_locations() 

46 if len(locations) < 1: 

47 raise Exception('No central location found for setting up DR bridging') 

48 return locations[0] # We take the first one. Typically there will be two central locations 

49 

50 

51# Set up a "bridge" between two feeds internal to DCAE 

52# A source feed "bridges_to" a target feed, meaning that anything published to 

53# the source feed will be delivered to subscribers to the target feed (as well as 

54# to subscribers of the source feed). 

55# 

56# The bridge is established by first adding a publisher to the target feed. The result of doing this 

57# is a publish URL and a set of publication credentials. 

58#The publish URL and publication credentials are used to set up a subscriber to the source feed. 

59#I.e., we tell the source feed to deliver to an endpoint which is actually a publish 

60# endpoint for the target feed. 

61@operation 

62def create_dr_bridge(**kwargs): 

63 

64 try: 

65 

66 # Get source and target feed ids 

67 if 'feed_id' in ctx.target.instance.runtime_properties: 

68 target_feed_id = ctx.target.instance.runtime_properties['feed_id'] 

69 else: 

70 raise Exception('Target feed has no feed_id property') 

71 if 'feed_id' in ctx.source.instance.runtime_properties: 

72 source_feed_id = ctx.source.instance.runtime_properties['feed_id'] 

73 else: 

74 raise Exception('Source feed has no feed_id property') 

75 

76 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) 

77 

78 # Get a location to use when creating a publisher or subscriber--a central location seems reasonable 

79 loc = _get_central_location(dmc) 

80 

81 ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc)) 

82 

83 # Add publisher to target feed 

84 publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc) 

85 ctx.logger.info("Added publisher id {0} to target feed {1} with user {2}".format(publisher_id, target_feed_id, username)) 

86 

87 # Add subscriber to source feed 

88 delivery_url = ctx.target.instance.runtime_properties['publish_url'] 

89 subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw) 

90 subscriber_id = subscriber_info["subId"] 

91 ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url)) 

92 

93 # Save the publisher and subscriber IDs on the source node, indexed by the target node id 

94 ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id} 

95 

96 except Exception as e: 

97 ctx.logger.error("Error creating bridge: {0}".format(e)) 

98 raise NonRecoverableError(e) 

99 

100# Set up a bridge from an internal DCAE feed to a feed in an external Data Router system 

101# The target feed needs to be provisioned in the external Data Router system. A publisher 

102# to that feed must also be set up in the external Data Router system. The publish URL, 

103# username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed. 

104# The bridge is established by setting up a subscriber to the internal DCAE source feed using the 

105# external feed publisher parameters as delivery parameters for the subscriber. 

106@operation 

107def create_external_dr_bridge(**kwargs): 

108 try: 

109 

110 # Make sure target feed has full set of properties 

111 if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties: 

112 url = ctx.target.node.properties['url'] 

113 username = ctx.target.node.properties['username'] 

114 userpw = ctx.target.node.properties['userpw'] 

115 else: 

116 raise Exception ("Target feed missing url, username, and/or user pw") 

117 

118 # Make sure source feed has a feed ID 

119 if 'feed_id' in ctx.source.instance.runtime_properties: 

120 source_feed_id = ctx.source.instance.runtime_properties['feed_id'] 

121 else: 

122 raise Exception('Source feed has no feed_id property') 

123 

124 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) 

125 

126 # Get a central location to use when creating subscriber 

127 loc = _get_central_location(dmc) 

128 

129 ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc)) 

130 

131 # Create subscription to source feed using properties of the external target feed 

132 subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw) 

133 subscriber_id = subscriber_info["subId"] 

134 ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url)) 

135 

136 # Save the subscriber ID on the source node, indexed by the target node id 

137 ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id} 

138 

139 except Exception as e: 

140 ctx.logger.error("Error creating external bridge: {0}".format(e)) 

141 raise NonRecoverableError(e) 

142 

143# Set up a bridge from a feed in an external Data Router system to an internal DCAE feed. 

144# The bridge is established by creating a publisher on the internal DCAE feed. Then a subscription 

145# to the external feed is created through manual provisioning in the external Data Router system, using 

146# the publish URL and the publisher username and password for the internal feed as the delivery parameters 

147# for the external subscription. 

148# In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of 

149# bridge will typically have an output that exposes the runtime_property set on the source node in this operation. 

150@operation 

151def create_external_source_dr_bridge(**kwargs): 

152 try: 

153 # Get target feed id 

154 if 'feed_id' in ctx.target.instance.runtime_properties: 

155 target_feed_id = ctx.target.instance.runtime_properties['feed_id'] 

156 else: 

157 raise Exception('Target feed has no feed_id property') 

158 

159 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) 

160 

161 # Get a central location to use when creating a publisher 

162 loc = _get_central_location(dmc) 

163 

164 # Create a publisher on the target feed 

165 publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc) 

166 

167 # Save the publisher info on the source node, indexed by the target node 

168 ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw} 

169 

170 except Exception as e: 

171 ctx.logger.error("Error creating external source bridge: {0}".format(e)) 

172 

173# Remove the bridge between the relationship source and target. 

174# For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed. 

175# For a bridge to an external target feed, deletes the subscriber on the source feed. 

176# For a bridge from an external source feed, deletes the publisher on the target feed. 

177@operation 

178def remove_dr_bridge(**kwargs): 

179 try: 

180 

181 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) 

182 

183 if ctx.target.node.id in ctx.source.instance.runtime_properties: 

184 

185 if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]: 

186 # Delete the subscription for this bridge 

187 ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])) 

188 dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']) 

189 

190 if 'publisher_id' in ctx.source.instance.runtime_properties: 

191 # Delete the publisher for this bridge 

192 ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])) 

193 dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']) 

194 

195 ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id)) 

196 

197 except Exception as e: 

198 ctx.logger.error("Error removing bridge: {0}".format(e)) 

199 # Let the uninstall workflow proceed--don't throw a NonRecoverableError