Fix PEP-8 issues in feature_tests, dashboard, about and misc module's python code. Fixes #3082

This commit is contained in:
Murtuza Zabuawala 2018-02-09 12:57:37 +00:00 committed by Dave Page
parent 942ac733a4
commit 6f25f4d175
17 changed files with 634 additions and 361 deletions

View File

@ -11,32 +11,31 @@
MODULE_NAME = 'about'
import sys
from flask import Response, render_template, __version__, url_for
from flask_babel import gettext
from flask_security import current_user, login_required
from pgadmin.utils import PgAdminModule
from pgadmin.utils.menu import MenuItem
import config
class AboutModule(PgAdminModule):
def get_own_menuitems(self):
appname = config.APP_NAME
if hasattr(str, 'decode'):
appname = appname.decode('utf-8')
return {
'help_items': [
MenuItem(name='mnu_about',
priority=999,
module="pgAdmin.About",
callback='about_show',
icon='fa fa-info-circle',
label=gettext(u'About %(appname)s',
appname=appname
)
)
MenuItem(
name='mnu_about',
priority=999,
module="pgAdmin.About",
callback='about_show',
icon='fa fa-info-circle',
label=gettext(u'About %(appname)s', appname=appname)
)
]
}
@ -50,8 +49,8 @@ class AboutModule(PgAdminModule):
def get_exposed_url_endpoints(self):
return ['about.index']
blueprint = AboutModule(MODULE_NAME, __name__,
static_url_path='')
blueprint = AboutModule(MODULE_NAME, __name__, static_url_path='')
##########################################################################
@ -61,20 +60,29 @@ blueprint = AboutModule(MODULE_NAME, __name__,
@login_required
def index():
"""Render the about box."""
info = {'python_version': sys.version, 'flask_version': __version__}
if config.SERVER_MODE is True:
info = {
'python_version': sys.version,
'flask_version': __version__
}
if config.SERVER_MODE:
info['app_mode'] = gettext('Server')
else:
info['app_mode'] = gettext('Desktop')
info['current_user'] = current_user.email
return render_template(MODULE_NAME + '/index.html', info=info, _=gettext)
return render_template(
MODULE_NAME + '/index.html', info=info, _=gettext
)
@blueprint.route("/about.js")
@login_required
def script():
"""render the required javascript"""
return Response(response=render_template("about/about.js", _=gettext),
status=200,
mimetype="application/javascript")
return Response(
response=render_template("about/about.js", _=gettext),
status=200,
mimetype="application/javascript"
)

View File

@ -61,7 +61,8 @@ class DashboardModule(PgAdminModule):
isPrivate=False,
limit=1,
isIframe=False,
canHide=True).__dict__
canHide=True
).__dict__
]
def register_preferences(self):
@ -70,8 +71,9 @@ class DashboardModule(PgAdminModule):
Register preferences for this module.
"""
# Register options for the PG and PPAS help paths
self.dashboard_preference = Preferences('dashboards',
gettext('Dashboards'))
self.dashboard_preference = Preferences(
'dashboards', gettext('Dashboards')
)
self.session_stats_refresh = self.dashboard_preference.register(
'dashboards', 'session_stats_refresh',
@ -149,6 +151,7 @@ class DashboardModule(PgAdminModule):
'dashboard.get_config_by_server_id',
]
blueprint = DashboardModule(MODULE_NAME, __name__)
@ -168,11 +171,13 @@ def check_precondition(f):
kwargs['sid']
)
stats_type = ('activity', 'prepared', 'locks', 'config')
# Below check handle the case where existing server is deleted
# by user and python server will raise exception if this check
# is not introduce.
if g.manager is None:
if f.__name__ in ['activity', 'prepared', 'locks', 'config']:
if f.__name__ in stats_type:
return precondition_required(
gettext("Please connect to the selected server"
" to view the table.")
@ -187,7 +192,7 @@ def check_precondition(f):
# If DB not connected then return error to browser
if not g.conn.connected():
if f.__name__ in ['activity', 'prepared', 'locks', 'config']:
if f.__name__ in stats_type:
return precondition_required(
gettext("Please connect to the selected server"
" to view the table.")
@ -202,7 +207,7 @@ def check_precondition(f):
db_conn = g.manager.connection(did=kwargs['did'])
# If the selected DB not connected then return error to browser
if not db_conn.connected():
if f.__name__ in ['activity', 'prepared', 'locks', 'config']:
if f.__name__ in stats_type:
return precondition_required(
gettext("Please connect to the selected database"
" to view the table.")
@ -220,8 +225,7 @@ def check_precondition(f):
# Include server_type in template_path when server_type is gpdb
g.template_path = 'dashboard/sql/' + (
'#{0}#{1}#'.format(g.server_type, g.version)
if g.server_type == 'gpdb' else
'#{0}#'.format(g.version)
if g.server_type == 'gpdb' else '#{0}#'.format(g.version)
)
return f(*args, **kwargs)
@ -233,10 +237,14 @@ def check_precondition(f):
@login_required
def script():
"""render the required javascript"""
return Response(response=render_template("dashboard/js/dashboard.js",
_=gettext),
status=200,
mimetype="application/javascript")
return Response(
response=render_template(
"dashboard/js/dashboard.js",
_=gettext
),
status=200,
mimetype="application/javascript"
)
@blueprint.route('/', endpoint='index')
@ -283,16 +291,20 @@ def index(sid=None, did=None):
if sid is None and did is None:
return render_template('/dashboard/welcome_dashboard.html')
if did is None:
return render_template('/dashboard/server_dashboard.html',
sid=sid,
rates=rates,
version=g.version)
return render_template(
'/dashboard/server_dashboard.html',
sid=sid,
rates=rates,
version=g.version
)
else:
return render_template('/dashboard/database_dashboard.html',
sid=sid,
did=did,
rates=rates,
version=g.version)
return render_template(
'/dashboard/database_dashboard.html',
sid=sid,
did=did,
rates=rates,
version=g.version
)
def get_data(sid, did, template):
@ -327,7 +339,8 @@ def get_data(sid, did, template):
@blueprint.route('/session_stats/', endpoint='session_stats')
@blueprint.route(
'/session_stats/<int:sid>', endpoint='get_session_stats_by_sever_id')
'/session_stats/<int:sid>', endpoint='get_session_stats_by_sever_id'
)
@blueprint.route(
'/session_stats/<int:sid>/<int:did>',
endpoint='get_session_stats_by_database_id'
@ -362,7 +375,8 @@ def tps_stats(sid=None, did=None):
@blueprint.route('/ti_stats/', endpoint='ti_stats')
@blueprint.route('/ti_stats/<int:sid>', endpoint='ti_stats_by_server_id')
@blueprint.route(
'/ti_stats/<int:sid>/<int:did>', endpoint='ti_stats_by_database_id')
'/ti_stats/<int:sid>/<int:did>', endpoint='ti_stats_by_database_id'
)
@login_required
@check_precondition
def ti_stats(sid=None, did=None):
@ -377,7 +391,8 @@ def ti_stats(sid=None, did=None):
@blueprint.route('/to_stats/', endpoint='to_stats')
@blueprint.route('/to_stats/<int:sid>', endpoint='to_stats_by_server_id')
@blueprint.route(
'/to_stats/<int:sid>/<int:did>', endpoint='to_stats_by_database_id')
'/to_stats/<int:sid>/<int:did>', endpoint='to_stats_by_database_id'
)
@login_required
@check_precondition
def to_stats(sid=None, did=None):
@ -392,7 +407,8 @@ def to_stats(sid=None, did=None):
@blueprint.route('/bio_stats/', endpoint='bio_stats')
@blueprint.route('/bio_stats/<int:sid>', endpoint='bio_stats_by_server_id')
@blueprint.route(
'/bio_stats/<int:sid>/<int:did>', endpoint='bio_stats_by_database_id')
'/bio_stats/<int:sid>/<int:did>', endpoint='bio_stats_by_database_id'
)
@login_required
@check_precondition
def bio_stats(sid=None, did=None):
@ -407,7 +423,8 @@ def bio_stats(sid=None, did=None):
@blueprint.route('/activity/', endpoint='activity')
@blueprint.route('/activity/<int:sid>', endpoint='get_activity_by_server_id')
@blueprint.route(
'/activity/<int:sid>/<int:did>', endpoint='get_activity_by_database_id')
'/activity/<int:sid>/<int:did>', endpoint='get_activity_by_database_id'
)
@login_required
@check_precondition
def activity(sid=None, did=None):
@ -422,7 +439,8 @@ def activity(sid=None, did=None):
@blueprint.route('/locks/', endpoint='locks')
@blueprint.route('/locks/<int:sid>', endpoint='get_locks_by_server_id')
@blueprint.route(
'/locks/<int:sid>/<int:did>', endpoint='get_locks_by_database_id')
'/locks/<int:sid>/<int:did>', endpoint='get_locks_by_database_id'
)
@login_required
@check_precondition
def locks(sid=None, did=None):
@ -437,7 +455,8 @@ def locks(sid=None, did=None):
@blueprint.route('/prepared/', endpoint='prepared')
@blueprint.route('/prepared/<int:sid>', endpoint='get_prepared_by_server_id')
@blueprint.route(
'/prepared/<int:sid>/<int:did>', endpoint='get_prepared_by_database_id')
'/prepared/<int:sid>/<int:did>', endpoint='get_prepared_by_database_id'
)
@login_required
@check_precondition
def prepared(sid=None, did=None):
@ -488,6 +507,7 @@ def cancel_query(sid=None, did=None, pid=None):
status=200
)
@blueprint.route(
'/terminate_session/<int:sid>/<int:pid>', methods=['DELETE']
)

View File

@ -26,15 +26,18 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
test_utils.create_table(self.server, "acceptance_test_db", "test_table")
test_utils.create_table(
self.server, "acceptance_test_db", "test_table")
self.page.add_server(self.server)
def runTest(self):
@ -62,7 +65,8 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
def _copies_rows(self):
pyperclip.copy("old clipboard contents")
self.page.find_by_xpath("//*[contains(@class, 'slick-row')]/*[1]").click()
self.page.find_by_xpath(
"//*[contains(@class, 'slick-row')]/*[1]").click()
self.page.find_by_xpath("//*[@id='btn-copy-row']").click()
@ -71,7 +75,10 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
def _copies_columns(self):
pyperclip.copy("old clipboard contents")
self.page.find_by_xpath("//*[@data-test='output-column-header' and contains(., 'some_column')]").click()
self.page.find_by_xpath(
"//*[@data-test='output-column-header' and "
"contains(., 'some_column')]"
).click()
self.page.find_by_xpath("//*[@id='btn-copy-row']").click()
self.assertEqual(
@ -82,18 +89,24 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
def _copies_row_using_keyboard_shortcut(self):
pyperclip.copy("old clipboard contents")
self.page.find_by_xpath("//*[contains(@class, 'slick-row')]/*[1]").click()
self.page.find_by_xpath(
"//*[contains(@class, 'slick-row')]/*[1]").click()
ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
ActionChains(self.page.driver).key_down(
Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
self.assertEqual('"Some-Name"\t"6"\t"some info"',
pyperclip.paste())
def _copies_column_using_keyboard_shortcut(self):
pyperclip.copy("old clipboard contents")
self.page.find_by_xpath("//*[@data-test='output-column-header' and contains(., 'some_column')]").click()
self.page.find_by_xpath(
"//*[@data-test='output-column-header' and "
"contains(., 'some_column')]"
).click()
ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
ActionChains(self.page.driver).key_down(
Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
self.assertEqual(
"""\"Some-Name"
@ -105,12 +118,21 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
pyperclip.copy("old clipboard contents")
top_left_cell = self.page.find_by_xpath(
"//div[contains(@class, 'slick-cell') and contains(., 'Some-Other-Name')]")
bottom_right_cell = self.page.find_by_xpath("//div[contains(@class, 'slick-cell') and contains(., '14')]")
"//div[contains(@class, 'slick-cell') and "
"contains(., 'Some-Other-Name')]"
)
bottom_right_cell = self.page.find_by_xpath(
"//div[contains(@class, 'slick-cell') and contains(., '14')]")
ActionChains(self.page.driver).click_and_hold(top_left_cell).move_to_element(bottom_right_cell) \
.release(bottom_right_cell).perform()
ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
ActionChains(
self.page.driver
).click_and_hold(top_left_cell).move_to_element(
bottom_right_cell
).release(bottom_right_cell).perform()
ActionChains(
self.page.driver
).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
self.assertEqual("""\"Some-Other-Name"\t"22"
"Yet-Another-Name"\t"14\"""", pyperclip.paste())
@ -119,14 +141,24 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
pyperclip.copy("old clipboard contents")
top_left_cell = self.page.find_by_xpath(
"//div[contains(@class, 'slick-cell') and contains(., 'Some-Other-Name')]")
"//div[contains(@class, 'slick-cell') and "
"contains(., 'Some-Other-Name')]"
)
initial_bottom_right_cell = self.page.find_by_xpath(
"//div[contains(@class, 'slick-cell') and contains(., '14')]")
ActionChains(self.page.driver).click_and_hold(top_left_cell).move_to_element(initial_bottom_right_cell) \
.release(initial_bottom_right_cell).perform()
ActionChains(
self.page.driver
).click_and_hold(top_left_cell).move_to_element(
initial_bottom_right_cell
).release(initial_bottom_right_cell).perform()
ActionChains(self.page.driver).key_down(Keys.SHIFT).send_keys(Keys.ARROW_RIGHT).key_up(Keys.SHIFT).perform()
ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
ActionChains(self.page.driver).key_down(Keys.SHIFT).send_keys(
Keys.ARROW_RIGHT
).key_up(Keys.SHIFT).perform()
ActionChains(self.page.driver).key_down(
Keys.CONTROL
).send_keys('c').key_up(Keys.CONTROL).perform()
self.assertEqual("""\"Some-Other-Name"\t"22"\t"some other info"
"Yet-Another-Name"\t"14"\t"cool info\"""", pyperclip.paste())
@ -134,11 +166,16 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
def _shift_resizes_column_selection(self):
pyperclip.copy("old clipboard contents")
self.page.find_by_xpath("//*[@data-test='output-column-header' and contains(., 'value')]").click()
ActionChains(self.page.driver).key_down(Keys.SHIFT).send_keys(Keys.ARROW_LEFT) \
.key_up(Keys.SHIFT).perform()
self.page.find_by_xpath(
"//*[@data-test='output-column-header' and "
"contains(., 'value')]"
).click()
ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
ActionChains(self.page.driver).key_down(
Keys.SHIFT).send_keys(Keys.ARROW_LEFT).key_up(Keys.SHIFT).perform()
ActionChains(self.page.driver).key_down(
Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
self.assertEqual(
"""\"Some-Name"\t"6"
@ -150,7 +187,9 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
pyperclip.copy("old clipboard contents")
bottom_right_cell = self.page.find_by_xpath(
"//div[contains(@class, 'slick-cell') and contains(., 'cool info')]")
"//div[contains(@class, 'slick-cell') and "
"contains(., 'cool info')]"
)
load_button = self.page.find_by_xpath("//button[@id='btn-load-file']")
ActionChains(self.page.driver).click_and_hold(bottom_right_cell) \
@ -158,7 +197,8 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
.release(load_button) \
.perform()
ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
ActionChains(self.page.driver).key_down(
Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform()
self.assertEqual('"cool info"', pyperclip.paste())
@ -166,10 +206,12 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
self.page.close_query_tool()
self.page.remove_server(self.server)
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")

View File

@ -30,8 +30,14 @@ class KeyboardShortcutFeatureTest(BaseFeatureTest):
def before(self):
self.new_shortcuts = {
'mnu_file': {'shortcut': [Keys.ALT, Keys.SHIFT, 'i'], 'locator': 'File main menu'},
'mnu_obj': {'shortcut': [Keys.ALT, Keys.SHIFT, 'j'], 'locator': 'Object main menu'}
'mnu_file': {
'shortcut': [Keys.ALT, Keys.SHIFT, 'i'],
'locator': 'File main menu'
},
'mnu_obj': {
'shortcut': [Keys.ALT, Keys.SHIFT, 'j'],
'locator': 'Object main menu'
}
}
self.wait = WebDriverWait(self.page.driver, 10)
@ -39,8 +45,8 @@ class KeyboardShortcutFeatureTest(BaseFeatureTest):
def runTest(self):
self._update_preferences()
# On updating keyboard shortcuts, preference cache is updated.
# There is no UI event through which we can identify that the cache is updated,
# So, added time.sleep()
# There is no UI event through which we can identify that the cache
# is updated, So, added time.sleep()
time.sleep(1)
self._check_shortcuts()
@ -48,10 +54,22 @@ class KeyboardShortcutFeatureTest(BaseFeatureTest):
action = ActionChains(self.driver)
for s in self.new_shortcuts:
key_combo = self.new_shortcuts[s]['shortcut']
action.key_down(key_combo[0]).key_down(key_combo[1]).key_down(key_combo[2]).key_up(Keys.ALT).perform()
action.key_down(
key_combo[0]
).key_down(
key_combo[1]
).key_down(
key_combo[2]
).key_up(
Keys.ALT
).perform()
self.wait.until(EC.presence_of_element_located(
(By.XPATH, "//li[contains(@id, " + s + ") and contains(@class, 'open')]"))
self.wait.until(
EC.presence_of_element_located(
(By.XPATH, "//li[contains(@id, " +
s +
") and contains(@class, 'open')]")
)
)
is_open = 'open' in self.page.find_by_id(s).get_attribute('class')
@ -66,20 +84,24 @@ class KeyboardShortcutFeatureTest(BaseFeatureTest):
(By.XPATH, "//*[contains(string(), 'Show system objects?')]"))
)
self.page.find_by_css_selector(".ajs-dialog.pg-el-container .ajs-maximize").click()
self.page.find_by_css_selector(
".ajs-dialog.pg-el-container .ajs-maximize"
).click()
browser = self.page.find_by_xpath(
"//*[contains(@class,'aciTreeLi') and contains(.,'Browser')]")
browser.find_element_by_xpath(
"//*[contains(@class,'aciTreeText') and contains(.,'Keyboard shortcuts')]") \
.click()
"//*[contains(@class,'aciTreeText') and "
"contains(.,'Keyboard shortcuts')]").click()
for s in self.new_shortcuts:
key = self.new_shortcuts[s]['shortcut'][2]
locator = self.new_shortcuts[s]['locator']
file_menu = self.page.find_by_xpath(
"//div[contains(@class,'pgadmin-control-group') and contains(.,'" + locator + "')]")
"//div[contains(@class,'pgadmin-control-group') "
"and contains(.,'" + locator + "')]"
)
field = file_menu.find_element_by_name('key')
field.click()
@ -87,9 +109,10 @@ class KeyboardShortcutFeatureTest(BaseFeatureTest):
# save and close the preference dialog.
self.page.find_by_xpath(
"//*[contains(@class,'pg-alertify-button') and contains(.,'OK')]").click()
"//*[contains(@class,'pg-alertify-button') and "
"contains(.,'OK')]"
).click()
self.page.wait_for_element_to_disappear(
lambda driver: driver.find_element_by_css_selector(".ajs-modal")
)

View File

@ -25,7 +25,8 @@ try:
with open(CURRENT_PATH + '/datatype_test.json') as data_file:
test_data_configuration = json.load(data_file)
config_data = test_data_configuration['tests']
type_minimum_version = test_data_configuration['datatype_minimum_version']
type_minimum_version = \
test_data_configuration['datatype_minimum_version']
except Exception as e:
print(str(e))
@ -41,12 +42,14 @@ class PGDataypeFeatureTest(BaseFeatureTest):
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
self.timezone = int(test_utils.get_timezone_without_dst(connection))
@ -82,17 +85,20 @@ class PGDataypeFeatureTest(BaseFeatureTest):
(By.XPATH, "//*[contains(string(), 'Show system objects?')]"))
)
self.page.find_by_css_selector(".ajs-dialog.pg-el-container .ajs-maximize").click()
self.page.find_by_css_selector(
".ajs-dialog.pg-el-container .ajs-maximize").click()
sql_editor = self.page.find_by_xpath(
"//*[contains(@class,'aciTreeLi') and contains(.,'SQL Editor')]")
sql_editor.find_element_by_xpath(
"//*[contains(@class,'aciTreeText') and contains(.,'Options')]")\
.click()
"//*[contains(@class,'aciTreeText') and contains(.,'Options')]"
).click()
insert_bracket_pairs_control= self.page.find_by_xpath(
"//div[contains(@class,'pgadmin-control-group') and contains(.,'Insert bracket pairs?')]")
insert_bracket_pairs_control = self.page.find_by_xpath(
"//div[contains(@class,'pgadmin-control-group') and "
"contains(.,'Insert bracket pairs?')]"
)
switch_btn = insert_bracket_pairs_control.\
find_element_by_class_name('bootstrap-switch')
@ -103,7 +109,9 @@ class PGDataypeFeatureTest(BaseFeatureTest):
# save and close the preference dialog.
self.page.find_by_xpath(
"//*[contains(@class,'pg-alertify-button') and contains(.,'OK')]").click()
"//*[contains(@class,'pg-alertify-button') and "
"contains(.,'OK')]"
).click()
self.page.wait_for_element_to_disappear(
lambda driver: driver.find_element_by_css_selector(".ajs-modal")
@ -129,12 +137,14 @@ class PGDataypeFeatureTest(BaseFeatureTest):
def after(self):
self.page.remove_server(self.server)
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
def _schema_node_expandable(self):
@ -155,9 +165,10 @@ class PGDataypeFeatureTest(BaseFeatureTest):
wait.until(EC.presence_of_element_located(
(By.XPATH,
"//*[contains(@class,'column-type') and contains(.,'{}')]".format(batch['datatype'][0])
))
)
"//*[contains(@class,'column-type') and "
"contains(.,'{}')]".format(batch['datatype'][0])
)
))
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas"))
@ -168,10 +179,12 @@ class PGDataypeFeatureTest(BaseFeatureTest):
cells = canvas.find_elements_by_css_selector('.slick-cell')
# remove first element as it is row number.
cells.pop(0)
for val, cell, datatype in zip(batch['output'], cells, batch['datatype']):
for val, cell, datatype in zip(
batch['output'], cells, batch['datatype']):
expected_output = batch['output'][cnt - 2]
if not self._is_datatype_available_in_current_database(datatype):
if not self._is_datatype_available_in_current_database(
datatype):
cnt += 1
continue
@ -212,7 +225,7 @@ class PGDataypeFeatureTest(BaseFeatureTest):
if first:
query += dataformatter.format(inputdata, datatype)
else:
query += ','+dataformatter.format(inputdata, datatype)
query += ',' + dataformatter.format(inputdata, datatype)
first = False
return query + ';'
@ -236,4 +249,6 @@ class PGDataypeFeatureTest(BaseFeatureTest):
self.page.click_modal('Yes')
def _is_datatype_available_in_current_database(self, datatype):
return datatype == '' or self.database_version >= type_minimum_version[datatype]
if datatype == '':
return True
return self.database_version >= type_minimum_version[datatype]

View File

@ -26,14 +26,17 @@ class QueryToolJourneyTest(BaseFeatureTest):
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port']
)
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
test_utils.create_table(self.server, "acceptance_test_db", "test_table")
test_utils.create_table(
self.server, "acceptance_test_db", "test_table")
self.page.add_server(self.server)
def runTest(self):
@ -50,8 +53,10 @@ class QueryToolJourneyTest(BaseFeatureTest):
def _test_copies_rows(self):
pyperclip.copy("old clipboard contents")
self.page.driver.switch_to.default_content()
self.page.driver.switch_to_frame(self.page.driver.find_element_by_tag_name("iframe"))
self.page.find_by_xpath("//*[contains(@class, 'slick-row')]/*[1]").click()
self.page.driver.switch_to_frame(
self.page.driver.find_element_by_tag_name("iframe"))
self.page.find_by_xpath(
"//*[contains(@class, 'slick-row')]/*[1]").click()
self.page.find_by_xpath("//*[@id='btn-copy-row']").click()
self.assertEqual('"Some-Name"\t"6"\t"some info"',
@ -61,8 +66,12 @@ class QueryToolJourneyTest(BaseFeatureTest):
pyperclip.copy("old clipboard contents")
self.page.driver.switch_to.default_content()
self.page.driver.switch_to_frame(self.page.driver.find_element_by_tag_name("iframe"))
self.page.find_by_xpath("//*[@data-test='output-column-header' and contains(., 'some_column')]").click()
self.page.driver.switch_to_frame(
self.page.driver.find_element_by_tag_name("iframe"))
self.page.find_by_xpath(
"//*[@data-test='output-column-header' and "
"contains(., 'some_column')]"
).click()
self.page.find_by_xpath("//*[@id='btn-copy-row']").click()
self.assertTrue('"Some-Name"' in pyperclip.paste())
@ -76,22 +85,32 @@ class QueryToolJourneyTest(BaseFeatureTest):
self._execute_query("SELECT * FROM table_that_doesnt_exist")
self.page.click_tab("Query History")
selected_history_entry = self.page.find_by_css_selector("#query_list .selected")
self.assertIn("SELECT * FROM table_that_doesnt_exist", selected_history_entry.text)
selected_history_entry = self.page.find_by_css_selector(
"#query_list .selected")
self.assertIn("SELECT * FROM table_that_doesnt_exist",
selected_history_entry.text)
failed_history_detail_pane = self.page.find_by_id("query_detail")
self.assertIn("Error Message relation \"table_that_doesnt_exist\" does not exist", failed_history_detail_pane.text)
self.assertIn(
"Error Message relation \"table_that_doesnt_exist\" "
"does not exist", failed_history_detail_pane.text
)
ActionChains(self.page.driver) \
.send_keys(Keys.ARROW_DOWN) \
.perform()
selected_history_entry = self.page.find_by_css_selector("#query_list .selected")
self.assertIn("SELECT * FROM test_table ORDER BY value", selected_history_entry.text)
selected_history_entry = self.page.find_by_css_selector(
"#query_list .selected")
self.assertIn("SELECT * FROM test_table ORDER BY value",
selected_history_entry.text)
selected_history_detail_pane = self.page.find_by_id("query_detail")
self.assertIn("SELECT * FROM test_table ORDER BY value", selected_history_detail_pane.text)
newly_selected_history_entry = self.page.find_by_xpath("//*[@id='query_list']/ul/li[2]")
self.assertIn("SELECT * FROM test_table ORDER BY value",
selected_history_detail_pane.text)
newly_selected_history_entry = self.page.find_by_xpath(
"//*[@id='query_list']/ul/li[2]")
self.page.click_element(newly_selected_history_entry)
selected_history_detail_pane = self.page.find_by_id("query_detail")
self.assertIn("SELECT * FROM table_that_doesnt_exist", selected_history_detail_pane.text)
self.assertIn("SELECT * FROM table_that_doesnt_exist",
selected_history_detail_pane.text)
self.__clear_query_tool()
@ -104,7 +123,8 @@ class QueryToolJourneyTest(BaseFeatureTest):
self.page.click_tab("Query History")
query_we_need_to_scroll_to = self.page.find_by_xpath("//*[@id='query_list']/ul/li[17]")
query_we_need_to_scroll_to = self.page.find_by_xpath(
"//*[@id='query_list']/ul/li[17]")
self.page.click_element(query_we_need_to_scroll_to)

View File

@ -29,12 +29,14 @@ class QueryToolFeatureTest(BaseFeatureTest):
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
self.page.wait_for_spinner_to_disappear()
@ -96,12 +98,14 @@ class QueryToolFeatureTest(BaseFeatureTest):
def after(self):
self.page.remove_server(self.server)
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
def _reset_options(self):
@ -152,7 +156,9 @@ class QueryToolFeatureTest(BaseFeatureTest):
ON_DEMAND_CHUNKS = 2
row_id_to_find = config.ON_DEMAND_RECORD_COUNT * ON_DEMAND_CHUNKS
query = """-- On demand query result on scroll, grid select all, column select all
query = """-- On demand query result on scroll
-- Grid select all
-- Column select all
SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format(
config.ON_DEMAND_RECORD_COUNT * ON_DEMAND_CHUNKS)
@ -172,7 +178,8 @@ SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format(
# scroll to bottom to fetch next chunk of result set.
self.driver.execute_script(
"pgAdmin.SqlEditor.jquery('.slick-viewport').scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());"
"pgAdmin.SqlEditor.jquery('.slick-viewport')"
".scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());"
)
canvas = wait.until(EC.presence_of_element_located(
@ -214,9 +221,10 @@ SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format(
# click on first data column to select all column.
wait.until(EC.presence_of_element_located(
(
By.XPATH,
"//span[contains(@class, 'column-name') and contains(., 'id1')]"))
(
By.XPATH,
"//span[contains(@class, 'column-name') "
"and contains(., 'id1')]"))
).click()
canvas = wait.until(EC.presence_of_element_located(
@ -228,7 +236,8 @@ SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format(
def _check_ondemand_result(self, row_id_to_find, canvas):
# scroll to bottom to bring last row of next chunk in viewport.
self.driver.execute_script(
"pgAdmin.SqlEditor.jquery('.slick-viewport').scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());"
"pgAdmin.SqlEditor.jquery('.slick-viewport')"
".scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());"
)
canvas.find_element_by_xpath(
@ -325,7 +334,8 @@ CREATE TABLE public.{}();""".format(table_name)
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and contains(string(), "CREATE TABLE")]'
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "CREATE TABLE")]'
)
self._clear_query_tool()
@ -339,7 +349,8 @@ ROLLBACK;"""
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and contains(string(), "ROLLBACK")]'
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "ROLLBACK")]'
)
self._clear_query_tool()
@ -347,7 +358,8 @@ ROLLBACK;"""
-- 2. (Done) Create table in public schema.
-- 3. (Done) ROLLBACK transaction.
-- 4. Check if table is *NOT* created.
SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
SELECT relname FROM pg_class
WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_id("btn-flash").click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
@ -355,9 +367,14 @@ SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
el = canvas.find_elements_by_xpath("//div[contains(@class, 'slick-cell') and contains(text(), '{}')]".format(table_name))
el = canvas.find_elements_by_xpath(
"//div[contains(@class, 'slick-cell') and "
"contains(text(), '{}')]".format(table_name))
assert len(el) == 0, "Table '{}' created with auto commit disabled and without any explicit commit.".format(table_name)
assert len(el) == 0, "Table '{}' created with auto commit disabled " \
"and without any explicit commit.".format(
table_name
)
def _query_tool_auto_commit_enabled(self):
@ -400,7 +417,8 @@ CREATE TABLE public.{}();""".format(table_name)
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and contains(string(), "CREATE TABLE")]'
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "CREATE TABLE")]'
)
self._clear_query_tool()
@ -415,7 +433,8 @@ ROLLBACK;"""
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and contains(string(), "ROLLBACK")]'
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "ROLLBACK")]'
)
self._clear_query_tool()
@ -424,7 +443,8 @@ ROLLBACK;"""
-- 3. (Done) Create table in public schema.
-- 4. (Done) ROLLBACK transaction
-- 5. Check if table is created event after ROLLBACK.
SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
SELECT relname FROM pg_class
WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_id("btn-flash").click()
self.page.click_tab('Data Output')
@ -433,9 +453,12 @@ SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
el = canvas.find_elements_by_xpath("//div[contains(@class, 'slick-cell') and contains(text(), '{}')]".format(table_name))
el = canvas.find_elements_by_xpath(
"//div[contains(@class, 'slick-cell') and "
"contains(text(), '{}')]".format(table_name))
assert len(el) != 0, "Table '{}' is not created with auto commit enabled.".format(table_name)
assert len(el) != 0, "Table '{}' is not created with auto " \
"commit enabled.".format(table_name)
def _query_tool_auto_rollback_enabled(self):
table_name = 'query_tool_auto_rollback_enabled_table'
@ -473,7 +496,8 @@ CREATE TABLE public.{}();""".format(table_name)
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and contains(string(), "CREATE TABLE")]'
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "CREATE TABLE")]'
)
self._clear_query_tool()
@ -489,7 +513,8 @@ SELECT 1/0;"""
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and contains(string(), "division by zero")]'
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "division by zero")]'
)
self._clear_query_tool()
@ -506,7 +531,8 @@ END;"""
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and contains(string(), "Query returned successfully")]'
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "Query returned successfully")]'
)
self._clear_query_tool()
@ -516,7 +542,8 @@ END;"""
-- 4. (Done) Generate error in transaction.
-- 5. (Done) END transaction.
-- 6. Check if table is *NOT* created after ending transaction.
SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
SELECT relname FROM pg_class
WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;"""
self.page.fill_codemirror_area_with(query)
self.page.find_by_id("btn-flash").click()
self.page.wait_for_query_tool_loading_indicator_to_disappear()
@ -524,9 +551,12 @@ SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2
canvas = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")))
el = canvas.find_elements_by_xpath("//div[contains(@class, 'slick-cell') and contains(text(), '{}')]".format(table_name))
el = canvas.find_elements_by_xpath(
"//div[contains(@class, 'slick-cell') and "
"contains(text(), '{}')]".format(table_name))
assert len(el) == 0, "Table '{}' created even after ROLLBACK due to sql error.".format(table_name)
assert len(el) == 0, "Table '{}' created even after ROLLBACK due to " \
"sql error.".format(table_name)
def _query_tool_cancel_query(self):
query = """-- 1. END any open transaction.
@ -548,8 +578,8 @@ SELECT 1, pg_sleep(300)"""
# if auto rollback is disabled then 'i' element will
# have 'auto-rollback fa fa-check visibility-hidden' classes
if 'auto-rollback fa fa-check' == str(auto_rollback_check.get_attribute(
'class')):
if 'auto-rollback fa fa-check' == str(
auto_rollback_check.get_attribute('class')):
auto_rollback_btn.click()
auto_commit_btn = self.page.find_by_id("btn-auto-commit")
@ -561,8 +591,8 @@ SELECT 1, pg_sleep(300)"""
# if auto commit is disabled then 'i' element will
# have 'auto-commit fa fa-check visibility-hidden' classes
if 'auto-commit fa fa-check visibility-hidden' == str(auto_commit_check.get_attribute(
'class')):
if 'auto-commit fa fa-check visibility-hidden' == str(
auto_commit_check.get_attribute('class')):
auto_commit_btn.click()
self.page.find_by_id("btn-flash").click()
@ -571,14 +601,17 @@ SELECT 1, pg_sleep(300)"""
self.page.wait_for_query_tool_loading_indicator_to_disappear()
self.page.click_tab('Messages')
self.page.find_by_xpath(
'//div[contains(@class, "sql-editor-message") and contains(string(), "canceling statement due to user request")]'
'//div[contains(@class, "sql-editor-message") and '
'contains(string(), "canceling statement due to user request")]'
)
def _test_explain_plan_feature(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
return connection.server_version > 90100

View File

@ -19,12 +19,14 @@ class TableDdlFeatureTest(BaseFeatureTest):
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
@ -32,7 +34,8 @@ class TableDdlFeatureTest(BaseFeatureTest):
self.page.add_server(self.server)
def runTest(self):
test_utils.create_table(self.server, "acceptance_test_db", "test_table")
test_utils.create_table(
self.server, "acceptance_test_db", "test_table")
self.page.toggle_open_server(self.server['name'])
self.page.toggle_open_tree_item('Databases')
@ -44,14 +47,17 @@ class TableDdlFeatureTest(BaseFeatureTest):
self.page.click_tab("SQL")
self.page.find_by_xpath(
"//*[contains(@class,'CodeMirror-lines') and contains(.,'CREATE TABLE public.test_table')]")
"//*[contains(@class,'CodeMirror-lines') and "
"contains(.,'CREATE TABLE public.test_table')]")
def after(self):
self.page.remove_server(self.server)
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")

View File

@ -22,7 +22,8 @@ CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
try:
with open(CURRENT_PATH + '/test_data.json') as data_file:
config_data = json.load(data_file)['table_insert_update_cases']['add_update']
config_data = json.load(data_file)[
'table_insert_update_cases']['add_update']
except Exception as e:
print(str(e))
@ -44,8 +45,8 @@ class CheckForViewDataTest(BaseFeatureTest):
"""
scenarios = [
("Validate Insert, Update operations in View/Edit data with given test "
"data",
("Validate Insert, Update operations in View/Edit data with "
"given test data",
dict())
]
@ -58,7 +59,8 @@ CREATE TABLE public.defaults
id serial NOT NULL,
number_defaults numeric(100) DEFAULT 1,
number_null numeric(100),
text_defaults text COLLATE pg_catalog."default" DEFAULT 'Hello World'::text,
text_defaults text COLLATE pg_catalog."default"
DEFAULT 'Hello World'::text,
text_null1 text COLLATE pg_catalog."default",
text_null2 text COLLATE pg_catalog."default",
text_null3 text COLLATE pg_catalog."default",
@ -82,14 +84,18 @@ CREATE TABLE public.defaults
def before(self):
with test_utils.Database(self.server) as (connection, _):
if connection.server_version < 90100:
self.skipTest("COLLATE is not present in PG versions below v9.1")
self.skipTest(
"COLLATE is not present in PG versions below v9.1"
)
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
@ -120,12 +126,14 @@ CREATE TABLE public.defaults
def after(self):
self.page.remove_server(self.server)
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
@staticmethod
@ -138,7 +146,7 @@ CREATE TABLE public.defaults
xpath_grid_row = "//*[contains(@class, 'ui-widget-content') " \
"and contains(@style, 'top:25px')]"
xpath_row_cell = '//div[contains(@class, "'+cell+'")]'
xpath_row_cell = '//div[contains(@class, "' + cell + '")]'
xpath_cell = '{0}{1}'.format(xpath_grid_row, xpath_row_cell)
@ -149,7 +157,7 @@ CREATE TABLE public.defaults
wait = WebDriverWait(self.driver, 5)
try:
wait.until(EC.text_to_be_present_in_element(
(By.XPATH, xpath+"//span"), str(value)),
(By.XPATH, xpath + "//span"), str(value)),
CheckForViewDataTest.TIMEOUT_STRING
)
except Exception:
@ -192,16 +200,20 @@ CREATE TABLE public.defaults
ActionChains(self.driver).send_keys(value).perform()
# Click on editor's Save button
self.page.find_by_xpath("//*[contains(@class, 'pg_text_editor')]"
"//button[contains(@class, 'fa-save')]").click()
self.page.find_by_xpath(
"//*[contains(@class, 'pg_text_editor')]"
"//button[contains(@class, 'fa-save')]"
).click()
else:
# Boolean editor test for to True click
if data[1] == 'true':
checkbox_el = cell_el.find_element_by_xpath(".//*[contains(@class, 'multi-checkbox')]")
checkbox_el = cell_el.find_element_by_xpath(
".//*[contains(@class, 'multi-checkbox')]")
checkbox_el.click()
# Boolean editor test for to False click
elif data[1] == 'false':
checkbox_el = cell_el.find_element_by_xpath(".//*[contains(@class, 'multi-checkbox')]")
checkbox_el = cell_el.find_element_by_xpath(
".//*[contains(@class, 'multi-checkbox')]")
# Sets true
checkbox_el.click()
# Sets false
@ -217,10 +229,11 @@ CREATE TABLE public.defaults
def _view_data_grid(self):
self.page.driver.find_element_by_link_text("Object").click()
ActionChains(self.page.driver) \
.move_to_element(
self.page.driver.find_element_by_link_text("View/Edit Data")) \
.perform()
ActionChains(
self.page.driver
).move_to_element(
self.page.driver.find_element_by_link_text("View/Edit Data")
).perform()
self.page.find_by_partial_link_text("All Rows").click()
# wait until datagrid frame is loaded.
@ -274,7 +287,7 @@ CREATE TABLE public.defaults
def _add_row(self):
for idx in range(1, len(config_data.keys()) + 1):
cell_xpath = CheckForViewDataTest._get_cell_xpath(
'r'+str(idx), 1
'r' + str(idx), 1
)
time.sleep(0.2)
self._update_cell(cell_xpath, config_data[str(idx)])

View File

@ -31,21 +31,26 @@ class CheckForXssFeatureTest(BaseFeatureTest):
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
test_utils.create_database(self.server, "acceptance_test_db")
test_utils.create_table(self.server, "acceptance_test_db",
"<h1>X")
test_utils.create_table(
self.server, "acceptance_test_db", "<h1>X"
)
# This is needed to test dependents tab (eg: BackGrid)
test_utils.create_constraint(self.server, "acceptance_test_db",
"<h1>X",
"unique", "<h1 onmouseover='console.log(2);'>Y")
test_utils.create_constraint(
self.server, "acceptance_test_db",
"<h1>X",
"unique", "<h1 onmouseover='console.log(2);'>Y"
)
def runTest(self):
self.page.wait_for_spinner_to_disappear()
@ -62,12 +67,14 @@ class CheckForXssFeatureTest(BaseFeatureTest):
def after(self):
self.page.remove_server(self.server)
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
connection = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(connection, "acceptance_test_db")
def _tables_node_expandable(self):
@ -106,7 +113,8 @@ class CheckForXssFeatureTest(BaseFeatureTest):
self.page.click_tab("SQL")
# Fetch the inner html & check for escaped characters
source_code = self.page.find_by_xpath(
"//*[contains(@class,'CodeMirror-lines') and contains(.,'CREATE TABLE')]"
"//*[contains(@class,'CodeMirror-lines') and "
"contains(.,'CREATE TABLE')]"
).get_attribute('innerHTML')
self._check_escaped_characters(
@ -115,11 +123,13 @@ class CheckForXssFeatureTest(BaseFeatureTest):
"SQL tab (Code Mirror)"
)
def _check_xss_in_dependents_tab(self): # Create any constraint with xss name to test this
# Create any constraint with xss name to test this
def _check_xss_in_dependents_tab(self):
self.page.click_tab("Dependents")
source_code = self.page.find_by_xpath(
"//*[@id='5']/table/tbody/tr/td/div/div/div[2]/table/tbody/tr/td[2]"
"//*[@id='5']/table/tbody/tr/td/div/div/div[2]/"
"table/tbody/tr/td[2]"
).get_attribute('innerHTML')
self._check_escaped_characters(
@ -138,7 +148,8 @@ class CheckForXssFeatureTest(BaseFeatureTest):
self.page.find_by_id("btn-flash").click()
result_row = self.page.find_by_xpath(
"//*[contains(@class, 'ui-widget-content') and contains(@style, 'top:0px')]"
"//*[contains(@class, 'ui-widget-content') and "
"contains(@style, 'top:0px')]"
)
cells = result_row.find_elements_by_tag_name('div')
@ -154,4 +165,5 @@ class CheckForXssFeatureTest(BaseFeatureTest):
def _check_escaped_characters(self, source_code, string_to_find, source):
# For XSS we need to search against element's html code
assert source_code.find(string_to_find) != -1, "{0} might be vulnerable to XSS ".format(source)
assert source_code.find(string_to_find) != - \
1, "{0} might be vulnerable to XSS ".format(source)

View File

@ -26,11 +26,15 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest):
def before(self):
with test_utils.Database(self.server) as (connection, _):
if connection.server_version < 90100:
self.skipTest("Functions tree node is not present in pgAdmin below PG v9.1")
self.skipTest(
"Functions tree node is not present in pgAdmin below "
"PG v9.1"
)
# Some test function is needed for debugger
test_utils.create_debug_function(self.server, "postgres",
"a_test_function")
test_utils.create_debug_function(
self.server, "postgres", "a_test_function"
)
def runTest(self):
self.page.wait_for_spinner_to_disappear()
@ -40,7 +44,7 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest):
def after(self):
test_utils.drop_debug_function(self.server, "postgres",
"a_test_function")
"a_test_function")
self.page.remove_server(self.server)
def _function_node_expandable(self):
@ -54,17 +58,21 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest):
def _debug_function(self):
self.page.driver.find_element_by_link_text("Object").click()
ActionChains(self.page.driver) \
.move_to_element(self.page.driver.find_element_by_link_text("Debugging")) \
.perform()
ActionChains(
self.page.driver
).move_to_element(
self.page.driver.find_element_by_link_text("Debugging")
).perform()
self.page.driver.find_element_by_link_text("Debug").click()
# We need to check if debugger plugin is installed or not
try:
wait = WebDriverWait(self.page.driver, 2)
is_error = wait.until(EC.presence_of_element_located(
(By.XPATH, "//div[contains(@class, 'alertify') and not(contains(@class, 'ajs-hidden'))]//div[contains(@class,'ajs-header')]"))
)
(By.XPATH, "//div[contains(@class, 'alertify') and "
"not(contains(@class, 'ajs-hidden'))]//div["
"contains(@class,'ajs-header')]")
))
except TimeoutException as e:
is_error = None
@ -88,7 +96,8 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest):
)
wait.until(EC.presence_of_element_located(
(By.XPATH, "//td[contains(@class,'test_function') and contains(.,'Hello, pgAdmin4')]"))
(By.XPATH, "//td[contains(@class,'test_function') and "
"contains(.,'Hello, pgAdmin4')]"))
)
# Only this tab is vulnerable rest are BackGrid & Code Mirror
@ -107,9 +116,11 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest):
def _close_debugger(self):
self.page.driver.switch_to_default_content()
self.page.click_element(
self.page.find_by_xpath("//*[@id='dockerContainer']/div/div[3]/div/div[2]/div[1]")
self.page.find_by_xpath(
"//*[@id='dockerContainer']/div/div[3]/div/div[2]/div[1]")
)
def _check_escaped_characters(self, source_code, string_to_find, source):
# For XSS we need to search against element's html code
assert source_code.find(string_to_find) != -1, "{0} might be vulnerable to XSS ".format(source)
assert source_code.find(string_to_find) != - \
1, "{0} might be vulnerable to XSS ".format(source)

View File

@ -23,7 +23,8 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest):
def before(self):
with test_utils.Database(self.server) as (connection, _):
if connection.server_version < 90100:
self.skipTest("Membership is not present in Postgres below PG v9.1")
self.skipTest(
"Membership is not present in Postgres below PG v9.1")
# Some test function is needed for debugger
test_utils.create_role(self.server, "postgres",
@ -45,11 +46,14 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest):
self.page.remove_server(self.server)
def _connects_to_server(self):
self.page.find_by_xpath("//*[@class='aciTreeText' and .='Servers']").click()
self.page.find_by_xpath(
"//*[@class='aciTreeText' and .='Servers']").click()
self.page.driver.find_element_by_link_text("Object").click()
ActionChains(self.page.driver) \
.move_to_element(self.page.driver.find_element_by_link_text("Create")) \
.perform()
ActionChains(
self.page.driver
).move_to_element(
self.page.driver.find_element_by_link_text("Create")
).perform()
self.page.find_by_partial_link_text("Server...").click()
server_config = self.server
@ -57,8 +61,10 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest):
self.page.find_by_partial_link_text("Connection").click()
self.page.fill_input_by_field_name("host", server_config['host'])
self.page.fill_input_by_field_name("port", server_config['port'])
self.page.fill_input_by_field_name("username", server_config['username'])
self.page.fill_input_by_field_name("password", server_config['db_password'])
self.page.fill_input_by_field_name(
"username", server_config['username'])
self.page.fill_input_by_field_name(
"password", server_config['db_password'])
self.page.find_by_xpath("//button[contains(.,'Save')]").click()
def _role_node_expandable(self):
@ -80,8 +86,12 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest):
'&lt;h1&gt;test&lt;/h1&gt;',
'Role Membership Control'
)
self.page.find_by_xpath("//button[contains(@type, 'cancel') and contains(.,'Cancel')]").click()
self.page.find_by_xpath(
"//button[contains(@type, 'cancel') and "
"contains(.,'Cancel')]"
).click()
def _check_escaped_characters(self, source_code, string_to_find, source):
# For XSS we need to search against element's html code
assert source_code.find(string_to_find) != -1, "{0} might be vulnerable to XSS ".format(source)
assert source_code.find(string_to_find) != - \
1, "{0} might be vulnerable to XSS ".format(source)

View File

@ -11,7 +11,7 @@
import pgadmin.utils.driver as driver
from flask import url_for, render_template, Response
from flask_babel import gettext as _
from flask_babel import gettext
from pgadmin.utils import PgAdminModule
from pgadmin.utils.preferences import Preferences
@ -22,18 +22,20 @@ MODULE_NAME = 'misc'
class MiscModule(PgAdminModule):
def get_own_javascripts(self):
return [{
'name': 'pgadmin.misc.explain',
'path': url_for('misc.index') + 'explain/explain',
'preloaded': False
}, {
'name': 'snap.svg',
'path': url_for(
'misc.static', filename='explain/vendor/snap.svg/' + (
'snap.svg' if config.DEBUG else 'snap.svg-min'
)),
'preloaded': False
}]
return [
{
'name': 'pgadmin.misc.explain',
'path': url_for('misc.index') + 'explain/explain',
'preloaded': False
}, {
'name': 'snap.svg',
'path': url_for(
'misc.static', filename='explain/vendor/snap.svg/' + (
'snap.svg' if config.DEBUG else 'snap.svg-min'
)),
'preloaded': False
}
]
def get_own_stylesheets(self):
stylesheets = []
@ -46,18 +48,24 @@ class MiscModule(PgAdminModule):
"""
Register preferences for this module.
"""
self.misc_preference = Preferences('miscellaneous', _('Miscellaneous'))
self.misc_preference = Preferences(
'miscellaneous', gettext('Miscellaneous')
)
lang_options = []
for lang in config.LANGUAGES:
lang_options.append({'label': config.LANGUAGES[lang],
'value': lang})
lang_options.append(
{
'label': config.LANGUAGES[lang],
'value': lang
}
)
# Register options for the User language settings
self.misc_preference.register(
'miscellaneous', 'user_language',
_("User language"), 'options', 'en',
category_label=_('User language'),
gettext("User language"), 'options', 'en',
category_label=gettext('User language'),
options=lang_options
)
@ -102,7 +110,8 @@ def explain_js():
"""
return Response(
response=render_template(
"explain/js/explain.js", _=_
"explain/js/explain.js",
_=gettext
),
status=200,
mimetype="application/javascript"

View File

@ -103,7 +103,6 @@ def acknowledge(pid):
"""
try:
BatchProcess.acknowledge(pid)
return success_return()
except LookupError as lerr:
return gone(errormsg=str(lerr))

View File

@ -55,11 +55,13 @@ if _IS_PY2:
else:
def _log(msg):
with open(_log_file, 'a') as fp:
fp.write(('INFO:: %s\n' % msg.encode('ascii', 'xmlcharrefreplace')))
fp.write(
('INFO:: %s\n' % msg.encode('ascii', 'xmlcharrefreplace'))
)
def _log_exception():
type_, value_, traceback_ = info=sys.exc_info()
type_, value_, traceback_ = info = sys.exc_info()
with open(_log_file, 'ab') as fp:
from traceback import format_exception
@ -159,7 +161,7 @@ class ProcessLogger(Thread):
Thread.__init__(self)
self.process = None
self.stream = None
self.logger = open(os.path.join(_out_dir, stream_type), 'wb')
self.logger = open(os.path.join(_out_dir, stream_type), 'wb')
def attach_process_stream(self, process, stream):
"""
@ -189,9 +191,15 @@ class ProcessLogger(Thread):
# Write into log file
if self.logger:
if msg:
self.logger.write(get_current_time(format='%y%m%d%H%M%S%f').encode('utf-8'))
self.logger.write(
get_current_time(
format='%y%m%d%H%M%S%f'
).encode('utf-8')
)
self.logger.write(b',')
self.logger.write(msg.lstrip(b'\r\n' if _IS_WIN else b'\n'))
self.logger.write(
msg.lstrip(b'\r\n' if _IS_WIN else b'\n')
)
self.logger.write(os.linesep.encode('utf-8'))
return True
@ -215,7 +223,10 @@ class ProcessLogger(Thread):
get_current_time(
format='%y%m%d%H%M%S%f'
),
msg.lstrip(b'\r\n' if _IS_WIN else b'\n'), os.linesep
msg.lstrip(
b'\r\n' if _IS_WIN else b'\n'
),
os.linesep
)
)
@ -253,9 +264,8 @@ def update_status(**kw):
if _out_dir:
status = dict(
(k, v) for k, v in kw.items() if k in [
'start_time', 'end_time', 'exit_code', 'pid'
]
(k, v) for k, v in kw.items()
if k in ('start_time', 'end_time', 'exit_code', 'pid')
)
_log('Updating the status:\n{0}'.format(json.dumps(status)))
with open(os.path.join(_out_dir, 'status'), 'w') as fp:
@ -396,7 +406,7 @@ def convert_environment_variables(env):
if not isinstance(value, str):
value = value.encode(_sys_encoding)
temp_env[key] = value
except Exception as e:
except Exception:
_log_exception()
return temp_env
@ -411,8 +421,8 @@ if __name__ == '__main__':
_fs_encoding = sys.getfilesystemencoding()
if not _fs_encoding or _fs_encoding == 'ascii':
# Fall back to 'utf-8', if we couldn't determine the file-system encoding,
# or 'ascii'.
# Fall back to 'utf-8', if we couldn't determine the file-system
# encoding or 'ascii'.
_fs_encoding = 'utf-8'
def u(_s, _encoding=_sys_encoding):
@ -442,14 +452,14 @@ if __name__ == '__main__':
# the child process to run as a daemon. And, it would run without
# depending on the status of the web-server.
if 'PGA_BGP_FOREGROUND' in os.environ and \
os.environ['PGA_BGP_FOREGROUND'] == "1":
os.environ['PGA_BGP_FOREGROUND'] == "1":
_log('[CHILD] Start process execution...')
# This is a child process running as the daemon process.
# Let's do the job assigning to it.
try:
_log('Executing the command now from the detached child...')
execute()
except:
except Exception:
_log_exception()
else:
from subprocess import CREATE_NEW_PROCESS_GROUP
@ -464,10 +474,11 @@ if __name__ == '__main__':
env['PGA_BGP_FOREGROUND'] = "1"
# We need environment variables & values in string
_log('[PARENT] Converting the environment variable in the bytes format...')
_log('[PARENT] Converting the environment variable in the '
'bytes format...')
try:
env = convert_environment_variables(env)
except Exception as e:
except Exception:
_log_exception()
kwargs = {
@ -477,7 +488,7 @@ if __name__ == '__main__':
'creationflags': CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS,
'close_fds': False,
'cwd': _out_dir,
'env': env
'env': env
}
cmd = [sys.executable]

View File

@ -66,7 +66,9 @@ class BatchProcess(object):
if 'id' in kwargs:
self._retrieve_process(kwargs['id'])
else:
self._create_process(kwargs['desc'], kwargs['cmd'], kwargs['args'])
self._create_process(
kwargs['desc'], kwargs['cmd'], kwargs['args']
)
def _retrieve_process(self, _id):
p = Process.query.filter_by(pid=_id, user_id=current_user.id).first()
@ -159,17 +161,25 @@ class BatchProcess(object):
args_csv_io, delimiter=str(','), quoting=csv.QUOTE_MINIMAL
)
if sys.version_info.major == 2:
csv_writer.writerow([a.encode('utf-8') if isinstance(a, unicode) else a for a in _args])
csv_writer.writerow(
[
a.encode('utf-8')
if isinstance(a, unicode) else a for a in _args
]
)
else:
csv_writer.writerow(_args)
args_val = args_csv_io.getvalue().strip(str('\r\n'))
j = Process(
pid=int(id), command=_cmd,
arguments=args_val.decode('utf-8', 'replace') if IS_PY2 and hasattr(args_val, 'decode') \
else args_val,
logdir=log_dir, desc=dumps(self.desc), user_id=current_user.id
pid=int(id),
command=_cmd,
arguments=args_val.decode('utf-8', 'replace')
if IS_PY2 and hasattr(args_val, 'decode') else args_val,
logdir=log_dir,
desc=dumps(self.desc),
user_id=current_user.id
)
db.session.add(j)
db.session.commit()
@ -278,7 +288,9 @@ class BatchProcess(object):
if os.name == 'nt' and IS_PY2:
command = []
for c in cmd:
command.append(c.encode('utf-8') if isinstance(c, unicode) else str(c))
command.append(
c.encode('utf-8') if isinstance(c, unicode) else str(c)
)
current_app.logger.info(
u"Executing the process executor with the arguments: %s",
@ -288,7 +300,8 @@ class BatchProcess(object):
cmd = command
else:
current_app.logger.info(
u"Executing the process executor with the arguments: %s", str(cmd)
u"Executing the process executor with the arguments: %s",
str(cmd)
)
# Make a copy of environment, and add new variables to support
@ -318,8 +331,12 @@ class BatchProcess(object):
stderr = open(stderr, "a")
p = Popen(
cmd, close_fds=False, env=env, stdout=stdout.fileno(),
stderr=stderr.fileno(), stdin=stdin.fileno(),
cmd,
close_fds=False,
env=env,
stdout=stdout.fileno(),
stderr=stderr.fileno(),
stdin=stdin.fileno(),
creationflags=(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS)
)
else:
@ -424,10 +441,12 @@ class BatchProcess(object):
execution_time = (etime - stime).total_seconds()
if process_output:
out, out_completed = read_log(self.stdout, stdout, out, ctime,
self.ecode)
err, err_completed = read_log(self.stderr, stderr, err, ctime,
self.ecode)
out, out_completed = read_log(
self.stdout, stdout, out, ctime, self.ecode
)
err, err_completed = read_log(
self.stderr, stderr, err, ctime, self.ecode
)
else:
out_completed = err_completed = False
@ -439,8 +458,16 @@ class BatchProcess(object):
}
return {
'out': {'pos': out, 'lines': stdout, 'done': out_completed},
'err': {'pos': err, 'lines': stderr, 'done': err_completed},
'out': {
'pos': out,
'lines': stdout,
'done': out_completed
},
'err': {
'pos': err,
'lines': stderr,
'done': err_completed
},
'start_time': self.stime,
'exit_code': self.ecode,
'execution_time': execution_time
@ -475,9 +502,8 @@ class BatchProcess(object):
except ValueError as e:
current_app.logger.warning(
_("Status for the background process '{0}' could not be loaded.").format(
p.pid
)
_("Status for the background process '{0}' could "
"not be loaded.").format(p.pid)
)
current_app.logger.exception(e)
return False, False
@ -514,8 +540,8 @@ class BatchProcess(object):
if isinstance(desc, IProcessDesc):
args = []
args_csv = StringIO(
p.arguments.encode('utf-8') \
if hasattr(p.arguments, 'decode') else p.arguments
p.arguments.encode('utf-8')
if hasattr(p.arguments, 'decode') else p.arguments
)
args_reader = csv.reader(args_csv, delimiter=str(','))
for arg in args_reader:

View File

@ -253,12 +253,12 @@ def file_manager_config(trans_id):
show_hidden_files = pref.preference('show_hidden_files').get()
return Response(response=render_template(
"file_manager/js/file_manager_config.json",
_=gettext,
data=data,
file_dialog_view=file_dialog_view,
show_hidden_files=show_hidden_files
),
"file_manager/js/file_manager_config.json",
_=gettext,
data=data,
file_dialog_view=file_dialog_view,
show_hidden_files=show_hidden_files
),
status=200,
mimetype="application/json"
)
@ -301,6 +301,7 @@ def save_last_directory_visited(trans_id):
data={'status': True}
)
@blueprint.route(
"/save_file_dialog_view/<int:trans_id>", methods=["POST"],
endpoint='save_file_dialog_view'
@ -312,6 +313,7 @@ def save_file_dialog_view(trans_id):
data={'status': True}
)
@blueprint.route(
"/save_show_hidden_file_option/<int:trans_id>", methods=["PUT"],
endpoint='save_show_hidden_file_option'
@ -331,7 +333,9 @@ class Filemanager(object):
self.trans_id = trans_id
self.patherror = encode_json(
{
'Error': gettext('No permission to operate on specified path.'),
'Error': gettext(
'No permission to operate on specified path.'
),
'Code': 0
}
)
@ -407,7 +411,8 @@ class Filemanager(object):
last_dir = last_dir[:-1]
while last_dir:
if os.path.exists(
(storage_dir if storage_dir is not None else '') + last_dir):
storage_dir
if storage_dir is not None else '' + last_dir):
break
if _platform == 'win32':
index = max(last_dir.rfind('\\'), last_dir.rfind('/'))
@ -426,7 +431,8 @@ class Filemanager(object):
# create configs using above configs
configs = {
"fileroot": last_dir.replace('\\', '\\\\'), # for JS json compatibility
# for JS json compatibility
"fileroot": last_dir.replace('\\', '\\\\'),
"dialog_type": fm_type,
"title": title,
"upload": {
@ -516,7 +522,7 @@ class Filemanager(object):
drives.append(letter)
bitmask >>= 1
if (drive_name != '' and drive_name is not None and
drive_name in drives):
drive_name in drives):
return u"{0}{1}".format(drive_name, ':')
else:
return drives # return drives if no argument is passed
@ -577,7 +583,7 @@ class Filemanager(object):
try:
drive_size = getDriveSize(path)
drive_size_in_units = sizeof_fmt(drive_size)
except:
except Exception:
drive_size = 0
protected = 1 if drive_size == 0 else 0
files[file_name] = {
@ -606,10 +612,10 @@ class Filemanager(object):
}
user_dir = path
folders_only = trans_data['folders_only'] if 'folders_only' in \
trans_data else ''
files_only = trans_data['files_only'] if 'files_only' in \
trans_data else ''
folders_only = trans_data['folders_only'] \
if 'folders_only' in trans_data else ''
files_only = trans_data['files_only'] \
if 'files_only' in trans_data else ''
supported_types = trans_data['supported_types'] \
if 'supported_types' in trans_data else []
@ -622,7 +628,7 @@ class Filemanager(object):
# continue if file/folder is hidden (based on user preference)
if not is_show_hidden_files and \
(is_folder_hidden(system_path) or f.startswith('.')):
(is_folder_hidden(system_path) or f.startswith('.')):
continue
user_path = os.path.join(os.path.join(user_dir, f))
@ -645,8 +651,8 @@ class Filemanager(object):
# filter files based on file_type
if file_type is not None and file_type != "*":
if folders_only or len(supported_types) > 0 and \
file_extension not in supported_types or \
file_type != file_extension:
file_extension not in supported_types or \
file_type != file_extension:
continue
# create a list of files and folders
@ -790,8 +796,9 @@ class Filemanager(object):
}
if not path_exists(orig_path):
thefile['Error'] = gettext(u"'{0}' file does not exist.".format(
path))
thefile['Error'] = gettext(
u"'{0}' file does not exist.".format(path)
)
thefile['Code'] = -1
return thefile
@ -822,7 +829,8 @@ class Filemanager(object):
if not dir.endswith('/'):
dir += u'/'
filelist = self.list_filesystem(dir, path, trans_data, file_type, show_hidden)
filelist = self.list_filesystem(
dir, path, trans_data, file_type, show_hidden)
return filelist
def rename(self, old=None, new=None, req=None):
@ -901,7 +909,8 @@ class Filemanager(object):
}
dir = self.dir if self.dir is not None else ''
path = path.encode('utf-8').decode('utf-8') if hasattr(str, 'decode') else path
path = path.encode(
'utf-8').decode('utf-8') if hasattr(str, 'decode') else path
orig_path = u"{0}{1}".format(dir, path)
try:
@ -951,20 +960,23 @@ class Filemanager(object):
file_obj = req.files['newfile']
file_name = file_obj.filename
if hasattr(str, 'decode'):
path = req.form.get('currentpath').encode('utf-8').decode('utf-8')
path = req.form.get('currentpath').encode(
'utf-8').decode('utf-8')
file_name = file_obj.filename.encode('utf-8').decode('utf-8')
orig_path = u"{0}{1}".format(dir, path)
newName = u"{0}{1}".format(orig_path, file_name)
with open(newName, 'wb') as f:
while True:
data = file_obj.read(4194304) # 4MB chunk (4 * 1024 * 1024 Bytes)
# 4MB chunk (4 * 1024 * 1024 Bytes)
data = file_obj.read(4194304)
if not data:
break
f.write(data)
except Exception as e:
code = 0
err_msg = u"Error: {0}".format(e.strerror if hasattr(e, 'strerror') else u'Unknown')
err_msg = u"Error: {0}".format(
e.strerror if hasattr(e, 'strerror') else u'Unknown')
try:
Filemanager.check_access_permission(dir, path)
@ -998,7 +1010,8 @@ class Filemanager(object):
path = path.encode('utf-8').decode('utf-8')
try:
orig_path = u"{0}{1}".format(dir, path)
Filemanager.check_access_permission(dir, u"{}{}".format(path, name))
Filemanager.check_access_permission(
dir, u"{}{}".format(path, name))
newName = u"{0}{1}".format(orig_path, name)
if not os.path.exists(newName):
@ -1059,14 +1072,14 @@ class Filemanager(object):
# check if file type is text or binary
text_chars = bytearray([7, 8, 9, 10, 12, 13, 27]) \
+ bytearray(range(0x20, 0x7f)) \
+ bytearray(range(0x80, 0x100))
+ bytearray(range(0x20, 0x7f)) \
+ bytearray(range(0x80, 0x100))
def is_binary_string(bytes_data):
"""Checks if string data is binary"""
return bool(
bytes_data.translate(None, text_chars)
)
bytes_data.translate(None, text_chars)
)
# read the file
try:
@ -1180,11 +1193,13 @@ class Filemanager(object):
orig_path = u"{0}{1}".format(dir, path)
try:
Filemanager.check_access_permission(dir, u"{}{}".format(
path, path))
Filemanager.check_access_permission(
dir, u"{}{}".format(path, path)
)
except Exception as e:
resp = Response(gettext(u"Error: {0}".format(e)))
resp.headers['Content-Disposition'] = 'attachment; filename=' + name
resp.headers['Content-Disposition'] = \
'attachment; filename=' + name
return resp
name = path.split('/')[-1]