Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# org.onap.dcaegen2 

2# ============LICENSE_START==================================================== 

3# ============================================================================= 

4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. 

5# Copyright (c) 2020 Pantheon.tech. All rights reserved. 

6# ============================================================================= 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# ============LICENSE_END====================================================== 

19 

20""" 

21PostgreSQL plugin to manage passwords 

22""" 

23 

24from __future__ import print_function 

25import sys 

26import os 

27import re 

28import json 

29import hashlib 

30import socket 

31import traceback 

32import base64 

33import binascii 

34import collections 

35try: 

36 from urllib.parse import quote 

37except ImportError: 

38 from urllib import quote 

39 

40from cloudify import ctx 

41from cloudify.decorators import operation 

42from cloudify.exceptions import NonRecoverableError 

43from cloudify.exceptions import RecoverableError 

44 

45try: 

46 import psycopg2 

47except ImportError: 

48 # FIXME: any users of this plugin installing its dependencies in nonstandard 

49 # directories should set up PYTHONPATH accordingly, outside the program code 

50 SYSPATH = sys.path 

51 sys.path = list(SYSPATH) 

52 sys.path.append('/usr/lib64/python2.7/site-packages') 

53 import psycopg2 

54 sys.path = SYSPATH 

55 

56from pgaas.logginginterface import debug, info, warn, error 

57 

58 

59""" 

60 To set up a cluster: 

61 - https://$NEXUS/repository/raw/type_files/sshkeyshare/sshkey_types.yaml 

62 - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml 

63 sharedsshkey_pgrs: 

64 type: dcae.nodes.ssh.keypair 

65 pgaas_cluster: 

66 type: dcae.nodes.pgaas.cluster 

67 properties: 

68 writerfqdn: { get_input: k8s_pgaas_instance_fqdn } 

69 readerfqdn: { get_input: k8s_pgaas_instance_fqdn } 

70 # OR: 

71 # writerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '-write.', { get_input: location_domain } ] } 

72 # readerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '.', { get_input: location_domain } ] } 

73 relationships: 

74 - type: dcae.relationships.pgaas_cluster_uses_sshkeypair 

75 target: sharedsshkey_pgrs 

76 

77 To reference an existing cluster: 

78 - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml 

79 pgaas_cluster: 

80 type: dcae.nodes.pgaas.cluster 

81 properties: 

82 writerfqdn: { get_input: k8s_pgaas_instance_fqdn } 

83 # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', 

84 # { get_input: pgaas_cluster_name }, '-write.', 

85 # { get_input: location_domain } ] } 

86 # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] } 

87 use_existing: true 

88 

89 To initialize an existing server to be managed by pgaas_plugin:: 

90 - https://$NEXUS/repository/raw/type_files/sshkeyshare/sshkey_types.yaml 

91 - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml 

92 pgaas_cluster: 

93 type: dcae.nodes.pgaas.cluster 

94 properties: 

95 writerfqdn: { get_input: k8s_pgaas_instance_fqdn } 

96 readerfqdn: { get_input: k8s_pgaas_instance_fqdn } 

97 # OR: 

98 # writerfqdn: { concat: [ { get_input: location_prefix }, '-', 

99 # { get_input: pgaas_cluster_name }, '-write.', 

100 # { get_input: location_domain } ] } 

101 # readerfqdn: { concat: [ { get_input: location_prefix }, '-', 

102 # { get_input: pgaas_cluster_name }, '.', 

103 # { get_input: location_domain } ] } 

104 initialpassword: { get_input: currentpassword } 

105 relationships: 

106 - type: dcae.relationships.pgaas_cluster_uses_sshkeypair 

107 target: sharedsshkey_pgrs 

108 

109 - { get_attribute: [ pgaas_cluster, public ] } 

110 - { get_attribute: [ pgaas_cluster, base64private ] } 

111 # - { get_attribute: [ pgaas_cluster, postgrespswd ] } 

112 

113 

114 To set up a database: 

115 - http://$NEXUS/raw/type_files/pgaas_types.yaml 

116 pgaasdbtest: 

117 type: dcae.nodes.pgaas.database 

118 properties: 

119 writerfqdn: { get_input: k8s_pgaas_instance_fqdn } 

120 # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', 

121 # { get_input: pgaas_cluster_name }, '-write.', 

122 # { get_input: location_domain } ] } 

123 # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] } 

124 name: { get_input: database_name } 

125 

126 To reference an existing database: 

127 - http://$NEXUS/raw/type_files/pgaas_types.yaml 

128 $CLUSTER_$DBNAME: 

129 type: dcae.nodes.pgaas.database 

130 properties: 

131 writerfqdn: { get_input: k8s_pgaas_instance_fqdn } 

132 # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', 

133 # { get_input: pgaas_cluster_name }, '-write.', 

134 # { get_input: location_domain } ] } 

135 # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] } 

136 name: { get_input: database_name } 

137 use_existing: true 

138 

139 $CLUSTER_$DBNAME_admin_host: 

140 description: Hostname for $CLUSTER $DBNAME database 

141 value: { get_attribute: [ $CLUSTER_$DBNAME, admin, host ] } 

142 $CLUSTER_$DBNAME_admin_user: 

143 description: Admin Username for $CLUSTER $DBNAME database 

144 value: { get_attribute: [ $CLUSTER_$DBNAME, admin, user ] } 

145 $CLUSTER_$DBNAME_admin_password: 

146 description: Admin Password for $CLUSTER $DBNAME database 

147 value: { get_attribute: [ $CLUSTER_$DBNAME, admin, password ] } 

148 $CLUSTER_$DBNAME_user_host: 

149 description: Hostname for $CLUSTER $DBNAME database 

150 value: { get_attribute: [ $CLUSTER_$DBNAME, user, host ] } 

151 $CLUSTER_$DBNAME_user_user: 

152 description: User Username for $CLUSTER $DBNAME database 

153 value: { get_attribute: [ $CLUSTER_$DBNAME, user, user ] } 

154 $CLUSTER_$DBNAME_user_password: 

155 description: User Password for $CLUSTER $DBNAME database 

156 value: { get_attribute: [ $CLUSTER_$DBNAME, user, password ] } 

157 $CLUSTER_$DBNAME_viewer_host: 

158 description: Hostname for $CLUSTER $DBNAME database 

159 value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, host ] } 

160 $CLUSTER_$DBNAME_viewer_user: 

161 description: Viewer Username for $CLUSTER $DBNAME database 

162 value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, user ] } 

163 $CLUSTER_$DBNAME_viewer_password: 

164 description: Viewer Password for $CLUSTER $DBNAME database 

165 value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, password ] } 

166 

167""" 

168 

169OPT_MANAGER_RESOURCES_PGAAS = "/opt/manager/resources/pgaas" 

170 

171# pylint: disable=invalid-name 

172def setOptManagerResources(o): # pylint: disable=global-statement 

173 """ 

174 Overrides the default locations of /opt/managers/resources 

175 """ 

176 # pylint: disable=global-statement 

177 global OPT_MANAGER_RESOURCES_PGAAS 

178 OPT_MANAGER_RESOURCES_PGAAS = "{}/pgaas".format(o) 

179 

180def safestr(s): 

181 """ 

182 returns a safely printable version of the string 

183 """ 

184 return quote(str(s), '') 

185 

186def raiseRecoverableError(msg): 

187 """ 

188 Print a warning message and raise a RecoverableError exception. 

189 This is a handy endpoint to add other extended debugging calls. 

190 """ 

191 warn(msg) 

192 raise RecoverableError(msg) 

193 

194def raiseNonRecoverableError(msg): 

195 """ 

196 Print an error message and raise a NonRecoverableError exception. 

197 This is a handy endpoint to add other extended debugging calls. 

198 """ 

199 error(msg) 

200 raise NonRecoverableError(msg) 

201 

202def dbexecute(crx, cmd, args=None): 

203 """ 

204 executes the SQL statement 

205 Prints the entire command for debugging purposes 

206 """ 

207 debug("executing {}".format(cmd)) 

208 crx.execute(cmd, args) 

209 

210 

211def dbexecute_trunc_print(crx, cmd, args=None): 

212 """ 

213 executes the SQL statement. 

214 Will print only the first 30 characters in the command 

215 Use this function if you are executing an SQL cmd with a password 

216 """ 

217 debug("executing {}".format(cmd[:30])) 

218 crx.execute(cmd, args) 

219 

220 

221def waithp(host, port): 

222 """ 

223 do a test connection to a host and port 

224 """ 

225 debug("waithp({0},{1})".format(safestr(host), safestr(port))) 

226 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

227 try: 

228 sock.connect((host, int(port))) 

229 except: # pylint: disable=bare-except 

230 a, b, c = sys.exc_info() 

231 traceback.print_exception(a, b, c) 

232 sock.close() 

233 raiseRecoverableError('Server at {0}:{1} is not ready'.format(safestr(host), safestr(port))) 

234 sock.close() 

235 

236def doconn(desc): 

237 """ 

238 open an SQL connection to the PG server 

239 """ 

240 debug("doconn({},{},{})".format(desc['host'], desc['user'], desc['database'])) 

241 # debug("doconn({},{},{},{})".format(desc['host'], desc['user'], desc['database'], desc['password'])) 

242 ret = psycopg2.connect(**desc) 

243 ret.autocommit = True 

244 return ret 

245 

246def hostportion(hostport): 

247 """ 

248 return the host portion of a fqdn:port or IPv4:port or [IPv6]:port 

249 """ 

250 ipv4re = re.match(r"^([^:]+)(:(\d+))?", hostport) 

251 ipv6re = re.match(r"^[[]([^]]+)[]](:(\d+))?", hostport) 

252 if ipv4re: 

253 return ipv4re.group(1) 

254 if ipv6re: 

255 return ipv6re.group(1) 

256 raiseNonRecoverableError("invalid hostport: {}".format(hostport)) 

257 

258def portportion(hostport): 

259 """ 

260 Return the port portion of a fqdn:port or IPv4:port or [IPv6]:port. 

261 If port is not present, return 5432. 

262 """ 

263 ipv6re = re.match(r"^[[]([^]]+)[]](:(\d+))?", hostport) 

264 ipv4re = re.match(r"^([^:]+)(:(\d+))?", hostport) 

265 if ipv4re: 

266 return ipv4re.group(3) if ipv4re.group(3) else '5432' 

267 if ipv6re: 

268 return ipv6re.group(3) if ipv6re.group(3) else '5432' 

269 raiseNonRecoverableError("invalid hostport: {}".format(hostport)) 

270 

271def rootdesc(data, dbname, initialpassword=None): 

272 """ 

273 return the postgres connection information 

274 """ 

275 debug("rootdesc(..data..,{0})".format(safestr(dbname))) 

276 # pylint: disable=bad-continuation 

277 return { 

278 'database': dbname, 

279 'host': hostportion(data['rw']), 

280 'port': portportion(data['rw']), 

281 'user': 'postgres', 

282 'password': initialpassword if initialpassword else getpass(data, 'postgres', data['rw'], 'postgres') 

283 } 

284 

285def rootconn(data, dbname='postgres', initialpassword=None): 

286 """ 

287 connect to a given server as postgres, 

288 connecting to the specified database 

289 """ 

290 debug("rootconn(..data..,{0})".format(safestr(dbname))) 

291 return doconn(rootdesc(data, dbname, initialpassword)) 

292 

293def onedesc(data, dbname, role, access): 

294 """ 

295 return the connection information for a given user and dbname on a cluster 

296 """ 

297 user = '{0}_{1}'.format(dbname, role) 

298 # pylint: disable=bad-continuation 

299 return { 

300 'database': dbname, 

301 'host': hostportion(data[access]), 

302 'port': portportion(data[access]), 

303 'user': user, 

304 'password': getpass(data, user, data['rw'], dbname) 

305 } 

306 

307def dbdescs(data, dbname): 

308 """ 

309 return the entire set of information for a specific server/database 

310 """ 

311 # pylint: disable=bad-continuation 

312 return { 

313 'admin': onedesc(data, dbname, 'admin', 'rw'), 

314 'user': onedesc(data, dbname, 'user', 'rw'), 

315 'viewer': onedesc(data, dbname, 'viewer', 'ro') 

316 } 

317 

318def getpass(data, ident, hostport, dbname): 

319 """ 

320 generate the password for a given user on a specific server 

321 """ 

322 m = hashlib.sha256() 

323 m.update(ident.encode()) 

324 

325 # mix in the seed (the last line) for that database, if one exists 

326 hostport = hostport.lower() 

327 dbname = dbname.lower() 

328 hostPortDbname = '{0}/{1}:{2}'.format(OPT_MANAGER_RESOURCES_PGAAS, hostport, dbname) 

329 try: 

330 lastLine = '' 

331 with open(hostPortDbname, "r") as fp: 

332 for line in fp: 

333 lastLine = line 

334 m.update(lastLine.encode()) 

335 except IOError: 

336 pass 

337 

338 m.update(base64.b64decode(data['data'])) 

339 return m.hexdigest() 

340 

341def find_related_nodes(reltype, inst=None): 

342 """ 

343 extract the related_nodes information from the context 

344 for a specific relationship 

345 """ 

346 if inst is None: 

347 inst = ctx.instance 

348 ret = [] 

349 for rel in inst.relationships: 

350 if reltype in rel.type_hierarchy: 

351 ret.append(rel.target) 

352 return ret 

353 

354def chkfqdn(fqdn): 

355 """ 

356 verify that a FQDN is valid 

357 """ 

358 if fqdn is None: 

359 return False 

360 hp = hostportion(fqdn) 

361 # not needed right now: pp = portportion(fqdn) 

362 # TODO need to augment this for IPv6 addresses 

363 return re.match('^[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$', hp) is not None 

364 

365def chkdbname(dbname): 

366 """ 

367 verify that a database name is valid 

368 """ 

369 ret = re.match('[a-zA-Z][a-zA-Z0-9]{0,43}', dbname) is not None and dbname != 'postgres' 

370 if not ret: 

371 warn("Invalid dbname: {0}".format(safestr(dbname))) 

372 return ret 

373 

374def get_valid_domains(): 

375 """ 

376 Return a list of the valid names, suitable for inclusion in an error message. 

377 """ 

378 msg = '' 

379 import glob 

380 validDomains = [] 

381 for f in glob.glob('{}/*'.format(OPT_MANAGER_RESOURCES_PGAAS)): 

382 try: 

383 with open(f, "r") as fp: 

384 try: 

385 tmpdata = json.load(fp) 

386 if 'pubkey' in tmpdata: 

387 validDomains.append(os.path.basename(f)) 

388 except: # pylint: disable=bare-except 

389 pass 

390 except: # pylint: disable=bare-except 

391 pass 

392 if len(validDomains) == 0: 

393 msg += '\nNo valid PostgreSQL cluster information was found' 

394 else: 

395 msg += '\nThese are the valid PostgreSQL cluster domains found on this manager:' 

396 for v in validDomains: 

397 msg += '\n\t"{}"'.format(v) 

398 return msg 

399 

400def get_existing_clusterinfo(wfqdn, rfqdn, related): 

401 """ 

402 Retrieve all of the information specific to an existing cluster. 

403 """ 

404 if rfqdn != '': 

405 raiseNonRecoverableError('Read-only FQDN must not be specified when using an existing cluster, fqdn={0}'.format(safestr(rfqdn))) 

406 if len(related) != 0: 

407 raiseNonRecoverableError('Cluster SSH keypair must not be specified when using an existing cluster') 

408 try: 

409 fn = '{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn.lower()) 

410 with open(fn, 'r') as f: 

411 data = json.load(f) 

412 data['rw'] = wfqdn 

413 return data 

414 except Exception as e: # pylint: disable=broad-except 

415 warn("Error: {0}".format(e)) 

416 msg = 'Cluster must be deployed when using an existing cluster.\nCheck your domain name: fqdn={0}\nerr={1}'.format(safestr(wfqdn), e) 

417 if not os.path.isdir(OPT_MANAGER_RESOURCES_PGAAS): 

418 msg += '\nThe directory {} does not exist. No PostgreSQL clusters have been deployed on this manager.'.format(OPT_MANAGER_RESOURCES_PGAAS) 

419 else: 

420 msg += get_valid_domains() 

421 # warn("Stack: {0}".format(traceback.format_exc())) 

422 raiseNonRecoverableError(msg) 

423 

424def getclusterinfo(wfqdn, reuse, rfqdn, initialpassword, related): 

425 """ 

426 Retrieve all of the information specific to a cluster. 

427 if reuse, retrieve it 

428 else create and store it 

429 """ 

430 # debug("getclusterinfo({}, {}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn), safestr(initialpassword))) 

431 debug("getclusterinfo({}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn))) 

432 if not chkfqdn(wfqdn): 

433 raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn))) 

434 if reuse: 

435 return get_existing_clusterinfo(wfqdn, rfqdn, related) 

436 

437 if rfqdn == '': 

438 rfqdn = wfqdn 

439 elif not chkfqdn(rfqdn): 

440 raiseNonRecoverableError('Invalid FQDN specified for read-only access, fqdn={0}'.format(safestr(rfqdn))) 

441 if len(related) != 1: 

442 raiseNonRecoverableError('Cluster SSH keypair must be specified using a dcae.relationships.pgaas_cluster_uses_sshkeypair ' + 

443 'relationship to a dcae.nodes.sshkeypair node') 

444 data = {'ro': rfqdn, 'pubkey': related[0].instance.runtime_properties['public'], 

445 'data': related[0].instance.runtime_properties['base64private'], 'hash': 'sha256'} 

446 os.umask(0o77) 

447 try: 

448 os.makedirs('{0}'.format(OPT_MANAGER_RESOURCES_PGAAS)) 

449 except: # pylint: disable=bare-except 

450 pass 

451 try: 

452 with open('{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn.lower()), 'w') as f: 

453 f.write(json.dumps(data)) 

454 except Exception as e: # pylint: disable=broad-except 

455 warn("Error: {0}".format(e)) 

456 warn("Stack: {0}".format(traceback.format_exc())) 

457 raiseNonRecoverableError('Cannot write cluster information to {0}: fqdn={1}, err={2}'.format(OPT_MANAGER_RESOURCES_PGAAS, safestr(wfqdn), e)) 

458 data['rw'] = wfqdn 

459 if initialpassword: 

460 with rootconn(data, initialpassword=initialpassword) as conn: 

461 crr = conn.cursor() 

462 dbexecute_trunc_print(crr, "ALTER USER postgres WITH PASSWORD %s", (getpass(data, 'postgres', wfqdn, 'postgres'),)) 

463 crr.close() 

464 return data 

465 

466@operation 

467def add_pgaas_cluster(**kwargs): # pylint: disable=unused-argument 

468 """ 

469 dcae.nodes.pgaas.cluster: 

470 Record key generation data for cluster 

471 """ 

472 try: 

473 warn("add_pgaas_cluster() invoked") 

474 data = getclusterinfo(ctx.node.properties['writerfqdn'], 

475 ctx.node.properties['use_existing'], 

476 ctx.node.properties['readerfqdn'], 

477 ctx.node.properties['initialpassword'], 

478 find_related_nodes('dcae.relationships.pgaas_cluster_uses_sshkeypair')) 

479 ctx.instance.runtime_properties['public'] = data['pubkey'] 

480 ctx.instance.runtime_properties['base64private'] = data['data'] 

481 ctx.instance.runtime_properties['postgrespswd'] = getpass(data, 'postgres', ctx.node.properties['writerfqdn'], 'postgres') 

482 warn('All done') 

483 except Exception as e: # pylint: disable=broad-except 

484 ctx.logger.warn("Error: {0}".format(e)) 

485 ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) 

486 raise e 

487 

488@operation 

489def rm_pgaas_cluster(**kwargs): # pylint: disable=unused-argument 

490 """ 

491 dcae.nodes.pgaas.cluster: 

492 Remove key generation data for cluster 

493 """ 

494 try: 

495 warn("rm_pgaas_cluster()") 

496 wfqdn = ctx.node.properties['writerfqdn'] 

497 if chkfqdn(wfqdn) and not ctx.node.properties['use_existing']: 

498 os.remove('{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn)) 

499 warn('All done') 

500 except Exception as e: # pylint: disable=broad-except 

501 ctx.logger.warn("Error: {0}".format(e)) 

502 ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) 

503 raise e 

504 

505def dbgetinfo(refctx): 

506 """ 

507 Get the data associated with a database. 

508 Make sure the connection exists. 

509 """ 

510 wfqdn = refctx.node.properties['writerfqdn'] 

511 related = find_related_nodes('dcae.relationships.database_runson_pgaas_cluster', refctx.instance) 

512 if wfqdn == '': 

513 if len(related) != 1: 

514 raiseNonRecoverableError('Database Cluster must be specified using exactly one dcae.relationships.database_runson_pgaas_cluster relationship ' + 

515 'to a dcae.nodes.pgaas.cluster node when writerfqdn is not specified') 

516 wfqdn = related[0].node.properties['writerfqdn'] 

517 return dbgetinfo_for_update(wfqdn) 

518 

519def dbgetinfo_for_update(wfqdn): 

520 """ 

521 Get the data associated with a database. 

522 Make sure the connection exists. 

523 """ 

524 

525 if not chkfqdn(wfqdn): 

526 raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn))) 

527 ret = getclusterinfo(wfqdn, True, '', '', []) 

528 waithp(hostportion(wfqdn), portportion(wfqdn)) 

529 return ret 

530 

531@operation 

532def create_database(**kwargs): 

533 """ 

534 dcae.nodes.pgaas.database: 

535 Create a database on a cluster 

536 """ 

537 try: 

538 debug("create_database() invoked") 

539 dbname = ctx.node.properties['name'] 

540 warn("create_database({0})".format(safestr(dbname))) 

541 if not chkdbname(dbname): 

542 raiseNonRecoverableError('Unacceptable or missing database name: {0}'.format(safestr(dbname))) 

543 debug('create_database(): dbname checked out') 

544 dbinfo = dbgetinfo(ctx) 

545 debug('Got db server info') 

546 descs = dbdescs(dbinfo, dbname) 

547 ctx.instance.runtime_properties['admin'] = descs['admin'] 

548 ctx.instance.runtime_properties['user'] = descs['user'] 

549 ctx.instance.runtime_properties['viewer'] = descs['viewer'] 

550 with rootconn(dbinfo) as conn: 

551 crx = conn.cursor() 

552 dbexecute(crx, 'SELECT datname FROM pg_database WHERE datistemplate = false') 

553 existingdbs = [x[0] for x in crx] 

554 if ctx.node.properties['use_existing']: 

555 if dbname not in existingdbs: 

556 raiseNonRecoverableError('use_existing specified but database does not exist, dbname={0}'.format(safestr(dbname))) 

557 return 

558 dbexecute(crx, 'SELECT rolname FROM pg_roles') 

559 existingroles = [x[0] for x in crx] 

560 admu = descs['admin']['user'] 

561 usru = descs['user']['user'] 

562 vwru = descs['viewer']['user'] 

563 cusr = '{0}_common_user_role'.format(dbname) 

564 cvwr = '{0}_common_viewer_role'.format(dbname) 

565 schm = '{0}_db_common'.format(dbname) 

566 if admu not in existingroles: 

567 dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(admu), (descs['admin']['password'],)) 

568 if usru not in existingroles: 

569 dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(usru), (descs['user']['password'],)) 

570 if vwru not in existingroles: 

571 dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(vwru), (descs['viewer']['password'],)) 

572 if cusr not in existingroles: 

573 dbexecute(crx, 'CREATE ROLE {0}'.format(cusr)) 

574 if cvwr not in existingroles: 

575 dbexecute(crx, 'CREATE ROLE {0}'.format(cvwr)) 

576 if dbname not in existingdbs: 

577 dbexecute(crx, 'CREATE DATABASE {0} WITH OWNER {1}'.format(dbname, admu)) 

578 crx.close() 

579 with rootconn(dbinfo, dbname) as dbconn: 

580 crz = dbconn.cursor() 

581 for r in [cusr, cvwr, usru, vwru]: 

582 dbexecute(crz, 'REVOKE ALL ON DATABASE {0} FROM {1}'.format(dbname, r)) 

583 dbexecute(crz, 'GRANT {0} TO {1}'.format(cvwr, cusr)) 

584 dbexecute(crz, 'GRANT {0} TO {1}'.format(cusr, admu)) 

585 dbexecute(crz, 'GRANT CONNECT ON DATABASE {0} TO {1}'.format(dbname, cvwr)) 

586 dbexecute(crz, 'CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}'.format(schm, admu)) 

587 for r in [admu, cusr, cvwr, usru, vwru]: 

588 dbexecute(crz, 'ALTER ROLE {0} IN DATABASE {1} SET search_path = public, {2}'.format(r, dbname, schm)) 

589 dbexecute(crz, 'GRANT USAGE ON SCHEMA {0} to {1}'.format(schm, cvwr)) 

590 dbexecute(crz, 'GRANT CREATE ON SCHEMA {0} to {1}'.format(schm, admu)) 

591 dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT SELECT ON TABLES TO {1}'.format(admu, cvwr)) 

592 dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT INSERT, UPDATE, DELETE, TRUNCATE ON TABLES TO {1}'.format(admu, cusr)) 

593 dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO {1}'.format(admu, cusr)) 

594 dbexecute(crz, 'GRANT TEMP ON DATABASE {0} TO {1}'.format(dbname, cusr)) 

595 dbexecute(crz, 'GRANT {0} to {1}'.format(cusr, usru)) 

596 dbexecute(crz, 'GRANT {0} to {1}'.format(cvwr, vwru)) 

597 crz.close() 

598 warn('All done') 

599 except Exception as e: # pylint: disable=broad-except 

600 ctx.logger.warn("Error: {0}".format(e)) 

601 ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) 

602 raise e 

603 

604@operation 

605def delete_database(**kwargs): # pylint: disable=unused-argument 

606 """ 

607 dcae.nodes.pgaas.database: 

608 Delete a database from a cluster 

609 """ 

610 try: 

611 debug("delete_database() invoked") 

612 dbname = ctx.node.properties['name'] 

613 warn("delete_database({0})".format(safestr(dbname))) 

614 if not chkdbname(dbname): 

615 return 

616 debug('delete_database(): dbname checked out') 

617 if ctx.node.properties['use_existing']: 

618 return 

619 debug('delete_database(): !use_existing') 

620 dbinfo = dbgetinfo(ctx) 

621 debug('Got db server info') 

622 with rootconn(dbinfo) as conn: 

623 crx = conn.cursor() 

624 admu = ctx.instance.runtime_properties['admin']['user'] 

625 usru = ctx.instance.runtime_properties['user']['user'] 

626 vwru = ctx.instance.runtime_properties['viewer']['user'] 

627 cusr = '{0}_common_user_role'.format(dbname) 

628 cvwr = '{0}_common_viewer_role'.format(dbname) 

629 dbexecute(crx, 'DROP DATABASE IF EXISTS {0}'.format(dbname)) 

630 for r in [usru, vwru, admu, cusr, cvwr]: 

631 dbexecute(crx, 'DROP ROLE IF EXISTS {0}'.format(r)) 

632 warn('All gone') 

633 except Exception as e: # pylint: disable=broad-except 

634 ctx.logger.warn("Error: {0}".format(e)) 

635 ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) 

636 raise e 

637 

638############################################################# 

639# function: update_database # 

640# Purpose: Called as a workflow to change the database # 

641# passwords for all the users # 

642# # 

643# Invoked via: # 

644# cfy executions start -d <deployment-id> update_db_passwd # 

645# # 

646# Assumptions: # 

647# 1) pgaas_types.yaml must define a work flow e.g. # 

648# workflows: # 

649# update_db_passwd : # 

650# mapping : pgaas.pgaas.pgaas_plugin.update_database # 

651# 2) DB Blueprint: node_template must have properties: # 

652# writerfqdn & name (of DB) # 

653############################################################# 

654# pylint: disable=unused-argument 

655@operation 

656def update_database(refctx, **kwargs): 

657 """ 

658 dcae.nodes.pgaas.database: 

659 Update the password for a database from a cluster 

660 refctx is auto injected into the function when called as a workflow 

661 """ 

662 try: 

663 debug("update_database() invoked") 

664 

665 ################################################ 

666 # Verify refctx contains the <nodes> attribute. # 

667 # The workflow context might not be consistent # 

668 # across different cloudify versions # 

669 ################################################ 

670 if not hasattr(refctx, 'nodes'): 

671 raiseNonRecoverableError('workflow context does not contain attribute=<nodes>. dir(refctx)={}'.format(dir(refctx))) 

672 

673 ############################################ 

674 # Verify that refctx.nodes is iterable # 

675 ############################################ 

676 if not isinstance(refctx.nodes, collections.Iterable): 

677 raiseNonRecoverableError("refctx.nodes is not an iterable. Type={}".format(type(refctx.nodes))) 

678 

679 ctx_node = None 

680 ############################################## 

681 # Iterate through the nodes until we find # 

682 # one with the properties we are looking for # 

683 ############################################## 

684 for i in refctx.nodes: 

685 

686 ############################################ 

687 # Safeguard: If a given node doesn't have # 

688 # properties then skip it. # 

689 # Don't cause an exception since the nodes # 

690 # entry we are searching might still exist # 

691 ############################################ 

692 if not hasattr(i, 'properties'): 

693 warn('Encountered a ctx node that does not have attr=<properties>. dir={}'.format(dir(i))) 

694 continue 

695 

696 debug("ctx node has the following Properties: {}".format(list(i.properties.keys()))) 

697 

698 if ('name' in i.properties) and ('writerfqdn' in i.properties): 

699 ctx_node = i 

700 break 

701 

702 

703 ############################################### 

704 # If none of the nodes have properties: # 

705 # <name> and <writerfqdn> then fatal error # 

706 ############################################### 

707 if not ctx_node: 

708 raiseNonRecoverableError('Either <name> or <writerfqdn> is not found in refctx.nodes.properties.') 

709 

710 debug("name is {}".format(ctx_node.properties['name'])) 

711 debug("host is {}".format(ctx_node.properties['writerfqdn'])) 

712 

713 dbname = ctx_node.properties['name'] 

714 debug("update_database({0})".format(safestr(dbname))) 

715 

716 ########################### 

717 # dbname must be valid # 

718 ########################### 

719 if not chkdbname(dbname): 

720 raiseNonRecoverableError('dbname is null') 

721 

722 

723 hostport = ctx_node.properties['writerfqdn'] 

724 debug('update_database(): wfqdn={}'.format(hostport)) 

725 dbinfo = dbgetinfo_for_update(hostport) 

726 

727 #debug('Got db server info={}'.format(dbinfo)) 

728 

729 hostPortDbname = '{0}/{1}:{2}'.format(OPT_MANAGER_RESOURCES_PGAAS, hostport.lower(), dbname.lower()) 

730 

731 debug('update_database(): hostPortDbname={}'.format(hostPortDbname)) 

732 try: 

733 appended = False 

734 with open(hostPortDbname, "a") as fp: 

735 with open("/dev/urandom", "rb") as rp: 

736 b = rp.read(16) 

737 print(binascii.hexlify(b).decode('utf-8'), file=fp) 

738 appended = True 

739 if not appended: 

740 ctx.logger.warn("Error: the password for {} {} was not successfully changed".format(hostport, dbname)) 

741 except Exception as e: # pylint: disable=broad-except 

742 ctx.logger.warn("Error: {0}".format(e)) 

743 ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) 

744 raise e 

745 

746 descs = dbdescs(dbinfo, dbname) 

747 

748 ########################################## 

749 # Verify we have expected keys # 

750 # <admin>, <user>, and <viewer> as well # 

751 # as "sub-key" <user> # 

752 ########################################## 

753 

754 if not isinstance(descs, dict): 

755 raiseNonRecoverableError('db descs has unexpected type=<{}> was expected type dict'.format(type(descs))) 

756 

757 for key in ("admin", "user", "viewer"): 

758 if key not in descs: 

759 raiseNonRecoverableError('db descs does not contain key=<{}>. Keys found for descs are: {}'.format(key, list(descs.keys()))) 

760 if 'user' not in descs[key]: 

761 raiseNonRecoverableError('db descs[{}] does not contain key=<user>. Keys found for descs[{}] are: {}'.format(key, key, list(descs[key].keys()))) 

762 

763 

764 with rootconn(dbinfo) as conn: 

765 crx = conn.cursor() 

766 

767 admu = descs['admin']['user'] 

768 usru = descs['user']['user'] 

769 vwru = descs['viewer']['user'] 

770 

771 for r in [usru, vwru, admu]: 

772 dbexecute_trunc_print(crx, "ALTER USER {} WITH PASSWORD '{}'".format(r, getpass(dbinfo, r, hostport, dbname))) 

773 #debug("user={} password={}".format(r, getpass(dbinfo, r, hostport, dbname))) 

774 

775 warn('All users updated for database {}'.format(dbname)) 

776 except Exception as e: # pylint: disable=broad-except 

777 ctx.logger.warn("Error: {0}".format(e)) 

778 ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) 

779 raise e