Coverage for pgaas/pgaas_plugin.py : 81%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# org.onap.dcaegen2
2# ============LICENSE_START====================================================
3# =============================================================================
4# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5# Copyright (c) 2020 Pantheon.tech. All rights reserved.
6# =============================================================================
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18# ============LICENSE_END======================================================
20"""
21PostgreSQL plugin to manage passwords
22"""
24from __future__ import print_function
25import sys
26import os
27import re
28import json
29import hashlib
30import socket
31import traceback
32import base64
33import binascii
34import collections
35try:
36 from urllib.parse import quote
37except ImportError:
38 from urllib import quote
40from cloudify import ctx
41from cloudify.decorators import operation
42from cloudify.exceptions import NonRecoverableError
43from cloudify.exceptions import RecoverableError
45try:
46 import psycopg2
47except ImportError:
48 # FIXME: any users of this plugin installing its dependencies in nonstandard
49 # directories should set up PYTHONPATH accordingly, outside the program code
50 SYSPATH = sys.path
51 sys.path = list(SYSPATH)
52 sys.path.append('/usr/lib64/python2.7/site-packages')
53 import psycopg2
54 sys.path = SYSPATH
56from pgaas.logginginterface import debug, info, warn, error
59"""
60 To set up a cluster:
61 - https://$NEXUS/repository/raw/type_files/sshkeyshare/sshkey_types.yaml
62 - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml
63 sharedsshkey_pgrs:
64 type: dcae.nodes.ssh.keypair
65 pgaas_cluster:
66 type: dcae.nodes.pgaas.cluster
67 properties:
68 writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
69 readerfqdn: { get_input: k8s_pgaas_instance_fqdn }
70 # OR:
71 # writerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '-write.', { get_input: location_domain } ] }
72 # readerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '.', { get_input: location_domain } ] }
73 relationships:
74 - type: dcae.relationships.pgaas_cluster_uses_sshkeypair
75 target: sharedsshkey_pgrs
77 To reference an existing cluster:
78 - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml
79 pgaas_cluster:
80 type: dcae.nodes.pgaas.cluster
81 properties:
82 writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
83 # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-',
84 # { get_input: pgaas_cluster_name }, '-write.',
85 # { get_input: location_domain } ] }
86 # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] }
87 use_existing: true
89 To initialize an existing server to be managed by pgaas_plugin::
90 - https://$NEXUS/repository/raw/type_files/sshkeyshare/sshkey_types.yaml
91 - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml
92 pgaas_cluster:
93 type: dcae.nodes.pgaas.cluster
94 properties:
95 writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
96 readerfqdn: { get_input: k8s_pgaas_instance_fqdn }
97 # OR:
98 # writerfqdn: { concat: [ { get_input: location_prefix }, '-',
99 # { get_input: pgaas_cluster_name }, '-write.',
100 # { get_input: location_domain } ] }
101 # readerfqdn: { concat: [ { get_input: location_prefix }, '-',
102 # { get_input: pgaas_cluster_name }, '.',
103 # { get_input: location_domain } ] }
104 initialpassword: { get_input: currentpassword }
105 relationships:
106 - type: dcae.relationships.pgaas_cluster_uses_sshkeypair
107 target: sharedsshkey_pgrs
109 - { get_attribute: [ pgaas_cluster, public ] }
110 - { get_attribute: [ pgaas_cluster, base64private ] }
111 # - { get_attribute: [ pgaas_cluster, postgrespswd ] }
114 To set up a database:
115 - http://$NEXUS/raw/type_files/pgaas_types.yaml
116 pgaasdbtest:
117 type: dcae.nodes.pgaas.database
118 properties:
119 writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
120 # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-',
121 # { get_input: pgaas_cluster_name }, '-write.',
122 # { get_input: location_domain } ] }
123 # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] }
124 name: { get_input: database_name }
126 To reference an existing database:
127 - http://$NEXUS/raw/type_files/pgaas_types.yaml
128 $CLUSTER_$DBNAME:
129 type: dcae.nodes.pgaas.database
130 properties:
131 writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
132 # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-',
133 # { get_input: pgaas_cluster_name }, '-write.',
134 # { get_input: location_domain } ] }
135 # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] }
136 name: { get_input: database_name }
137 use_existing: true
139 $CLUSTER_$DBNAME_admin_host:
140 description: Hostname for $CLUSTER $DBNAME database
141 value: { get_attribute: [ $CLUSTER_$DBNAME, admin, host ] }
142 $CLUSTER_$DBNAME_admin_user:
143 description: Admin Username for $CLUSTER $DBNAME database
144 value: { get_attribute: [ $CLUSTER_$DBNAME, admin, user ] }
145 $CLUSTER_$DBNAME_admin_password:
146 description: Admin Password for $CLUSTER $DBNAME database
147 value: { get_attribute: [ $CLUSTER_$DBNAME, admin, password ] }
148 $CLUSTER_$DBNAME_user_host:
149 description: Hostname for $CLUSTER $DBNAME database
150 value: { get_attribute: [ $CLUSTER_$DBNAME, user, host ] }
151 $CLUSTER_$DBNAME_user_user:
152 description: User Username for $CLUSTER $DBNAME database
153 value: { get_attribute: [ $CLUSTER_$DBNAME, user, user ] }
154 $CLUSTER_$DBNAME_user_password:
155 description: User Password for $CLUSTER $DBNAME database
156 value: { get_attribute: [ $CLUSTER_$DBNAME, user, password ] }
157 $CLUSTER_$DBNAME_viewer_host:
158 description: Hostname for $CLUSTER $DBNAME database
159 value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, host ] }
160 $CLUSTER_$DBNAME_viewer_user:
161 description: Viewer Username for $CLUSTER $DBNAME database
162 value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, user ] }
163 $CLUSTER_$DBNAME_viewer_password:
164 description: Viewer Password for $CLUSTER $DBNAME database
165 value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, password ] }
167"""
169OPT_MANAGER_RESOURCES_PGAAS = "/opt/manager/resources/pgaas"
171# pylint: disable=invalid-name
172def setOptManagerResources(o): # pylint: disable=global-statement
173 """
174 Overrides the default locations of /opt/managers/resources
175 """
176 # pylint: disable=global-statement
177 global OPT_MANAGER_RESOURCES_PGAAS
178 OPT_MANAGER_RESOURCES_PGAAS = "{}/pgaas".format(o)
180def safestr(s):
181 """
182 returns a safely printable version of the string
183 """
184 return quote(str(s), '')
186def raiseRecoverableError(msg):
187 """
188 Print a warning message and raise a RecoverableError exception.
189 This is a handy endpoint to add other extended debugging calls.
190 """
191 warn(msg)
192 raise RecoverableError(msg)
194def raiseNonRecoverableError(msg):
195 """
196 Print an error message and raise a NonRecoverableError exception.
197 This is a handy endpoint to add other extended debugging calls.
198 """
199 error(msg)
200 raise NonRecoverableError(msg)
202def dbexecute(crx, cmd, args=None):
203 """
204 executes the SQL statement
205 Prints the entire command for debugging purposes
206 """
207 debug("executing {}".format(cmd))
208 crx.execute(cmd, args)
211def dbexecute_trunc_print(crx, cmd, args=None):
212 """
213 executes the SQL statement.
214 Will print only the first 30 characters in the command
215 Use this function if you are executing an SQL cmd with a password
216 """
217 debug("executing {}".format(cmd[:30]))
218 crx.execute(cmd, args)
221def waithp(host, port):
222 """
223 do a test connection to a host and port
224 """
225 debug("waithp({0},{1})".format(safestr(host), safestr(port)))
226 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
227 try:
228 sock.connect((host, int(port)))
229 except: # pylint: disable=bare-except
230 a, b, c = sys.exc_info()
231 traceback.print_exception(a, b, c)
232 sock.close()
233 raiseRecoverableError('Server at {0}:{1} is not ready'.format(safestr(host), safestr(port)))
234 sock.close()
236def doconn(desc):
237 """
238 open an SQL connection to the PG server
239 """
240 debug("doconn({},{},{})".format(desc['host'], desc['user'], desc['database']))
241 # debug("doconn({},{},{},{})".format(desc['host'], desc['user'], desc['database'], desc['password']))
242 ret = psycopg2.connect(**desc)
243 ret.autocommit = True
244 return ret
246def hostportion(hostport):
247 """
248 return the host portion of a fqdn:port or IPv4:port or [IPv6]:port
249 """
250 ipv4re = re.match(r"^([^:]+)(:(\d+))?", hostport)
251 ipv6re = re.match(r"^[[]([^]]+)[]](:(\d+))?", hostport)
252 if ipv4re:
253 return ipv4re.group(1)
254 if ipv6re:
255 return ipv6re.group(1)
256 raiseNonRecoverableError("invalid hostport: {}".format(hostport))
258def portportion(hostport):
259 """
260 Return the port portion of a fqdn:port or IPv4:port or [IPv6]:port.
261 If port is not present, return 5432.
262 """
263 ipv6re = re.match(r"^[[]([^]]+)[]](:(\d+))?", hostport)
264 ipv4re = re.match(r"^([^:]+)(:(\d+))?", hostport)
265 if ipv4re:
266 return ipv4re.group(3) if ipv4re.group(3) else '5432'
267 if ipv6re:
268 return ipv6re.group(3) if ipv6re.group(3) else '5432'
269 raiseNonRecoverableError("invalid hostport: {}".format(hostport))
271def rootdesc(data, dbname, initialpassword=None):
272 """
273 return the postgres connection information
274 """
275 debug("rootdesc(..data..,{0})".format(safestr(dbname)))
276 # pylint: disable=bad-continuation
277 return {
278 'database': dbname,
279 'host': hostportion(data['rw']),
280 'port': portportion(data['rw']),
281 'user': 'postgres',
282 'password': initialpassword if initialpassword else getpass(data, 'postgres', data['rw'], 'postgres')
283 }
285def rootconn(data, dbname='postgres', initialpassword=None):
286 """
287 connect to a given server as postgres,
288 connecting to the specified database
289 """
290 debug("rootconn(..data..,{0})".format(safestr(dbname)))
291 return doconn(rootdesc(data, dbname, initialpassword))
293def onedesc(data, dbname, role, access):
294 """
295 return the connection information for a given user and dbname on a cluster
296 """
297 user = '{0}_{1}'.format(dbname, role)
298 # pylint: disable=bad-continuation
299 return {
300 'database': dbname,
301 'host': hostportion(data[access]),
302 'port': portportion(data[access]),
303 'user': user,
304 'password': getpass(data, user, data['rw'], dbname)
305 }
307def dbdescs(data, dbname):
308 """
309 return the entire set of information for a specific server/database
310 """
311 # pylint: disable=bad-continuation
312 return {
313 'admin': onedesc(data, dbname, 'admin', 'rw'),
314 'user': onedesc(data, dbname, 'user', 'rw'),
315 'viewer': onedesc(data, dbname, 'viewer', 'ro')
316 }
318def getpass(data, ident, hostport, dbname):
319 """
320 generate the password for a given user on a specific server
321 """
322 m = hashlib.sha256()
323 m.update(ident.encode())
325 # mix in the seed (the last line) for that database, if one exists
326 hostport = hostport.lower()
327 dbname = dbname.lower()
328 hostPortDbname = '{0}/{1}:{2}'.format(OPT_MANAGER_RESOURCES_PGAAS, hostport, dbname)
329 try:
330 lastLine = ''
331 with open(hostPortDbname, "r") as fp:
332 for line in fp:
333 lastLine = line
334 m.update(lastLine.encode())
335 except IOError:
336 pass
338 m.update(base64.b64decode(data['data']))
339 return m.hexdigest()
341def find_related_nodes(reltype, inst=None):
342 """
343 extract the related_nodes information from the context
344 for a specific relationship
345 """
346 if inst is None:
347 inst = ctx.instance
348 ret = []
349 for rel in inst.relationships:
350 if reltype in rel.type_hierarchy:
351 ret.append(rel.target)
352 return ret
354def chkfqdn(fqdn):
355 """
356 verify that a FQDN is valid
357 """
358 if fqdn is None:
359 return False
360 hp = hostportion(fqdn)
361 # not needed right now: pp = portportion(fqdn)
362 # TODO need to augment this for IPv6 addresses
363 return re.match('^[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$', hp) is not None
365def chkdbname(dbname):
366 """
367 verify that a database name is valid
368 """
369 ret = re.match('[a-zA-Z][a-zA-Z0-9]{0,43}', dbname) is not None and dbname != 'postgres'
370 if not ret:
371 warn("Invalid dbname: {0}".format(safestr(dbname)))
372 return ret
374def get_valid_domains():
375 """
376 Return a list of the valid names, suitable for inclusion in an error message.
377 """
378 msg = ''
379 import glob
380 validDomains = []
381 for f in glob.glob('{}/*'.format(OPT_MANAGER_RESOURCES_PGAAS)):
382 try:
383 with open(f, "r") as fp:
384 try:
385 tmpdata = json.load(fp)
386 if 'pubkey' in tmpdata:
387 validDomains.append(os.path.basename(f))
388 except: # pylint: disable=bare-except
389 pass
390 except: # pylint: disable=bare-except
391 pass
392 if len(validDomains) == 0:
393 msg += '\nNo valid PostgreSQL cluster information was found'
394 else:
395 msg += '\nThese are the valid PostgreSQL cluster domains found on this manager:'
396 for v in validDomains:
397 msg += '\n\t"{}"'.format(v)
398 return msg
400def get_existing_clusterinfo(wfqdn, rfqdn, related):
401 """
402 Retrieve all of the information specific to an existing cluster.
403 """
404 if rfqdn != '':
405 raiseNonRecoverableError('Read-only FQDN must not be specified when using an existing cluster, fqdn={0}'.format(safestr(rfqdn)))
406 if len(related) != 0:
407 raiseNonRecoverableError('Cluster SSH keypair must not be specified when using an existing cluster')
408 try:
409 fn = '{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn.lower())
410 with open(fn, 'r') as f:
411 data = json.load(f)
412 data['rw'] = wfqdn
413 return data
414 except Exception as e: # pylint: disable=broad-except
415 warn("Error: {0}".format(e))
416 msg = 'Cluster must be deployed when using an existing cluster.\nCheck your domain name: fqdn={0}\nerr={1}'.format(safestr(wfqdn), e)
417 if not os.path.isdir(OPT_MANAGER_RESOURCES_PGAAS):
418 msg += '\nThe directory {} does not exist. No PostgreSQL clusters have been deployed on this manager.'.format(OPT_MANAGER_RESOURCES_PGAAS)
419 else:
420 msg += get_valid_domains()
421 # warn("Stack: {0}".format(traceback.format_exc()))
422 raiseNonRecoverableError(msg)
424def getclusterinfo(wfqdn, reuse, rfqdn, initialpassword, related):
425 """
426 Retrieve all of the information specific to a cluster.
427 if reuse, retrieve it
428 else create and store it
429 """
430 # debug("getclusterinfo({}, {}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn), safestr(initialpassword)))
431 debug("getclusterinfo({}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn)))
432 if not chkfqdn(wfqdn):
433 raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn)))
434 if reuse:
435 return get_existing_clusterinfo(wfqdn, rfqdn, related)
437 if rfqdn == '':
438 rfqdn = wfqdn
439 elif not chkfqdn(rfqdn):
440 raiseNonRecoverableError('Invalid FQDN specified for read-only access, fqdn={0}'.format(safestr(rfqdn)))
441 if len(related) != 1:
442 raiseNonRecoverableError('Cluster SSH keypair must be specified using a dcae.relationships.pgaas_cluster_uses_sshkeypair ' +
443 'relationship to a dcae.nodes.sshkeypair node')
444 data = {'ro': rfqdn, 'pubkey': related[0].instance.runtime_properties['public'],
445 'data': related[0].instance.runtime_properties['base64private'], 'hash': 'sha256'}
446 os.umask(0o77)
447 try:
448 os.makedirs('{0}'.format(OPT_MANAGER_RESOURCES_PGAAS))
449 except: # pylint: disable=bare-except
450 pass
451 try:
452 with open('{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn.lower()), 'w') as f:
453 f.write(json.dumps(data))
454 except Exception as e: # pylint: disable=broad-except
455 warn("Error: {0}".format(e))
456 warn("Stack: {0}".format(traceback.format_exc()))
457 raiseNonRecoverableError('Cannot write cluster information to {0}: fqdn={1}, err={2}'.format(OPT_MANAGER_RESOURCES_PGAAS, safestr(wfqdn), e))
458 data['rw'] = wfqdn
459 if initialpassword:
460 with rootconn(data, initialpassword=initialpassword) as conn:
461 crr = conn.cursor()
462 dbexecute_trunc_print(crr, "ALTER USER postgres WITH PASSWORD %s", (getpass(data, 'postgres', wfqdn, 'postgres'),))
463 crr.close()
464 return data
466@operation
467def add_pgaas_cluster(**kwargs): # pylint: disable=unused-argument
468 """
469 dcae.nodes.pgaas.cluster:
470 Record key generation data for cluster
471 """
472 try:
473 warn("add_pgaas_cluster() invoked")
474 data = getclusterinfo(ctx.node.properties['writerfqdn'],
475 ctx.node.properties['use_existing'],
476 ctx.node.properties['readerfqdn'],
477 ctx.node.properties['initialpassword'],
478 find_related_nodes('dcae.relationships.pgaas_cluster_uses_sshkeypair'))
479 ctx.instance.runtime_properties['public'] = data['pubkey']
480 ctx.instance.runtime_properties['base64private'] = data['data']
481 ctx.instance.runtime_properties['postgrespswd'] = getpass(data, 'postgres', ctx.node.properties['writerfqdn'], 'postgres')
482 warn('All done')
483 except Exception as e: # pylint: disable=broad-except
484 ctx.logger.warn("Error: {0}".format(e))
485 ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
486 raise e
488@operation
489def rm_pgaas_cluster(**kwargs): # pylint: disable=unused-argument
490 """
491 dcae.nodes.pgaas.cluster:
492 Remove key generation data for cluster
493 """
494 try:
495 warn("rm_pgaas_cluster()")
496 wfqdn = ctx.node.properties['writerfqdn']
497 if chkfqdn(wfqdn) and not ctx.node.properties['use_existing']:
498 os.remove('{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn))
499 warn('All done')
500 except Exception as e: # pylint: disable=broad-except
501 ctx.logger.warn("Error: {0}".format(e))
502 ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
503 raise e
505def dbgetinfo(refctx):
506 """
507 Get the data associated with a database.
508 Make sure the connection exists.
509 """
510 wfqdn = refctx.node.properties['writerfqdn']
511 related = find_related_nodes('dcae.relationships.database_runson_pgaas_cluster', refctx.instance)
512 if wfqdn == '':
513 if len(related) != 1:
514 raiseNonRecoverableError('Database Cluster must be specified using exactly one dcae.relationships.database_runson_pgaas_cluster relationship ' +
515 'to a dcae.nodes.pgaas.cluster node when writerfqdn is not specified')
516 wfqdn = related[0].node.properties['writerfqdn']
517 return dbgetinfo_for_update(wfqdn)
519def dbgetinfo_for_update(wfqdn):
520 """
521 Get the data associated with a database.
522 Make sure the connection exists.
523 """
525 if not chkfqdn(wfqdn):
526 raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn)))
527 ret = getclusterinfo(wfqdn, True, '', '', [])
528 waithp(hostportion(wfqdn), portportion(wfqdn))
529 return ret
531@operation
532def create_database(**kwargs):
533 """
534 dcae.nodes.pgaas.database:
535 Create a database on a cluster
536 """
537 try:
538 debug("create_database() invoked")
539 dbname = ctx.node.properties['name']
540 warn("create_database({0})".format(safestr(dbname)))
541 if not chkdbname(dbname):
542 raiseNonRecoverableError('Unacceptable or missing database name: {0}'.format(safestr(dbname)))
543 debug('create_database(): dbname checked out')
544 dbinfo = dbgetinfo(ctx)
545 debug('Got db server info')
546 descs = dbdescs(dbinfo, dbname)
547 ctx.instance.runtime_properties['admin'] = descs['admin']
548 ctx.instance.runtime_properties['user'] = descs['user']
549 ctx.instance.runtime_properties['viewer'] = descs['viewer']
550 with rootconn(dbinfo) as conn:
551 crx = conn.cursor()
552 dbexecute(crx, 'SELECT datname FROM pg_database WHERE datistemplate = false')
553 existingdbs = [x[0] for x in crx]
554 if ctx.node.properties['use_existing']:
555 if dbname not in existingdbs:
556 raiseNonRecoverableError('use_existing specified but database does not exist, dbname={0}'.format(safestr(dbname)))
557 return
558 dbexecute(crx, 'SELECT rolname FROM pg_roles')
559 existingroles = [x[0] for x in crx]
560 admu = descs['admin']['user']
561 usru = descs['user']['user']
562 vwru = descs['viewer']['user']
563 cusr = '{0}_common_user_role'.format(dbname)
564 cvwr = '{0}_common_viewer_role'.format(dbname)
565 schm = '{0}_db_common'.format(dbname)
566 if admu not in existingroles:
567 dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(admu), (descs['admin']['password'],))
568 if usru not in existingroles:
569 dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(usru), (descs['user']['password'],))
570 if vwru not in existingroles:
571 dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(vwru), (descs['viewer']['password'],))
572 if cusr not in existingroles:
573 dbexecute(crx, 'CREATE ROLE {0}'.format(cusr))
574 if cvwr not in existingroles:
575 dbexecute(crx, 'CREATE ROLE {0}'.format(cvwr))
576 if dbname not in existingdbs:
577 dbexecute(crx, 'CREATE DATABASE {0} WITH OWNER {1}'.format(dbname, admu))
578 crx.close()
579 with rootconn(dbinfo, dbname) as dbconn:
580 crz = dbconn.cursor()
581 for r in [cusr, cvwr, usru, vwru]:
582 dbexecute(crz, 'REVOKE ALL ON DATABASE {0} FROM {1}'.format(dbname, r))
583 dbexecute(crz, 'GRANT {0} TO {1}'.format(cvwr, cusr))
584 dbexecute(crz, 'GRANT {0} TO {1}'.format(cusr, admu))
585 dbexecute(crz, 'GRANT CONNECT ON DATABASE {0} TO {1}'.format(dbname, cvwr))
586 dbexecute(crz, 'CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}'.format(schm, admu))
587 for r in [admu, cusr, cvwr, usru, vwru]:
588 dbexecute(crz, 'ALTER ROLE {0} IN DATABASE {1} SET search_path = public, {2}'.format(r, dbname, schm))
589 dbexecute(crz, 'GRANT USAGE ON SCHEMA {0} to {1}'.format(schm, cvwr))
590 dbexecute(crz, 'GRANT CREATE ON SCHEMA {0} to {1}'.format(schm, admu))
591 dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT SELECT ON TABLES TO {1}'.format(admu, cvwr))
592 dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT INSERT, UPDATE, DELETE, TRUNCATE ON TABLES TO {1}'.format(admu, cusr))
593 dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO {1}'.format(admu, cusr))
594 dbexecute(crz, 'GRANT TEMP ON DATABASE {0} TO {1}'.format(dbname, cusr))
595 dbexecute(crz, 'GRANT {0} to {1}'.format(cusr, usru))
596 dbexecute(crz, 'GRANT {0} to {1}'.format(cvwr, vwru))
597 crz.close()
598 warn('All done')
599 except Exception as e: # pylint: disable=broad-except
600 ctx.logger.warn("Error: {0}".format(e))
601 ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
602 raise e
604@operation
605def delete_database(**kwargs): # pylint: disable=unused-argument
606 """
607 dcae.nodes.pgaas.database:
608 Delete a database from a cluster
609 """
610 try:
611 debug("delete_database() invoked")
612 dbname = ctx.node.properties['name']
613 warn("delete_database({0})".format(safestr(dbname)))
614 if not chkdbname(dbname):
615 return
616 debug('delete_database(): dbname checked out')
617 if ctx.node.properties['use_existing']:
618 return
619 debug('delete_database(): !use_existing')
620 dbinfo = dbgetinfo(ctx)
621 debug('Got db server info')
622 with rootconn(dbinfo) as conn:
623 crx = conn.cursor()
624 admu = ctx.instance.runtime_properties['admin']['user']
625 usru = ctx.instance.runtime_properties['user']['user']
626 vwru = ctx.instance.runtime_properties['viewer']['user']
627 cusr = '{0}_common_user_role'.format(dbname)
628 cvwr = '{0}_common_viewer_role'.format(dbname)
629 dbexecute(crx, 'DROP DATABASE IF EXISTS {0}'.format(dbname))
630 for r in [usru, vwru, admu, cusr, cvwr]:
631 dbexecute(crx, 'DROP ROLE IF EXISTS {0}'.format(r))
632 warn('All gone')
633 except Exception as e: # pylint: disable=broad-except
634 ctx.logger.warn("Error: {0}".format(e))
635 ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
636 raise e
638#############################################################
639# function: update_database #
640# Purpose: Called as a workflow to change the database #
641# passwords for all the users #
642# #
643# Invoked via: #
644# cfy executions start -d <deployment-id> update_db_passwd #
645# #
646# Assumptions: #
647# 1) pgaas_types.yaml must define a work flow e.g. #
648# workflows: #
649# update_db_passwd : #
650# mapping : pgaas.pgaas.pgaas_plugin.update_database #
651# 2) DB Blueprint: node_template must have properties: #
652# writerfqdn & name (of DB) #
653#############################################################
654# pylint: disable=unused-argument
655@operation
656def update_database(refctx, **kwargs):
657 """
658 dcae.nodes.pgaas.database:
659 Update the password for a database from a cluster
660 refctx is auto injected into the function when called as a workflow
661 """
662 try:
663 debug("update_database() invoked")
665 ################################################
666 # Verify refctx contains the <nodes> attribute. #
667 # The workflow context might not be consistent #
668 # across different cloudify versions #
669 ################################################
670 if not hasattr(refctx, 'nodes'):
671 raiseNonRecoverableError('workflow context does not contain attribute=<nodes>. dir(refctx)={}'.format(dir(refctx)))
673 ############################################
674 # Verify that refctx.nodes is iterable #
675 ############################################
676 if not isinstance(refctx.nodes, collections.Iterable):
677 raiseNonRecoverableError("refctx.nodes is not an iterable. Type={}".format(type(refctx.nodes)))
679 ctx_node = None
680 ##############################################
681 # Iterate through the nodes until we find #
682 # one with the properties we are looking for #
683 ##############################################
684 for i in refctx.nodes:
686 ############################################
687 # Safeguard: If a given node doesn't have #
688 # properties then skip it. #
689 # Don't cause an exception since the nodes #
690 # entry we are searching might still exist #
691 ############################################
692 if not hasattr(i, 'properties'):
693 warn('Encountered a ctx node that does not have attr=<properties>. dir={}'.format(dir(i)))
694 continue
696 debug("ctx node has the following Properties: {}".format(list(i.properties.keys())))
698 if ('name' in i.properties) and ('writerfqdn' in i.properties):
699 ctx_node = i
700 break
703 ###############################################
704 # If none of the nodes have properties: #
705 # <name> and <writerfqdn> then fatal error #
706 ###############################################
707 if not ctx_node:
708 raiseNonRecoverableError('Either <name> or <writerfqdn> is not found in refctx.nodes.properties.')
710 debug("name is {}".format(ctx_node.properties['name']))
711 debug("host is {}".format(ctx_node.properties['writerfqdn']))
713 dbname = ctx_node.properties['name']
714 debug("update_database({0})".format(safestr(dbname)))
716 ###########################
717 # dbname must be valid #
718 ###########################
719 if not chkdbname(dbname):
720 raiseNonRecoverableError('dbname is null')
723 hostport = ctx_node.properties['writerfqdn']
724 debug('update_database(): wfqdn={}'.format(hostport))
725 dbinfo = dbgetinfo_for_update(hostport)
727 #debug('Got db server info={}'.format(dbinfo))
729 hostPortDbname = '{0}/{1}:{2}'.format(OPT_MANAGER_RESOURCES_PGAAS, hostport.lower(), dbname.lower())
731 debug('update_database(): hostPortDbname={}'.format(hostPortDbname))
732 try:
733 appended = False
734 with open(hostPortDbname, "a") as fp:
735 with open("/dev/urandom", "rb") as rp:
736 b = rp.read(16)
737 print(binascii.hexlify(b).decode('utf-8'), file=fp)
738 appended = True
739 if not appended:
740 ctx.logger.warn("Error: the password for {} {} was not successfully changed".format(hostport, dbname))
741 except Exception as e: # pylint: disable=broad-except
742 ctx.logger.warn("Error: {0}".format(e))
743 ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
744 raise e
746 descs = dbdescs(dbinfo, dbname)
748 ##########################################
749 # Verify we have expected keys #
750 # <admin>, <user>, and <viewer> as well #
751 # as "sub-key" <user> #
752 ##########################################
754 if not isinstance(descs, dict):
755 raiseNonRecoverableError('db descs has unexpected type=<{}> was expected type dict'.format(type(descs)))
757 for key in ("admin", "user", "viewer"):
758 if key not in descs:
759 raiseNonRecoverableError('db descs does not contain key=<{}>. Keys found for descs are: {}'.format(key, list(descs.keys())))
760 if 'user' not in descs[key]:
761 raiseNonRecoverableError('db descs[{}] does not contain key=<user>. Keys found for descs[{}] are: {}'.format(key, key, list(descs[key].keys())))
764 with rootconn(dbinfo) as conn:
765 crx = conn.cursor()
767 admu = descs['admin']['user']
768 usru = descs['user']['user']
769 vwru = descs['viewer']['user']
771 for r in [usru, vwru, admu]:
772 dbexecute_trunc_print(crx, "ALTER USER {} WITH PASSWORD '{}'".format(r, getpass(dbinfo, r, hostport, dbname)))
773 #debug("user={} password={}".format(r, getpass(dbinfo, r, hostport, dbname)))
775 warn('All users updated for database {}'.format(dbname))
776 except Exception as e: # pylint: disable=broad-except
777 ctx.logger.warn("Error: {0}".format(e))
778 ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
779 raise e