pgadmin4/web/pgadmin/feature_tests/query_tool_tests.py
Usman Muzaffar be26fc540c Many fixes to the stability of the feature tests, including:
tree toggle issue
Query tool inteliSence issue eg. when there is only one option and drop down is not shown
Backup and restore windows locator changes
Fixes required due to resolving rm # 4041
Dependent tab not showing data sometime, so refreshed the page and handled it
Due to change of logic for auto commit, did the required changes
Due to fix of RM 4062, did the required workaround which broke the test case.
2019-03-21 12:04:37 +00:00

856 lines
32 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from __future__ import print_function
import sys
from selenium.common.exceptions import StaleElementReferenceException
from selenium.webdriver import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from regression.python_test_utils import test_utils
from regression.feature_utils.base_feature_test import BaseFeatureTest
import config
from .locators import QueryToolLocatorsCss
class QueryToolFeatureTest(BaseFeatureTest):
"""
This feature test will test the different query tool features.
"""
scenarios = [
("Query tool feature test", dict())
]
def before(self):
self.page.wait_for_spinner_to_disappear()
self.page.add_server(self.server)
self._locate_database_tree_node()
self.page.open_query_tool()
self.page.wait_for_spinner_to_disappear()
self._reset_options()
def runTest(self):
# on demand result set on scrolling.
print("\nOn demand query result... ",
file=sys.stderr, end="")
self._on_demand_result()
self._clear_query_tool()
# explain query with verbose and cost
print("Explain query with verbose and cost... ",
file=sys.stderr, end="")
if self._supported_server_version():
self._query_tool_explain_with_verbose_and_cost()
print("OK.", file=sys.stderr)
self._clear_query_tool()
else:
print("Skipped.", file=sys.stderr)
# explain analyze query with buffers and timing
print("Explain analyze query with buffers and timing... ",
file=sys.stderr, end="")
if self._supported_server_version():
self._query_tool_explain_analyze_with_buffers_and_timing()
print("OK.", file=sys.stderr)
self._clear_query_tool()
else:
print("Skipped.", file=sys.stderr)
# auto commit disabled.
print("Auto commit disabled... ", file=sys.stderr, end="")
self._query_tool_auto_commit_disabled()
print("OK.", file=sys.stderr)
self._clear_query_tool()
# auto commit enabled.
print("Auto commit enabled... ", file=sys.stderr, end="")
self._query_tool_auto_commit_enabled()
print("OK.", file=sys.stderr)
self._clear_query_tool()
# auto rollback enabled.
print("Auto rollback enabled...", file=sys.stderr, end="")
self._query_tool_auto_rollback_enabled()
print(" OK.", file=sys.stderr)
self._clear_query_tool()
# cancel query.
print("Cancel query... ", file=sys.stderr, end="")
self._query_tool_cancel_query()
print("OK.", file=sys.stderr)
self._clear_query_tool()
# Notify Statements.
print("Capture Notify Statements... ", file=sys.stderr, end="")
self._query_tool_notify_statements()
self._clear_query_tool()
# explain query with JIT stats
print("Explain query with JIT stats... ",
file=sys.stderr, end="")
if self._supported_jit_on_server():
self._query_tool_explain_check_jit_stats()
print("OK.", file=sys.stderr)
self._clear_query_tool()
else:
print("Skipped.", file=sys.stderr)
def after(self):
self.page.remove_server(self.server)
def _reset_options(self):
# this will set focus to correct iframe.
self.page.fill_codemirror_area_with('')
explain_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_explain_options_dropdown)
explain_op.click()
# disable Explain options and auto rollback only if they are enabled.
for op in (QueryToolLocatorsCss.btn_explain_verbose,
QueryToolLocatorsCss.btn_explain_costs,
QueryToolLocatorsCss.btn_explain_buffers,
QueryToolLocatorsCss.btn_explain_timing):
btn = self.page.find_by_css_selector(op)
check = btn.find_element_by_tag_name('i')
if 'visibility-hidden' not in check.get_attribute('class'):
btn.click()
query_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_query_dropdown)
query_op.click()
# disable auto rollback only if they are enabled
btn = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_auto_rollback)
check = btn.find_element_by_tag_name('i')
if 'visibility-hidden' not in check.get_attribute('class'):
btn.click()
# enable autocommit only if it's disabled
btn = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_auto_commit)
check = btn.find_element_by_tag_name('i')
if 'visibility-hidden' in check.get_attribute('class'):
btn.click()
# close menu
query_op.click()
def _locate_database_tree_node(self):
self.page.toggle_open_tree_item(self.server['name'])
self.page.toggle_open_tree_item('Databases')
self.page.toggle_open_tree_item(self.test_db)
def _clear_query_tool(self):
self.page.click_element(self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_clear_dropdown)
)
ActionChains(self.driver) \
.move_to_element(self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_clear)).perform()
self.page.click_element(
self.page.find_by_css_selector(QueryToolLocatorsCss.btn_clear)
)
self.page.click_modal('Yes')
def _on_demand_result(self):
ON_DEMAND_CHUNKS = 2
row_id_to_find = config.ON_DEMAND_RECORD_COUNT * ON_DEMAND_CHUNKS
query = """-- On demand query result on scroll
-- Grid select all
-- Column select all
SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format(
config.ON_DEMAND_RECORD_COUNT * ON_DEMAND_CHUNKS)
print("\nOn demand result set on scrolling... ",
file=sys.stderr, end="")
wait = WebDriverWait(self.page.driver, 10)
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
# wait for header of the table to be visible
wait.until(EC.visibility_of_element_located(
(By.XPATH,'//div[@class="slick-header-columns"]')))
wait.until(EC.presence_of_element_located(
(By.XPATH,
'//span[@data-row="0" and text()="1"]'))
)
# scroll to bottom to fetch next chunk of result set.
self.driver.execute_script(
"pgAdmin.SqlEditor.jquery('.slick-viewport')"
".scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());"
)
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
self._check_ondemand_result(row_id_to_find, canvas)
print("OK.", file=sys.stderr)
print("On demand result set on grid select all... ",
file=sys.stderr, end="")
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
# wait for header of the table to be visible
wait.until(EC.visibility_of_element_located(
(By.XPATH,'//div[@class="slick-header-columns"]')))
# wait for first row to contain value
wait.until(EC.presence_of_element_located(
(By.XPATH,
'//span[@data-row="0" and text()="1"]'))
)
wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, ".slick-header-column"))).click()
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
self._check_ondemand_result(row_id_to_find, canvas)
print("OK.", file=sys.stderr)
print("On demand result set on column select all... ",
file=sys.stderr, end="")
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
# wait for header of the table to be visible
wait.until(EC.visibility_of_element_located(
(By.XPATH,'//div[@class="slick-header-columns"]')))
wait.until(EC.presence_of_element_located(
(By.XPATH,
'//span[@data-row="0" and text()="1"]'))
)
# click on first data column to select all column.
wait.until(EC.presence_of_element_located(
(
By.XPATH,
"//span[contains(@class, 'column-name') "
"and contains(., 'id1')]"))
).click()
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
self._check_ondemand_result(row_id_to_find, canvas)
print("OK.", file=sys.stderr)
def _check_ondemand_result(self, row_id_to_find, canvas):
# scroll to bottom to bring last row of next chunk in viewport.
self.driver.execute_script(
"pgAdmin.SqlEditor.jquery('.slick-viewport')"
".scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());"
)
canvas.find_element_by_xpath(
'//span[text()="{}"]'.format(row_id_to_find)
)
def _query_tool_explain_with_verbose_and_cost(self):
query = """-- Explain query with verbose and cost
SELECT generate_series(1, 1000) as id order by id desc"""
wait = WebDriverWait(self.page.driver, 10)
self.page.fill_codemirror_area_with(query)
explain_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_explain_options_dropdown)
explain_op.click()
# disable Explain options and auto rollback only if they are enabled.
for op in (QueryToolLocatorsCss.btn_explain_verbose,
QueryToolLocatorsCss.btn_explain_costs):
self.page.find_by_css_selector(op).click()
explain_op.click()
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_explain).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Data Output')
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas"))
)
# Search for 'Output' word in result (verbose option)
canvas.find_element_by_xpath("//*[contains(string(), 'Output')]")
# Search for 'Total Cost' word in result (cost option)
canvas.find_element_by_xpath("//*[contains(string(),'Total Cost')]")
def _query_tool_explain_analyze_with_buffers_and_timing(self):
query = """-- Explain analyze query with buffers and timing
SELECT generate_series(1, 1000) as id order by id desc"""
wait = WebDriverWait(self.page.driver, 10)
self.page.fill_codemirror_area_with(query)
explain_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_explain_options_dropdown)
explain_op.click()
# disable Explain options and auto rollback only if they are enabled.
for op in (QueryToolLocatorsCss.btn_explain_buffers,
QueryToolLocatorsCss.btn_explain_timing):
self.page.find_by_css_selector(op).click()
explain_op.click()
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_explain_analyze).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Data Output')
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas"))
)
# Search for 'Shared Read Blocks' word in result (buffers option)
canvas.find_element_by_xpath(
"//*[contains(string(), 'Shared Read Blocks')]"
)
# Search for 'Actual Total Time' word in result (timing option)
canvas.find_element_by_xpath(
"//*[contains(string(), 'Actual Total Time')]"
)
def _query_tool_auto_commit_disabled(self):
table_name = 'query_tool_auto_commit_disabled_table'
query = """-- 1. Disable auto commit.
-- 2. Create table in public schema.
-- 3. ROLLBACK transaction.
-- 4. Check if table is *NOT* created.
CREATE TABLE public.{}();""".format(table_name)
wait = WebDriverWait(self.page.driver, 10)
self.page.fill_codemirror_area_with(query)
# open auto commit option and disable it
query_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_query_dropdown)
query_op.click()
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_auto_commit).click()
# close option
query_op.click()
# execute query
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "CREATE TABLE")]'
)
# do the ROLLBACK and check if the table is present or not
self._clear_query_tool()
query = """-- 1. (Done) Disable auto commit.
-- 2. (Done) Create table in public schema.
-- 3. ROLLBACK transaction.
-- 4. Check if table is *NOT* created.
ROLLBACK;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "ROLLBACK")]'
)
self._clear_query_tool()
query = """-- 1. (Done) Disable auto commit.
-- 2. (Done) Create table in public schema.
-- 3. (Done) ROLLBACK transaction.
-- 4. Check if table is *NOT* created.
SELECT relname FROM pg_class
WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Data Output')
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
el = canvas.find_elements_by_xpath(
"//div[contains(@class, 'slick-cell') and "
"contains(text(), '{}')]".format(table_name))
assert len(el) == 0, "Table '{}' created with auto commit disabled " \
"and without any explicit commit.".format(
table_name
)
# again roll back so that the auto commit drop down is enabled
query = """-- 1. (Done) Disable auto commit.
-- 2. (Done) Create table in public schema.
-- 3. ROLLBACK transaction.
-- 4. Check if table is *NOT* created.
ROLLBACK;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
def _query_tool_auto_commit_enabled(self):
query = """-- 1. Enable auto commit.
-- 2. END any open transaction.
-- 3. Create table in public schema.
-- 4. ROLLBACK transaction
-- 5. Check if table is created event after ROLLBACK.
END;"""
self.page.fill_codemirror_area_with(query)
wait = WebDriverWait(self.page.driver, 10)
query_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_query_dropdown)
query_op.click()
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_auto_commit).click()
query_op.click()
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self._clear_query_tool()
table_name = 'query_tool_auto_commit_enabled_table'
query = """-- 1. (Done) END any open transaction.
-- 2. Enable auto commit.
-- 3. Create table in public schema.
-- 4. ROLLBACK transaction
-- 5. Check if table is created event after ROLLBACK.
CREATE TABLE public.{}();""".format(table_name)
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "CREATE TABLE")]'
)
self._clear_query_tool()
query = """-- 1. (Done) END any open transaction if any.
-- 2. (Done) Enable auto commit.
-- 3. (Done) Create table in public schema.
-- 4. ROLLBACK transaction
-- 5. Check if table is created event after ROLLBACK.
ROLLBACK;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "ROLLBACK")]'
)
self._clear_query_tool()
query = """-- 1. (Done) END any open transaction if any.
-- 2. (Done) Enable auto commit.
-- 3. (Done) Create table in public schema.
-- 4. (Done) ROLLBACK transaction
-- 5. Check if table is created event after ROLLBACK.
SELECT relname FROM pg_class
WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.click_tab('Data Output')
self.page.wait_for_query_tool_loading_indicator_to_disappear()
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
el = canvas.find_elements_by_xpath(
"//div[contains(@class, 'slick-cell') and "
"contains(text(), '{}')]".format(table_name))
assert len(el) != 0, "Table '{}' is not created with auto " \
"commit enabled.".format(table_name)
def _query_tool_auto_rollback_enabled(self):
table_name = 'query_tool_auto_rollback_enabled_table'
query = """-- 1. Enable auto rollback and disable auto commit.
-- 2. END any open transaction.
-- 3. Create table in public schema.
-- 4. Generate error in transaction.
-- 5. END transaction.
-- 6. Check if table is *NOT* created after ending transaction.
END;"""
wait = WebDriverWait(self.page.driver, 10)
self.page.fill_codemirror_area_with(query)
query_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_query_dropdown)
query_op.click()
# uncheckt auto commit and check auto-rollback
self.uncheck_execute_option('auto_commit')
self.check_execute_option('auto_rollback')
query_op.click()
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self._clear_query_tool()
query = """-- 1. (Done) END any open transaction.
-- 2. Enable auto rollback and disable auto commit.
-- 3. Create table in public schema.
-- 4. Generate error in transaction.
-- 5. END transaction.
-- 6. Check if table is *NOT* created after ending transaction.
CREATE TABLE public.{}();""".format(table_name)
wait = WebDriverWait(self.page.driver, 10)
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "CREATE TABLE")]'
)
self._clear_query_tool()
query = """-- 1. (Done) END any open transaction.
-- 2. (Done) Enable auto rollback and disable auto commit.
-- 3. (Done) Create table in public schema.
-- 4. Generate error in transaction.
-- 5. END transaction.
-- 6. Check if table is *NOT* created after ending transaction.
SELECT 1/0;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "division by zero")]'
)
self._clear_query_tool()
query = """-- 1. (Done) END any open transaction.
-- 2. (Done) Enable auto rollback and disable auto commit.
-- 3. (Done) Create table in public schema.
-- 4. (Done) Generate error in transaction.
-- 5. END transaction.
-- 6. Check if table is *NOT* created after ending transaction.
END;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "Query returned successfully")]'
)
self._clear_query_tool()
query = """-- 1. (Done) END any open transaction.
-- 2. (Done) Enable auto rollback and disable auto commit.
-- 3. (Done) Create table in public schema.
-- 4. (Done) Generate error in transaction.
-- 5. (Done) END transaction.
-- 6. Check if table is *NOT* created after ending transaction.
SELECT relname FROM pg_class
WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Data Output')
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
el = canvas.find_elements_by_xpath(
"//div[contains(@class, 'slick-cell') and "
"contains(text(), '{}')]".format(table_name))
assert len(el) == 0, "Table '{}' created even after ROLLBACK due to " \
"sql error.".format(table_name)
def _query_tool_cancel_query(self):
query = """-- 1. END any open transaction.
-- 2. Enable auto commit and Disable auto rollback.
-- 3. Execute long running query.
-- 4. Cancel long running query execution.
END;
SELECT 1, pg_sleep(300)"""
self.page.fill_codemirror_area_with(query)
# query_button drop can be disabled so enable
commit_button = self.page.find_by_css_selector("#btn-commit")
if not commit_button.get_attribute('disabled'):
commit_button.click()
query_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_query_dropdown)
query_op.click()
# enable auto-commit and disable auto-rollback
self.check_execute_option('auto_commit')
self.uncheck_execute_option('auto_rollback')
# close drop down
query_op.click()
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.find_by_xpath("//*[@id='fetching_data']")
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_cancel_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and '
'(contains(string(), "canceling statement due to user request") '
'or contains(string(), "Execution Cancelled!"))]'
)
def _supported_server_version(self):
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
return connection.server_version > 90100
def _query_tool_notify_statements(self):
wait = WebDriverWait(self.page.driver, 60)
print("\n\tListen on an event... ", file=sys.stderr, end="")
self.page.fill_codemirror_area_with("LISTEN foo;")
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
wait.until(EC.text_to_be_present_in_element(
(By.CSS_SELECTOR, ".sql-editor-message"), "LISTEN")
)
print("OK.", file=sys.stderr)
self._clear_query_tool()
print("\tNotify event without data... ", file=sys.stderr, end="")
self.page.fill_codemirror_area_with("NOTIFY foo;")
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Notifications')
wait.until(EC.text_to_be_present_in_element(
(By.CSS_SELECTOR, "td.channel"), "foo")
)
print("OK.", file=sys.stderr)
self._clear_query_tool()
print("\tNotify event with data... ", file=sys.stderr, end="")
if self._supported_server_version():
self.page.fill_codemirror_area_with("SELECT pg_notify('foo', "
"'Hello')")
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Notifications')
wait.until(WaitForAnyElementWithText(
(By.CSS_SELECTOR, 'td.payload'), "Hello"))
print("OK.", file=sys.stderr)
self._clear_query_tool()
else:
print("Skipped.", file=sys.stderr)
def _supported_jit_on_server(self):
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
pg_cursor = connection.cursor()
pg_cursor.execute('select version()')
version_string = pg_cursor.fetchone()
# check if jit is turned on
jit_enabled = False
try:
pg_cursor.execute('show jit')
show_jit = pg_cursor.fetchone()
if show_jit[0] == 'on':
jit_enabled = True
except:
pass
is_edb = False
if len(version_string) > 0:
is_edb = 'EnterpriseDB' in version_string[0]
connection.close()
return connection.server_version >= 110000 and jit_enabled
def _query_tool_explain_check_jit_stats(self):
wait = WebDriverWait(self.page.driver, 10)
self.page.fill_codemirror_area_with("SET jit_above_cost=10;")
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_execute_query).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self._clear_query_tool()
self.page.fill_codemirror_area_with("SELECT count(*) FROM pg_class;")
explain_op = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_explain_options_dropdown)
explain_op.click()
# disable Explain options and only enable COST option
for op in (QueryToolLocatorsCss.btn_explain_verbose,
QueryToolLocatorsCss.btn_explain_costs,
QueryToolLocatorsCss.btn_explain_buffers,
QueryToolLocatorsCss.btn_explain_timing):
btn = self.page.find_by_css_selector(op)
check = btn.find_element_by_tag_name('i')
if 'visibility-hidden' not in check.get_attribute('class'):
btn.click()
# click cost button
cost_btn = self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_explain_costs)
cost_btn.click()
# close explain options
explain_op.click()
self.page.find_by_css_selector(
QueryToolLocatorsCss.btn_explain_analyze).click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Data Output')
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas"))
)
# Search for 'Output' word in result (verbose option)
canvas.find_element_by_xpath("//*[contains(string(), 'JIT')]")
self._clear_query_tool()
def check_execute_option(self, option):
""""This function will check auto commit or auto roll back based on
user input. If button is already checked, no action will be taken"""
if option == 'auto_commit':
check_status = self.driver.find_element_by_css_selector(
QueryToolLocatorsCss.btn_auto_commit_check_status)
if 'visibility-hidden' in check_status.get_attribute('class'):
self.page.find_by_css_selector(QueryToolLocatorsCss.
btn_auto_commit).click()
if option == 'auto_rollback':
check_status = self.driver.find_element_by_css_selector(
QueryToolLocatorsCss.btn_auto_rollback_check_status)
if 'visibility-hidden' in check_status.get_attribute('class'):
self.page.find_by_css_selector(QueryToolLocatorsCss.
btn_auto_rollback).click()
def uncheck_execute_option(self, option):
""""This function will uncheck auto commit or auto roll back based on
user input. If button is already unchecked, no action will be taken"""
if option == 'auto_commit':
check_status = self.driver.find_element_by_css_selector(
QueryToolLocatorsCss.btn_auto_commit_check_status)
if 'visibility-hidden' not in check_status.get_attribute('class'):
self.page.find_by_css_selector(QueryToolLocatorsCss.
btn_auto_commit).click()
if option == 'auto_rollback':
check_status = self.driver.find_element_by_css_selector(
QueryToolLocatorsCss.btn_auto_rollback_check_status)
if 'visibility-hidden' not in check_status.get_attribute('class'):
self.page.find_by_css_selector(QueryToolLocatorsCss.
btn_auto_rollback).click()
class WaitForAnyElementWithText(object):
def __init__(self, locator, text):
self.locator = locator
self.text = text
def __call__(self, driver):
try:
elements = EC._find_elements(driver, self.locator)
for elem in elements:
if self.text in elem.text:
return True
return False
except StaleElementReferenceException:
return False