Implement Selenium Grid to run multiple tests across different browsers, operating systems, and machines in parallel. Fixes #5255

This commit is contained in:
Yogesh Mahajan 2020-05-11 12:11:31 +05:30 committed by Akshay Joshi
parent b64896f558
commit 1294c089a8
29 changed files with 1452 additions and 376 deletions

View File

@ -13,7 +13,7 @@ New features
Housekeeping
************
| `Issue #5255 <https://redmine.postgresql.org/issues/5255>`_ - Implement Selenium Grid to run multiple tests across different browsers, operating systems, and machines in parallel.
| `Issue #5334 <https://redmine.postgresql.org/issues/5334>`_ - Improve code coverage and API test cases for the Rules module.
| `Issue #5443 <https://redmine.postgresql.org/issues/5443>`_ - Remove support for Python 2.
| `Issue #5444 <https://redmine.postgresql.org/issues/5444>`_ - Cleanup Python detection in the runtime project file.

View File

@ -0,0 +1,277 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# #########################################################################
# Updates browser images(selenoid-docker) depending on arguments passed while
# running this script.
# e.g. --chrome /usr/bin/google-chrome --firefox /usr/bin/firefox
# Access details about switches using help
# e.g. --help
import argparse
import os
import subprocess
import sys
import traceback
import requests
import json
def read_command_line():
"""Read the command line arguments.
Returns:
ArgumentParser: The parsed arguments object
"""
parser = argparse.ArgumentParser(
description='Get latest browser images(chrome & firefox) for selenoid.'
'e.g. - --chrome /usr/bin/google-chrome --firefox '
'/usr/bin/firefox')
parser.add_argument("--chrome", metavar="CHROME",
help="the Chrome executable path")
parser.add_argument("--firefox", metavar="FIREFOX",
help="the firefox executable path")
args_val = parser.parse_args()
return args_val
def get_browser_version(browser_name, executable_path):
"""
Function returns browser version for specified browser using executable
path passed in arguments.
:param browser_name:
:param executable_path: e.g. /usr/bin/firefox
:return: browser version
"""
# On Linux/Mac we run the browser executable with the --version flag,
# then parse the output.
browser_version_val = None
try:
result = subprocess.Popen([executable_path, '--version'],
stdout=subprocess.PIPE)
except FileNotFoundError:
print('The specified browser executable could not be found.')
sys.exit(1)
version_str = result.stdout.read().decode("utf-8")
if browser_name.lower() == "chrome":
# Check for 'Chrom' not 'Chrome' in case the user is using Chromium.
if "Chrom" not in version_str:
print('The specified Chrome executable output an unexpected '
'version string: {}.'.format(version_str))
sys.exit(1)
# On some linux distro `chrome--version` gives output like
# 'Google Chrome 80.0.3987.132 unknown\n'
# so we need to check and remove the unknown string from the version
if version_str.endswith("unknown\n"):
version_str = version_str.strip("unknown\n").strip()
chrome_version = '.'.join(version_str.split()[-1].split('.')[:-2])
# Make sure browser version has only 1 decimal point
if chrome_version.count('.') != 1:
print('The specified Chrome executable output an unexpected '
'version string: {}.'.format(version_str))
sys.exit(1)
browser_version_val = chrome_version
elif browser_name.lower() == "firefox":
if "Firefox" not in version_str:
print('The specified Firefox executable output an unexpected '
'version string: {}.'.format(version_str))
sys.exit(1)
# Some time firefox --version gives output like
# 'Running without a11y support!
# Mozilla Firefox 68.7.0esr'
# Other output - [root@localhost local]# /usr/bin/firefox --version
# Mozilla Firefox 75.0
if 'esr' in version_str:
firefox_version = '.'.join(
version_str.split()[-1].split('.')[:-2]) + '.0'
else:
firefox_version = '.'.join(
version_str.split()[-1].split('.')[:-1]) + '.0'
# Make sure browser version has only 1 decimal point
if firefox_version.count('.') != 1:
print('The specified Chrome executable output an unexpected '
'version string: {}.'.format(version_str))
sys.exit(1)
browser_version_val = firefox_version
else:
print("{0} is not recognised ".format(browser_name))
sys.exit(1)
return browser_version_val
def check_and_download_vnc_browser_image(browser_name, browser_version):
"""
Function checks presence for vnc images for passed browser
at docker.io/selenoid/ registry
:param browser_name:
:param browser_version:
:return:true if browser image is available & downloaded else false
"""
res = requests.get(
'https://registry.hub.docker.com/v2/repositories/selenoid/vnc_' +
browser_name + '/tags/')
res = res.json()
version_tag = []
if len(res['results']) > 0:
for result in res['results']:
if 'name' in result:
version_tag.append(result['name'])
vnc_image_available = False
image_name = 'vnc_' + browser_name + ':' + browser_version
for idx, tag in enumerate(version_tag):
if browser_version == tag:
command = 'docker pull selenoid/vnc_' + browser_name + ':' \
+ browser_version
print(' VNC image is available & downloading now... {0}'.format(
command))
try:
subprocess.call([command], shell=True, stdout=subprocess.PIPE)
vnc_image_available = True
except Exception:
traceback.print_exc(file=sys.stderr)
print(
'{0}} Image found but could not download.'.format(command))
sys.exit(1)
break
elif idx == len(version_tag):
print("{0} Image is not available.".format(image_name))
vnc_image_available = False
else:
pass
return vnc_image_available
def reload_selenoid_config():
"""
Function runs command to refresh selenoid configuration
:return: true if command execution for selenoid reload is successful
else false
"""
command = 'docker kill -s HUP selenoid'
reload_successful = False
try:
subprocess.call([command], shell=True, stdout=subprocess.PIPE)
print(" Selenoid Configuration is reloaded.")
reload_successful = True
except Exception:
traceback.print_exc(file=sys.stderr)
print('Error while reloading selenoid configuration.')
sys.exit(1)
return reload_successful
def edit_browsers_json(browser_name, browser_version):
"""
Function edits browsers.json which is used by selenoid to
load browser configuration.
Default path for this file is
"user_home_dir + '/.aerokube/selenoid/browsers.json'"
Currently this is hardcoded, might need to modify
if we want to pass customize browsers.json
:param browser_name:
:param browser_version:
:return:
"""
file_edited = True
# Read existing browsers.json
json_file = open(file_path, 'r')
existing_data = json.load(json_file)
updated_data = None
# Update data for new browser images
if browser_name.lower() == 'chrome':
version_data = existing_data['chrome']['versions']
if browser_version in version_data.keys():
print(" {0}:{1} is already updated in browsers.json.".format(
browser_name, browser_version))
file_edited = True
else:
data_to_insert = dict(
{browser_version: {
'image': 'selenoid/vnc_chrome:' + browser_version,
'port': '4444', 'path': '/'}})
(existing_data['chrome']['versions']).update(data_to_insert)
updated_data = existing_data
print(updated_data)
elif browser_name.lower() == 'firefox':
version_data = existing_data['firefox']['versions']
if browser_version in version_data.keys():
print(" {0}:{1} is already updated in browsers.json.".format(
browser_name, browser_version))
file_edited = True
else:
data_to_insert = dict(
{browser_version: {
'image': 'selenoid/vnc_firefox:' + browser_version,
'port': '4444', 'path': '/'}})
(existing_data['firefox']['versions']).update(data_to_insert)
updated_data = existing_data
else:
print("Browser version not matched")
file_edited = False
# Write updated data in browsers.json
if updated_data is not None:
json_file = open(file_path, 'w')
json.dump(updated_data, json_file)
print(" 'browsers.json' is updated for {0} {1}".format(
browser_name, browser_version))
file_edited = True
return file_edited
# Main Program starts here
# Read command line arguments & get list of browser_name, executable path.
args = vars(read_command_line())
# Get path path for browsers.json
user_home_dir = os.getenv("HOME")
file_path = user_home_dir + '/.aerokube/selenoid/browsers.json'
print("***** Updating '{0}' for new browser versions.*****".format(file_path))
# Iterate over arguments passed
for browser, executable_path in args.items():
if executable_path is not None:
# Get browser name
browser_name = browser
# Get browser version
browser_version = get_browser_version(browser, executable_path)
print(
" Browser version for {0} is {1} in current executable path ".
format(browser_name, browser_version))
# Download vnc browser image.
download_new_image = check_and_download_vnc_browser_image(
browser_name, browser_version)
# If browser vnc image is available, then edit browsers.json
if download_new_image:
if edit_browsers_json(browser_name, browser_version):
print(
" File 'browsers.json' is updated for {0} - {1} \n".format(
browser_name, browser_version))
else:
print(
" File 'browsers.json' can NOT be updated for {0} - {1} \n"
.format(browser_name, browser_version))
else:
print(" Browser image is not available for {0}, {1}".format(
browser_name, browser_version))
# Reload selenoid configuration
if reload_selenoid_config():
print(
"***** Updated '{0}' for new browser versions.*****".format(file_path))

View File

@ -562,6 +562,10 @@ try:
except ImportError:
pass
# Override DEFAULT_SERVE value from environment variable.
if 'PGADMIN_CONFIG_DEFAULT_SERVER' in os.environ:
DEFAULT_SERVER = os.environ['PGADMIN_CONFIG_DEFAULT_SERVER']
# Disable USER_INACTIVITY_TIMEOUT when SERVER_MODE=False
if not SERVER_MODE:
USER_INACTIVITY_TIMEOUT = 0

View File

@ -44,31 +44,30 @@ class IndexConstraintAddTestCase(BaseTestGenerator):
dict(url='/browser/unique_constraint/obj/', data=unique_key_data))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"index constraint(primary key or unique key).")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:8])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
self.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will add index constraint(primary key or unique key)
@ -81,10 +80,9 @@ class IndexConstraintAddTestCase(BaseTestGenerator):
content_type='html/json')
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
database_utils.disconnect_database(self, self.server_id, self.db_id)
class ConstraintsUsingIndexAddTestCase(BaseTestGenerator):
@ -117,30 +115,28 @@ class ConstraintsUsingIndexAddTestCase(BaseTestGenerator):
dict(url='/browser/unique_constraint/obj/', data=unique_key_data))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"constraint using index.")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_constraint_%s" % (str(uuid.uuid4())[1:8])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
self.table_name = "table_constraint_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will add index constraint(primary key or unique key)
@ -158,7 +154,6 @@ class ConstraintsUsingIndexAddTestCase(BaseTestGenerator):
content_type='html/json')
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -38,31 +38,30 @@ class IndexConstraintDeleteTestCase(BaseTestGenerator):
type="UNIQUE"))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"index constraint(primary key or unique key).")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:8])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
self.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will delete index constraint(primary key or
@ -81,7 +80,6 @@ class IndexConstraintDeleteTestCase(BaseTestGenerator):
)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -38,31 +38,30 @@ class IndexConstraintGetTestCase(BaseTestGenerator):
type="UNIQUE"))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"index constraint(primary key or unique key).")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:8])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
self.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will fetch the index constraint(primary key or
@ -81,7 +80,6 @@ class IndexConstraintGetTestCase(BaseTestGenerator):
)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -40,31 +40,30 @@ class IndexConstraintUpdateTestCase(BaseTestGenerator):
type="UNIQUE", data=data))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"index constraint(primary key or unique key).")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:8])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
self.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will update index constraint(primary key or
@ -84,7 +83,6 @@ class IndexConstraintUpdateTestCase(BaseTestGenerator):
follow_redirects=True)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -125,8 +125,9 @@ class TableUpdateParameterTestCase(BaseTestGenerator):
),
]
@classmethod
def setUpClass(self):
table_name = "test_table_parameters_%s" % (str(uuid.uuid4())[1:8])
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@ -142,12 +143,14 @@ class TableUpdateParameterTestCase(BaseTestGenerator):
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "test_table_parameters_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(
self.server, self.db_name,
self.schema_name,
self.table_name)
self.table_id = tables_utils.get_table_id(self.server, self.db_name,
self.table_name)
if self.table_id is None:
self.table_id = tables_utils.create_table(
self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will fetch added table under schema node."""
@ -167,7 +170,6 @@ class TableUpdateParameterTestCase(BaseTestGenerator):
follow_redirects=True)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(self):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -483,3 +483,26 @@ def get_hash_partitions_data(data):
}]
data['partition_keys'] = \
[{'key_type': 'column', 'pt_column': 'empno'}]
def get_table_id(server, db_name, table_name):
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'],
server['sslmode'])
pg_cursor = connection.cursor()
pg_cursor.execute("select oid from pg_class where relname='%s'" %
table_name)
table = pg_cursor.fetchone()
if table:
table_id = table[0]
else:
table_id = None
connection.close()
return table_id
except Exception:
traceback.print_exc(file=sys.stderr)
raise

View File

@ -125,8 +125,9 @@ class MViewsUpdateParameterTestCase(BaseTestGenerator):
),
]
@classmethod
def setUpClass(self):
m_view_name = "test_mview_put_%s" % (str(uuid.uuid4())[1:8])
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@ -150,17 +151,19 @@ class MViewsUpdateParameterTestCase(BaseTestGenerator):
if not schema_response:
raise Exception("Could not find the schema to update a mview.")
self.m_view_name = "test_mview_put_%s" % (str(uuid.uuid4())[1:8])
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default " \
"AS SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE " \
"%s.%s OWNER TO %s"
self.m_view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
m_view_sql,
self.m_view_id = views_utils.get_view_id(self.server, self.db_name,
self.m_view_name)
if self.m_view_id is None:
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE " \
"pg_default AS SELECT 'test_pgadmin' WITH NO " \
"DATA;ALTER TABLE %s.%s OWNER TO %s"
self.m_view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
m_view_sql,
self.m_view_name)
def runTest(self):
"""This function will update the view/mview under schema node."""
mview_response = views_utils.verify_view(self.server, self.db_name,
@ -180,7 +183,6 @@ class MViewsUpdateParameterTestCase(BaseTestGenerator):
follow_redirects=True)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(self):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -48,8 +48,7 @@ class MViewsUpdateParameterTestCase(BaseTestGenerator):
),
]
@classmethod
def setUpClass(self):
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@ -143,7 +142,6 @@ class MViewsUpdateParameterTestCase(BaseTestGenerator):
# On success we get job_id from server
self.assertTrue('job_id' in response.json['data'])
@classmethod
def tearDownClass(self):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -86,3 +86,28 @@ def verify_view(server, db_name, view_name):
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def get_view_id(server, db_name, view_name):
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'],
server['sslmode'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
# Get 'oid' from newly created view
pg_cursor.execute("select oid from pg_class where relname='%s'" %
view_name)
view = pg_cursor.fetchone()
view_id = None
if view:
view_id = view[0]
connection.close()
return view_id
except Exception:
traceback.print_exc(file=sys.stderr)
raise

View File

@ -8,7 +8,6 @@
##########################################################################
from __future__ import print_function
import pyperclip
import random
from selenium.webdriver import ActionChains
@ -60,8 +59,18 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
self._mouseup_outside_grid_still_makes_a_selection()
self._copies_rows_with_header()
def paste_values_to_scratch_pad(self):
self.page.driver.switch_to.default_content()
self.page.driver.switch_to_frame(
self.page.driver.find_element_by_tag_name("iframe"))
scratch_pad_ele = self.page.find_by_css_selector(
QueryToolLocators.scratch_pad_css)
self.page.paste_values(scratch_pad_ele)
clipboard_text = scratch_pad_ele.get_attribute("value")
scratch_pad_ele.clear()
return clipboard_text
def _copies_rows(self):
pyperclip.copy("old clipboard contents")
first_row = self.page.find_by_xpath(
QueryToolLocators.output_row_xpath.format(1))
first_row.click()
@ -70,14 +79,14 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
QueryToolLocators.copy_button_css)
copy_button.click()
clipboard_text = self.paste_values_to_scratch_pad()
self.assertEqual('"Some-Name"\t6\t"some info"',
pyperclip.paste())
clipboard_text)
def _copies_rows_with_header(self):
self.page.find_by_css_selector('#btn-copy-row-dropdown').click()
self.page.find_by_css_selector('a#btn-copy-with-header').click()
pyperclip.copy("old clipboard contents")
select_all = self.page.find_by_xpath(
QueryToolLocators.select_all_column)
select_all.click()
@ -86,13 +95,14 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
QueryToolLocators.copy_button_css)
copy_button.click()
clipboard_text = self.paste_values_to_scratch_pad()
self.assertEqual("""\"some_column"\t"value"\t"details"
\"Some-Name"\t6\t"some info"
\"Some-Other-Name"\t22\t"some other info"
\"Yet-Another-Name"\t14\t"cool info\"""", pyperclip.paste())
\"Yet-Another-Name"\t14\t"cool info\"""", clipboard_text)
def _copies_columns(self):
pyperclip.copy("old clipboard contents")
column = self.page.find_by_css_selector(
QueryToolLocators.output_column_header_css.format('some_column'))
column.click()
@ -101,14 +111,15 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
QueryToolLocators.copy_button_css)
copy_button.click()
clipboard_text = self.paste_values_to_scratch_pad()
self.assertEqual(
"""\"Some-Name"
"Some-Other-Name"
"Yet-Another-Name\"""",
pyperclip.paste())
clipboard_text)
def _copies_row_using_keyboard_shortcut(self):
pyperclip.copy("old clipboard contents")
first_row = self.page.find_by_xpath(
QueryToolLocators.output_row_xpath.format(1))
first_row.click()
@ -116,11 +127,12 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
ActionChains(self.page.driver).key_down(
Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
clipboard_text = self.paste_values_to_scratch_pad()
self.assertEqual('"Some-Name"\t6\t"some info"',
pyperclip.paste())
clipboard_text)
def _copies_column_using_keyboard_shortcut(self):
pyperclip.copy("old clipboard contents")
column = self.page.find_by_css_selector(
QueryToolLocators.output_column_header_css.format('some_column'))
column.click()
@ -128,15 +140,15 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
ActionChains(self.page.driver).key_down(
Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
clipboard_text = self.paste_values_to_scratch_pad()
self.assertEqual(
"""\"Some-Name"
"Some-Other-Name"
"Yet-Another-Name\"""",
pyperclip.paste())
clipboard_text)
def _copies_rectangular_selection(self):
pyperclip.copy("old clipboard contents")
top_left_cell = \
self.page.find_by_xpath(
QueryToolLocators.output_column_data_xpath.
@ -154,12 +166,12 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
self.page.driver
).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
clipboard_text = self.paste_values_to_scratch_pad()
self.assertEqual(
'"Some-Other-Name"\t22\n"Yet-Another-Name"\t14', pyperclip.paste())
'"Some-Other-Name"\t22\n"Yet-Another-Name"\t14', clipboard_text)
def _shift_resizes_rectangular_selection(self):
pyperclip.copy("old clipboard contents")
top_left_cell = self.page.find_by_xpath(
QueryToolLocators.output_column_data_xpath.
format('Some-Other-Name')
@ -180,12 +192,12 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
Keys.CONTROL
).send_keys('c').key_up(Keys.CONTROL).perform()
clipboard_text = self.paste_values_to_scratch_pad()
self.assertEqual("""\"Some-Other-Name"\t22\t"some other info"
"Yet-Another-Name"\t14\t"cool info\"""", pyperclip.paste())
"Yet-Another-Name"\t14\t"cool info\"""", clipboard_text)
def _shift_resizes_column_selection(self):
pyperclip.copy("old clipboard contents")
column = self.page.find_by_css_selector(
QueryToolLocators.output_column_header_css.format('value')
)
@ -197,13 +209,13 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
ActionChains(self.page.driver).key_down(
Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
clipboard_text = self.paste_values_to_scratch_pad()
self.assertEqual(
'"Some-Name"\t6\n"Some-Other-Name"\t22\n"Yet-Another-Name"\t14',
pyperclip.paste())
clipboard_text)
def _mouseup_outside_grid_still_makes_a_selection(self):
pyperclip.copy("old clipboard contents")
bottom_right_cell = self.page.find_by_xpath(
QueryToolLocators.output_column_data_xpath.format('cool info')
)
@ -218,7 +230,9 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
ActionChains(self.page.driver).key_down(
Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
self.assertIn('"cool info"', pyperclip.paste())
clipboard_text = self.paste_values_to_scratch_pad()
self.assertIn('"cool info"', clipboard_text)
def after(self):
self.page.close_query_tool()

View File

@ -38,7 +38,8 @@ class CheckFileManagerFeatureTest(BaseFeatureTest):
self.page.add_server(self.server)
self.wait = WebDriverWait(self.page.driver, 10)
self.XSS_FILE = '/tmp/<img src=x onmouseover=alert("1")>.sql'
self.XSS_FILE = '/tmp/<img src=x ' + self.server['name'][:13] \
+ '=alert("1")>.sql'
# Remove any previous file
if os.path.isfile(self.XSS_FILE):
os.remove(self.XSS_FILE)
@ -67,7 +68,7 @@ class CheckFileManagerFeatureTest(BaseFeatureTest):
self.page.open_query_tool()
def _create_new_file(self):
self.page.find_by_css_selector(QueryToolLocators.btn_save_file)\
self.page.find_by_css_selector(QueryToolLocators.btn_save_file) \
.click()
# Set the XSS value in input
self.page.find_by_css_selector('.change_file_types')
@ -112,8 +113,8 @@ class CheckFileManagerFeatureTest(BaseFeatureTest):
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self._check_escaped_characters(
contents,
'&lt;img src=x onmouseover=alert("1")&gt;.sql',
'File manager'
'&lt;img src=x ' + self.server['name'][:13] +
'=alert("1")&gt;.sql', 'File manager'
)
def _check_escaped_characters(self, source_code, string_to_find, source):

View File

@ -94,24 +94,33 @@ class KeyboardShortcutFeatureTest(BaseFeatureTest):
NavMenuLocators.preference_menu_item_css)
pref_menu_item.click()
# Wait till the preference dialogue box is displayed by checking the
# visibility of Show System Object label
self.wait.until(EC.presence_of_element_located(
(By.XPATH, NavMenuLocators.show_system_objects_pref_label_xpath))
)
maximize_button = self.page.find_by_css_selector(
NavMenuLocators.maximize_pref_dialogue_css)
maximize_button.click()
browser_node = self.page.find_by_xpath(
NavMenuLocators.specified_preference_tree_node.format('Browser'))
if self.page.find_by_xpath(
NavMenuLocators.specified_pref_node_exp_status.
format('Browser')).get_attribute('aria-expanded') == 'false':
ActionChains(self.driver).double_click(browser_node).perform()
display_node = self.page.find_by_xpath(
NavMenuLocators.specified_sub_node_of_pref_tree_node.format(
'Browser', 'Display'))
attempt = 5
while attempt > 0:
display_node.click()
# After clicking the element gets loaded in to the dom but still
# not visible, hence sleeping for a sec.
time.sleep(1)
if self.page.wait_for_element_to_be_visible(
self.driver,
NavMenuLocators.show_system_objects_pref_label_xpath, 3):
break
else:
attempt -= 1
maximize_button = self.page.find_by_css_selector(
NavMenuLocators.maximize_pref_dialogue_css)
maximize_button.click()
keyboard_node = self.page.find_by_xpath(
NavMenuLocators.specified_sub_node_of_pref_tree_node.format(
'Browser', 'Keyboard shortcuts'))

View File

@ -92,6 +92,18 @@ class PGDataypeFeatureTest(BaseFeatureTest):
wait = WebDriverWait(self.page.driver, 10)
browser_node = self.page.find_by_xpath(
NavMenuLocators.specified_preference_tree_node.format('Browser'))
if self.page.find_by_xpath(
NavMenuLocators.specified_pref_node_exp_status.
format('Browser')).get_attribute('aria-expanded') == 'false':
ActionChains(self.driver).double_click(browser_node).perform()
self.page.retry_click(
(By.XPATH, NavMenuLocators.specified_sub_node_of_pref_tree_node.
format('Browser', 'Display')),
(By.XPATH, NavMenuLocators.show_system_objects_pref_label_xpath))
# Wait till the preference dialogue box is displayed by checking the
# visibility of Show System Object label
wait.until(EC.presence_of_element_located(

View File

@ -18,6 +18,7 @@ from regression.python_test_utils import test_utils
from regression.python_test_utils import test_gui_helper
from regression.feature_utils.locators import NavMenuLocators
from regression.feature_utils.tree_area_locators import TreeAreaLocators
from selenium.webdriver import ActionChains
class PGUtilitiesBackupFeatureTest(BaseFeatureTest):
@ -56,6 +57,7 @@ class PGUtilitiesBackupFeatureTest(BaseFeatureTest):
self.server['sslmode']
)
test_utils.drop_database(connection, self.database_name)
self._update_preferences()
db_id = test_utils.create_database(self.server, self.database_name)
if not db_id:
self.assertTrue(False, "Database {} is not "
@ -130,7 +132,7 @@ class PGUtilitiesBackupFeatureTest(BaseFeatureTest):
self._check_detailed_window_for_xss('Backup')
else:
command = self.page.find_by_css_selector(
NavMenuLocators.process_watcher_detailed_command_canvas_css).\
NavMenuLocators.process_watcher_detailed_command_canvas_css). \
text
self.assertIn(self.server['name'], str(command))
@ -199,7 +201,7 @@ class PGUtilitiesBackupFeatureTest(BaseFeatureTest):
self._check_detailed_window_for_xss('Restore')
else:
command = self.page.find_by_css_selector(
NavMenuLocators.process_watcher_detailed_command_canvas_css).\
NavMenuLocators.process_watcher_detailed_command_canvas_css). \
text
self.assertIn(self.server['name'], str(command))
@ -242,3 +244,74 @@ class PGUtilitiesBackupFeatureTest(BaseFeatureTest):
# For XSS we need to search against element's html code
assert source_code.find(string_to_find) != - \
1, "{0} might be vulnerable to XSS ".format(source)
def _update_preferences(self):
"""
Function updates preferences for binary path.
"""
file_menu = self.page.find_by_css_selector(
NavMenuLocators.file_menu_css)
file_menu.click()
pref_menu_item = self.page.find_by_css_selector(
NavMenuLocators.preference_menu_item_css)
pref_menu_item.click()
wait = WebDriverWait(self.page.driver, 10)
# Wait till the preference dialogue box is displayed by checking the
# visibility of Show System Object label
wait.until(EC.presence_of_element_located(
(By.XPATH, NavMenuLocators.show_system_objects_pref_label_xpath))
)
maximize_button = self.page.find_by_css_selector(
NavMenuLocators.maximize_pref_dialogue_css)
maximize_button.click()
path = self.page.find_by_xpath(
NavMenuLocators.specified_preference_tree_node.format('Paths'))
if self.page.find_by_xpath(
NavMenuLocators.specified_pref_node_exp_status.format('Paths')). \
get_attribute('aria-expanded') == 'false':
ActionChains(self.driver).double_click(path).perform()
binary_path = self.page.find_by_xpath(
NavMenuLocators.specified_sub_node_of_pref_tree_node.format(
'Paths', 'Binary paths'))
binary_path.click()
default_binary_path = self.server['default_binary_paths']
if default_binary_path is not None:
server_types = default_binary_path.keys()
for serv in server_types:
if serv == 'pg':
path_input = self.page.find_by_xpath(
"//label[text()='PostgreSQL Binary "
"Path']/following-sibling::div//input")
path_input.clear()
path_input.click()
path_input.send_keys(default_binary_path['pg'])
elif serv == 'gpdb':
path_input = self.page.find_by_xpath(
"//label[text()='Greenplum Database Binary "
"Path']/following-sibling::div//input")
path_input.clear()
path_input.click()
path_input.send_keys(default_binary_path['gpdb'])
elif serv == 'ppas':
path_input = self.page.find_by_xpath(
"//label[text()='EDB Advanced Server Binary "
"Path']/following-sibling::div//input")
path_input.clear()
path_input.click()
path_input.send_keys(default_binary_path['ppas'])
else:
print('Binary path Key is Incorrect')
# save and close the preference dialog.
self.page.click_modal('Save')
self.page.wait_for_element_to_disappear(
lambda driver: driver.find_element_by_css_selector(".ajs-modal")
)

View File

@ -9,7 +9,6 @@
from __future__ import print_function
import sys
import pyperclip
import random
from selenium.webdriver import ActionChains
@ -90,7 +89,6 @@ class QueryToolJourneyTest(BaseFeatureTest):
print(" OK.", file=sys.stderr)
def _test_copies_rows(self):
pyperclip.copy("old clipboard contents")
self.page.driver.switch_to.default_content()
self.page.driver.switch_to_frame(
self.page.driver.find_element_by_tag_name("iframe"))
@ -103,12 +101,21 @@ class QueryToolJourneyTest(BaseFeatureTest):
QueryToolLocators.copy_button_css)
copy_row.click()
self.page.driver.switch_to.default_content()
self.page.driver.switch_to_frame(
self.page.driver.find_element_by_tag_name("iframe"))
scratch_pad_ele = self.page.find_by_css_selector(
QueryToolLocators.scratch_pad_css)
self.page.paste_values(scratch_pad_ele)
clipboard_text = scratch_pad_ele.get_attribute("value")
self.assertEqual('"Some-Name"\t6\t"some info"',
pyperclip.paste())
clipboard_text)
scratch_pad_ele.clear()
def _test_copies_columns(self):
pyperclip.copy("old clipboard contents")
self.page.driver.switch_to.default_content()
self.page.driver.switch_to_frame(
self.page.driver.find_element_by_tag_name("iframe"))
@ -121,9 +128,20 @@ class QueryToolJourneyTest(BaseFeatureTest):
QueryToolLocators.copy_button_css)
copy_btn.click()
self.assertTrue('"Some-Name"' in pyperclip.paste())
self.assertTrue('"Some-Other-Name"' in pyperclip.paste())
self.assertTrue('"Yet-Another-Name"' in pyperclip.paste())
self.page.driver.switch_to.default_content()
self.page.driver.switch_to_frame(
self.page.driver.find_element_by_tag_name("iframe"))
scratch_pad_ele = self.page.find_by_css_selector(
QueryToolLocators.scratch_pad_css)
self.page.paste_values(scratch_pad_ele)
clipboard_text = scratch_pad_ele.get_attribute("value")
self.assertTrue('"Some-Name"' in clipboard_text)
self.assertTrue('"Some-Other-Name"' in clipboard_text)
self.assertTrue('"Yet-Another-Name"' in clipboard_text)
scratch_pad_ele.clear()
def _test_history_tab(self):
self.page.clear_query_tool()
@ -370,10 +388,10 @@ class QueryToolJourneyTest(BaseFeatureTest):
self.page.find_by_css_selector(
QueryToolLocators.btn_clear_dropdown)
)
ActionChains(self.driver)\
ActionChains(self.driver) \
.move_to_element(
self.page.find_by_css_selector(
QueryToolLocators.btn_clear_history)).perform()
self.page.find_by_css_selector(
QueryToolLocators.btn_clear_history)).perform()
self.page.click_element(
self.page.find_by_css_selector(QueryToolLocators.btn_clear_history)
)

View File

@ -131,13 +131,15 @@ CREATE TABLE public.nonintpkey
self.test_db, 'public')
self._load_config_data('table_insert_update_cases')
data_local = config_data
# iterate on both tables
for cnt in (1, 2):
self._perform_test_for_table('defaults_{0}'.format(str(cnt)))
self._perform_test_for_table('defaults_{0}'.format(str(cnt)),
data_local)
# test nonint pkey table
self._load_config_data('table_insert_update_nonint')
self._perform_test_for_table('nonintpkey')
data_local = config_data
self._perform_test_for_table('nonintpkey', data_local)
def after(self):
self.page.remove_server(self.server)
@ -167,7 +169,7 @@ CREATE TABLE public.nonintpkey
global config_data
config_data = config_data_json[config_key]
def _perform_test_for_table(self, table_name):
def _perform_test_for_table(self, table_name, config_data_local):
self.page.click_a_tree_node(
table_name,
TreeAreaLocators.sub_nodes_of_tables_node)
@ -176,20 +178,21 @@ CREATE TABLE public.nonintpkey
self.page.wait_for_query_tool_loading_indicator_to_disappear()
# Run test to insert a new row in table with default values
self._add_row()
self._add_row(config_data_local)
self._verify_row_data(row_height=0,
config_check_data=config_data['add'])
config_check_data=config_data_local['add'])
# Run test to copy/paste a row
self._copy_paste_row()
self._copy_paste_row(config_data_local)
self._update_row()
self._update_row(config_data_local)
self.page.click_tab("Messages")
self._verify_messsages("")
self.page.click_tab("Data Output")
updated_row_data = {
i: config_data['update'][i] if i in config_data['update'] else val
for i, val in config_data['add'].items()
i: config_data_local['update'][i] if i in config_data_local[
'update'] else val
for i, val in config_data_local['add'].items()
}
self._verify_row_data(row_height=0,
config_check_data=updated_row_data)
@ -221,7 +224,6 @@ CREATE TABLE public.nonintpkey
Returns: None
"""
self.wait.until(EC.visibility_of_element_located(
(By.XPATH, xpath)), CheckForViewDataTest.TIMEOUT_STRING
)
@ -238,7 +240,7 @@ CREATE TABLE public.nonintpkey
if value == 'clear':
cell_el.find_element_by_css_selector('input').clear()
else:
ActionChains(self.driver).send_keys(value).\
ActionChains(self.driver).send_keys(value). \
send_keys(Keys.ENTER).perform()
elif cell_type in ['text', 'json', 'text[]', 'boolean[]']:
text_area_ele = self.page.find_by_css_selector(
@ -290,7 +292,7 @@ CREATE TABLE public.nonintpkey
self.page.driver.find_element_by_tag_name('iframe')
)
def _copy_paste_row(self):
def _copy_paste_row(self, config_data_l):
row0_cell0_xpath = CheckForViewDataTest._get_cell_xpath("r0", 1)
self.page.find_by_xpath(row0_cell0_xpath).click()
@ -300,12 +302,12 @@ CREATE TABLE public.nonintpkey
QueryToolLocators.paste_button_css).click()
# Update primary key of copied cell
self._add_update_save_row(config_data['copy'], row=2)
self._add_update_save_row(config_data_l['copy'], row=2)
# Verify row 1 and row 2 data
updated_row_data = {
i: config_data['copy'][i] if i in config_data['copy'] else val
for i, val in config_data['add'].items()
i: config_data_l['copy'][i] if i in config_data_l['copy'] else val
for i, val in config_data_l['add'].items()
}
self._verify_row_data(row_height=25,
config_check_data=updated_row_data)
@ -329,11 +331,11 @@ CREATE TABLE public.nonintpkey
# save ajax is completed.
time.sleep(2)
def _add_row(self):
self._add_update_save_row(config_data['add'], 1)
def _add_row(self, config_data_l):
self._add_update_save_row(config_data_l['add'], 1)
def _update_row(self):
self._add_update_save_row(config_data['update'], 1)
def _update_row(self, config_data_l):
self._add_update_save_row(config_data_l['update'], 1)
def _verify_messsages(self, text):
messages_ele = self.page.find_by_css_selector(

View File

@ -211,17 +211,23 @@ class CheckForXssFeatureTest(BaseFeatureTest):
"Query tool (History Entry)"
)
# Check for history details message
history_ele = self.driver\
.find_element_by_css_selector(".query-detail .content-value")
source_code = history_ele.get_attribute('innerHTML')
retry = 2
while retry > 0:
try:
history_ele = self.driver \
.find_element_by_css_selector(
".query-detail .content-value")
source_code = history_ele.get_attribute('innerHTML')
break
except StaleElementReferenceException:
retry -= 1
self._check_escaped_characters(
source_code,
'&lt;script&gt;alert(1)&lt;/script&gt;',
"Query tool (History Details-Message)"
)
retry = 2
while retry > 0:
try:

View File

@ -120,9 +120,8 @@ class BaseTestGenerator(unittest.TestCase):
self.skipTest('cannot run in: %s' %
server_con['data']['type'])
@classmethod
def setTestServer(cls, server):
cls.server = server
def setTestServer(self, server):
self.server = server
@abstractmethod
def runTest(self):
@ -137,17 +136,14 @@ class BaseTestGenerator(unittest.TestCase):
def setTestClient(cls, test_client):
cls.tester = test_client
@classmethod
def setDriver(cls, driver):
cls.driver = driver
def setDriver(self, driver):
self.driver = driver
@classmethod
def setServerInformation(cls, server_information):
cls.server_information = server_information
def setServerInformation(self, server_information):
self.server_information = server_information
@classmethod
def setTestDatabaseName(cls, database_name):
cls.test_db = database_name
def setTestDatabaseName(self, database_name):
self.test_db = database_name
@classmethod
def setReSQLModuleList(cls, module_list):

View File

@ -141,6 +141,61 @@ Python Tests:
and registered automatically by its module name in
'pgadmin4/web/pgadmin/utils/test.py' file.
- To run Feature Tests in parallel using selenoid(grid + docker), selenoid
need to be installed. Steps to install selenoid -
- Install & Start docker
$yum -y install docker docker-registry
$vi /etc/sysconfig/docker # in OPTIONS add --selinux-enabled=false
$systemctl enable docker.service
$systemctl start docker.service
$systemctl status docker.service
- Install & Start Selenoid
$curl -s https://aerokube.com/cm/bash | bash
$./cm selenoid start --vnc --args "-limit 3 -cpu 1.5 -mem 1.5g"
$./cm selenoid-ui start
Check selenoid status -
http://<IP address of Selenoid Installed machine>:4444/status
- Should show json with browsers details
http://<IP address of Selenoid Installed machine>:8080/#/
- Capabilities shows available browser
Note : In --args "-limit 3 -cpu 1.5 -mem 1.5g"
-limit 3 :limits maximum parallel sessions(dockers) in selenoid,
-cpu :limit memory and CPU usage,
-mem :limit memory per session.
Generally max parallel session is the number of cores * 1.5 2
You can list available flags by using ./cm selenoid args
Additional Information about tool
- https://aerokube.com/selenoid/latest/
- Update 'test_config.json' with selenoid config information
pgAdmin_default_server -
It is the IP address for the machine where pgadmin source code is
present.Value should NOT be '127.0.0.1' even though everything runs
on the same machine.
You can get it on linux running command 'ifconfig | grep inet'
e.g. - 192.168.143.121
max_parallel_sessions -
This is other way to control number of tests to be run in parallel.
This should be equal or less than limit specified while setting up
selenoid
selenoid_url -
Url should be formed as below -
http://<IP address of Selenoid Installed machine>:4444/wd/hub/
e.g. - selenoid_url": "http://192.168.143.121:4444/wd/hub"
If source code & selenoid servers are on same machine then
selenoid url value can be - "http://localhost:4444/wd/hub"
browsers_list -
List of browser name & version enclosed in {} on which tests to be
executed.
Make sure list contains those browsers & versions only which are shown
in capabilities tab while in selenoid status web-page.
If version is mention as null, then latest version available in
selenoid server will be used for execution.
e.g. - [ {"name": "Chrome","version": "80.0"},
{"name": "Firefox","version": "74.0"}]
- Change to the regression test directory:
run 'cd web/regression'
@ -193,6 +248,10 @@ Python Tests:
Example 3) Exclude reverse engineered SQL test framework for all modules
run 'python runtests.py --exclude resql'
- Execute ui selenium tests in parallel using selenoid(selenium grid + docker)
Example : --pkg feature_tests --parallel
Code Coverage:
---------------

View File

@ -61,11 +61,16 @@ class AppStarter:
raise Exception('Unable to start python server even after '
'retrying 60 times.')
launch_browser(0)
if self.driver is not None:
launch_browser(0)
else:
return "http://" + self.app_config.DEFAULT_SERVER + ":" \
+ random_server_port
def stop_app(self):
""" This function stop the started app by killing process """
self.driver.quit()
if self.driver is not None:
self.driver.quit()
# os.killpg supported in Mac and Unix as this function not supported in
# Windows
try:

View File

@ -172,6 +172,8 @@ class QueryToolLocators:
new_row_xpath = "//div[contains(@class, 'new-row')]"
scratch_pad_css = ".sql-scratch > textarea"
copy_button_css = "#btn-copy-row"
paste_button_css = "#btn-paste-row"
@ -217,9 +219,9 @@ class QueryToolLocators:
btn_commit = "#btn-commit"
show_query_internally_btn = \
"//div[label[normalize-space(" \
"text())='Show queries generated internally by pgAdmin?']]" \
"//div[contains(@class,'toggle btn')]"
"//div[label[contains(normalize-space(text())," \
"'Show queries generated internally by')]]//" \
"div[contains(@class,'toggle btn')]"
editable_column_icon_xpath = "//div[contains(@class," \
" 'editable-column-header-icon')]" \

View File

@ -88,11 +88,17 @@ class PgadminPage:
(By.CSS_SELECTOR, "button[type='save'].btn.btn-primary")))
self.find_by_css_selector("button[type='save'].btn.btn-primary").\
click()
WebDriverWait(self.driver, 10).until(
EC.visibility_of_element_located(
(By.XPATH,
"//*[@id='tree']//*[.='" + server_config['name'] + "']")))
try:
WebDriverWait(self.driver, 10).until(
EC.visibility_of_element_located(
(By.XPATH,
"//*[@id='tree']//*[.='" + server_config['name'] + "']")))
except TimeoutException:
self.toggle_open_servers_group()
WebDriverWait(self.driver, 10).until(
EC.visibility_of_element_located(
(By.XPATH,
"//*[@id='tree']//*[.='" + server_config['name'] + "']")))
def open_query_tool(self):
self.driver.find_element_by_link_text("Tools").click()
@ -910,7 +916,11 @@ class PgadminPage:
return element
except (NoSuchElementException, WebDriverException):
return False
time.sleep(1)
self.driver.switch_to.default_content()
self.driver.switch_to_frame(
self.driver.find_element_by_tag_name("iframe"))
self.find_by_xpath("//a[text()='Query Editor']").click()
codemirror_ele = WebDriverWait(
self.driver, timeout=self.timeout, poll_frequency=0.01)\
.until(find_codemirror,
@ -1161,3 +1171,34 @@ class PgadminPage:
except Exception:
attempt += 1
return click_status
def paste_values(self, el=None):
"""
Function paste values in scratch pad
:param el:
"""
actions = ActionChains(self.driver)
if el:
# Must step
el.click()
if self.driver.capabilities["platformName"] == 'mac':
# FF step
el.send_keys(Keys.COMMAND + "v")
# Chrome Step
actions.key_down(Keys.SHIFT)
actions.send_keys(Keys.INSERT)
actions.key_up(Keys.SHIFT)
actions.perform()
else:
el.send_keys(Keys.CONTROL + "v")
def wait_for_element_to_be_visible(self, driver, xpath, time_value=20):
"""This will wait until an element is visible on page"""
element_located_status = False
try:
if WebDriverWait(driver, time_value).until(
EC.visibility_of_element_located((By.XPATH, xpath))):
element_located_status = True
except TimeoutException:
element_located_status = False
return element_located_status

View File

@ -8,6 +8,8 @@
##########################################################################
from __future__ import print_function
import fileinput
import traceback
import os
import sys
@ -16,7 +18,17 @@ import psycopg2
import sqlite3
import shutil
from functools import partial
from selenium.webdriver.support.wait import WebDriverWait
from testtools.testcase import clone_test_with_new_id
import re
import time
from selenium.common.exceptions import WebDriverException
import urllib.request as urllib
import json
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support import expected_conditions as ec
import config
import regression
@ -1216,3 +1228,242 @@ def create_expected_output(parameters, actual_data):
actual_data.remove(value)
break
return expected_output
def is_parallel_ui_tests(args):
"""
This function checks for coverage args exists in command line args
:return: boolean
"""
if "parallel" in args and args["parallel"]:
return True
return False
def get_selenium_grid_status_and_browser_list(selenoid_url):
"""
This function checks selenoid status for given url
:param selrnoid_url:
:return: status of selenoid & list of browsers available with selenoid if
status is up
"""
selenoid_status = False
browser_list = []
try:
selenoid_status = get_selenium_grid_status_json(selenoid_url)
if selenoid_status:
available_browsers = selenoid_status["browsers"]
list_of_browsers = test_setup.config_data['selenoid_config'][
'browsers_list']
for browser in list_of_browsers:
if browser["name"].lower() in available_browsers.keys():
versions = available_browsers[(browser["name"].lower())]
if browser["version"] is None:
print("Specified version of browser is None. Hence "
"latest version of {0} available with selenoid "
"server will be used.\n".format(browser["name"]))
browser_list.append(browser)
elif browser["version"] in versions.keys():
browser_list.append(browser)
else:
print(
"Available {0} versions {1}".format(
browser["name"], versions.keys()))
print("Specified Version = {0}".format(
browser["version"]))
else:
print("{0} is NOT available".format(browser["name"]))
except Exception as e:
(str(e))
print("Unable to find Selenoid Status")
return selenoid_status, browser_list
def is_feature_test_included(arguments):
"""
:param arguments: his is command line arguments for module name to
which test suite will run
:return: boolean value whether to execute feature tests or NOT &
browser name if feature_test_tobe_included = True
"""
exclude_pkgs = []
if arguments['exclude'] is not None:
exclude_pkgs += arguments['exclude'].split(',')
feature_test_tobe_included = 'feature_tests' not in exclude_pkgs and \
(arguments['pkg'] is None or arguments[
'pkg'] == "all" or
arguments['pkg'] == "feature_tests")
return feature_test_tobe_included
def launch_url_in_browser(driver_instance, url, title='pgAdmin 4', timeout=40):
"""
Function launches urls in specified driver instance
:param driver_instance:browser instance
:param url:url to be launched
:param title:web-page tile on successful launch default is 'pgAdmin 4'
:param timeout:in seconds for getting specified title default is 20sec
:return:
"""
count = timeout / 5
while count > 0:
try:
driver_instance.get(url)
wait = WebDriverWait(driver_instance, 10)
wait.until(ec.title_is(title))
break
except WebDriverException as e:
time.sleep(6)
count -= 1
if count == 0:
exception_msg = 'Web-page title did not match to {0}. ' \
'Please check url {1} accessible on ' \
'internet.'.format(title, url)
raise Exception(exception_msg)
def get_remote_webdriver(hub_url, browser, browser_ver, test_name):
"""
This functions returns remote web-driver instance created in selenoid
machine.
:param hub_url
:param browser: browser name
:param browser_ver: version for browser
:param test_name: test name
:return: remote web-driver instance for specified browser
"""
test_name = browser + browser_ver + "_" + test_name + "-" + time.strftime(
"%m_%d_%y_%H_%M_%S", time.localtime())
driver_local = None
desired_capabilities = {
"version": browser_ver,
"enableVNC": True,
"enableVideo": True,
"enableLog": True,
"videoName": test_name + ".mp4",
"logName": test_name + ".log",
"name": test_name,
"timeZone": "Asia/Kolkata"
}
if browser == 'firefox':
profile = webdriver.FirefoxProfile()
profile.set_preference("dom.disable_beforeunload", True)
desired_capabilities["browserName"] = "firefox"
desired_capabilities["requireWindowFocus"] = True
desired_capabilities["enablePersistentHover"] = False
driver_local = webdriver.Remote(
command_executor=hub_url,
desired_capabilities=desired_capabilities, browser_profile=profile)
elif browser == 'chrome':
options = Options()
options.add_argument("--window-size=1280,1024")
desired_capabilities["browserName"] = "chrome"
driver_local = webdriver.Remote(
command_executor=hub_url,
desired_capabilities=desired_capabilities, options=options)
else:
print("Specified browser does not exist.")
# maximize browser window
driver_local.maximize_window()
# driver_local.implicitly_wait(2)
return driver_local
def get_parallel_sequential_module_list(module_list):
"""
Functions segregate parallel & sequential modules
:param module_list: Complete list of modules
:return: parallel & sequential module lists
"""
# list of files consisting tests that needs to be
# executed sequentially
sequential_tests_file = [
'pgadmin.feature_tests.pg_utilities_backup_restore_test',
'pgadmin.feature_tests.pg_utilities_maintenance_test',
'pgadmin.feature_tests.keyboard_shortcut_test']
# list of tests can be executed in parallel
parallel_tests = list(module_list)
for module in module_list:
if str(module[0]) in sequential_tests_file:
parallel_tests.remove(module)
# list of tests can be executed in sequentially
sequential_tests = list(
filter(lambda i: i not in parallel_tests,
module_list))
# return parallel & sequential lists
return parallel_tests, sequential_tests
def get_browser_details(browser_info_dict, url):
"""
Function extracts browser name & version from browser info dict
in test_config.json
:param browser_info_dict:
:return: browser name & version
"""
browser_name = browser_info_dict["name"].lower()
browser_version = browser_info_dict["version"]
if browser_version is None:
selenoid_status = get_selenium_grid_status_json(url)
versions = selenoid_status["browsers"][browser_name]
browser_version = max(versions)
return browser_name, browser_version
def print_test_summary(complete_module_list, parallel_testlist,
sequential_tests_list, browser_name, browser_version):
"""
Prints test summary about total, parallel, sequential, browser name,
browser version information
:param complete_module_list:
:param parallel_testlist:
:param sequential_tests_list:
:param browser_name:
:param browser_version:
"""
print(
"=================================================================",
file=sys.stderr
)
print(
"Total Tests # {0}\nParallel Tests # {1}, "
"Sequential Tests # {2}".format(
len(complete_module_list), len(parallel_testlist),
len(sequential_tests_list)),
file=sys.stderr)
print("Browser: [Name:{0}, Version: {1}]".format(
browser_name.capitalize(), browser_version),
file=sys.stderr)
print(
"=================================================================\n",
file=sys.stderr
)
def get_selenium_grid_status_json(selenoid_url):
"""
Functions returns json response received from selenoid server
:param selenoid_url:
:return:
"""
try:
selenoid_status = urllib.urlopen(
"http://" + re.split('/', (re.split('//', selenoid_url, 1)[1]))[
0] + "/status", timeout=10)
selenoid_status = json.load(selenoid_status)
if isinstance(selenoid_status, dict):
return selenoid_status
except Exception as e:
print("Unable to find Selenoid Status.Kindly check url passed -'{0}'".
format(selenoid_url))
return None

View File

@ -21,7 +21,6 @@ fixtures==3.0.0
linecache2==1.0.0
pbr==3.1.1
pycodestyle>=2.5.0
pyperclip~=1.6.0
python-mimeparse==1.6.0
testscenarios==0.5.0
testtools==2.3.0

View File

@ -21,7 +21,8 @@ import traceback
import json
import random
import coverage
import threading
import time
import unittest
if sys.version_info < (3, 4):
@ -136,7 +137,7 @@ scenarios.apply_scenario = test_utils.apply_scenario
def get_suite(module_list, test_server, test_app_client, server_information,
test_db_name):
test_db_name, driver_passed):
"""
This function add the tests to test suite and return modified test suite
variable.
@ -166,7 +167,7 @@ def get_suite(module_list, test_server, test_app_client, server_information,
obj.setApp(app)
obj.setTestClient(test_app_client)
obj.setTestServer(test_server)
obj.setDriver(driver)
obj.setDriver(driver_passed)
obj.setServerInformation(server_information)
obj.setTestDatabaseName(test_db_name)
scenario = scenarios.generate_scenarios(obj)
@ -207,57 +208,62 @@ def get_test_modules(arguments):
exclude_pkgs += arguments['exclude'].split(',')
if 'feature_tests' not in exclude_pkgs and \
(arguments['pkg'] is None or arguments['pkg'] == "all" or
arguments['pkg'] == "feature_tests"):
(arguments['pkg'] is None or arguments['pkg'] == "all" or
arguments['pkg'] == "feature_tests"):
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import \
DesiredCapabilities
if arguments['pkg'] == "feature_tests":
exclude_pkgs.extend(['resql'])
default_browser = 'chrome'
if not test_utils.is_parallel_ui_tests(args):
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import \
DesiredCapabilities
# Check default browser provided through command line. If provided
# then use that browser as default browser else check for the setting
# provided in test_config.json file.
if (
'default_browser' in arguments and
arguments['default_browser'] is not None
):
default_browser = arguments['default_browser'].lower()
elif (
test_setup.config_data and
"default_browser" in test_setup.config_data
):
default_browser = test_setup.config_data['default_browser'].lower()
default_browser = 'chrome'
if default_browser == 'firefox':
cap = DesiredCapabilities.FIREFOX
cap['requireWindowFocus'] = True
cap['enablePersistentHover'] = False
profile = webdriver.FirefoxProfile()
profile.set_preference("dom.disable_beforeunload", True)
driver = webdriver.Firefox(capabilities=cap,
firefox_profile=profile)
driver.implicitly_wait(1)
else:
options = Options()
if test_setup.config_data:
if 'headless_chrome' in test_setup.config_data:
if test_setup.config_data['headless_chrome']:
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-setuid-sandbox")
options.add_argument("--window-size=1280,1024")
options.add_argument("--disable-infobars")
options.add_experimental_option('w3c', False)
driver = webdriver.Chrome(chrome_options=options)
# Check default browser provided through command line. If provided
# then use that browser as default browser else check for the
# setting provided in test_config.json file.
if (
'default_browser' in arguments and
arguments['default_browser'] is not None
):
default_browser = arguments['default_browser'].lower()
elif (
test_setup.config_data and
"default_browser" in test_setup.config_data
):
default_browser = test_setup.config_data[
'default_browser'].lower()
# maximize browser window
driver.maximize_window()
if default_browser == 'firefox':
cap = DesiredCapabilities.FIREFOX
cap['requireWindowFocus'] = True
cap['enablePersistentHover'] = False
profile = webdriver.FirefoxProfile()
profile.set_preference("dom.disable_beforeunload", True)
driver = webdriver.Firefox(capabilities=cap,
firefox_profile=profile)
driver.implicitly_wait(1)
else:
options = Options()
if test_setup.config_data:
if 'headless_chrome' in test_setup.config_data:
if test_setup.config_data['headless_chrome']:
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-setuid-sandbox")
options.add_argument("--window-size=1280,1024")
options.add_argument("--disable-infobars")
options.add_experimental_option('w3c', False)
driver = webdriver.Chrome(chrome_options=options)
app_starter = AppStarter(driver, config)
app_starter.start_app()
# maximize browser window
driver.maximize_window()
app_starter = AppStarter(driver, config)
app_starter.start_app()
handle_cleanup = test_utils.get_cleanup_handler(test_client, app_starter)
# Register cleanup function to cleanup on exit
@ -319,6 +325,9 @@ def add_arguments():
'--modules',
help='Executes the feature test for specific modules in pkg'
)
parser.add_argument('--parallel', nargs='?', const=True,
type=bool, default=False,
help='Enable parallel Feature Tests')
arg = parser.parse_args()
return arg
@ -404,117 +413,213 @@ class StreamToLogger(object):
pass
if __name__ == '__main__':
# Failure detected?
failure = False
test_result = dict()
cov = None
# Set signal handler for cleanup
signal_list = dir(signal)
required_signal_list = ['SIGTERM', 'SIGABRT', 'SIGQUIT', 'SIGINT']
# Get the OS wise supported signals
supported_signal_list = [sig for sig in required_signal_list if
sig in signal_list]
for sig in supported_signal_list:
signal.signal(getattr(signal, sig), sig_handler)
# Set basic logging configuration for log file
fh = logging.FileHandler(CURRENT_PATH + '/' +
'regression.log', 'w', 'utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(config.FILE_LOG_FORMAT))
logger = logging.getLogger()
logger.addHandler(fh)
# Create logger to write log in the logger file as well as on console
stderr_logger = logging.getLogger('STDERR')
sys.stderr = StreamToLogger(stderr_logger, logging.ERROR)
args = vars(add_arguments())
# Get test module list
def execute_test(test_module_list_passed, server_passed, driver_passed):
"""
Function executes actually test
:param test_module_list_passed:
:param server_passed:
:param driver_passed:
:return:
"""
try:
test_module_list = get_test_modules(args)
except Exception as e:
print(str(e))
sys.exit(1)
# Login the test client
test_utils.login_tester_account(test_client)
print("\n=============Running the test cases for '%s' ============="
% server_passed['name'], file=sys.stderr)
# Create test server
server_information = \
test_utils.create_parent_server_node(server_passed)
servers_info = test_utils.get_config_data()
node_name = "all"
if args['pkg'] is not None:
node_name = args['pkg'].split('.')[-1]
# Create test database with random number to avoid conflict in
# parallel execution on different platforms. This database will be
# used across all feature tests.
test_db_name = "acceptance_test_db" + \
str(random.randint(10000, 65535))
connection = test_utils.get_db_connection(
server_passed['db'],
server_passed['username'],
server_passed['db_password'],
server_passed['host'],
server_passed['port'],
server_passed['sslmode']
)
# Start coverage
if test_utils.is_coverage_enabled(args):
cov = coverage.Coverage(config_file=COVERAGE_CONFIG_FILE)
cov.start()
# Add the server version in server information
server_information['server_version'] = connection.server_version
server_information['type'] = server_passed['type']
try:
for server in servers_info:
print("\n=============Running the test cases for '%s'============="
% server['name'], file=sys.stderr)
# Create test server
server_information = test_utils.create_parent_server_node(server)
# Drop the database if already exists.
test_utils.drop_database(connection, test_db_name)
# Create test database with random number to avoid conflict in
# parallel execution on different platforms. This database will be
# used across all feature tests.
test_db_name = "acceptance_test_db" + \
str(random.randint(10000, 65535))
connection = test_utils.get_db_connection(
server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'],
server['sslmode']
)
# Create database
test_utils.create_database(server_passed, test_db_name)
# Add the server version in server information
server_information['server_version'] = connection.server_version
server_information['type'] = server['type']
# Configure preferences for the test cases
test_utils.configure_preferences(
default_binary_path=server_passed['default_binary_paths'])
# Drop the database if already exists.
# Get unit test suit
suite = get_suite(test_module_list_passed,
server_passed,
test_client,
server_information, test_db_name, driver_passed)
# Run unit test suit created
tests = unittest.TextTestRunner(stream=sys.stderr,
descriptions=True,
verbosity=2).run(suite)
# processing results
ran_tests, failed_cases, skipped_cases, passed_cases = \
get_tests_result(tests)
# This is required when some tests are running parallel
# & some sequential in case of parallel ui tests
if threading.current_thread().getName() == "sequential_tests":
try:
if test_result[server_passed['name']][0] is not None:
ran_tests = test_result[server_passed['name']][0] + \
ran_tests
failed_cases.update(test_result[server_passed['name']][1])
skipped_cases.update(test_result[server_passed['name']][2])
passed_cases.update(test_result[server_passed['name']][3])
test_result[server_passed['name']] = [ran_tests, failed_cases,
skipped_cases,
passed_cases]
except KeyError:
pass
# Add final results server wise in test_result dict
test_result[server_passed['name']] = [ran_tests, failed_cases,
skipped_cases, passed_cases]
# Set empty list for 'passed' parameter for each testRun.
# So that it will not append same test case name
# unittest.result.TestResult.passed = []
# Drop the testing database created initially
if connection:
test_utils.drop_database(connection, test_db_name)
# Create database
test_utils.create_database(server, test_db_name)
# Configure preferences for the test cases
test_utils.configure_preferences(
default_binary_path=server['default_binary_paths'])
connection.close()
suite = get_suite(test_module_list,
server,
test_client,
server_information, test_db_name)
tests = unittest.TextTestRunner(stream=sys.stderr,
descriptions=True,
verbosity=2).run(suite)
# Delete test server
test_utils.delete_test_server(test_client)
except Exception as exc:
traceback.print_exc(file=sys.stderr)
print(str(exc))
print("Exception in {0}".format(threading.current_thread().ident))
finally:
# Delete web-driver instance
thread_name = "parallel_tests" + server_passed['name']
if threading.currentThread().getName() == thread_name:
driver_passed.quit()
time.sleep(20)
ran_tests, failed_cases, skipped_cases, passed_cases = \
get_tests_result(tests)
test_result[server['name']] = [ran_tests, failed_cases,
skipped_cases, passed_cases]
# Print info about completed tests
print(
"\n=============Completed the test cases for '%s'============="
% server_passed['name'], file=sys.stderr)
# Set empty list for 'passed' parameter for each testRun.
# So that it will not append same test case name
unittest.result.TestResult.passed = []
if len(failed_cases) > 0:
failure = True
def run_parallel_tests(url_client, servers_details, parallel_tests_lists,
name_of_browser, version_of_browser, max_thread_count):
"""
Function used to run tests in parallel
:param url_client:
:param servers_details:
:param parallel_tests_lists:
:param name_of_browser:
:param version_of_browser:
:param max_thread_count:
"""
driver_object = None
try:
# Thread list
threads_list = []
# Create thread for each server
for ser in servers_details:
# Logic to add new threads
while True:
# If active thread count <= max_thread_count, add new thread
if threading.activeCount() <= max_thread_count:
# Get remote web-driver instance at server level
driver_object = \
test_utils.get_remote_webdriver(hub_url,
name_of_browser,
version_of_browser,
ser['name'])
# Launch client url in browser
test_utils.launch_url_in_browser(driver_object, url_client)
# Drop the testing database created initially
if connection:
test_utils.drop_database(connection, test_db_name)
connection.close()
# Add name for thread
thread_name = "parallel_tests" + ser['name']
# Delete test server
test_utils.delete_test_server(test_client)
except SystemExit:
if handle_cleanup:
handle_cleanup()
# Start thread
t = threading.Thread(target=execute_test, name=thread_name,
args=(parallel_tests_lists, ser,
driver_object))
threads_list.append(t)
t.start()
time.sleep(3)
break
# else sleep for 10 seconds
else:
time.sleep(10)
# Start threads in parallel
for t in threads_list:
t.join()
except Exception as exc:
# Print exception stack trace
traceback.print_exc(file=sys.stderr)
print(str(exc))
# Clean driver object created
if driver_object is not None:
driver_object.quit()
def run_sequential_tests(url_client, servers_details, sequential_tests_lists,
name_of_browser, version_of_browser):
"""
Function is used to execute tests that needs to be run in sequential
manner.
:param url_client:
:param servers_details:
:param sequential_tests_lists:
:param name_of_browser:
:param version_of_browser:
:return:
"""
driver_object = None
try:
# Get remote web-driver instance
driver_object = test_utils.get_remote_webdriver(hub_url,
name_of_browser,
version_of_browser,
"Sequential_Tests")
# Launch client url in browser
test_utils.launch_url_in_browser(driver_object, url_client)
# Add name for thread
thread_name = "sequential_tests"
# Start thread
for ser in servers_details:
t = threading.Thread(target=execute_test,
name=thread_name,
args=(sequential_tests_lists, ser,
driver_object))
t.start()
t.join()
except Exception as exc:
# Print exception stack trace
traceback.print_exc(file=sys.stderr)
print(str(exc))
finally:
# Clean driver object created
driver_object.quit()
def print_test_results():
print(
"\n==============================================================="
"=======",
@ -543,6 +648,10 @@ if __name__ == '__main__':
total_passed_cases = int(
test_result[server_res][0]) - total_failed - total_skipped
if len(failed_cases) > 0:
global failure
failure = True
print(
"%s:\n\n\t%s test%s passed\n\t%s test%s failed%s%s"
"\n\t%s test%s skipped%s%s\n" %
@ -578,12 +687,162 @@ if __name__ == '__main__':
file=sys.stderr
)
if __name__ == '__main__':
# Failure detected?
failure = False
test_result = dict()
cov = None
# Set signal handler for cleanup
signal_list = dir(signal)
required_signal_list = ['SIGTERM', 'SIGABRT', 'SIGQUIT', 'SIGINT']
# Get the OS wise supported signals
supported_signal_list = [sig for sig in required_signal_list if
sig in signal_list]
for sig in supported_signal_list:
signal.signal(getattr(signal, sig), sig_handler)
# Set basic logging configuration for log file
fh = logging.FileHandler(CURRENT_PATH + '/' +
'regression.log', 'w', 'utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter('[%(thread)d] ' +
config.FILE_LOG_FORMAT))
logger = logging.getLogger()
logger.addHandler(fh)
# Create logger to write log in the logger file as well as on console
stderr_logger = logging.getLogger('STDERR')
sys.stderr = StreamToLogger(stderr_logger, logging.ERROR)
args = vars(add_arguments())
# Get test module list
try:
test_module_list = get_test_modules(args)
except Exception as e:
print(str(e))
sys.exit(1)
# Login the test client
test_utils.login_tester_account(test_client)
servers_info = test_utils.get_config_data()
node_name = "all"
if args['pkg'] is not None:
node_name = args['pkg'].split('.')[-1]
# Start coverage
if test_utils.is_coverage_enabled(args):
cov = coverage.Coverage(config_file=COVERAGE_CONFIG_FILE)
cov.start()
# Check if feature tests included & parallel tests switch passed
if test_utils.is_feature_test_included(args) and \
test_utils.is_parallel_ui_tests(args):
# Get selenium config dict
selenoid_config = test_setup.config_data['selenoid_config']
# Set DEFAULT_SERVER value
default_server = selenoid_config['pgAdmin_default_server']
os.environ["PGADMIN_CONFIG_DEFAULT_SERVER"] = str(default_server)
config.DEFAULT_SERVER = str(default_server)
# Get hub url
hub_url = selenoid_config['selenoid_url']
# Get selenium grid status & list of available browser out passed
selenium_grid_status, list_of_browsers \
= test_utils.get_selenium_grid_status_and_browser_list(hub_url)
# Execute tests if selenium-grid is up
if selenium_grid_status and len(list_of_browsers) > 0:
app_starter_local = None
# run across browsers
for browser_info in list_of_browsers:
try:
# browser info
browser_name, browser_version = \
test_utils.get_browser_details(browser_info, hub_url)
# tests lists can be executed in parallel & sequentially
parallel_tests, sequential_tests = \
test_utils.get_parallel_sequential_module_list(
test_module_list)
# Print test summary
test_utils.print_test_summary(test_module_list,
parallel_tests,
sequential_tests,
browser_name,
browser_version)
# Create app form source code
app_starter_local = AppStarter(None, config)
client_url = app_starter_local.start_app()
# Running Parallel tests
if len(parallel_tests) > 0:
parallel_sessions = int(selenoid_config[
'max_parallel_sessions'])
run_parallel_tests(client_url, servers_info,
parallel_tests, browser_name,
browser_version, parallel_sessions)
# Wait till all threads started in parallel are finished
while True:
try:
if threading.activeCount() <= 1:
break
else:
time.sleep(10)
except Exception as e:
traceback.print_exc(file=sys.stderr)
print(str(e))
# Sequential Tests
if len(sequential_tests) > 0:
run_sequential_tests(client_url, servers_info,
sequential_tests, browser_name,
browser_version)
# Clean up environment
if app_starter_local:
app_starter_local.stop_app()
except SystemExit:
if app_starter_local:
app_starter_local.stop_app()
if handle_cleanup:
handle_cleanup()
# Pause before printing result in order not to mix output
time.sleep(5)
# Print note for completion of execution in a browser.
print(
"\n============= Test execution with {0} is "
"completed.=============".format(browser_name),
file=sys.stderr)
print_test_results()
del os.environ["PGADMIN_CONFIG_DEFAULT_SERVER"]
else:
try:
for server in servers_info:
thread = threading.Thread(target=execute_test, args=(
test_module_list, server, driver))
thread.start()
thread.join()
except SystemExit:
if handle_cleanup:
handle_cleanup()
print_test_results()
# Stop code coverage
if test_utils.is_coverage_enabled(args):
cov.stop()
cov.save()
# # Print coverage only if coverage args given in command line
# Print coverage only if coverage args given in command line
if test_utils.is_coverage_enabled(args):
test_utils.print_and_store_coverage_report(cov)

View File

@ -54,6 +54,15 @@
"key_file": ""
}
}],
"selenoid_config": {
"pgAdmin_default_server":"IP address of machine where source code is going to be executed",
"max_parallel_sessions": "3",
"selenoid_url": "http://<IP address of Selenoid Installed machine>:4444/wd/hub",
"browsers_list":[
{"name": "Chrome", "version": null},
{"name": "Firefox", "version": null}
]
},
"server_group": 1,
"server_credentials": [
{