Files
pgadmin4/web/pgadmin/browser/server_groups/servers/roles/tests/utils.py
2017-07-18 15:23:11 +01:00

180 lines
5.7 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from __future__ import print_function
import os
import pickle
import sys
import uuid
from regression.python_test_utils import test_utils as utils
from regression.test_setup import config_data
ROLE_URL = '/browser/role/obj/'
file_name = os.path.basename(__file__)
def verify_role(server, role_name):
"""
This function calls the GET API for role to verify
:param server: server details
:type server: dict
:param role_name: role name
:type role_name: str
:return role: role record from db
:rtype role: dict
"""
try:
connection = utils.get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'],
server['sslmode'])
pg_cursor = connection.cursor()
pg_cursor.execute(
"SELECT * from pg_catalog.pg_roles pr WHERE pr.rolname='%s'" %
role_name)
connection.commit()
role = pg_cursor.fetchone()
connection.close()
return role
except Exception as exception:
exception = "Error while getting role: %s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception, file=sys.stderr)
def test_getrole(tester):
if not tester:
return None
all_id = utils.get_ids()
server_ids = all_id["sid"]
role_ids_dict = all_id["lrid"][0]
server_group = config_data['server_group']
role_response_data = []
for server_id in server_ids:
role_id = role_ids_dict[int(server_id)]
role_response_data.append(
verify_role(tester, server_group, server_id, role_id))
return role_response_data
def get_role_data(lr_pwd):
"""This function returns the role data"""
data = {
"rolcanlogin": "true",
"rolconnlimit": -1,
"rolcreaterole": "true",
"rolinherit": "true",
"rolmembership": [],
"rolname": "test_role_%s" % str(uuid.uuid4())[1:6],
"rolpassword": lr_pwd,
"rolvaliduntil": "12/27/2017",
"seclabels": [],
"variables": [{"name": "work_mem",
"database": "postgres",
"value": 65}]
}
return data
def create_role(server, role_name):
"""
This function create the role.
:param server:
:param role_name:
:return:
"""
try:
connection = utils.get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'],
server['sslmode'])
pg_cursor = connection.cursor()
pg_cursor.execute("CREATE ROLE %s LOGIN" % role_name)
connection.commit()
# Get 'oid' from newly created tablespace
pg_cursor.execute(
"SELECT pr.oid from pg_catalog.pg_roles pr WHERE pr.rolname='%s'" %
role_name)
oid = pg_cursor.fetchone()
role_id = ''
if oid:
role_id = oid[0]
connection.close()
return role_id
except Exception as exception:
exception = "Error while deleting role: %s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception, file=sys.stderr)
def write_role_id(response_data):
"""
:param response_data:
:return:
"""
lr_id = response_data['node']['_id']
server_id = response_data['node']['_pid']
pickle_id_dict = utils.get_pickle_id_dict()
# TODO: modify logic to write in file / file exists or create new check
# old file
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'lrid' in pickle_id_dict:
if pickle_id_dict['lrid']:
# Add the db_id as value in dict
pickle_id_dict["lrid"][0].update({server_id: lr_id})
else:
# Create new dict with server_id and db_id
pickle_id_dict["lrid"].append({server_id: lr_id})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
def delete_role(connection, role_name):
"""
This function use to delete the existing roles in the servers
:param connection: db connection
:type connection: connection object
:param role_name: role name
:type role_name: str
:return: None
"""
try:
pg_cursor = connection.cursor()
pg_cursor.execute(
"SELECT * FROM pg_catalog.pg_roles WHERE rolname='%s'" % role_name)
role_count = pg_cursor.fetchone()
if role_count:
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute("DROP ROLE %s" % role_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
connection.close()
except Exception as exception:
exception = "Error while deleting role: %s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception, file=sys.stderr)