Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START==================================================== 

2# org.onap.dcaegen2 

3# ============================================================================= 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# ============================================================================= 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# ============LICENSE_END====================================================== 

19 

20import consul 

21import json 

22try: 

23 from urllib.parse import urlparse 

24except ImportError: 

25 from urlparse import urlparse 

26 

27 

28class ConsulHandle(object): 

29 ''' 

30 Provide access to Consul KV store and service discovery 

31 ''' 

32 

33 def __init__(self, api_url, user, password, logger): 

34 ''' 

35 Constructor 

36 ''' 

37 u = urlparse(api_url) 

38 self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme) 

39 

40 def get_config(self, key): 

41 ''' 

42 Get configuration information from Consul using the provided key. 

43 It should be in JSON form. Convert it to a dictionary 

44 ''' 

45 (index, val) = self.ch.kv.get(key) 

46 config = json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate 

47 return config 

48 

49 def get_service(self,service_name): 

50 ''' 

51 Look up the service named service_name in Consul. 

52 Return the service address and port. 

53 ''' 

54 (index, val) = self.ch.catalog.service(service_name) 

55 if len(val) > 0: # catalog.service returns an empty array if service not found 

56 service = val[0] # Could be multiple listings, but we take the first 

57 if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0): 

58 service_address = service['ServiceAddress'] # Most services should have this 

59 else: 

60 service_address = service['Address'] # "External" services will have this only 

61 service_port = service['ServicePort'] 

62 else: 

63 raise Exception('Could not find service information for "{0}"'.format(service_name)) 

64 

65 return service_address, service_port 

66 

67 def add_to_entry(self, key, add_name, add_value): 

68 ''' 

69 Find 'key' in consul. 

70 Treat its value as a JSON string representing a dict. 

71 Extend the dict by adding an entry with key 'add_name' and value 'add_value'. 

72 Turn the resulting extended dict into a JSON string. 

73 Store the string back into Consul under 'key'. 

74 Watch out for conflicting concurrent updates. 

75 

76 Example: 

77 Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}' 

78 add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'}) 

79 should result in the value for key 'xyz:dmaap' in consul being updated to 

80 '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}' 

81 ''' 

82 

83 while True: # do until update succeeds 

84 (index, val) = self.ch.kv.get(key) # index gives version of key retrieved 

85 

86 if val is None: # no key yet 

87 vstring = '{}' 

88 mod_index = 0 # Use 0 as the cas index for initial insertion of the key 

89 else: 

90 vstring = val['Value'] 

91 mod_index = val['ModifyIndex'] 

92 

93 # Build the updated dict 

94 # Exceptions just propagate 

95 v = json.loads(vstring) 

96 v[add_name] = add_value 

97 new_vstring = json.dumps(v) 

98 

99 updated = self.ch.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false 

100 if updated: 

101 break 

102 

103 

104 def delete_entry(self,entry_name): 

105 ''' 

106 Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store. 

107 

108 Note that the kv.delete() operation always returns True, 

109 whether there's an entry with key 'entry_name' exists or not. This doesn't seem like 

110 a great design, but it means it's safe to try to delete the same entry repeatedly. 

111 

112 Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and 

113 feeds we've stored into the 'component_name:dmaap' entry. 

114 

115 Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire 

116 'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or 

117 subscribes relationship. The first unlink will actually remove the entry, the subsequent ones 

118 will harmlessly try to remove it again. 

119 

120 The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches 

121 the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting 

122 entry back into Consul. It would be very similar to add_from_entry. When there's nothing left 

123 in the entry, then the entire entry would be deleted. 

124 ''' 

125 self.ch.kv.delete(entry_name)