Ensure the query tool displays but does not render HTML returned by the server in the results grid. Fixes #2330.

This commit is contained in:
Murtuza Zabuawala 2017-04-10 14:07:48 +01:00 committed by Dave Page
parent b86fa15dbc
commit a2a2b8b888
7 changed files with 405 additions and 14 deletions

View File

@ -0,0 +1,189 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from selenium.webdriver import ActionChains
from regression.python_test_utils import test_utils
from regression.feature_utils.base_feature_test import BaseFeatureTest
import time
class CheckForXssFeatureTest(BaseFeatureTest):
"""
Tests to check if pgAdmin4 is vulnerable to XSS.
Here we will check html source code for escaped characters if we
found them in the code then we are not vulnerable otherwise we might.
We will cover,
1) Browser Tree (aciTree)
2) Properties Tab (BackFrom)
3) Dependents Tab (BackGrid)
4) SQL Tab (Code Mirror)
5) Query Tool (SlickGrid)
"""
scenarios = [
("Test XSS check for panels and query tool", dict())
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
test_utils.create_table(self.server, "acceptance_test_db",
"<h1>X")
# This is needed to test dependents tab (eg: BackGrid)
test_utils.create_constraint(self.server, "acceptance_test_db",
"<h1>X",
"unique", "<h1 onmouseover='console.log(2);'>Y")
def runTest(self):
self.page.wait_for_spinner_to_disappear()
self._connects_to_server()
self._tables_node_expandable()
self._check_xss_in_browser_tree()
self._check_xss_in_properties_tab()
self._check_xss_in_sql_tab()
self._check_xss_in_dependents_tab()
# Query tool
self._check_xss_in_query_tool()
self._close_query_tool()
def after(self):
time.sleep(1)
self.page.remove_server(self.server)
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
test_utils.drop_database(connection, "acceptance_test_db")
def _connects_to_server(self):
self.page.find_by_xpath("//*[@class='aciTreeText' and .='Servers']").click()
self.page.driver.find_element_by_link_text("Object").click()
ActionChains(self.page.driver) \
.move_to_element(self.page.driver.find_element_by_link_text("Create")) \
.perform()
self.page.find_by_partial_link_text("Server...").click()
server_config = self.server
self.page.fill_input_by_field_name("name", server_config['name'])
self.page.find_by_partial_link_text("Connection").click()
self.page.fill_input_by_field_name("host", server_config['host'])
self.page.fill_input_by_field_name("port", server_config['port'])
self.page.fill_input_by_field_name("username", server_config['username'])
self.page.fill_input_by_field_name("password", server_config['db_password'])
self.page.find_by_xpath("//button[contains(.,'Save')]").click()
def _tables_node_expandable(self):
self.page.toggle_open_tree_item(self.server['name'])
self.page.toggle_open_tree_item('Databases')
self.page.toggle_open_tree_item('acceptance_test_db')
self.page.toggle_open_tree_item('Schemas')
self.page.toggle_open_tree_item('public')
self.page.toggle_open_tree_item('Tables')
self.page.select_tree_item("<h1>X")
def _check_xss_in_browser_tree(self):
# Fetch the inner html & check for escaped characters
source_code = self.page.find_by_xpath(
"//*[@id='tree']"
).get_attribute('innerHTML')
self._check_escaped_characters(
source_code,
"&lt;h1&gt;X",
"Browser tree"
)
def _check_xss_in_properties_tab(self):
self.page.click_tab("Properties")
source_code = self.page.find_by_xpath(
"//span[contains(@class,'uneditable-input')]"
).get_attribute('innerHTML')
self._check_escaped_characters(
source_code,
"&lt;h1&gt;X",
"Properties tab (Backform Control)"
)
def _check_xss_in_sql_tab(self):
self.page.click_tab("SQL")
# Fetch the inner html & check for escaped characters
source_code = self.page.find_by_xpath(
"//*[contains(@class,'CodeMirror-lines') and contains(.,'CREATE TABLE')]"
).get_attribute('innerHTML')
self._check_escaped_characters(
source_code,
"&lt;h1&gt;X",
"SQL tab (Code Mirror)"
)
def _check_xss_in_dependents_tab(self): # Create any constraint with xss name to test this
self.page.click_tab("Dependents")
source_code = self.page.find_by_xpath(
"//*[@id='5']/table/tbody/tr/td/div/div/div[2]/table/tbody/tr/td[2]"
).get_attribute('innerHTML')
self._check_escaped_characters(
source_code,
"public.&lt;h1 onmouseover='console.log(2);'&gt;Y",
"Dependents tab (BackGrid)"
)
def _check_xss_in_query_tool(self):
self.page.driver.find_element_by_link_text("Tools").click()
self.page.find_by_partial_link_text("Query Tool").click()
time.sleep(3)
self.page.driver.switch_to.frame(self.page.driver.find_element_by_tag_name('iframe'))
self.page.fill_codemirror_area_with("select '<img src=\"x\" onerror=\"console.log(1)\">'")
time.sleep(1)
self.page.find_by_id("btn-flash").click()
time.sleep(2)
source_code = self.page.find_by_xpath(
"//*[@id='0']//*[@id='datagrid']/div[5]/div/div[1]/div[2]"
).get_attribute('innerHTML')
self._check_escaped_characters(
source_code,
'&lt;img src="x" onerror="console.log(1)"&gt;',
"Query tool (SlickGrid)"
)
def _close_query_tool(self):
self.page.driver.switch_to_default_content()
self.page.click_element(
self.page.find_by_xpath("//*[@id='dockerContainer']/div/div[3]/div/div[2]/div[1]")
)
time.sleep(0.5)
self.page.driver.switch_to.frame(self.page.driver.find_element_by_tag_name('iframe'))
time.sleep(1)
self.page.click_element(self.page.find_by_xpath("//button[contains(.,'Yes')]"))
time.sleep(0.5)
self.page.driver.switch_to_default_content()
def _check_escaped_characters(self, source_code, string_to_find, source):
# For XSS we need to search against element's html code
if source_code.find(string_to_find) == -1:
# No escaped characters found
assert False, "{0} might be vulnerable to XSS ".format(source)
else:
# escaped characters found
assert True

View File

@ -0,0 +1,119 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from selenium.webdriver import ActionChains
from regression.python_test_utils import test_utils
from regression.feature_utils.base_feature_test import BaseFeatureTest
import time
class CheckDebuggerForXssFeatureTest(BaseFeatureTest):
"""Tests to check if Debugger is vulnerable to XSS."""
scenarios = [
("Test table DDL generation", dict())
]
def before(self):
# Some test function is needed for debugger
test_utils.create_debug_function(self.server, "postgres",
"test_function")
def runTest(self):
self.page.wait_for_spinner_to_disappear()
self._connects_to_server()
self._function_node_expandable()
self._debug_function()
def after(self):
time.sleep(0.5)
test_utils.drop_debug_function(self.server, "postgres",
"test_function")
self.page.remove_server(self.server)
def _connects_to_server(self):
self.page.find_by_xpath("//*[@class='aciTreeText' and .='Servers']").click()
self.page.driver.find_element_by_link_text("Object").click()
ActionChains(self.page.driver) \
.move_to_element(self.page.driver.find_element_by_link_text("Create")) \
.perform()
self.page.find_by_partial_link_text("Server...").click()
server_config = self.server
self.page.fill_input_by_field_name("name", server_config['name'])
self.page.find_by_partial_link_text("Connection").click()
self.page.fill_input_by_field_name("host", server_config['host'])
self.page.fill_input_by_field_name("port", server_config['port'])
self.page.fill_input_by_field_name("username", server_config['username'])
self.page.fill_input_by_field_name("password", server_config['db_password'])
self.page.find_by_xpath("//button[contains(.,'Save')]").click()
def _function_node_expandable(self):
self.page.toggle_open_tree_item(self.server['name'])
self.page.toggle_open_tree_item('Databases')
self.page.toggle_open_tree_item('postgres')
self.page.toggle_open_tree_item('Schemas')
self.page.toggle_open_tree_item('public')
self.page.toggle_open_tree_item('Functions')
self.page.select_tree_item("test_function()")
def _debug_function(self):
self.page.driver.find_element_by_link_text("Object").click()
ActionChains(self.page.driver) \
.move_to_element(self.page.driver.find_element_by_link_text("Debugging")) \
.perform()
self.page.driver.find_element_by_link_text("Debug").click()
time.sleep(0.5)
# We need to check if debugger plugin is installed or not
try:
is_error = self.page.find_by_xpath(
"//div[contains(@class,'ajs-header')]"
).text
except Exception as e:
is_error = None
# If debugger plugin is not found
if is_error and is_error == "Debugger Error":
self.page.click_modal_ok()
self.skipTest("Please make sure that debugger plugin is properly configured")
else:
time.sleep(2)
self.page.driver.switch_to.frame(self.page.driver.find_element_by_tag_name('iframe'))
self.page.click_element(self.page.driver.find_elements_by_xpath("//button")[2])
time.sleep(2)
# Only this tab is vulnerable rest are BackGrid & Code Mirror control
# which are already tested in Query tool test case
self.page.click_tab("Messages")
source_code = self.page.find_by_xpath(
"//*[@id='messages']"
).get_attribute('innerHTML')
self._check_escaped_characters(
source_code,
'NOTICE: &lt;img src="x" onerror="console.log(1)"&gt;',
'Debugger'
)
self._close_debugger()
def _close_debugger(self):
time.sleep(0.5)
self.page.driver.switch_to_default_content()
time.sleep(0.5)
self.page.click_element(
self.page.find_by_xpath("//*[@id='dockerContainer']/div/div[3]/div/div[2]/div[1]")
)
def _check_escaped_characters(self, source_code, string_to_find, source):
# For XSS we need to search against element's html code
if source_code.find(string_to_find) == -1:
# No escaped characters found
assert False, "{0} might be vulnerable to XSS ".format(source)
else:
# escaped characters found
assert True

View File

@ -24,7 +24,7 @@
} else {
// Stringify only if it's json object
if (typeof value === "object" && !Array.isArray(value)) {
return JSON.stringify(value);
return _.escape(JSON.stringify(value));
} else if (Array.isArray(value)) {
var temp = [];
$.each(value, function(i, val) {
@ -34,9 +34,9 @@
temp.push(val)
}
});
return "[" + temp.join() + "]"
return _.escape("[" + temp.join() + "]")
} else {
return value;
return _.escape(value);
}
}
}
@ -49,7 +49,7 @@
return '';
}
else {
return "<span style='float:right'>" + value + "</span>";
return "<span style='float:right'>" + _.escape(value) + "</span>";
}
}
@ -70,7 +70,7 @@
return "<span class='pull-left'>[null]</span>";
}
else {
return value;
return _.escape(value);
}
}

View File

@ -1425,10 +1425,10 @@ def poll_end_execution_result(trans_id):
status = 'Success'
additional_msgs = conn.messages()
if len(additional_msgs) > 0:
additional_msgs = [msg.strip("<br>") for msg in additional_msgs]
additional_msgs = "<br>".join(additional_msgs)
additional_msgs = [msg.strip("\n") for msg in additional_msgs]
additional_msgs = "\n".join(additional_msgs)
if statusmsg:
statusmsg = additional_msgs + "<br>" + statusmsg
statusmsg = additional_msgs + "\n" + statusmsg
else:
statusmsg = additional_msgs
@ -1443,10 +1443,10 @@ def poll_end_execution_result(trans_id):
status = 'Success'
additional_msgs = conn.messages()
if len(additional_msgs) > 0:
additional_msgs = [msg.strip("<br>") for msg in additional_msgs]
additional_msgs = "<br>".join(additional_msgs)
additional_msgs = [msg.strip("\n") for msg in additional_msgs]
additional_msgs = "\n".join(additional_msgs)
if statusmsg:
statusmsg = additional_msgs + "<br>" + statusmsg
statusmsg = additional_msgs + "\n" + statusmsg
else:
statusmsg = additional_msgs
@ -1460,9 +1460,9 @@ def poll_end_execution_result(trans_id):
additional_msgs = conn.messages()
if len(additional_msgs) > 0:
additional_msgs = [msg.strip("\n") for msg in additional_msgs]
additional_msgs = "<br>".join(additional_msgs)
additional_msgs = "\n".join(additional_msgs)
if statusmsg:
statusmsg = additional_msgs + "<br>" + statusmsg
statusmsg = additional_msgs + "\n" + statusmsg
else:
statusmsg = additional_msgs
return make_json_response(data={

View File

@ -383,6 +383,9 @@ define(
// This function will update messages tab
update_messages: function(msg) {
// To prevent xss
msg = _.escape(msg);
var old_msgs='', new_msgs='';
old_msgs = pgTools.DirectDebug.messages_panel.$container.find('.messages').html();
if(old_msgs) {

View File

@ -35,7 +35,10 @@ class PgadminPage:
def click_modal_ok(self):
time.sleep(0.5)
self.click_element(self.find_by_xpath("//button[contains(.,'OK')]"))
# Find active alertify dialog in case of multiple alertify dialog & click on that dialog
self.click_element(
self.find_by_xpath("//div[contains(@class, 'alertify') and not(contains(@class, 'ajs-hidden'))]//button[.='OK']")
)
def add_server(self, server_config):
self.find_by_xpath("//*[@class='aciTreeText' and contains(.,'Servers')]").click()

View File

@ -158,6 +158,83 @@ def create_table(server, db_name, table_name):
except Exception:
traceback.print_exc(file=sys.stderr)
def create_constraint(
server, db_name, table_name,
constraint_type="unique", constraint_name="test_unique"):
try:
connection = get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute('''
ALTER TABLE "%s"
ADD CONSTRAINT "%s" %s (some_column)
''' % (table_name, constraint_name, constraint_type.upper())
)
connection.set_isolation_level(old_isolation_level)
connection.commit()
except Exception:
traceback.print_exc(file=sys.stderr)
def create_debug_function(server, db_name, function_name="test_func"):
try:
connection = get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute('''
CREATE OR REPLACE FUNCTION public."%s"()
RETURNS text
LANGUAGE 'plpgsql'
COST 100.0
VOLATILE
AS $function$
BEGIN
RAISE INFO 'This is a test function';
RAISE NOTICE '<img src="x" onerror="console.log(1)">';
RAISE NOTICE '<h1 onmouseover="console.log(1);">';
RETURN 'Hello, pgAdmin4';
END;
$function$;
''' % (function_name)
)
connection.set_isolation_level(old_isolation_level)
connection.commit()
except Exception:
traceback.print_exc(file=sys.stderr)
def drop_debug_function(server, db_name, function_name="test_func"):
try:
connection = get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute('''
DROP FUNCTION public."%s"();
''' % (function_name)
)
connection.set_isolation_level(old_isolation_level)
connection.commit()
except Exception:
traceback.print_exc(file=sys.stderr)
def drop_database(connection, database_name):
"""This function used to drop the database"""