Fixed following issues for feature test:

1. Modified the get_chromedriver utility for supporting python version below 3.5.
2. Handled some exceptions getting intermittently on some databases.
3. Generalized some functions with additional parameters.
4. Disabled the auto-expansion of the children nodes for maintaining the synchronization.
This commit is contained in:
Shubham Agarwal 2019-12-03 19:35:48 +05:30 committed by Akshay Joshi
parent f7cc79fab1
commit 5093e6db5e
8 changed files with 93 additions and 60 deletions

View File

@ -17,7 +17,12 @@ import os
import platform import platform
import subprocess import subprocess
import sys import sys
import urllib.request try:
from urllib.request import urlopen, urlretrieve
from urllib.error import URLError
except Exception:
from urllib import urlopen, urlretrieve
URLError = Exception
import zipfile import zipfile
@ -85,13 +90,13 @@ def get_chrome_version(args):
# On Linux/Mac we run the Chrome executable with the --version flag, # On Linux/Mac we run the Chrome executable with the --version flag,
# then parse the output. # then parse the output.
try: try:
result = subprocess.run([args.chrome, '--version'], result = subprocess.Popen([args.chrome, '--version'],
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
except FileNotFoundError as e: except FileNotFoundError:
print('The specified Chrome executable could not be found.') print('The specified Chrome executable could not be found.')
sys.exit(1) sys.exit(1)
version_str = result.stdout.decode("utf-8").strip() version_str = result.stdout.read().decode("utf-8")
# Check for 'Chrom' not 'Chrome' in case the user is using Chromium. # Check for 'Chrom' not 'Chrome' in case the user is using Chromium.
if "Chrom" not in version_str: if "Chrom" not in version_str:
print('The specified Chrome executable output an unexpected ' print('The specified Chrome executable output an unexpected '
@ -120,8 +125,8 @@ def get_chromedriver_version(chrome_version):
.format(chrome_version) .format(chrome_version)
try: try:
fp = urllib.request.urlopen(url) fp = urlopen(url)
except urllib.error.URLError as e: except URLError as e:
print('The chromedriver catalog URL could not be accessed: {}' print('The chromedriver catalog URL could not be accessed: {}'
.format(e)) .format(e))
sys.exit(1) sys.exit(1)
@ -173,8 +178,8 @@ print('Downloading chromedriver v{} for Chrome v{} on {}...'
.format(chromedriver_version, chrome_version, system)) .format(chromedriver_version, chrome_version, system))
try: try:
file, headers = urllib.request.urlretrieve(url) file, headers = urlretrieve(url)
except urllib.error.URLError as e: except URLError as e:
print('The chromedriver download URL could not be accessed: {}' print('The chromedriver download URL could not be accessed: {}'
.format(e)) .format(e))
sys.exit(1) sys.exit(1)

View File

@ -164,11 +164,7 @@ class PGDataypeFeatureTest(BaseFeatureTest):
self._create_enum_type() self._create_enum_type()
for batch in config_data: for batch in config_data:
query = self.construct_select_query(batch) query = self.construct_select_query(batch)
self.page.fill_codemirror_area_with(query) self.page.execute_query(query)
execute_query = self.page.find_by_css_selector(
QueryToolLocators.btn_execute_query_css)
execute_query.click()
wait = WebDriverWait(self.page.driver, 5) wait = WebDriverWait(self.page.driver, 5)
# wait for the visibility of the grid to appear # wait for the visibility of the grid to appear

View File

@ -118,8 +118,8 @@ class PGUtilitiesBackupFeatureTest(BaseFeatureTest):
(By.XPATH, (By.XPATH,
NavMenuLocators.process_watcher_alertfier)) NavMenuLocators.process_watcher_alertfier))
self.page.wait_for_element_to_disappear( self.page.wait_for_element_to_disappear(
lambda driver: driver.find_element_by_css_selector(".loading-logs") lambda driver: driver.find_element_by_css_selector(
) ".loading-logs"), 10)
if status != "Successfully completed.": if status != "Successfully completed.":
self.assertEquals(status, "Successfully completed.") self.assertEquals(status, "Successfully completed.")
@ -188,8 +188,8 @@ class PGUtilitiesBackupFeatureTest(BaseFeatureTest):
(By.XPATH, (By.XPATH,
NavMenuLocators.process_watcher_alertfier)) NavMenuLocators.process_watcher_alertfier))
self.page.wait_for_element_to_disappear( self.page.wait_for_element_to_disappear(
lambda driver: driver.find_element_by_css_selector(".loading-logs") lambda driver: driver.find_element_by_css_selector(
) ".loading-logs"), 10)
if status != "Successfully completed.": if status != "Successfully completed.":
self.assertEquals(status, "Successfully completed.") self.assertEquals(status, "Successfully completed.")
@ -215,6 +215,7 @@ class PGUtilitiesBackupFeatureTest(BaseFeatureTest):
os.remove(backup_file) os.remove(backup_file)
def after(self): def after(self):
test_gui_helper.close_process_watcher(self)
test_gui_helper.close_bgprocess_popup(self) test_gui_helper.close_bgprocess_popup(self)
self.page.remove_server(self.server) self.page.remove_server(self.server)
connection = test_utils.get_db_connection( connection = test_utils.get_db_connection(

View File

@ -37,22 +37,22 @@ class QueryToolAutoCompleteFeatureTest(BaseFeatureTest):
self.page.add_server(self.server) self.page.add_server(self.server)
self.first_schema_name = "test_schema" + \ self.first_schema_name = "test_schema" + \
str(random.randint(1000, 3000)) str(random.randint(1000, 2000))
test_utils.create_schema(self.server, self.test_db, test_utils.create_schema(self.server, self.test_db,
self.first_schema_name) self.first_schema_name)
self.second_schema_name = "comp_schema" + \ self.second_schema_name = "comp_schema" + \
str(random.randint(1000, 3000)) str(random.randint(2000, 3000))
test_utils.create_schema(self.server, self.test_db, test_utils.create_schema(self.server, self.test_db,
self.second_schema_name) self.second_schema_name)
self.first_table_name = "auto_comp_" + \ self.first_table_name = "auto_comp_" + \
str(random.randint(1000, 3000)) str(random.randint(1000, 2000))
test_utils.create_table(self.server, self.test_db, test_utils.create_table(self.server, self.test_db,
self.first_table_name) self.first_table_name)
self.second_table_name = "auto_comp_" + \ self.second_table_name = "auto_comp_" + \
str(random.randint(1000, 3000)) str(random.randint(2000, 3000))
test_utils.create_table(self.server, self.test_db, test_utils.create_table(self.server, self.test_db,
self.second_table_name) self.second_table_name)

View File

@ -218,12 +218,6 @@ SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format(
QueryToolLocators.query_output_cells)) QueryToolLocators.query_output_cells))
) )
# click on first data column to select all column.
column_1 = \
self.page.find_by_css_selector(
QueryToolLocators.output_column_header_css.format('id1'))
column_1.click()
canvas = self.wait.until(EC.presence_of_element_located( canvas = self.wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, QueryToolLocators.query_output_canvas_css))) (By.CSS_SELECTOR, QueryToolLocators.query_output_canvas_css)))
@ -235,6 +229,11 @@ SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format(
scroll = 10 scroll = 10
status = False status = False
while scroll: while scroll:
# click on first data column to select all column.
column_1 = \
self.page.find_by_css_selector(
QueryToolLocators.output_column_header_css.format('id1'))
column_1.click()
canvas_ele = self.page.find_by_css_selector('.grid-canvas') canvas_ele = self.page.find_by_css_selector('.grid-canvas')
scrolling_height = canvas_ele.size['height'] scrolling_height = canvas_ele.size['height']
self.driver.execute_script( self.driver.execute_script(

View File

@ -173,7 +173,7 @@ class PgadminPage:
self.fill_codemirror_area_with(query) self.fill_codemirror_area_with(query)
self.click_execute_query_button() self.click_execute_query_button()
def click_execute_query_button(self): def click_execute_query_button(self, timeout=20):
retry = 5 retry = 5
execute_button = self.find_by_css_selector( execute_button = self.find_by_css_selector(
QueryToolLocators.btn_execute_query_css) QueryToolLocators.btn_execute_query_css)
@ -189,7 +189,7 @@ class PgadminPage:
break break
else: else:
retry -= 1 retry -= 1
self.wait_for_query_tool_loading_indicator_to_disappear() self.wait_for_query_tool_loading_indicator_to_disappear(timeout)
def check_execute_option(self, option): def check_execute_option(self, option):
""""This function will check auto commit or auto roll back based on """"This function will check auto commit or auto roll back based on
@ -689,15 +689,17 @@ class PgadminPage:
try: try:
webdriver.ActionChains(self.driver).double_click( webdriver.ActionChains(self.driver).double_click(
server_element).perform() server_element).perform()
if self.wait_for_element_to_appear(self.driver, if self.check_if_element_exist_by_xpath(
ConnectToServerDiv.ok_button): ConnectToServerDiv.ok_button):
self.fill_input_by_xpath( field = self.find_by_xpath(
ConnectToServerDiv.password_field, password) ConnectToServerDiv.password_field)
self.fill_input(field, password)
self.find_by_xpath(ConnectToServerDiv.ok_button).click() self.find_by_xpath(ConnectToServerDiv.ok_button).click()
self.wait_until_element_not_visible( self.wait_for_element_to_disappear(
ConnectToServerDiv.ok_button) lambda driver: driver.find_element_by_xpath(
if self.wait_for_element_to_be_visible( ConnectToServerDiv.ok_button))
self.driver, ConnectToServerDiv.error_message, 2): if self.check_if_element_exist_by_xpath(
ConnectToServerDiv.error_message, 2):
print( print(
"While entering password in click_and_connect_server " "While entering password in click_and_connect_server "
"function, error is occurred : " + str( "function, error is occurred : " + str(
@ -983,7 +985,7 @@ class PgadminPage:
self._wait_for("element to exist", element_if_it_exists) self._wait_for("element to exist", element_if_it_exists)
return find_method_with_args(self.driver) return find_method_with_args(self.driver)
def wait_for_element_to_disappear(self, find_method_with_args): def wait_for_element_to_disappear(self, find_method_with_args, timeout=5):
def element_if_it_disappears(driver): def element_if_it_disappears(driver):
try: try:
element = find_method_with_args(driver) element = find_method_with_args(driver)
@ -994,7 +996,8 @@ class PgadminPage:
except (NoSuchElementException, StaleElementReferenceException): except (NoSuchElementException, StaleElementReferenceException):
return True return True
return self._wait_for("element to disappear", element_if_it_disappears) return self._wait_for("element to disappear",
element_if_it_disappears, timeout)
def wait_for_reloading_indicator_to_disappear(self): def wait_for_reloading_indicator_to_disappear(self):
def reloading_indicator_has_disappeared(driver): def reloading_indicator_has_disappeared(driver):
@ -1017,7 +1020,7 @@ class PgadminPage:
self._wait_for("spinner to disappear", spinner_has_disappeared, 20) self._wait_for("spinner to disappear", spinner_has_disappeared, 20)
def wait_for_query_tool_loading_indicator_to_disappear(self): def wait_for_query_tool_loading_indicator_to_disappear(self, timeout=20):
def spinner_has_disappeared(driver): def spinner_has_disappeared(driver):
try: try:
spinner = driver.find_element_by_css_selector( spinner = driver.find_element_by_css_selector(
@ -1029,7 +1032,8 @@ class PgadminPage:
time.sleep(0.5) time.sleep(0.5)
return True return True
self._wait_for("spinner to disappear", spinner_has_disappeared, 20) self._wait_for(
"spinner to disappear", spinner_has_disappeared, timeout)
def wait_for_query_tool_loading_indicator_to_appear(self): def wait_for_query_tool_loading_indicator_to_appear(self):
status = self.check_if_element_exist_by_xpath( status = self.check_if_element_exist_by_xpath(
@ -1151,7 +1155,7 @@ class PgadminPage:
try: try:
element = self.driver.find_element(*click_locator) element = self.driver.find_element(*click_locator)
element.click() element.click()
WebDriverWait(self.driver, 2).until( WebDriverWait(self.driver, 10).until(
EC.visibility_of_element_located(verify_locator)) EC.visibility_of_element_located(verify_locator))
click_status = True click_status = True
except Exception: except Exception:

View File

@ -47,11 +47,14 @@ def close_bgprocess_popup(tester):
def close_process_watcher(tester): def close_process_watcher(tester):
attempt = 10 attempt = 10
while attempt > 0: while attempt > 0:
close_btn = tester.page.find_by_xpath( try:
NavMenuLocators.process_watcher_close_button_xpath) if not tester.page.check_if_element_exist_by_xpath(
close_btn.click() NavMenuLocators.process_watcher_close_button_xpath, 1):
if not tester.page.check_if_element_exist_by_xpath( break
NavMenuLocators.process_watcher_close_button_xpath, 1): else:
break close_btn = tester.page.find_by_xpath(
else: NavMenuLocators.process_watcher_close_button_xpath)
close_btn.click()
attempt -= 1
except Exception:
attempt -= 1 attempt -= 1

View File

@ -108,6 +108,7 @@ def clear_node_info_dict():
def create_database(server, db_name, encoding=None): def create_database(server, db_name, encoding=None):
"""This function used to create database and returns the database id""" """This function used to create database and returns the database id"""
db_id = ''
try: try:
connection = get_db_connection( connection = get_db_connection(
server['db'], server['db'],
@ -135,13 +136,13 @@ def create_database(server, db_name, encoding=None):
pg_cursor.execute("SELECT db.oid from pg_database db WHERE" pg_cursor.execute("SELECT db.oid from pg_database db WHERE"
" db.datname='%s'" % db_name) " db.datname='%s'" % db_name)
oid = pg_cursor.fetchone() oid = pg_cursor.fetchone()
db_id = ''
if oid: if oid:
db_id = oid[0] db_id = oid[0]
connection.close() connection.close()
return db_id return db_id
except Exception: except Exception:
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)
return db_id
def create_table(server, db_name, table_name, extra_columns=[]): def create_table(server, db_name, table_name, extra_columns=[]):
@ -777,6 +778,27 @@ def configure_preferences(default_binary_path=None):
' WHERE PID = ?', (-1, pref_tree_state_save_interval.pid) ' WHERE PID = ?', (-1, pref_tree_state_save_interval.pid)
) )
# Disable auto expand sole children tree state for tests
pref_auto_expand_sol_children = \
browser_pref.preference('auto_expand_sole_children')
user_pref = cur.execute(
'SELECT pid, uid FROM user_preferences '
'where pid=?', (pref_auto_expand_sol_children.pid,)
)
if len(user_pref.fetchall()) == 0:
cur.execute(
'INSERT INTO user_preferences(pid, uid, value)'
' VALUES (?,?,?)', (pref_auto_expand_sol_children.pid, 1, 'False')
)
else:
cur.execute(
'UPDATE user_preferences'
' SET VALUE = ?'
' WHERE PID = ?', ('False', pref_auto_expand_sol_children.pid)
)
# Disable reload warning on browser # Disable reload warning on browser
pref_confirm_on_refresh_close = \ pref_confirm_on_refresh_close = \
browser_pref.preference('confirm_on_refresh_close') browser_pref.preference('confirm_on_refresh_close')
@ -1103,18 +1125,21 @@ def get_watcher_dialogue_status(self):
"""This will get watcher dialogue status""" """This will get watcher dialogue status"""
import time import time
attempts = 120 attempts = 120
status = None
while attempts > 0: while attempts > 0:
status = self.page.find_by_css_selector( try:
".pg-bg-status-text").text status = self.page.find_by_css_selector(
".pg-bg-status-text").text
if 'Failed' in status: if 'Failed' in status:
break break
if status == 'Started' or status == 'Running...': if status == 'Started' or status == 'Running...':
attempts -= 1
time.sleep(.5)
else:
break
except Exception:
attempts -= 1 attempts -= 1
time.sleep(.5)
else:
break
return status return status