Improve code coverage and API test cases for Tables. Fixes #5774.

This commit is contained in:
Yogesh Mahajan 2020-09-03 12:35:56 +05:30 committed by Akshay Joshi
parent ddf5b33219
commit 3e35dc95e5
19 changed files with 3079 additions and 602 deletions

View File

@ -18,6 +18,7 @@ Housekeeping
| `Issue #5332 <https://redmine.postgresql.org/issues/5332>`_ - Improve code coverage and API test cases for Columns and Constraints (Index, Foreign Key, Check, Exclusion).
| `Issue #5344 <https://redmine.postgresql.org/issues/5344>`_ - Improve code coverage and API test cases for Grant Wizard.
| `Issue #5774 <https://redmine.postgresql.org/issues/5774>`_ - Improve code coverage and API test cases for Tables.
Bug fixes
*********

View File

@ -1,96 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import uuid
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableUpdateColumnTestCase(BaseTestGenerator):
"""This class will update the column node from table"""
scenarios = [
# Fetching default URL for table node.
('Add privileges for existing column',
dict(url='/browser/table/obj/')
)
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "test_table_column_put_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(
self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will fetch added table under schema node."""
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id)
if not table_response:
raise Exception("Could not find the table to update.")
data = {
"columns": {
"changed": [{
"attnum": 1,
"attacl": {
"added": [{
"grantee": self.server["username"],
"grantor": self.server["username"],
"privileges": [
{"privilege_type": "a", "privilege": True,
"with_grant": True},
{"privilege_type": "r", "privilege": True,
"with_grant": True},
{"privilege_type": "w", "privilege": True,
"with_grant": True},
{"privilege_type": "x", "privilege": True,
"with_grant": True
}
]
}]
}
}]
}
}
response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) + '/' +
str(self.schema_id) + '/' + str(self.table_id),
data=json.dumps(data), follow_redirects=True)
self.assertEqual(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -9,6 +9,7 @@
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@ -23,159 +24,38 @@ from . import utils as tables_utils
class TableAddTestCase(BaseTestGenerator):
""" This class will add new collation under schema node. """
scenarios = [
# Fetching default URL for table node.
('Create Table', dict(url='/browser/table/obj/')),
('Create Range partitioned table with 2 partitions',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='range',
skip_msg='Partitioned table are not supported by '
'PPAS/PG 10.0 and below.'
)
),
('Create Range partitioned table with 1 default and 2'
' value based partition',
dict(url='/browser/table/obj/',
server_min_version=110000,
partition_type='range',
is_default=True,
skip_msg='Partitioned table are not supported by '
'PPAS/PG 10.0 and below.'
)
),
('Create Multilevel Range partitioned table with subpartition table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='range',
multilevel_partition=True,
skip_msg='Partitioned table are not supported by '
'PPAS/PG 10.0 and below.'
)
),
('Create List partitioned table with 2 partitions',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='list',
skip_msg='Partitioned table are not supported by '
'PPAS/PG 10.0 and below.'
)
),
('Create Multilevel List partitioned table with subpartition table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='list',
multilevel_partition=True,
skip_msg='Partitioned table are not supported by '
'PPAS/PG 10.0 and below.'
)
),
('Create Hash partitioned table with 2 partitions',
dict(url='/browser/table/obj/',
server_min_version=110000,
partition_type='hash',
skip_msg='Hash Partition are not supported by '
'PPAS/PG 11.0 and below.'
)
),
('Create Table with Identity columns',
dict(url='/browser/table/obj/',
server_min_version=100000,
skip_msg='Identity columns are not supported by '
'PPAS/PG 10.0 and below.',
columns=[{
'name': 'iden_always',
'cltype': 'bigint',
'attacl': [],
'is_primary_key': False,
'attnotnull': True,
'attlen': None,
'attprecision': None,
'attoptions': [],
'seclabels': [],
'colconstype': 'i',
'attidentity': 'a',
'seqincrement': 1,
'seqstart': 1,
'seqmin': 1,
'seqmax': 10,
'seqcache': 1,
'seqcycle': True
}, {
'name': 'iden_default',
'cltype': 'bigint',
'attacl': [],
'is_primary_key': False,
'attnotnull': True,
'attlen': None,
'attprecision': None,
'attoptions': [],
'seclabels': [],
'colconstype': 'i',
'attidentity': 'd',
'seqincrement': 2,
'seqstart': 2,
'seqmin': 2,
'seqmax': 2000,
'seqcache': 1,
'seqcycle': True
}])
),
('Create Table with Generated columns',
dict(url='/browser/table/obj/',
server_min_version=120000,
skip_msg='Generated columns are not supported by '
'PPAS/PG 12.0 and below.',
columns=[{
'name': 'm1',
'cltype': 'bigint',
'attacl': [],
'is_primary_key': False,
'attoptions': [],
'seclabels': []
}, {
'name': 'm2',
'cltype': 'bigint',
'attacl': [],
'is_primary_key': False,
'attoptions': [],
'seclabels': []
}, {
'name': 'genrated',
'cltype': 'bigint',
'attacl': [],
'is_primary_key': False,
'attnotnull': True,
'attlen': None,
'attprecision': None,
'attoptions': [],
'seclabels': [],
'colconstype': 'g',
'genexpr': 'm1*m2'
}])
)
url = '/browser/table/obj/'
]
# Generates scenarios
scenarios = utils.generate_scenarios("table_create",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Check server version
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
if hasattr(self, 'server_min_version'):
if "server_min_version" in self.inventory_data:
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add "
"partitioned table.")
if server_con["data"]["version"] < self.server_min_version:
self.skipTest(self.skip_msg)
if server_con["data"]["version"] < \
self.inventory_data["server_min_version"]:
self.skipTest(self.inventory_data["skip_msg"])
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -186,15 +66,20 @@ class TableAddTestCase(BaseTestGenerator):
def runTest(self):
""" This function will add table under schema node. """
if "table_name" in self.data:
self.table_name = self.data["table_name"]
else:
self.table_name = "test_table_add_%s" % (str(uuid.uuid4())[1:8])
db_user = self.server["username"]
self.table_name = "test_table_add_%s" % (str(uuid.uuid4())[1:8])
# Get the common data
data = tables_utils.get_table_common_data()
self.data.update(tables_utils.get_table_common_data())
if self.server_information and \
'server_version' in self.server_information and \
self.server_information['server_version'] >= 120000:
data['spcname'] = None
data.update({
self.data['spcname'] = None
self.data.update({
"name": self.table_name,
"relowner": db_user,
"schema": self.schema_name,
@ -217,37 +102,26 @@ class TableAddTestCase(BaseTestGenerator):
}]
})
# If column is provided in the scenario then use those columns
if hasattr(self, 'columns'):
data['columns'] = self.columns
if hasattr(self, 'partition_type'):
data['partition_type'] = self.partition_type
data['is_partitioned'] = True
if self.partition_type == 'range':
if hasattr(self, 'is_default'):
tables_utils.get_range_partitions_data(data, 'Default')
elif hasattr(self, 'multilevel_partition'):
tables_utils.get_range_partitions_data(
data, None, True)
else:
tables_utils.get_range_partitions_data(data)
elif self.partition_type == 'list':
if hasattr(self, 'multilevel_partition'):
tables_utils.get_list_partitions_data(data, True)
else:
tables_utils.get_list_partitions_data(data)
else:
tables_utils.get_hash_partitions_data(data)
# Add table
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEqual(response.status_code, 200)
if self.is_positive_test:
response = tables_utils.api_create(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = tables_utils.api_create(self)
else:
if self.table_name == "":
del self.data["name"]
response = tables_utils.api_create(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -8,6 +8,7 @@
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@ -21,12 +22,17 @@ from . import utils as tables_utils
class TableDeleteTestCase(BaseTestGenerator):
"""This class will delete new table under schema node."""
scenarios = [
# Fetching default URL for table node.
('Delete Table', dict(url='/browser/table/obj/'))
]
url = '/browser/table/obj/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_delete",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@ -35,6 +41,8 @@ class TableDeleteTestCase(BaseTestGenerator):
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -42,24 +50,53 @@ class TableDeleteTestCase(BaseTestGenerator):
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_delete_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will delete added table under schema node."""
# Create table
if self.is_list:
self.table_name_1 = \
"test_table_delete_%s" % (str(uuid.uuid4())[1:8])
self.table_id_1 = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name_1
)
# Verify table creation
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id)
if not table_response:
raise Exception("Could not find the table to delete.")
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id),
follow_redirects=True)
self.assertEqual(response.status_code, 200)
def runTest(self):
"""This function will delete added table under schema node."""
if self.is_positive_test:
if self.is_list:
self.data["ids"] = [self.table_id, self.table_id_1]
response = tables_utils.api_delete(self, "")
else:
response = tables_utils.api_delete(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = tables_utils.api_delete(self)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_delete(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -0,0 +1,75 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableGetDependenciesDependentsTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_dependencies_dependents",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will delete added table under schema node."""
if self.is_positive_test:
if self.is_dependent:
self.url = self.url + 'dependent/'
response = tables_utils.api_get(self)
else:
self.url = self.url + 'dependency/'
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -8,6 +8,7 @@
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@ -21,12 +22,17 @@ from . import utils as tables_utils
class TableGetTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
scenarios = [
# Fetching default URL for table node.
('Fetch table Node URL', dict(url='/browser/table/obj/'))
]
url = '/browser/table/obj/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_get",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@ -35,6 +41,8 @@ class TableGetTestCase(BaseTestGenerator):
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -42,20 +50,49 @@ class TableGetTestCase(BaseTestGenerator):
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
# Create table
if self.is_list:
self.table_name_1 = \
"test_table_delete_%s" % (str(uuid.uuid4())[1:8])
self.table_id_1 = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name_1
)
def runTest(self):
"""This function will delete added table under schema node."""
response = self.tester.get(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id),
follow_redirects=True)
self.assertEqual(response.status_code, 200)
if self.is_positive_test:
if self.is_list:
response = tables_utils.api_get(self, "")
else:
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
if self.is_list:
response = tables_utils.api_get(self, "")
else:
response = tables_utils.api_get(self)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -0,0 +1,98 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableGetTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/'
# Generates scenarios
scenarios = utils.generate_scenarios(
"test_table_get_existing_table_actions", tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Update url
self.url = self.url + self.add_to_url
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
# Check Server version
if "server_min_version" in self.inventory_data:
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add "
"partitioned table.")
if server_con["data"]["version"] < \
self.inventory_data["server_min_version"]:
self.skipTest(self.inventory_data["skip_msg"])
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will delete added table under schema node."""
if self.is_positive_test:
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = tables_utils.api_get(self)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,98 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableGetNodesTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/nodes/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_get_nodes",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
# Create table
if self.is_list:
self.table_name_1 = \
"test_table_delete_%s" % (str(uuid.uuid4())[1:8])
self.table_id_1 = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name_1)
def runTest(self):
"""This function will delete added table under schema node."""
if self.is_positive_test:
if self.is_list:
response = tables_utils.api_get(self, "")
else:
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
if self.is_list:
response = tables_utils.api_get(self, "")
else:
response = tables_utils.api_get(self)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,104 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableGetPreTablecreationParametersTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/'
# Generates scenarios
scenarios = utils.generate_scenarios(
"table_get_pre_table_creation_parameters",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Update url
self.url = self.url + self.add_to_url
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "test_table_put_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
# self.table_id = tables_utils.create_table_for_partition(
# self.server,
# self.db_name,
# self.schema_name,
# self.table_name,
# 'partitioned',
# 'list')
def runTest(self):
"""This function will delete added table under schema node."""
url_encode_data = None
if hasattr(self, "url_encoded_data"):
if "tid" in self.data:
self.data["tid"] = self.table_id
elif "tname" in self.data:
self.data["tname"] = self.table_name
url_encode_data = self.data
if self.is_positive_test:
response = tables_utils.api_get_pre_table_creation_params(
self, url_encode_data)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = tables_utils.api_get_pre_table_creation_params(
self, url_encode_data)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_get_pre_table_creation_params(
self, url_encode_data)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,99 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableGetScriptSqlTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_get_script_sql",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Update url
self.url = self.url + self.add_to_url
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:8])
if "query" in self.inventory_data:
custom_query = self.inventory_data["query"]
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name,
custom_query)
else:
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will delete added table under schema node."""
if self.is_positive_test:
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
if self.is_list:
response = tables_utils.api_get(self, "")
else:
response = tables_utils.api_get(self)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,95 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableGetStatisticsTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/stats/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_get_statistics",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
# Create table
if self.is_list:
self.table_name_1 = \
"test_table_get_%s" % (str(uuid.uuid4())[1:8])
self.table_id_1 = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name_1
)
def runTest(self):
"""This function will delete added table under schema node."""
if self.is_positive_test:
if self.is_list:
response = tables_utils.api_get(self, "")
else:
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
if self.is_list:
response = tables_utils.api_get(self, "")
else:
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,79 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableGetMsqlTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/msql/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_msql",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will delete added table under schema node."""
url_encode_data = self.data
if self.is_positive_test:
response = tables_utils.api_get_msql(self, url_encode_data)
# Assert response
utils.assert_status_code(self, response)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_get_msql(self, url_encode_data)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -1,175 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import uuid
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableUpdateParameterTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
scenarios = [
# Fetching default URL for table node.
('Enable custom auto vacuum and set the parameters for table '
'without autovacuum_enabled',
dict(url='/browser/table/obj/',
api_data={
'autovacuum_custom': True,
'vacuum_table': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': 20},
{'name': 'autovacuum_vacuum_threshold',
'value': 20}
]
}}
)
),
('Change a parameter to zero value '
'without autovacuum_enabled',
dict(url='/browser/table/obj/',
api_data={
'vacuum_table': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': 0}
]
}}
)
),
('Enable autovacuum_enabled',
dict(url='/browser/table/obj/',
api_data={'autovacuum_enabled': 't'}
)
),
('Reset individual parameters for table',
dict(url='/browser/table/obj/',
api_data={
'autovacuum_enabled': 'x',
'vacuum_table': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': None},
]
}}
)
),
('Reset custom auto vacuum',
dict(url='/browser/table/obj/',
api_data={'autovacuum_custom': False}
)
),
('Enable toast custom auto vacuum and set the parameters for table '
'without autovacuum_enabled',
dict(url='/browser/table/obj/',
api_data={
'toast_autovacuum': True,
'vacuum_toast': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': 20},
{'name': 'autovacuum_vacuum_threshold',
'value': 20}
]
}}
)
),
('Change a toast parameter to zero value '
'without autovacuum_enabled',
dict(url='/browser/table/obj/',
api_data={
'vacuum_toast': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': 0}
]
}}
)
),
('Enable toast.autovacuum_enabled',
dict(url='/browser/table/obj/',
api_data={'toast_autovacuum_enabled': 't'}
)
),
('Reset individual toast parameters for table',
dict(url='/browser/table/obj/',
api_data={
'toast_autovacuum_enabled': 'x',
'vacuum_toast': {
'changed': [
{'name': 'autovacuum_vacuum_cost_delay',
'value': None},
]
}}
)
),
('Reset auto vacuum',
dict(url='/browser/table/obj/',
api_data={'toast_autovacuum': False}
)
),
]
table_name = "test_table_parameters_%s" % (str(uuid.uuid4())[1:8])
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_id = tables_utils.get_table_id(self.server, self.db_name,
self.table_name)
if self.table_id is None:
self.table_id = tables_utils.create_table(
self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will fetch added table under schema node."""
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id)
if not table_response:
raise Exception("Could not find the table to update.")
data = self.api_data
data['oid'] = self.table_id
response = self.tester.put(self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEqual(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -21,142 +21,69 @@ from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableUpdateTestCase(BaseTestGenerator):
class TablePutTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
scenarios = [
# Fetching default URL for table node.
('Update Table', dict(url='/browser/table/obj/')),
('Create partitions of existing range partitioned table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='range',
mode='create'
)
),
('Create partitions with partition table of existing range '
'partitioned table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='range',
mode='multilevel'
)
),
('Create partitions of existing list partitioned table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='list',
mode='create'
)
),
('Create partitions with partition table of existing list '
'partitioned table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='list',
mode='multilevel'
)
),
('Detach partition from existing range partitioned table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='range',
mode='detach'
)
),
('Detach partition from existing list partitioned table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='list',
mode='detach'
)
),
('Attach partition to existing range partitioned table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='range',
mode='attach'
)
),
('Attach partition to existing list partitioned table',
dict(url='/browser/table/obj/',
server_min_version=100000,
partition_type='list',
mode='attach'
)
)
]
url = '/browser/table/obj/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_put",
tables_utils.test_cases)
table_name = "test_table_parameters_%s" % (str(uuid.uuid4())[1:8])
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
if hasattr(self, 'server_min_version'):
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add "
"partitioned table.")
if server_con["data"]["version"] < self.server_min_version:
message = "Partitioned table are not supported by " \
"PPAS/PG 10.0 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Verify schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "test_table_put_%s" % (str(uuid.uuid4())[1:8])
self.is_partition = False
if hasattr(self, 'server_min_version'):
self.is_partition = True
self.table_id = tables_utils.create_table_for_partition(
self.server,
self.db_name,
self.schema_name,
self.table_name,
'partitioned',
self.partition_type)
else:
# Get/Create table
self.table_id = tables_utils.get_table_id(self.server, self.db_name,
self.table_name)
if self.table_id is None:
self.table_id = tables_utils.create_table(
self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will fetch added table under schema node."""
# Verify table creation
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id)
if not table_response:
raise Exception("Could not find the table to update.")
if self.is_partition:
data = {"id": self.table_id}
tables_utils.set_partition_data(
self.server, self.db_name, self.schema_name, self.table_name,
self.partition_type, data, self.mode)
else:
data = {
"description": "This is test comment for table",
"id": self.table_id
}
def runTest(self):
"""This function will fetch added table under schema node."""
self.data['oid'] = self.table_id
response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) + '/' +
str(self.schema_id) + '/' + str(self.table_id),
data=json.dumps(data), follow_redirects=True)
self.assertEqual(response.status_code, 200)
if "is_grant_tab" in self.inventory_data:
grant_data = {"grantee": self.server["username"],
"grantor": self.server["username"]}
self.data["columns"]["changed"][0]["attacl"]["added"][0].update(
grant_data)
if self.is_positive_test:
response = tables_utils.api_put(self)
# Assert response
utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -0,0 +1,117 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableUpdateTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/obj/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_put_partition",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
# Check Server version
if "server_min_version" in self.inventory_data:
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add "
"partitioned table.")
if server_con["data"]["version"] < \
self.inventory_data["server_min_version"]:
self.skipTest(self.inventory_data["skip_msg"])
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Verify schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_put_%s" % (str(uuid.uuid4())[1:8])
self.is_partition = self.inventory_data.get("is_partition",)
if self.is_partition:
self.table_id = tables_utils.create_table_for_partition(
self.server,
self.db_name,
self.schema_name,
self.table_name,
'partitioned',
self.inventory_data["partition_type"])
else:
self.table_id = tables_utils.create_table(
self.server, self.db_name,
self.schema_name,
self.table_name)
# Verify table creation
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id)
if not table_response:
raise Exception("Could not find the table to update.")
def runTest(self):
"""This function will fetch added table under schema node."""
self.data["id"] = self.table_id
if self.is_partition:
tables_utils.set_partition_data(
self.server, self.db_name, self.schema_name, self.table_name,
self.inventory_data["partition_type"], self.data,
self.inventory_data["mode"])
if self.is_positive_test:
response = tables_utils.api_put(self)
# Assert response
utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = tables_utils.api_put(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -8,7 +8,7 @@
##########################################################################
import uuid
import json
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@ -20,14 +20,19 @@ from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableDeleteMultipleTestCase(BaseTestGenerator):
class TableDeleteTestCase(BaseTestGenerator):
"""This class will delete new table under schema node."""
scenarios = [
# Fetching default URL for table node.
('Delete Table', dict(url='/browser/table/obj/'))
]
url = '/browser/table/reset/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_delete_statistics",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@ -36,6 +41,8 @@ class TableDeleteMultipleTestCase(BaseTestGenerator):
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
@ -43,37 +50,34 @@ class TableDeleteMultipleTestCase(BaseTestGenerator):
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_delete_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.table_name_1 = "test_table_delete_%s" % (str(uuid.uuid4())[1:8])
self.table_id_1 = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name_1
)
def runTest(self):
"""This function will delete added table under schema node."""
# Verify table creation
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id)
if not table_response:
raise Exception("Could not find the table to delete.")
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id_1)
if not table_response:
raise Exception("Could not find the table to delete.")
def runTest(self):
"""This function will delete added table under schema node."""
if self.is_positive_test:
response = tables_utils.api_delete(self)
data = {'ids': [self.table_id, self.table_id_1]}
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/',
data=json.dumps(data),
content_type='html/json',
follow_redirects=True)
self.assertEqual(response.status_code, 200)
# Assert response
utils.assert_status_code(self, response)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_delete(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@ -0,0 +1,88 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as tables_utils
class TableGetSqlTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
url = '/browser/table/sql/'
# Generates scenarios
scenarios = utils.generate_scenarios("table_sql",
tables_utils.test_cases)
def setUp(self):
# Load test data
self.data = self.test_data
# Create db connection
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
# Create schema
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
# Create table
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
# Create table
if self.is_list:
self.table_name_1 = \
"test_table_delete_%s" % (str(uuid.uuid4())[1:8])
self.table_id_1 = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name_1
)
def runTest(self):
"""This function will delete added table under schema node."""
if self.is_positive_test:
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
else:
if 'table_id' in self.data:
self.table_id = self.data['table_id']
response = tables_utils.api_get(self)
# Assert response
utils.assert_status_code(self, response)
utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -10,11 +10,89 @@
import sys
import traceback
import os
import json
from urllib.parse import urlencode
from regression.python_test_utils import test_utils as utils
# Load test data from json file.
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
with open(CURRENT_PATH + "/table_test_data.json") as data_file:
test_cases = json.load(data_file)
def create_table(server, db_name, schema_name, table_name):
def api_create(self):
return self.tester.post("{0}{1}/{2}/{3}/{4}/".
format(self.url, utils.SERVER_GROUP,
self.server_id,
self.db_id, self.schema_id),
data=json.dumps(self.data),
content_type='html/json'
)
def api_delete(self, table_id=None):
if table_id is None:
table_id = self.table_id
return self.tester.delete("{0}{1}/{2}/{3}/{4}/{5}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, table_id),
data=json.dumps(self.data),
follow_redirects=True)
def api_get(self, table_id=None):
if table_id is None:
table_id = self.table_id
return self.tester.get("{0}{1}/{2}/{3}/{4}/{5}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, table_id),
follow_redirects=True
)
def api_get_msql(self, url_encode_data):
return self.tester.get("{0}{1}/{2}/{3}/{4}/{5}?{6}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
urlencode(url_encode_data)),
follow_redirects=True
)
def api_put(self):
return self.tester.put("{0}{1}/{2}/{3}/{4}/{5}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id),
data=json.dumps(self.data),
follow_redirects=True
)
def api_get_pre_table_creation_params(self, url_encode_data=None):
if url_encode_data is None:
return self.tester.get("{0}{1}/{2}/{3}/{4}/".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id),
follow_redirects=True
)
else:
return self.tester.get("{0}{1}/{2}/{3}/{4}/?{5}".
format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
urlencode(url_encode_data)),
follow_redirects=True
)
def create_table(server, db_name, schema_name, table_name, custom_query=None):
"""
This function creates a table under provided schema.
:param server: server details
@ -28,6 +106,12 @@ def create_table(server, db_name, schema_name, table_name):
:return table_id: table id
:rtype: int
"""
if custom_query is None:
query = "CREATE TABLE %s.%s(id serial UNIQUE NOT NULL, name text," \
" location text)" % \
(schema_name, table_name)
else:
query = eval(custom_query)
try:
connection = utils.get_db_connection(db_name,
server['username'],
@ -38,9 +122,6 @@ def create_table(server, db_name, schema_name, table_name):
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "CREATE TABLE %s.%s(id serial UNIQUE NOT NULL, name text," \
" location text)" %\
(schema_name, table_name)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()