Fixed Securtiy Hotspot reported by SonarQube.

This commit is contained in:
Akshay Joshi
2022-08-12 17:10:26 +05:30
parent b34078c6d2
commit 084203debc
54 changed files with 170 additions and 169 deletions

View File

@@ -7,7 +7,7 @@
#
##########################################################################
import random
import secrets
import string
import urllib3
import ipaddress
@@ -39,4 +39,4 @@ def get_my_ip():
def get_random_id():
""" Return a random 10 byte string """
letters = string.ascii_letters + string.digits
return ''.join(random.choice(letters) for _ in range(10))
return ''.join(secrets.choice(letters) for _ in range(10))

View File

@@ -28,12 +28,11 @@ def __generate_otp() -> str:
str: A six-digits OTP for the current user
"""
import time
import base64
import codecs
import random
import secrets
code = codecs.encode("{}{}{}".format(
time.time(), current_user.username, random.randint(1000, 9999)
time.time(), current_user.username, secrets.choice(range(1000, 9999))
).encode(), "hex")
res = 0

View File

@@ -9,7 +9,7 @@
"""A blueprint module implementing the Webserver authentication."""
import random
import secrets
import string
import config
from flask import request, current_app, session, Response, render_template, \
@@ -23,7 +23,6 @@ from pgadmin.utils.constants import WEBSERVER
from pgadmin.utils import PgAdminModule
from pgadmin.utils.csrf import pgCSRFProtect
from flask_security.utils import logout_user
from os import environ, path, remove
class WebserverModule(PgAdminModule):
@@ -91,7 +90,7 @@ class WebserverAuthentication(BaseAuthentication):
"Webserver authenticate failed.")
session['pass_enc_key'] = ''.join(
(random.choice(string.ascii_lowercase) for _ in range(10)))
(secrets.choice(string.ascii_lowercase) for _ in range(10)))
useremail = request.environ.get('mail')
if not useremail:
useremail = ''

View File

@@ -10,13 +10,13 @@
""" Implements Partitions Node """
import re
import random
import secrets
import simplejson as json
import pgadmin.browser.server_groups.servers.databases.schemas as schema
from flask import render_template, request, current_app
from flask_babel import gettext
from pgadmin.browser.server_groups.servers.databases.schemas.utils \
import DataTypeReader, VacuumSettings
import DataTypeReader
from pgadmin.utils.ajax import internal_server_error, \
make_response as ajax_response, gone
from pgadmin.browser.server_groups.servers.databases.schemas.tables.utils \
@@ -492,7 +492,7 @@ class PartitionsView(BaseTableView, DataTypeReader, SchemaDiffObjectCompare):
# the partitioned(base) table.
target_data['orig_name'] = target_data['name']
target_data['name'] = 'temp_partitioned_{0}'.format(
random.randint(1, 9999999))
secrets.choice(range(1, 9999999)))
# For PG/EPAS 11 and above when we copy the data from original
# table to temporary table for schema diff, we will have to create
# a default partition to prevent the data loss.
@@ -515,7 +515,7 @@ class PartitionsView(BaseTableView, DataTypeReader, SchemaDiffObjectCompare):
# Create temporary name for partitions
for item in source_data['partitions']:
item['temp_partition_name'] = 'partition_{0}'.format(
random.randint(1, 9999999))
secrets.choice(range(1, 9999999)))
partition_data['partitions'] = source_data['partitions']

View File

@@ -6,7 +6,7 @@
# This software is released under the PostgreSQL Licence
#
##########################################################################
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
@@ -64,7 +64,7 @@ class AllServersGetTestCase(BaseTestGenerator):
if self.is_positive_test:
if hasattr(self, 'invalid_server_group'):
self.url = self.url + '{0}/{1}?_={1}'.format(
utils.SERVER_GROUP, random.randint(1, 9999999))
utils.SERVER_GROUP, secrets.choice(range(1, 9999999)))
elif hasattr(self, 'children'):
self.url = self.url + '{0}/{1}'.format(
@@ -83,7 +83,8 @@ class AllServersGetTestCase(BaseTestGenerator):
self.connect_to_server(url)
self.url = self.url + '{0}/{1}?_={2}'.format(
utils.SERVER_GROUP, server_id, random.randint(1, 9999999))
utils.SERVER_GROUP, server_id,
secrets.choice(range(1, 9999999)))
response = self.get_server()
self.assertEquals(response.status_code,
self.expected_data["status_code"])

View File

@@ -8,7 +8,7 @@
##########################################################################
import sys
import random
import secrets
from regression.python_test_utils import test_utils
from regression.feature_utils.locators import BrowserToolBarLocators
@@ -31,7 +31,8 @@ class BrowserToolBarFeatureTest(BaseFeatureTest):
def before(self):
self.page.wait_for_spinner_to_disappear()
self.page.add_server(self.server)
self.test_table_name = "test_table" + str(random.randint(1000, 3000))
self.test_table_name = "test_table" + str(
secrets.choice(range(1000, 3000)))
test_utils.create_table(self.server, self.test_db,
self.test_table_name)

View File

@@ -7,12 +7,10 @@
#
##########################################################################
import random
import time
import secrets
from selenium.webdriver import ActionChains
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from regression.python_test_utils import test_utils
from regression.feature_utils.base_feature_test import BaseFeatureTest
from regression.feature_utils.locators import QueryToolLocators
@@ -33,7 +31,8 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
# Create test table with random name to avoid same name conflicts in
# parallel execution
self.test_table_name = "test_table" + str(random.randint(1000, 3000))
self.test_table_name = "test_table" + \
str(secrets.choice(range(1000, 3000)))
self.page.add_server(self.server)
test_utils.create_table(
self.server, self.test_db, self.test_table_name)

View File

@@ -8,10 +8,9 @@
##########################################################################
import os
import random
import string
import sys
import time
import tempfile
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
@@ -41,12 +40,17 @@ class CheckFileManagerFeatureTest(BaseFeatureTest):
self.wait = WebDriverWait(self.page.driver, 10)
filename = self.server_information['type'] + \
str(self.server_information['server_version'])
self.XSS_FILE = '/<img src=x ' + filename + '=alert("1")>.sql'
self.XSS_FILE = '<img src=x ' + filename + '=alert("1")>.sql'
self.tmpDir = os.path.join(tempfile.gettempdir(), 'pga4_test')
# Create temp directory
if not os.path.exists(self.tmpDir):
os.makedirs(self.tmpDir)
if self.parallel_ui_tests:
xss_file_path = self.XSS_FILE
else:
xss_file_path = '/tmp/' + self.XSS_FILE
xss_file_path = os.path.join(self.tmpDir, self.XSS_FILE)
# Remove any previous file
if os.path.isfile(xss_file_path):
os.remove(xss_file_path)
@@ -82,9 +86,12 @@ class CheckFileManagerFeatureTest(BaseFeatureTest):
(By.XPATH, QueryToolLocators.change_file_types_dd_xpath)))
# Save the file
if not self.parallel_ui_tests:
self.page.fill_input_by_css_selector(
QueryToolLocators.folder_path_css, '',
key_after_input=Keys.ENTER)
self.page.fill_input_by_css_selector(
QueryToolLocators.folder_path_css,
"/tmp/", input_keys=True, key_after_input=Keys.ENTER)
self.tmpDir, input_keys=True, key_after_input=Keys.ENTER)
self.page.find_by_css_selector(
QueryToolLocators.folder_path_css).send_keys(Keys.ENTER)
input_file_path_ele = \
@@ -101,15 +108,19 @@ class CheckFileManagerFeatureTest(BaseFeatureTest):
(By.XPATH, QueryToolLocators.change_file_types_dd_xpath)))
# Open the file
if not self.parallel_ui_tests:
self.page.fill_input_by_css_selector(
QueryToolLocators.folder_path_css, '',
key_after_input=Keys.ENTER)
self.page.fill_input_by_css_selector(
QueryToolLocators.folder_path_css,
"/tmp/", key_after_input=Keys.ENTER)
self.tmpDir, key_after_input=Keys.ENTER)
self.page.find_by_css_selector(
QueryToolLocators.folder_path_css).send_keys(Keys.ENTER)
time.sleep(2)
self.page.fill_input_by_css_selector(
QueryToolLocators.search_file_edit_box_css, self.XSS_FILE)
QueryToolLocators.search_file_edit_box_css, self.XSS_FILE,
input_keys=True)
self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, QueryToolLocators.select_file_content_css)))

View File

@@ -7,9 +7,8 @@
#
##########################################################################
import random
import secrets
import os
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
@@ -67,9 +66,10 @@ class PGUtilitiesMaintenanceFeatureTest(BaseFeatureTest):
self.server['sslmode']
)
self.table_name = self.table_name + str(random.randint(100, 1000))
self.table_name = self.table_name + str(
secrets.choice(range(100, 1000)))
self.database_name = \
self.database_name + str(random.randint(100, 1000))
self.database_name + str(secrets.choice(range(100, 1000)))
test_utils.drop_database(connection, self.database_name)
test_utils.create_database(self.server, self.database_name)
test_utils.create_table(self.server, self.database_name,

View File

@@ -8,7 +8,7 @@
##########################################################################
import sys
import random
import secrets
from selenium.webdriver import ActionChains
from selenium.webdriver.common.keys import Keys
@@ -37,22 +37,22 @@ class QueryToolAutoCompleteFeatureTest(BaseFeatureTest):
self.page.add_server(self.server)
self.first_schema_name = "test_schema" + \
str(random.randint(1000, 2000))
str(secrets.choice(range(1000, 2000)))
test_utils.create_schema(self.server, self.test_db,
self.first_schema_name)
self.second_schema_name = "comp_schema" + \
str(random.randint(2000, 3000))
str(secrets.choice(range(2000, 3000)))
test_utils.create_schema(self.server, self.test_db,
self.second_schema_name)
self.first_table_name = "auto_comp_" + \
str(random.randint(1000, 2000))
str(secrets.choice(range(1000, 2000)))
test_utils.create_table(self.server, self.test_db,
self.first_table_name)
self.second_table_name = "auto_comp_" + \
str(random.randint(2000, 3000))
str(secrets.choice(range(2000, 3000)))
test_utils.create_table(self.server, self.test_db,
self.second_table_name)

View File

@@ -8,7 +8,7 @@
##########################################################################
import sys
import random
import secrets
import traceback
from selenium.webdriver import ActionChains
@@ -42,14 +42,15 @@ class QueryToolJourneyTest(BaseFeatureTest):
query_editor_tab_id = "id-query"
def before(self):
self.test_table_name = "test_table" + str(random.randint(1000, 3000))
self.test_table_name = "test_table" + str(
secrets.choice(range(1000, 3000)))
self.invalid_table_name = \
"table_that_doesnt_exist_" + str(random.randint(1000, 3000))
"table_that_doesnt_exist_" + str(secrets.choice(range(1000, 3000)))
test_utils.create_table(
self.server, self.test_db, self.test_table_name)
self.test_editable_table_name = "test_editable_table" + \
str(random.randint(1000, 3000))
str(secrets.choice(range(1000, 3000)))
create_sql = '''
CREATE TABLE "%s" (
pk_column NUMERIC PRIMARY KEY,

View File

@@ -7,7 +7,7 @@
#
##########################################################################
import random
import secrets
from regression.feature_utils.base_feature_test import BaseFeatureTest
from regression.python_test_utils import test_utils
@@ -28,7 +28,8 @@ class TableDdlFeatureTest(BaseFeatureTest):
self.page.add_server(self.server)
def runTest(self):
self.test_table_name = "test_table" + str(random.randint(1000, 3000))
self.test_table_name = "test_table" + str(
secrets.choice(range(1000, 3000)))
test_utils.create_table(self.server, self.test_db,
self.test_table_name)
self.page.expand_tables_node("Server", self.server['name'],

View File

@@ -7,7 +7,7 @@
#
##########################################################################
import random
import secrets
import time
from regression.feature_utils.base_feature_test import BaseFeatureTest
from regression.python_test_utils import test_utils
@@ -85,7 +85,8 @@ class CopySQLFeatureTest(BaseFeatureTest):
return query_tool_result
def _create_table(self):
self.test_table_name = "test_table" + str(random.randint(1000, 3000))
self.test_table_name = "test_table" + str(
secrets.choice(range(1000, 3000)))
test_utils.create_table(self.server, self.test_db,
self.test_table_name)
self.page.expand_tables_node("Server", self.server['name'],

View File

@@ -8,8 +8,7 @@
##########################################################################
import sys
import random
import time
import secrets
from regression.python_test_utils import test_utils
from regression.feature_utils.base_feature_test import BaseFeatureTest
@@ -44,7 +43,7 @@ class CheckForXssFeatureTest(BaseFeatureTest):
check_xss_chars_set2 = '&lt;script&gt;alert(1)&lt;/script&gt;'
def before(self):
self.test_table_name = "<h1>X" + str(random.randint(1000, 3000))
self.test_table_name = "<h1>X" + str(secrets.choice(range(1000, 3000)))
test_utils.create_type(
self.server, self.test_db, self.test_type_name,

View File

@@ -7,7 +7,7 @@
#
##########################################################################
import random
import secrets
from selenium.webdriver import ActionChains
from selenium.common.exceptions import TimeoutException
@@ -37,7 +37,7 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest):
# Some test function is needed for debugger
self.function_name = "a_test_function" + \
str(random.randint(10000, 65535))
str(secrets.choice(range(10000, 65535)))
test_utils.create_debug_function(
self.server, self.test_db, self.function_name
)

View File

@@ -7,7 +7,7 @@
#
##########################################################################
import random
import secrets
from regression.python_test_utils import test_utils
from regression.feature_utils.base_feature_test import BaseFeatureTest
@@ -35,7 +35,7 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest):
"Membership is not present in Postgres below PG v9.1")
# create role
self.role = "test_role" + str(random.randint(10000, 65535))
self.role = "test_role" + str(secrets.choice(range(10000, 65535)))
# Some test function is needed for debugger
test_utils.create_role(self.server, "postgres",

View File

@@ -171,11 +171,11 @@ class BatchProcess(object):
)
def random_number(size):
import random
import secrets
import string
return ''.join(
random.choice(
secrets.choice(
string.ascii_uppercase + string.digits
) for _ in range(size)
)

View File

@@ -9,7 +9,7 @@
# Azure implementation
import config
import random
import secrets
from pgadmin.misc.cloud.utils import _create_server, CloudProcessDesc
from pgadmin.misc.bgprocess.processes import BatchProcess
from pgadmin import make_json_response
@@ -260,7 +260,7 @@ class Azure:
self._availability_zone = None
self._available_capabilities_list = []
self.azure_cache_name = current_user.username \
+ str(random.randint(1, 9999)) + "_msal.cache"
+ str(secrets.choice(range(1, 9999))) + "_msal.cache"
self.azure_cache_location = config.AZURE_CREDENTIAL_CACHE_DIR + '/'
##########################################################################

View File

@@ -11,7 +11,7 @@
import os
import os.path
import random
import secrets
import string
import time
from urllib.parse import unquote
@@ -19,7 +19,6 @@ from sys import platform as _platform
import config
import codecs
import pathlib
from werkzeug.exceptions import InternalServerError
import simplejson as json
from flask import render_template, Response, session, request as req, \
@@ -434,7 +433,7 @@ class Filemanager(object):
}
# Create a unique id for the transaction
trans_id = str(random.randint(1, 9999999))
trans_id = str(secrets.choice(range(1, 9999999)))
if 'fileManagerData' not in session:
file_manager_data = dict()

View File

@@ -9,9 +9,8 @@
import config
import string
import random
import secrets
import os
import re
import getpass
from pgadmin.utils.constants import ENTER_EMAIL_ADDRESS
@@ -22,7 +21,7 @@ def user_info_desktop():
print("NOTE: Configuring authentication for DESKTOP mode.")
email = config.DESKTOP_USER
p1 = ''.join([
random.choice(string.ascii_letters + string.digits)
secrets.choice(string.ascii_letters + string.digits)
for _ in range(32)
])
return email, p1

View File

@@ -8,7 +8,7 @@
##########################################################################
import time
import random
import secrets
import simplejson as json
@@ -32,7 +32,7 @@ def run_backup_job(tester, job_id, expected_params, assert_in, assert_not_in,
break
# Check the process list
response1 = tester.get('/misc/bgprocess/?_={0}'.format(
random.randint(1, 9999999)))
secrets.choice(range(1, 9999999))))
assert_equal(response1.status_code, 200)
process_list = json.loads(response1.data.decode('utf-8'))
@@ -69,12 +69,12 @@ def run_backup_job(tester, job_id, expected_params, assert_in, assert_not_in,
# Check the process details
p_details = tester.get('/misc/bgprocess/{0}?_={1}'.format(
job_id, random.randint(1, 9999999))
job_id, secrets.choice(range(1, 9999999)))
)
assert_equal(p_details.status_code, 200)
p_details = tester.get('/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, 0, 0, random.randint(1, 9999999))
job_id, 0, 0, secrets.choice(range(1, 9999999)))
)
assert_equal(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))
@@ -88,7 +88,7 @@ def run_backup_job(tester, job_id, expected_params, assert_in, assert_not_in,
p_details = tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, out, err, random.randint(1, 9999999))
job_id, out, err, secrets.choice(range(1, 9999999)))
)
assert_equal(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))

View File

@@ -10,12 +10,11 @@
"""A blueprint module implementing the debugger"""
import simplejson as json
import random
import secrets
import re
import copy
from flask import url_for, Response, render_template, request, \
current_app
from flask import render_template, request, current_app
from flask_babel import gettext
from flask_security import login_required
from werkzeug.useragents import UserAgent
@@ -34,8 +33,8 @@ from pgadmin.model import db, DebuggerFunctionArguments
from pgadmin.tools.debugger.utils.debugger_instance import DebuggerInstance
from pgadmin.browser.server_groups.servers.databases.extensions.utils \
import get_extension_details
from pgadmin.utils.constants import PREF_LABEL_DISPLAY, \
PREF_LABEL_KEYBOARD_SHORTCUTS, MIMETYPE_APP_JS, SERVER_CONNECTION_CLOSED
from pgadmin.utils.constants import PREF_LABEL_KEYBOARD_SHORTCUTS, \
SERVER_CONNECTION_CLOSED
from pgadmin.preferences import preferences
MODULE_NAME = 'debugger'
@@ -778,7 +777,7 @@ def initialize_target(debug_type, trans_id, sid, did,
"""
# Create asynchronous connection using random connection id.
conn_id = str(random.randint(1, 9999999))
conn_id = str(secrets.choice(range(1, 9999999)))
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(sid)
conn = manager.connection(did=did, conn_id=conn_id)
data_obj = {}
@@ -1352,7 +1351,7 @@ def start_execution(trans_id, port_num):
)
# Create asynchronous connection using random connection id.
exe_conn_id = str(random.randint(1, 9999999))
exe_conn_id = str(secrets.choice(range(1, 9999999)))
try:
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(
de_inst.debugger_data['server_id'])

View File

@@ -9,7 +9,7 @@
from flask import session
from threading import Lock
import random
import secrets
debugger_sessions_lock = Lock()
@@ -17,7 +17,7 @@ debugger_sessions_lock = Lock()
class DebuggerInstance(object):
def __init__(self, trans_id=None):
if trans_id is None:
self._trans_id = str(random.randint(1, 9999999))
self._trans_id = str(secrets.choice(range(1, 9999999)))
else:
self._trans_id = str(trans_id)

View File

@@ -8,7 +8,7 @@
##########################################################################
import uuid
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils as utils
from regression import parent_node_dict
@@ -34,7 +34,7 @@ class ERDClose(BaseTestGenerator):
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database to add the schema.")
trans_id = random.randint(1, 9999999)
trans_id = secrets.choice(range(1, 9999999))
url = '/erd/initialize/{trans_id}/{sgid}/{sid}/{did}'.format(
trans_id=trans_id, sgid=self.sgid, sid=self.sid, did=self.did)

View File

@@ -9,7 +9,7 @@
import json
import uuid
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils as utils
from regression import parent_node_dict
@@ -35,7 +35,7 @@ class ERDInitialize(BaseTestGenerator):
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database to add the schema.")
trans_id = random.randint(1, 9999999)
trans_id = secrets.choice(range(1, 9999999))
url = '/erd/initialize/{trans_id}/{sgid}/{sid}/{did}'.format(
trans_id=trans_id, sgid=self.sgid, sid=self.sid, did=self.did)

View File

@@ -8,13 +8,11 @@
##########################################################################
import uuid
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils as utils
from regression import parent_node_dict
from regression.test_setup import config_data
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
class ERDPanel(BaseTestGenerator):
@@ -26,7 +24,7 @@ class ERDPanel(BaseTestGenerator):
self.sgid = config_data["server_group"]
def runTest(self):
trans_id = random.randint(1, 9999999)
trans_id = secrets.choice(range(1, 9999999))
url = '/erd/panel/{trans_id}?sgid={sgid}&sid={sid}&server_type=pg' \
'&did={did}&gen=false'.\
format(trans_id=trans_id, sgid=self.sgid, sid=self.sid,

View File

@@ -9,7 +9,7 @@
import json
import uuid
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils as utils
from regression import parent_node_dict
@@ -35,7 +35,7 @@ class ERDPrequisite(BaseTestGenerator):
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database to add the schema.")
trans_id = random.randint(1, 9999999)
trans_id = secrets.choice(range(1, 9999999))
url = '/erd/prequisite/{trans_id}/{sgid}/{sid}/{did}'.format(
trans_id=trans_id, sgid=self.sgid, sid=self.sid, did=self.did)

View File

@@ -9,7 +9,7 @@
import json
import uuid
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils as utils
from regression import parent_node_dict
@@ -63,7 +63,7 @@ class ERDSql(BaseTestGenerator):
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database to add the schema.")
trans_id = random.randint(1, 9999999)
trans_id = secrets.choice(range(1, 9999999))
url = '/erd/sql/{trans_id}/{sgid}/{sid}/{did}'.format(
trans_id=trans_id, sgid=self.sgid, sid=self.sid, did=self.did)

View File

@@ -9,7 +9,7 @@
import json
import uuid
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils as utils
from regression import parent_node_dict
@@ -65,7 +65,7 @@ class ERDTables(BaseTestGenerator):
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database to add the schema.")
trans_id = random.randint(1, 9999999)
trans_id = secrets.choice(range(1, 9999999))
url = '/erd/tables/{trans_id}/{sgid}/{sid}/{did}'.format(
trans_id=trans_id, sgid=self.sgid, sid=self.sid, did=self.did)

View File

@@ -8,10 +8,9 @@
##########################################################################
import time
import random
import secrets
import simplejson as json
import uuid
import re
from regression import parent_node_dict
@@ -46,7 +45,7 @@ def run_import_export_job(tester, job_id, expected_params, assert_in,
break
# Check the process list
response1 = tester.get('/misc/bgprocess/?_={0}'.format(
random.randint(1, 9999999)))
secrets.choice(range(1, 9999999))))
assert_equal(response1.status_code, 200)
process_list = json.loads(response1.data.decode('utf-8'))
@@ -89,12 +88,12 @@ def run_import_export_job(tester, job_id, expected_params, assert_in,
# Check the process details
p_details = tester.get('/misc/bgprocess/{0}?_={1}'.format(
job_id, random.randint(1, 9999999))
job_id, secrets.choice(range(1, 9999999)))
)
assert_equal(p_details.status_code, 200)
p_details = tester.get('/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, 0, 0, random.randint(1, 9999999))
job_id, 0, 0, secrets.choice(range(1, 9999999)))
)
assert_equal(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))
@@ -108,7 +107,7 @@ def run_import_export_job(tester, job_id, expected_params, assert_in,
p_details = tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, out, err, random.randint(1, 9999999))
job_id, out, err, secrets.choice(range(1, 9999999)))
)
assert_equal(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))

View File

@@ -12,9 +12,9 @@ functionality"""
import json
import os
import random
import secrets
from flask import url_for, Response, render_template, request
from flask import Response, render_template, request
from flask_babel import gettext as _
from flask_security import login_required, current_user
from pgadmin.utils import PgAdminModule
@@ -138,7 +138,8 @@ def load_servers():
if 'Servers' in data:
for server in data["Servers"]:
obj = data["Servers"][server]
server_id = server + '_' + str(random.randint(1, 9999))
server_id = server + '_' + str(
secrets.choice(range(1, 9999)))
if obj['Group'] in groups:
groups[obj['Group']]['children'].append(

View File

@@ -8,7 +8,7 @@
##########################################################################
import time
import random
import secrets
import simplejson as json
import os
@@ -80,7 +80,7 @@ class MaintenanceJobTest(BaseTestGenerator):
break
# Check the process list
response1 = self.tester.get('/misc/bgprocess/?_={0}'.format(
random.randint(1, 9999999)))
secrets.choice(range(1, 9999999))))
self.assertEqual(response1.status_code, 200)
process_list = json.loads(response1.data.decode('utf-8'))
@@ -105,13 +105,13 @@ class MaintenanceJobTest(BaseTestGenerator):
# Check the process details
p_details = self.tester.get('/misc/bgprocess/{0}?_={1}'.format(
job_id, random.randint(1, 9999999))
job_id, secrets.choice(range(1, 9999999)))
)
self.assertEqual(p_details.status_code, 200)
p_details = self.tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, 0, 0, random.randint(1, 9999999)
job_id, 0, 0, secrets.choice(range(1, 9999999))
)
)
self.assertEqual(p_details.status_code, 200)
@@ -125,7 +125,7 @@ class MaintenanceJobTest(BaseTestGenerator):
p_details = self.tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, out, err, random.randint(1, 9999999))
job_id, out, err, secrets.choice(range(1, 9999999)))
)
self.assertEqual(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))

View File

@@ -1,5 +1,5 @@
import uuid
import random
import secrets
import sys
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils as utils
@@ -19,7 +19,7 @@ class PSQLPanel(BaseTestGenerator):
def runTest(self):
if sys.platform == 'win32':
self.skipTest('PSQL disabled for windows')
trans_id = random.randint(1, 9999999)
trans_id = secrets.choice(range(1, 9999999))
url = '/psql/panel/{trans_id}?sgid={sgid}&sid={sid}&did={did}' \
'&server_type=pg&db={db_name}&theme={theme}'.\
format(trans_id=trans_id, sgid=self.sgid, sid=self.sid,

View File

@@ -35,9 +35,6 @@ class PSQLStartProcess(BaseTestGenerator):
assert received[0]['name'] == 'connected'
assert received[0]['args'][0]['sid'] != ''
import random
trans_id = random.randint(1, 9999999)
data = {
'sid': self.sid,
'db': 'postgres',

View File

@@ -8,7 +8,7 @@
##########################################################################
import time
import random
import secrets
import os
import simplejson as json
@@ -120,7 +120,7 @@ class RestoreJobTest(BaseTestGenerator):
break
# Check the process list
response1 = self.tester.get('/misc/bgprocess/?_={0}'.format(
random.randint(1, 9999999)))
secrets.choice(range(1, 9999999))))
self.assertEqual(response1.status_code, 200)
process_list = json.loads(response1.data.decode('utf-8'))
@@ -150,14 +150,14 @@ class RestoreJobTest(BaseTestGenerator):
# Check the process details
p_details = self.tester.get('/misc/bgprocess/{0}?_={1}'.format(
job_id, random.randint(1, 9999999))
job_id, secrets.choice(range(1, 9999999)))
)
self.assertEqual(p_details.status_code, 200)
json.loads(p_details.data.decode('utf-8'))
p_details = self.tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, 0, 0, random.randint(1, 9999999)
job_id, 0, 0, secrets.choice(range(1, 9999999))
)
)
self.assertEqual(p_details.status_code, 200)
@@ -172,7 +172,7 @@ class RestoreJobTest(BaseTestGenerator):
p_details = self.tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, out, err, random.randint(1, 9999999))
job_id, out, err, secrets.choice(range(1, 9999999)))
)
self.assertEqual(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))

View File

@@ -10,7 +10,7 @@
"""A blueprint module implementing the schema_diff frame."""
import simplejson as json
import pickle
import random
import secrets
import copy
from flask import Response, session, url_for, request
@@ -200,7 +200,7 @@ def initialize():
trans_id = None
try:
# Create a unique id for the transaction
trans_id = str(random.randint(1, 9999999))
trans_id = str(secrets.choice(range(1, 9999999)))
if 'schemaDiff' not in session:
schema_diff_data = dict()

View File

@@ -10,7 +10,7 @@
import uuid
import json
import os
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
@@ -146,7 +146,7 @@ class SchemaDiffTestCase(BaseTestGenerator):
response_data = self.compare()
diff_file = os.path.join(self.sql_folder, 'diff_{0}.sql'.format(
str(random.randint(1, 99999))))
str(secrets.choice(range(1, 99999)))))
file_obj = open(diff_file, 'a')
for diff in response_data['data']:

View File

@@ -11,7 +11,7 @@
import os
import pickle
import re
import random
import secrets
from urllib.parse import unquote
from threading import Lock
@@ -203,7 +203,7 @@ def initialize_viewdata(trans_id, cmd_type, obj_type, sgid, sid, did, obj_id):
filter_sql = request.args or request.form
# Create asynchronous connection using random connection id.
conn_id = str(random.randint(1, 9999999))
conn_id = str(secrets.choice(range(1, 9999999)))
try:
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(sid)
# default_conn is same connection which is created when user connect to
@@ -404,7 +404,7 @@ def _connect(conn, **kwargs):
def _init_sqleditor(trans_id, connect, sgid, sid, did, **kwargs):
# Create asynchronous connection using random connection id.
conn_id = str(random.randint(1, 9999999))
conn_id = str(secrets.choice(range(1, 9999999)))
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(sid)
@@ -503,7 +503,7 @@ def update_sqleditor_connection(trans_id, sgid, sid, did):
req_args['recreate'] == '1'):
connect = False
new_trans_id = str(random.randint(1, 9999999))
new_trans_id = str(secrets.choice(range(1, 9999999)))
kwargs = {
'user': data['user'],
'role': data['role'] if 'role' in data else None,

View File

@@ -15,7 +15,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from regression.python_test_utils import test_utils
import json
from pgadmin.utils import server_utils
import random
import secrets
class TestDownloadCSV(BaseTestGenerator):
@@ -96,7 +96,8 @@ class TestDownloadCSV(BaseTestGenerator):
]
def setUp(self):
self._db_name = 'download_results_' + str(random.randint(10000, 65535))
self._db_name = 'download_results_' + str(
secrets.choice(range(10000, 65535)))
self._sid = self.server_information['server_id']
server_con = server_utils.connect_server(self, self._sid)
@@ -133,7 +134,7 @@ class TestDownloadCSV(BaseTestGenerator):
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = self.init_url.format(
self.trans_id, test_utils.SERVER_GROUP, self._sid, self._did)
response = self.tester.post(url)

View File

@@ -14,7 +14,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
import random
import secrets
class TestEditorHistory(BaseTestGenerator):
@@ -69,7 +69,7 @@ class TestEditorHistory(BaseTestGenerator):
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'.format(
self.trans_id, utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)

View File

@@ -14,7 +14,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from regression.python_test_utils import test_utils
import json
from pgadmin.utils import server_utils
import random
import secrets
class TestEncodingCharset(BaseTestGenerator):
@@ -237,7 +237,7 @@ class TestEncodingCharset(BaseTestGenerator):
def setUp(self):
self.encode_db_name = 'encoding_' + self.db_encoding + \
str(random.randint(10000, 65535))
str(secrets.choice(range(10000, 65535)))
self.encode_sid = self.server_information['server_id']
server_con = server_utils.connect_server(self, self.encode_sid)
@@ -261,7 +261,7 @@ class TestEncodingCharset(BaseTestGenerator):
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'\
.format(self.trans_id, test_utils.SERVER_GROUP, self.encode_sid,
self.encode_did)

View File

@@ -8,7 +8,7 @@
##########################################################################
import json
import random
import secrets
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
@@ -33,7 +33,7 @@ class TestExplainPlan(BaseTestGenerator):
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'.format(
self.trans_id, utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)

View File

@@ -14,7 +14,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
import random
import secrets
class TestMacros(BaseTestGenerator):
@@ -105,7 +105,7 @@ class TestMacros(BaseTestGenerator):
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'.format(
self.trans_id, utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)

View File

@@ -14,7 +14,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
import random
import secrets
class TestPollQueryTool(BaseTestGenerator):
@@ -76,7 +76,7 @@ NOTICE: Hello, world!
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'.format(
self.trans_id, utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)

View File

@@ -7,8 +7,7 @@
#
##########################################################################
import sys
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils
@@ -66,7 +65,7 @@ class TestSQLASCIIEncoding(BaseTestGenerator):
def setUp(self):
self.encode_db_name = 'test_encoding_' + self.db_encoding + \
str(random.randint(1000, 65535))
str(secrets.choice(range(1000, 65535)))
self.encode_sid = self.server_information['server_id']
server_con = server_utils.connect_server(self, self.encode_sid)

View File

@@ -8,7 +8,7 @@
##########################################################################
import json
import random
import secrets
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
@@ -303,7 +303,7 @@ class TestTransactionControl(BaseTestGenerator):
raise Exception("Could not connect to the database.")
def _initialize_query_tool(self):
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'.format(
self.trans_id, utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)
@@ -320,7 +320,7 @@ class TestTransactionControl(BaseTestGenerator):
def _create_test_table(self):
test_table_name = "test_for_updatable_resultset" + \
str(random.randint(1000, 9999))
str(secrets.choice(range(1000, 9999)))
create_sql = """
DROP TABLE IF EXISTS "%s";

View File

@@ -9,7 +9,7 @@
import uuid
import json
import random
import secrets
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
@@ -96,7 +96,7 @@ class TestViewData(BaseTestGenerator):
table_id = result[0][0]
# Initialize query tool
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/viewdata/{0}/3/table/{1}/{2}/{3}/{4}' \
.format(self.trans_id, test_utils.SERVER_GROUP, self.server_id,
self.db_id, table_id)

View File

@@ -10,7 +10,7 @@
"""Start executing the query in async mode."""
import pickle
import random
import secrets
from flask import Response
from flask_babel import gettext
@@ -35,7 +35,7 @@ class StartRunningQuery:
def __init__(self, blueprint_object, logger):
self.http_session = None
self.blueprint_object = blueprint_object
self.connection_id = str(random.randint(1, 9999999))
self.connection_id = str(secrets.choice(range(1, 9999999)))
self.logger = logger
def execute(self, sql, trans_id, http_session, connect=False):

View File

@@ -7,8 +7,7 @@
#
##########################################################################
import json
import random
import secrets
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
@@ -133,7 +132,7 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
def setUp(self):
self.test_table_name = "test_for_updatable_resultset" + \
str(random.randint(1000, 9999))
str(secrets.choice(range(1000, 9999)))
self._initialize_database_connection()
self._initialize_query_tool()
self._initialize_urls()
@@ -202,7 +201,7 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
raise Exception("Could not connect to the database.")
def _initialize_query_tool(self):
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'.format(
self.trans_id, utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)

View File

@@ -8,7 +8,7 @@
##########################################################################
import json
import random
import secrets
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
@@ -920,7 +920,7 @@ class TestSaveChangedData(BaseTestGenerator):
raise Exception("Could not connect to the database.")
def _initialize_query_tool(self):
self.trans_id = str(random.randint(1, 9999999))
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'.format(
self.trans_id, utils.SERVER_GROUP, self.server_id, self.db_id)
response = self.tester.post(url)
@@ -934,7 +934,7 @@ class TestSaveChangedData(BaseTestGenerator):
def _create_test_table(self):
self.test_table_name = "test_for_save_data" + \
str(random.randint(1000, 9999))
str(secrets.choice(range(1000, 9999)))
create_sql = """
DROP TABLE IF EXISTS "%s";

View File

@@ -13,17 +13,16 @@ It is a wrapper around the actual psycopg2 driver, and connection
object.
"""
import random
import secrets
import select
import datetime
from collections import deque
import psycopg2
from flask import g, current_app, session
from flask import g, current_app
from flask_babel import gettext
from flask_security import current_user
from pgadmin.utils.crypto import decrypt, encrypt
from pgadmin.utils.crypto import decrypt
from psycopg2.extensions import encodings
from os import environ
import config
from pgadmin.model import User
@@ -39,7 +38,6 @@ from .encoding import get_encoding, configure_driver_encodings
from pgadmin.utils import csv
from pgadmin.utils.master_password import get_crypt_key
from io import StringIO
from pgadmin.utils.constants import KERBEROS
from pgadmin.utils.locker import ConnectionLocker
_ = gettext
@@ -968,7 +966,7 @@ WHERE db.datname = current_database()""")
if not status:
return False, str(cur)
query_id = random.randint(1, 9999999)
query_id = secrets.choice(range(1, 9999999))
dsn = self.conn.get_dsn_parameters()
current_app.logger.log(
@@ -1042,7 +1040,7 @@ WHERE db.datname = current_database()""")
if not status:
return False, str(cur)
query_id = random.randint(1, 9999999)
query_id = secrets.choice(range(1, 9999999))
encoding = self.python_encoding
@@ -1114,7 +1112,7 @@ WHERE db.datname = current_database()""")
if not status:
return False, str(cur)
query_id = random.randint(1, 9999999)
query_id = secrets.choice(range(1, 9999999))
dsn = self.conn.get_dsn_parameters()
current_app.logger.log(
@@ -1202,7 +1200,7 @@ WHERE db.datname = current_database()""")
if not status:
return False, str(cur)
query_id = random.randint(1, 9999999)
query_id = secrets.choice(range(1, 9999999))
dsn = self.conn.get_dsn_parameters()
current_app.logger.log(
25,
@@ -1261,7 +1259,7 @@ WHERE db.datname = current_database()""")
if not status:
return False, str(cur)
query_id = random.randint(1, 9999999)
query_id = secrets.choice(range(1, 9999999))
dsn = self.conn.get_dsn_parameters()
current_app.logger.log(
25,

View File

@@ -9,7 +9,7 @@
import os
import subprocess
import signal
import random
import secrets
import time
from selenium.common.exceptions import WebDriverException
@@ -26,7 +26,7 @@ class AppStarter:
def start_app(self):
""" This function start the subprocess to start pgAdmin app """
random_server_port = str(random.randint(10000, 65535))
random_server_port = str(secrets.choice(range(10000, 65535)))
env = {
"PGADMIN_INT_PORT": random_server_port,
"SQLITE_PATH": str(self.app_config.TEST_SQLITE_PATH)

View File

@@ -16,7 +16,7 @@ import psycopg2
import sqlite3
import shutil
from functools import partial
import random
import secrets
import importlib
from selenium.webdriver.support.wait import WebDriverWait
@@ -1740,7 +1740,7 @@ def create_users_for_parallel_tests(tester):
@param tester: test client
@return: uer details dict
"""
login_username = 'ui_test_user' + str(random.randint(1000, 9999)) +\
login_username = 'ui_test_user' + str(secrets.choice(range(1000, 9999))) +\
'@edb.com'
user_details = {'login_username': login_username,
'login_password': 'adminedb'}

View File

@@ -18,7 +18,7 @@ import signal
import sys
import traceback
import json
import random
import secrets
import threading
import time
import unittest
@@ -478,7 +478,7 @@ def execute_test(test_module_list_passed, server_passed, driver_passed,
# parallel execution on different platforms. This database will be
# used across all feature tests.
test_db_name = "acceptance_test_db" + \
str(random.randint(10000, 65535))
str(secrets.choice(range(10000, 65535)))
connection = test_utils.get_db_connection(
server_passed['db'],
server_passed['username'],