Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ============LICENSE_START======================================================= 

2# org.onap.dcae 

3# ================================================================================ 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2019 Pantheon.tech. All rights reserved. 

6# ================================================================================ 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# ============LICENSE_END========================================================= 

19# 

20 

21import json 

22import logging 

23import re 

24import uuid 

25from functools import partial 

26 

27import consul 

28import requests 

29 

30logger = logging.getLogger("discovery") 

31 

32 

33class DiscoveryError(RuntimeError): 

34 pass 

35 

36class DiscoveryConnectionError(RuntimeError): 

37 pass 

38 

39class DiscoveryServiceNotFoundError(RuntimeError): 

40 pass 

41 

42class DiscoveryKVEntryNotFoundError(RuntimeError): 

43 pass 

44 

45 

46def _wrap_consul_call(consul_func, *args, **kwargs): 

47 """Wrap Consul call to map errors""" 

48 try: 

49 return consul_func(*args, **kwargs) 

50 except requests.exceptions.ConnectionError as e: 

51 raise DiscoveryConnectionError(e) 

52 

53 

54def generate_service_component_name(service_component_type): 

55 """Generate service component id used to pass into the service component 

56 instance and used as the key to the service component configuration. 

57 

58 Updated for use with Kubernetes. Sometimes the service component name gets 

59 used in Kubernetes in contexts (such as naming a Kubernetes Service) that 

60 requires the name to conform to the RFC1035 DNS "label" syntax: 

61 -- starts with an alpha 

62 -- contains only of alphanumerics and "-" 

63 -- <= 63 characters long 

64 

65 Format: 

66 s<service component id>-<service component type>, 

67 truncated to 63 characters, "_" replaced with "-" in service_component_type, 

68 other non-conforming characters removed from service_component_type 

69 """ 

70 # Random generated 

71 # Copied from cdap plugin 

72 sct = re.sub('[^A-Za-z0-9-]','',(service_component_type.replace('_','-'))) 

73 return ("s{0}-{1}".format(str(uuid.uuid4()).replace("-",""),sct))[:63] 

74 

75 

76def create_kv_conn(host): 

77 """Create connection to key-value store 

78 

79 Returns a Consul client to the specified Consul host""" 

80 try: 

81 [hostname, port] = host.split(":") 

82 return consul.Consul(host=hostname, port=int(port)) 

83 except ValueError as e: 

84 return consul.Consul(host=host) 

85 

86def push_service_component_config(kv_conn, service_component_name, config): 

87 config_string = config if isinstance(config, str) else json.dumps(config) 

88 kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put) 

89 

90 if kv_put_func(service_component_name, config_string): 

91 logger.info("Added config for {0}".format(service_component_name)) 

92 else: 

93 raise DiscoveryError("Failed to push configuration") 

94 

95def remove_service_component_config(kv_conn, service_component_name): 

96 kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete) 

97 kv_delete_func(service_component_name) 

98 

99 

100def get_kv_value(kv_conn, key): 

101 """Get a key-value entry's value from Consul 

102 

103 Raises DiscoveryKVEntryNotFoundError if entry not found 

104 """ 

105 kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get) 

106 (index, val) = kv_get_func(key) 

107 

108 if val: 

109 return json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate 

110 else: 

111 raise DiscoveryKVEntryNotFoundError("{0} kv entry not found".format(key)) 

112 

113 

114def _create_rel_key(service_component_name): 

115 return "{0}:rel".format(service_component_name) 

116 

117def store_relationship(kv_conn, source_name, target_name): 

118 # TODO: Rel entry may already exist in a one-to-many situation. Need to 

119 # support that. 

120 rel_key = _create_rel_key(source_name) 

121 rel_value = [target_name] if target_name else [] 

122 

123 kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put) 

124 kv_put_func(rel_key, json.dumps(rel_value)) 

125 logger.info("Added relationship for {0}".format(rel_key)) 

126 

127def delete_relationship(kv_conn, service_component_name): 

128 rel_key = _create_rel_key(service_component_name) 

129 kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get) 

130 index, rels = kv_get_func(rel_key) 

131 

132 if rels: 

133 rels = json.loads(rels["Value"].decode("utf-8")) 

134 kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete) 

135 kv_delete_func(rel_key) 

136 return rels 

137 else: 

138 return [] 

139 

140def lookup_service(kv_conn, service_component_name): 

141 catalog_get_func = partial(_wrap_consul_call, kv_conn.catalog.service) 

142 index, results = catalog_get_func(service_component_name) 

143 

144 if results: 

145 return results 

146 else: 

147 raise DiscoveryServiceNotFoundError("Failed to find: {0}".format(service_component_name)) 

148 

149 

150# TODO: Note these functions have been (for the most part) shamelessly lifted from 

151# dcae-cli and should really be shared. 

152 

153def _is_healthy_pure(get_health_func, instance): 

154 """Checks to see if a component instance is running healthy 

155 

156 Pure function edition 

157 

158 Args 

159 ---- 

160 get_health_func: func(string) -> complex object 

161 Look at unittests in test_discovery to see examples 

162 instance: (string) fully qualified name of component instance 

163 

164 Returns 

165 ------- 

166 True if instance has been found and is healthy else False 

167 """ 

168 index, resp = get_health_func(instance) 

169 

170 if resp: 170 ↛ 176line 170 didn't jump to line 176, because the condition on line 170 was never false

171 def is_passing(instance): 

172 return all([check["Status"] == "passing" for check in instance["Checks"]]) 

173 

174 return any([is_passing(instance) for instance in resp]) 

175 else: 

176 return False 

177 

178def is_healthy(consul_host, instance): 

179 """Checks to see if a component instance is running healthy 

180 

181 Impure function edition 

182 

183 Args 

184 ---- 

185 consul_host: (string) host string of Consul 

186 instance: (string) fully qualified name of component instance 

187 

188 Returns 

189 ------- 

190 True if instance has been found and is healthy else False 

191 """ 

192 cons = create_kv_conn(consul_host) 

193 

194 get_health_func = partial(_wrap_consul_call, cons.health.service) 

195 return _is_healthy_pure(get_health_func, instance) 

196 

197 

198def add_to_entry(conn, key, add_name, add_value): 

199 """ 

200 Find 'key' in consul. 

201 Treat its value as a JSON string representing a dict. 

202 Extend the dict by adding an entry with key 'add_name' and value 'add_value'. 

203 Turn the resulting extended dict into a JSON string. 

204 Store the string back into Consul under 'key'. 

205 Watch out for conflicting concurrent updates. 

206 

207 Example: 

208 Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}' 

209 add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'}) 

210 should result in the value for key 'xyz:dmaap' in consul being updated to 

211 '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}' 

212 """ 

213 while True: # do until update succeeds 

214 (index, val) = conn.kv.get(key) # index gives version of key retrieved 

215 

216 if val is None: # no key yet 

217 vstring = '{}' 

218 mod_index = 0 # Use 0 as the cas index for initial insertion of the key 

219 else: 

220 vstring = val['Value'] 

221 mod_index = val['ModifyIndex'] 

222 

223 # Build the updated dict 

224 # Exceptions just propagate 

225 v = json.loads(vstring) 

226 v[add_name] = add_value 

227 new_vstring = json.dumps(v) 

228 

229 updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false 

230 if updated: 

231 return v 

232 

233 

234def _find_matching_services(services, name_search, tags): 

235 """Find matching services given search criteria""" 

236 tags = set(tags) 

237 return [srv_name for srv_name in services 

238 if name_search in srv_name and tags <= set(services[srv_name])] 

239 

240 

241def search_services(conn, name_search, tags): 

242 """Search for services that match criteria 

243 

244 Args: 

245 ----- 

246 name_search: (string) Name to search for as a substring 

247 tags: (list) List of strings that are tags. A service must match **all** the 

248 tags in the list. 

249 

250 Retruns: 

251 -------- 

252 List of names of services that matched 

253 """ 

254 # srvs is dict where key is service name and value is list of tags 

255 catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services) 

256 index, srvs = catalog_get_services_func() 

257 

258 if srvs: 

259 matches = _find_matching_services(srvs, name_search, tags) 

260 

261 if matches: 

262 return matches 

263 

264 raise DiscoveryServiceNotFoundError( 

265 "No matches found: {0}, {1}".format(name_search, tags)) 

266 else: 

267 raise DiscoveryServiceNotFoundError("No services found")