Test suite improvements:

- Test framework support API testing with multiple server for this we need to modify test_config.json(for user it’s test_config.json.in) and test_advanced_config.json(for user it’s test_advanced_config.json.in). Server details of PG and  PPAS are included in both .in files.

- Removed the logic of logging in  the test client on each test scenario(As per Khushboo's comment in previous email).  We need this logic in test cases under ‘browser/tests/’ as for test scenarios like change password and  invalid login test cases as test client should be logged out first. So, as per this the code is slightly modified in ‘browser/tests/’.
This commit is contained in:
Navnath Gadakh
2016-07-27 15:33:36 +01:00
committed by Dave Page
parent b6e8d195dc
commit 5c3c543d2e
21 changed files with 304 additions and 230 deletions

View File

@@ -32,6 +32,7 @@ import config
# Get the config database schema version. We store this in pgadmin.model
# as it turns out that putting it in the config files isn't a great idea
from pgadmin.model import SCHEMA_VERSION
from test_utils import login_tester_account, logout_tester_account
config.SETTINGS_SCHEMA_VERSION = SCHEMA_VERSION
@@ -43,6 +44,8 @@ config.CONSOLE_LOG_LEVEL = WARNING
app = create_app()
app.config['WTF_CSRF_ENABLED'] = False
test_client = app.test_client()
# Login the test client
login_tester_account(test_client)
def get_suite(arguments, test_app_client):
@@ -65,11 +68,11 @@ def get_suite(arguments, test_app_client):
pgadmin_suite = unittest.TestSuite()
# Load the test modules which are in given package(i.e. in arguments.pkg)
if arguments.pkg is None or arguments.pkg == "all":
if arguments['pkg'] is None or arguments['pkg'] == "all":
TestsGeneratorRegistry.load_generators('pgadmin')
else:
TestsGeneratorRegistry.load_generators('pgadmin.{}.tests'.format(
arguments.pkg))
arguments['pkg']))
# Get the each test module and add into list
for key, klass in TestsGeneratorRegistry.registry.items():
@@ -130,7 +133,8 @@ class StreamToLogger(object):
if __name__ == '__main__':
# Set basic logging configuration for log file
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s:%(levelname)s:%(name)s:%(message)s',
format='%(asctime)s:%(levelname)s:%(name)s:%(message)s'
,
filename=CURRENT_PATH + "/" + "regression.log",
filemode='w'
)
@@ -139,9 +143,12 @@ if __name__ == '__main__':
stderr_logger = logging.getLogger('STDERR')
sys.stderr = StreamToLogger(stderr_logger, logging.ERROR)
args = add_arguments()
args = vars(add_arguments())
suite = get_suite(args, test_client)
tests = unittest.TextTestRunner(stream=sys.stderr, descriptions=True,
verbosity=2).run(suite)
# Logout the test client
logout_tester_account(test_client)
print("Please check output in file: %s/regression.log " % CURRENT_PATH)

View File

@@ -93,7 +93,101 @@
"test_privileges": [],
"test_securities": [],
"test_variables": []
}
},
{
"test_privileges_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "C",
"privilege": true,
"with_grant": true
},
{
"privilege_type": "T",
"privilege": true,
"with_grant": false
}
]
}
],
"test_conn_limit": -1,
"test_owner": "enterprisedb",
"test_fun_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "X",
"privilege": true,
"with_grant": false
}
]
}
],
"test_seq_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "r",
"privilege": true,
"with_grant": false
},
{
"privilege_type": "w",
"privilege": true,
"with_grant": false
},
{
"privilege_type": "U",
"privilege": true,
"with_grant": false
}
]
}
],
"test_tbl_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "a",
"privilege": true,
"with_grant": true
},
{
"privilege_type": "r",
"privilege": true,
"with_grant": false
}
]
}
],
"test_type_acl": [
{
"grantee": "enterprisedb",
"grantor": "enterprisedb",
"privileges": [
{
"privilege_type": "U",
"privilege": true,
"with_grant": false
}
]
}
],
"test_encoding": "UTF8",
"test_name": "test_db_automation",
"test_privileges": [],
"test_securities": [],
"test_variables": []
}
],
"test_db_update_data": [

View File

@@ -15,7 +15,17 @@
"test_db_port": 5432,
"test_maintenance_db": "postgres",
"test_sslmode": "prefer"
}
},
{
"test_name": "Postgres Plus Advanced Server 9.4",
"test_comment": "Postgres Plus Advanced 9.4 Server (EDB Installer)",
"test_db_username": "enterprisedb",
"test_host": "localhost",
"test_db_password": "edb",
"test_db_port": 5444,
"test_maintenance_db": "edb",
"test_sslmode": "prefer"
}
],
"test_server_update_data": [
{

View File

@@ -12,11 +12,13 @@ import pickle
import json
import uuid
from test_setup import config_data, advanced_config_data, pickle_path
from test_setup import config_data, advanced_config_data, \
pickle_path
SERVER_URL = '/browser/server/obj/'
SERVER_CONNECT_URL = 'browser/server/connect/'
DATABASE_CONNECT_URL = '/browser/database/obj/'
DATABASE_URL = '/browser/database/obj/'
DATABASE_CONNECT_URL = 'browser/database/connect/'
def get_ids(url=pickle_path):
@@ -36,6 +38,39 @@ def get_ids(url=pickle_path):
return ids
def verify_database(tester, server_group, server_id, db_id):
"""
This function verifies that database is exists and whether it connect
successfully or not
:param tester: test client
:type tester: flask test client object
:param server_group: server group id
:type server_group: int
:param server_id: server id
:type server_id: str
:param db_id: database id
:type db_id: str
:return: temp_db_con
:rtype: list
"""
# Connect to server
response = tester.post('{0}{1}/{2}'.format(
SERVER_CONNECT_URL, server_group, server_id), data=dict(
password=config_data['test_server_credentials'][0][
'test_db_password']),
follow_redirects=True)
# Connect to database
con_response = tester.post('{0}{1}/{2}/{3}'.format(
DATABASE_CONNECT_URL, server_group, server_id, db_id),
follow_redirects=True)
temp_db_con = json.loads(con_response.data.decode('utf-8'))
return temp_db_con
def test_getnodes(tester=None):
# Connect to server and database.
@@ -45,24 +80,13 @@ def test_getnodes(tester=None):
all_id = get_ids()
server_ids = all_id["sid"]
db_id = all_id["did"][0]
db_ids_dict = all_id["did"][0]
srv_grp = config_data['test_server_group']
# TODO: need to add code to handle multiple databases with servers
db_con = []
for server_id in server_ids:
# Connect to server
response = tester.post('browser/server/connect/{0}/{1}'.format(
srv_grp, server_id), data=dict(
password=config_data['test_server_credentials'][0][
'test_db_password']),
follow_redirects=True)
# Connect to database
con_response = tester.post(
'browser/database/connect/{0}/{1}/{2}'.format(
srv_grp, server_id, db_id), follow_redirects=True)
db_con = json.loads(con_response.data.decode('utf-8'))
db_id = db_ids_dict[int(server_id)]
db_con.append(verify_database(tester, srv_grp, server_id, db_id))
return db_con
@@ -76,31 +100,30 @@ def get_db_data(server_connect_data):
:rtype: dict
"""
adv_config_data = None
data = None
db_user = server_connect_data['data']['user']['name']
if db_user == "postgres":
# Get the advance test data of 'postgres' user
adv_config_data = advanced_config_data[
'test_add_database_data'][0]
else:
# Get the advance test data of 'enterprisedb' user
adv_config_data = advanced_config_data[
'test_add_database_data'][1]
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['test_add_database_data']:
if db_user == config_test_data['test_owner']:
adv_config_data = config_test_data
data = {
"datacl": adv_config_data['test_privileges_acl'],
"datconnlimit": adv_config_data['test_conn_limit'],
"datowner": adv_config_data['test_owner'],
"deffuncacl": adv_config_data['test_fun_acl'],
"defseqacl": adv_config_data['test_seq_acl'],
"deftblacl": adv_config_data['test_tbl_acl'],
"deftypeacl": adv_config_data['test_type_acl'],
"encoding": adv_config_data['test_encoding'],
"name": str(uuid.uuid4())[1:8],
"privileges": adv_config_data['test_privileges'],
"securities": adv_config_data['test_securities'],
"variables": adv_config_data['test_variables']
}
if adv_config_data is not None:
data = {
"datacl": adv_config_data['test_privileges_acl'],
"datconnlimit": adv_config_data['test_conn_limit'],
"datowner": adv_config_data['test_owner'],
"deffuncacl": adv_config_data['test_fun_acl'],
"defseqacl": adv_config_data['test_seq_acl'],
"deftblacl": adv_config_data['test_tbl_acl'],
"deftypeacl": adv_config_data['test_type_acl'],
"encoding": adv_config_data['test_encoding'],
"name": str(uuid.uuid4())[1:8],
"privileges": adv_config_data['test_privileges'],
"securities": adv_config_data['test_securities'],
"variables": adv_config_data['test_variables']
}
return data
@@ -199,13 +222,18 @@ def write_db_parent_id(response_data):
"""
db_id = response_data['node']['_id']
server_id = response_data['node']['_pid']
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
pickle_id_dict["did"].append(db_id)
if 'did' in pickle_id_dict:
if pickle_id_dict['did']:
# Add the db_id as value in dict
pickle_id_dict["did"][0].update({server_id: db_id})
else:
# Create new dict with server_id and db_id
pickle_id_dict["did"].append({server_id: db_id})
db_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, db_output)
db_output.close()
@@ -288,7 +316,8 @@ def connect_server(tester):
['test_db_password']),
follow_redirects=True)
server_connect_detail = json.loads(response.data.decode())
connect_database(tester, server_connect_detail, server_id, server_group)
connect_database(tester, server_connect_detail, server_id,
server_group)
server_connect.append(server_connect_detail)
servers.append(server_id)
return server_connect, server_group, servers
@@ -313,7 +342,7 @@ def connect_database(tester, server_connect, server_id, server_group):
if server_connect['data']['connected']:
db_data = get_db_data(server_connect)
db_response = tester.post(
DATABASE_CONNECT_URL + str(server_group) + "/" + server_id + "/",
DATABASE_URL + str(server_group) + "/" + server_id + "/",
data=json.dumps(db_data),
content_type='html/json')
response_data = json.loads(db_response.data.decode())
@@ -341,31 +370,28 @@ def delete_server(tester):
assert response_data['success'] == 1
def delete_database(tester, db_id):
def delete_database(tester):
"""
This function used to delete the added databases
:param tester: test client object
:param db_id: database id to be delete
:type db_id: int
:return: None
"""
srv_grp = config_data['test_server_group']
all_id = get_ids()
server_ids = all_id["sid"]
# TODO: Need to modify the code , to delete the databases for all
# TODO: servers. Currently it delete only one database.
#db_id = all_id["did"][0]
db_ids_dict = all_id['did'][0]
db_con = test_getnodes(tester)
if len(db_con) == 0:
raise Exception("No database(s) to delete.")
for server_id in server_ids:
response = tester.delete(DATABASE_CONNECT_URL + str(srv_grp) + '/' + str(
server_id) + '/' + str(db_id), follow_redirects=True)
db_id = db_ids_dict[int(server_id)]
response = tester.delete(DATABASE_URL + str(srv_grp) + '/' +
str(server_id) + '/' + str(db_id),
follow_redirects=True)
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
assert response_data['success'] == 1