Add test cases for tables, types, views and their children.

This commit is contained in:
Navnath Gadakh 2016-10-14 11:20:51 -07:00 committed by Dave Page
parent 22dadacb0f
commit 404d4efd2e
65 changed files with 4220 additions and 4 deletions

View File

@ -998,7 +998,7 @@ It may have been removed by another user.
sql_header = """
-- CATALOG: {0}
-- DROP SCHEMA {0};
-- DROP SCHEMA {0};(
""".format(old_data['name'])
if hasattr(str, 'decode'):

View File

@ -39,6 +39,33 @@ def create_trigger_function(server, db_name, schema_name, func_name):
traceback.print_exc(file=sys.stderr)
def create_trigger_function_with_trigger(server, db_name, schema_name,
func_name):
"""This function add the trigger function to schema"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
query = "CREATE FUNCTION "+schema_name+"."+func_name+"()" \
" RETURNS trigger LANGUAGE 'plpgsql' STABLE LEAKPROOF" \
" SECURITY DEFINER SET enable_sort=true AS $BODY$ BEGIN" \
" NULL; END; $BODY$"
pg_cursor.execute(query)
connection.commit()
# Get 'oid' from newly created function
pg_cursor.execute("SELECT pro.oid, pro.proname FROM"
" pg_proc pro WHERE pro.proname='%s'" %
func_name)
functions = pg_cursor.fetchone()
connection.close()
return functions
except Exception:
traceback.print_exc(file=sys.stderr)
def verify_trigger_function(server, db_name, func_name):
"""This function verifies the trigger function in db"""
connection = utils.get_db_connection(db_name,

View File

@ -0,0 +1,15 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class ColumnsTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,74 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
class ColumnAddTestCase(BaseTestGenerator):
"""This class will add new column under table node."""
scenarios = [
('Add table Node URL', dict(url='/browser/column/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will add column under table node."""
self.column_name = "test_column_add_%s" % (str(uuid.uuid4())[1:6])
data = {"name": self.column_name,
"cltype": "\"char\"",
"attacl": [],
"is_primary_key": False,
"attnotnull": False,
"attlen": False,
"attprecision": None,
"attoptions": [],
"seclabels": []
}
# Add table
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,73 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as columns_utils
class ColumnDeleteTestCase(BaseTestGenerator):
"""This class will delete column under table node."""
scenarios = [
('Delete table Node URL', dict(url='/browser/column/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
def runTest(self):
"""This function will drop column under table node."""
col_response = columns_utils.verify_column(self.server, self.db_name,
self.column_name)
if not col_response:
raise Exception("Could not find the column to drop.")
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id) + '/' +
str(self.column_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,69 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as columns_utils
class ColumnGetTestCase(BaseTestGenerator):
"""This class will get column under table node."""
scenarios = [
('Fetch table Node URL', dict(url='/browser/column/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
def runTest(self):
"""This function will fetch the column under table node."""
response = self.tester.get(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id) + '/' +
str(self.column_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,81 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as columns_utils
class ColumnPutTestCase(BaseTestGenerator):
"""This class will update the column under table node."""
scenarios = [
('Put table Node URL', dict(url='/browser/column/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_put_%s" % (str(uuid.uuid4())[1:6])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
def runTest(self):
"""This function will update the column under table node."""
col_response = columns_utils.verify_column(self.server, self.db_name,
self.column_name)
if not col_response:
raise Exception("Could not find the column to update.")
data = {
"attnum": self.column_id,
"name": self.column_name,
"description": "This is test comment for column"
}
response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id) + '/' +
str(self.column_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,86 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_column(server, db_name, schema_name, table_name, col_name):
"""
This function creates a column under provided table.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param table_name: table name
:type table_name: str
:param col_name: column name
:type col_name: str
:return table_id: table id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "ALTER TABLE %s.%s ADD COLUMN %s char" % \
(schema_name, table_name, col_name)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get column position of newly added column
pg_cursor.execute("select attnum from pg_attribute where"
" attname='%s'" % col_name)
col = pg_cursor.fetchone()
col_pos = ''
if col:
col_pos = col[0]
connection.close()
return col_pos
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def verify_column(server, db_name, col_name):
"""
This function verifies table exist in database or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param col_name: column name
:type col_name: str
:return table: table record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute("select * from pg_attribute where attname='%s'" %
col_name)
col = pg_cursor.fetchone()
connection.close()
return col
except Exception:
traceback.print_exc(file=sys.stderr)
raise

View File

@ -0,0 +1,14 @@
# ##########################################################################
# #
# # pgAdmin 4 - PostgreSQL Tools
# #
# # Copyright (C) 2013 - 2016, The pgAdmin Development Team
# # This software is released under the PostgreSQL Licence
# #
# ##########################################################################
# from pgadmin.utils.route import BaseTestGenerator
#
#
# class CheckConstraintTestGenerator(BaseTestGenerator):
# def runTest(self):
# return []

View File

@ -0,0 +1,73 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
class CheckConstraintAddTestCase(BaseTestGenerator):
"""This class will add check constraint to existing table"""
scenarios = [
('Add check constraint to table',
dict(url='/browser/check_constraints/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a check "
"constraint.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a check "
"constraint.")
self.table_name = "table_checkconstraint_add_%s" % \
(str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will add check constraint to table."""
check_constraint_name = "test_checkconstraint_add_%s" % \
(str(uuid.uuid4())[1:6])
data = {"name": check_constraint_name,
"consrc": " (id > 0)",
"convalidated": True,
"comment": "this is test comment"}
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,79 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as chk_constraint_utils
class CheckConstraintDeleteTestCase(BaseTestGenerator):
"""This class will delete check constraint to existing table"""
scenarios = [
('Delete check constraint to table',
dict(url='/browser/check_constraints/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete a check "
"constraint.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete a check "
"constraint.")
self.table_name = "table_checkconstraint_delete_%s" % \
(str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
self.check_constraint_name = "test_checkconstraint_delete_%s" % \
(str(uuid.uuid4())[1:6])
self.check_constraint_id = \
chk_constraint_utils.create_check_constraint(
self.server, self.db_name, self.schema_name, self.table_name,
self.check_constraint_name)
def runTest(self):
"""This function will delete check constraint to table."""
chk_constraint = chk_constraint_utils.verify_check_constraint(
self.server, self.db_name, self.check_constraint_name)
if not chk_constraint:
raise Exception("Could not find the check constraint to delete.")
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
self.table_id,
self.check_constraint_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,75 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as chk_constraint_utils
class CheckConstraintGetTestCase(BaseTestGenerator):
"""This class will fetch check constraint to existing table"""
scenarios = [
('Fetch check constraint to table',
dict(url='/browser/check_constraints/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to fetch a check "
"constraint.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch a check "
"constraint.")
self.table_name = "table_checkconstraint_get_%s" % \
(str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
self.check_constraint_name = "test_checkconstraint_get_%s" % \
(str(uuid.uuid4())[1:6])
self.check_constraint_id = \
chk_constraint_utils.create_check_constraint(
self.server, self.db_name, self.schema_name, self.table_name,
self.check_constraint_name)
def runTest(self):
"""This function will fetch check constraint to table."""
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
self.table_id,
self.check_constraint_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,82 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as chk_constraint_utils
class CheckConstraintPutTestCase(BaseTestGenerator):
"""This class will update check constraint to existing table"""
scenarios = [
('Update check constraint to table',
dict(url='/browser/check_constraints/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to update a check "
"constraint.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to update a check "
"constraint.")
self.table_name = "table_checkconstraint_put_%s" % \
(str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.table_name)
self.check_constraint_name = "test_checkconstraint_put_%s" % \
(str(uuid.uuid4())[1:6])
self.check_constraint_id = \
chk_constraint_utils.create_check_constraint(
self.server, self.db_name, self.schema_name, self.table_name,
self.check_constraint_name)
def runTest(self):
"""This function will delete check constraint to table."""
chk_constraint = chk_constraint_utils.verify_check_constraint(
self.server, self.db_name, self.check_constraint_name)
if not chk_constraint:
raise Exception("Could not find the check constraint to update.")
data = {"oid": self.check_constraint_id,
"comment": "This is test comment for check constraint."}
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
self.table_id,
self.check_constraint_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,89 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_check_constraint(server, db_name, schema_name, table_name,
check_constraint_name):
"""
This function creates a check constraint under provided table.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param table_name: table name
:type table_name: str
:param check_constraint_name: constraint name
:type check_constraint_name: str
:return chk_constraint_id: check constraint id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "ALTER TABLE %s.%s ADD CONSTRAINT %s CHECK ( (id > 0)) " \
"NOT VALID; COMMENT ON CONSTRAINT %s ON %s.%s IS " \
"'this is test comment'" % (schema_name, table_name,
check_constraint_name,
check_constraint_name,
schema_name, table_name)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get oid of newly added check constraint
pg_cursor.execute(
"SELECT oid FROM pg_constraint where conname='%s'" %
check_constraint_name)
chk_constraint_record = pg_cursor.fetchone()
connection.close()
chk_constraint_id = chk_constraint_record[0]
return chk_constraint_id
except Exception:
traceback.print_exc(file=sys.stderr)
def verify_check_constraint(server, db_name, check_constraint_name):
"""
This function verifies check constraint constraint exist or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param check_constraint_name: constraint name
:type check_constraint_name: str
:return chk_constraint_record: check constraint record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute(
"SELECT oid FROM pg_constraint where conname='%s'" %
check_constraint_name)
chk_constraint_record = pg_cursor.fetchone()
connection.close()
return chk_constraint_record
except Exception:
traceback.print_exc(file=sys.stderr)

View File

@ -0,0 +1,15 @@
# ##########################################################################
# #
# # pgAdmin 4 - PostgreSQL Tools
# #
# # Copyright (C) 2013 - 2016, The pgAdmin Development Team
# # This software is released under the PostgreSQL Licence
# #
# ##########################################################################
# from pgadmin.utils.route import BaseTestGenerator
#
#
# class ForeignKeyTestGenerator(BaseTestGenerator):
#
# def runTest(self):
# return []

View File

@ -0,0 +1,79 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
class ForeignKeyAddTestCase(BaseTestGenerator):
"""This class will add foreign key to existing table"""
scenarios = [
('Add foreign Key constraint to table',
dict(url='/browser/foreign_key/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a foreign "
"key constraint.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a foreign "
"key constraint.")
self.local_table_name = "table_foreignkey_%s" % \
(str(uuid.uuid4())[1:6])
self.local_table_id = tables_utils.create_table(self.server,
self.db_name,
self.schema_name,
self.local_table_name)
self.foreign_table_name = "table_foreignkey_%s" % \
(str(uuid.uuid4())[1:6])
self.foreign_table_id = tables_utils.create_table(
self.server, self.db_name, self.schema_name,
self.foreign_table_name)
def runTest(self):
"""This function will add foreign key table column."""
foreignkey_name = "test_foreignkey_add_%s" % \
(str(uuid.uuid4())[1:6])
data = {"name": foreignkey_name,
"columns": [{"local_column": "id",
"references": self.foreign_table_id,
"referenced": "id"}],
"confupdtype": "a", "confdeltype": "a", "autoindex": False}
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/' + str(self.local_table_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,83 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as fk_utils
class ForeignKeyDeleteTestCase(BaseTestGenerator):
"""This class will delete foreign key to existing table"""
scenarios = [
('Delete foreign Key constraint.',
dict(url='/browser/foreign_key/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception(
"Could not connect to database to delete a foreign "
"key constraint.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete a foreign "
"key constraint.")
self.local_table_name = "local_table_foreignkey_delete_%s" % \
(str(uuid.uuid4())[1:6])
self.local_table_id = tables_utils.create_table(
self.server, self.db_name, self.schema_name, self.local_table_name)
self.foreign_table_name = "foreign_table_foreignkey_delete_%s" % \
(str(uuid.uuid4())[1:6])
self.foreign_table_id = tables_utils.create_table(
self.server, self.db_name, self.schema_name,
self.foreign_table_name)
self.foreign_key_name = "test_foreignkey_delete_%s" % \
(str(uuid.uuid4())[1:6])
self.foreign_key_id = fk_utils.create_foreignkey(
self.server, self.db_name, self.schema_name, self.local_table_name,
self.foreign_table_name)
def runTest(self):
"""This function will delete foreign key attached to table column."""
fk_response = fk_utils.verify_foreignkey(self.server, self.db_name,
self.local_table_name)
if not fk_response:
raise Exception("Could not find the foreign key constraint to "
"delete.")
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
self.local_table_id,
self.foreign_key_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,78 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as fk_utils
class ForeignGetDeleteTestCase(BaseTestGenerator):
"""This class will fetch foreign key from existing table"""
scenarios = [
('Fetch foreign Key constraint.',
dict(url='/browser/foreign_key/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception(
"Could not connect to database to fetch a foreign "
"key constraint.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch a foreign "
"key constraint.")
self.local_table_name = "local_table_foreignkey_get_%s" % \
(str(uuid.uuid4())[1:6])
self.local_table_id = tables_utils.create_table(
self.server, self.db_name, self.schema_name, self.local_table_name)
self.foreign_table_name = "foreign_table_foreignkey_get_%s" % \
(str(uuid.uuid4())[1:6])
self.foreign_table_id = tables_utils.create_table(
self.server, self.db_name, self.schema_name,
self.foreign_table_name)
self.foreign_key_name = "test_foreignkey_get_%s" % \
(str(uuid.uuid4())[1:6])
self.foreign_key_id = fk_utils.create_foreignkey(
self.server, self.db_name, self.schema_name, self.local_table_name,
self.foreign_table_name)
def runTest(self):
"""This function will delete foreign key attached to table column."""
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
self.local_table_id,
self.foreign_key_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,82 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as fk_utils
class ForeignPutDeleteTestCase(BaseTestGenerator):
"""This class will update foreign key from existing table"""
scenarios = [
('Fetch foreign Key constraint.',
dict(url='/browser/foreign_key/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception(
"Could not connect to database to fetch a foreign "
"key constraint.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch a foreign "
"key constraint.")
self.local_table_name = "local_table_foreignkey_get_%s" % \
(str(uuid.uuid4())[1:6])
self.local_table_id = tables_utils.create_table(
self.server, self.db_name, self.schema_name, self.local_table_name)
self.foreign_table_name = "foreign_table_foreignkey_get_%s" % \
(str(uuid.uuid4())[1:6])
self.foreign_table_id = tables_utils.create_table(
self.server, self.db_name, self.schema_name,
self.foreign_table_name)
self.foreign_key_name = "test_foreignkey_get_%s" % \
(str(uuid.uuid4())[1:6])
self.foreign_key_id = fk_utils.create_foreignkey(
self.server, self.db_name, self.schema_name, self.local_table_name,
self.foreign_table_name)
def runTest(self):
"""This function will update foreign key attached to table column."""
data = {"oid": self.foreign_key_id,
"comment": "This is TEST comment for foreign key constraint."
}
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
self.local_table_id,
self.foreign_key_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,88 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_foreignkey(server, db_name, schema_name, local_table_name,
foreign_table_name):
"""
This function creates a column under provided table.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param local_table_name: local table name
:type local_table_name: str
:param foreign_table_name: foreign table name
:type foreign_table_name: str
:return table_id: table id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "ALTER TABLE %s.%s ADD FOREIGN KEY (id) REFERENCES %s.%s " \
"(id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION" % \
(
schema_name, local_table_name, schema_name,
foreign_table_name)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get oid of newly added foreign key
pg_cursor.execute(
"SELECT oid FROM pg_constraint where conname='%s_id_fkey'" %
local_table_name)
fk_record = pg_cursor.fetchone()
connection.close()
fk_id = fk_record[0]
return fk_id
except Exception:
traceback.print_exc(file=sys.stderr)
def verify_foreignkey(server, db_name, local_table_name):
"""
This function verifies foreign key constraint exist or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param local_table_name: local table name
:type local_table_name: str
:return table: table record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute(
"SELECT oid FROM pg_constraint where conname='%s_id_fkey'" %
local_table_name)
fk_record = pg_cursor.fetchone()
connection.close()
return fk_record
except Exception:
traceback.print_exc(file=sys.stderr)

View File

@ -577,7 +577,6 @@ class IndexConstraintView(PGChildNodeView):
data=data, conn=self.conn,
constraint_name=self.constraint_name
)
status, msg = self.conn.execute_scalar(SQL)
if not status:
self.end_transaction()

View File

@ -0,0 +1,15 @@
# ##########################################################################
# #
# # pgAdmin 4 - PostgreSQL Tools
# #
# # Copyright (C) 2013 - 2016, The pgAdmin Development Team
# # This software is released under the PostgreSQL Licence
# #
# ##########################################################################
# from pgadmin.utils.route import BaseTestGenerator
#
#
# class IndexConstraintTestGenerator(BaseTestGenerator):
#
# def runTest(self):
# return []

View File

@ -0,0 +1,84 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
class IndexConstraintAddTestCase(BaseTestGenerator):
"""This class will add index constraint(primary key or unique key) to
table column"""
primary_key_name = "test_primarykey_add_%s" % \
(str(uuid.uuid4())[1:6])
primary_key_data = {"name": primary_key_name,
"spcname": "pg_default",
"columns": [{"column": "id"}]
}
unique_key_name = "test_uniquekey_add_%s" % \
(str(uuid.uuid4())[1:6])
unique_key_data = {"name": unique_key_name,
"spcname": "pg_default",
"columns": [{"column": "id"}]}
scenarios = [
('Add primary Key constraint to table',
dict(url='/browser/primary_key/obj/', data=primary_key_data)),
('Add unique Key constraint to table',
dict(url='/browser/unique_constraint/obj/', data=unique_key_data))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"index constraint(primary key or unique key).")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:6])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
def runTest(self):
"""This function will add index constraint(primary key or unique key)
to table column."""
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
data=json.dumps(self.data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)

View File

@ -0,0 +1,85 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as index_constraint_utils
class IndexConstraintDeleteTestCase(BaseTestGenerator):
"""This class will delete index constraint(primary key or unique key) of
table column"""
primary_key_name = "test_primarykey_delete_%s" % \
(str(uuid.uuid4())[1:6])
unique_key_name = "test_uniquekey_delete_%s" % \
(str(uuid.uuid4())[1:6])
scenarios = [
('Delete primary Key constraint of table',
dict(url='/browser/primary_key/obj/', name=primary_key_name,
type="PRIMARY KEY")),
('Delete unique Key constraint of table',
dict(url='/browser/unique_constraint/obj/', name=unique_key_name,
type="UNIQUE"))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"index constraint(primary key or unique key).")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:6])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
def runTest(self):
"""This function will delete index constraint(primary key or
unique key) of table column."""
index_constraint_id = \
index_constraint_utils.create_index_constraint(
self.server, self.db_name, self.schema_name, self.table_name,
self.name, self.type)
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
self.table_id,
index_constraint_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)

View File

@ -0,0 +1,85 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as index_constraint_utils
class IndexConstraintGetTestCase(BaseTestGenerator):
"""This class will fetch the index constraint(primary key or unique key) of
table column"""
primary_key_name = "test_primarykey_delete_%s" % \
(str(uuid.uuid4())[1:6])
unique_key_name = "test_uniquekey_delete_%s" % \
(str(uuid.uuid4())[1:6])
scenarios = [
('Fetch primary Key constraint of table',
dict(url='/browser/primary_key/obj/', name=primary_key_name,
type="PRIMARY KEY")),
('Fetch unique Key constraint of table',
dict(url='/browser/unique_constraint/obj/', name=unique_key_name,
type="UNIQUE"))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"index constraint(primary key or unique key).")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:6])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
def runTest(self):
"""This function will fetch the index constraint(primary key or
unique key) of table column."""
index_constraint_id = \
index_constraint_utils.create_index_constraint(
self.server, self.db_name, self.schema_name, self.table_name,
self.name, self.type)
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id,
self.table_id,
index_constraint_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)

View File

@ -0,0 +1,88 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as index_constraint_utils
class IndexConstraintUpdateTestCase(BaseTestGenerator):
"""This class will update index constraint(primary key or unique key) of
table column"""
primary_key_name = "test_primarykey_put_%s" % \
(str(uuid.uuid4())[1:6])
unique_key_name = "test_uniquekey_put_%s" % \
(str(uuid.uuid4())[1:6])
data = {"oid": "", "comment": "this is test comment"}
scenarios = [
('Update primary Key constraint of table',
dict(url='/browser/primary_key/obj/', name=primary_key_name,
type="PRIMARY KEY", data=data)),
('Update unique Key constraint of table',
dict(url='/browser/unique_constraint/obj/', name=unique_key_name,
type="UNIQUE", data=data))
]
@classmethod
def setUpClass(cls):
cls.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
cls.server_id = schema_info["server_id"]
cls.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
cls.server_id, cls.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a "
"index constraint(primary key or unique key).")
cls.schema_id = schema_info["schema_id"]
cls.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(cls.server,
cls.db_name,
cls.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a index "
"constraint(primary key or unique key).")
cls.table_name = "table_indexconstraint_%s" % \
(str(uuid.uuid4())[1:6])
cls.table_id = tables_utils.create_table(cls.server,
cls.db_name,
cls.schema_name,
cls.table_name)
def runTest(self):
"""This function will update index constraint(primary key or
unique key) of table column."""
index_constraint_id = \
index_constraint_utils.create_index_constraint(
self.server, self.db_name, self.schema_name, self.table_name,
self.name, self.type)
self.data["oid"] = index_constraint_id
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
index_constraint_id
),
data=json.dumps(self.data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
# Disconnect the database
database_utils.disconnect_database(cls, cls.server_id, cls.db_id)

View File

@ -0,0 +1,86 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_index_constraint(server, db_name, schema_name, table_name,
key_name, key_type):
"""
This function creates a index constraint(PK or UK) under provided table.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param table_name: table name
:type table_name: str
:param key_name: test name for primary or unique key
:type key_name: str
:param key_type: key type i.e. primary or unique key
:type key_type: str
:return oid: key constraint id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "ALTER TABLE %s.%s ADD CONSTRAINT %s %s (id)" % \
(schema_name, table_name, key_name, key_type)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get oid of newly added index constraint
pg_cursor.execute(
"SELECT conindid FROM pg_constraint where conname='%s'" % key_name)
index_constraint = pg_cursor.fetchone()
connection.close()
oid = index_constraint[0]
return oid
except Exception:
traceback.print_exc(file=sys.stderr)
def verify_index_constraint(server, db_name, table_name):
"""
This function verifies that index constraint(PK or UK) is exists or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param table_name: table name
:type table_name: str
:return index_constraint: index constraint record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute(
"SELECT oid FROM pg_constraint where conname='%s'" %
table_name)
index_constraint = pg_cursor.fetchone()
connection.close()
return index_constraint
except Exception:
traceback.print_exc(file=sys.stderr)

View File

@ -0,0 +1,15 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class ConstraintsTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,15 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class IndexesTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,68 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
class IndexesAddTestCase(BaseTestGenerator):
"""This class will add new index to existing table column"""
scenarios = [
('Add index Node URL', dict(url='/browser/index/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_for_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will add index to existing table column."""
self.index_name = "test_index_add_%s" % (str(uuid.uuid4())[1:6])
data = {"name": self.index_name,
"spcname": "pg_default",
"amname": "btree",
"columns": [
{"colname": "id", "sort_order": False, "nulls": False}]}
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,81 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.column. \
tests import utils as columns_utils
from . import utils as indexes_utils
class IndexesDeleteTestCase(BaseTestGenerator):
"""This class will delete the existing index of column."""
scenarios = [
('Delete index Node URL', dict(url='/browser/index/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:6])
self.index_id = indexes_utils.create_index(self.server, self.db_name,
self.schema_name,
self.table_name,
self.index_name,
self.column_name)
def runTest(self):
"""This function will delete index of existing column."""
index_response = indexes_utils.verify_index(self.server, self.db_name,
self.index_name)
if not index_response:
raise Exception("Could not find the index to delete.")
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id) + '/' +
str(self.index_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,76 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.column. \
tests import utils as columns_utils
from . import utils as indexes_utils
class IndexesGetTestCase(BaseTestGenerator):
"""This class will fetch the existing index of column."""
scenarios = [
('Fetch index Node URL', dict(url='/browser/index/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:6])
self.index_id = indexes_utils.create_index(self.server, self.db_name,
self.schema_name,
self.table_name,
self.index_name,
self.column_name)
def runTest(self):
"""This function will fetch the existing column index."""
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.index_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,84 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.column. \
tests import utils as columns_utils
from . import utils as indexes_utils
class IndexesUpdateTestCase(BaseTestGenerator):
"""This class will update the existing index of column."""
scenarios = [
('Put index Node URL', dict(url='/browser/index/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:6])
self.index_id = indexes_utils.create_index(self.server, self.db_name,
self.schema_name,
self.table_name,
self.index_name,
self.column_name)
def runTest(self):
"""This function will update the index of existing column."""
index_response = indexes_utils.verify_index(self.server, self.db_name,
self.index_name)
if not index_response:
raise Exception("Could not find the index to update.")
data = {"oid": self.index_id,
"description": "This is test comment for index"}
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.index_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,90 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_index(server, db_name, schema_name, table_name, index_name,
col_name):
"""
This function will add the new index to existing column.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param table_name: table name
:type table_name: str
:param index_name: index name
:type index_name: str
:param col_name: column name
:type col_name: str
:return table_id: table id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "CREATE INDEX %s ON %s.%s USING btree (%s ASC NULLS LAST) " \
"TABLESPACE pg_default" % (index_name, schema_name,
table_name, col_name)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get oid of newly added index
pg_cursor.execute("select oid from pg_class where relname='%s'" %
index_name)
index_record = pg_cursor.fetchone()
index_oid = ''
if index_record:
index_oid = index_record[0]
connection.close()
return index_oid
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def verify_index(server, db_name, index_name):
"""
This function verifies index exist or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param index_name: index name
:type index_name: str
:return table: table record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute("select * from pg_class where relname='%s'" %
index_name)
index_record = pg_cursor.fetchone()
connection.close()
return index_record
except Exception:
traceback.print_exc(file=sys.stderr)
raise

View File

@ -0,0 +1,15 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class RulesTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,69 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
class RulesAddTestCase(BaseTestGenerator):
"""This class will add new rule under table node."""
scenarios = [
('Add rule Node URL', dict(url='/browser/rule/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a rule.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a rule.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will rule under table node."""
rule_name = "test_rule_add_%s" % (str(uuid.uuid4())[1:6])
data = {"schema": self.schema_name,
"view": self.table_name,
"name": rule_name,
"event": "Update"
}
response = self.tester.post(
"{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id),
data=json.dumps(data),
content_type='html/json'
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,72 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as rules_utils
class RulesDeleteTestCase(BaseTestGenerator):
"""This class will delete rule under table node."""
scenarios = [
('Delete rule Node URL', dict(url='/browser/rule/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete rule.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete rule.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.rule_name = "test_rule_delete_%s" % (str(uuid.uuid4())[1:6])
self.rule_id = rules_utils.create_rule(self.server, self.db_name,
self.schema_name,
self.table_name,
self.rule_name)
def runTest(self):
"""This function will delete rule under table node."""
rule_response = rules_utils.verify_rule(self.server, self.db_name,
self.rule_name)
if not rule_response:
raise Exception("Could not find the rule to delete.")
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.rule_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,68 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as rules_utils
class RulesGetTestCase(BaseTestGenerator):
"""This class will fetch the rule under table node."""
scenarios = [
('Fetch rule Node URL', dict(url='/browser/rule/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete rule.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete rule.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.rule_name = "test_rule_delete_%s" % (str(uuid.uuid4())[1:6])
self.rule_id = rules_utils.create_rule(self.server, self.db_name,
self.schema_name,
self.table_name,
self.rule_name)
def runTest(self):
"""This function will fetch the rule under table node."""
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.rule_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,76 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from . import utils as rules_utils
class RulesUpdateTestCase(BaseTestGenerator):
"""This class will update the rule under table node."""
scenarios = [
('Put rule Node URL', dict(url='/browser/rule/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete rule.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete rule.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.rule_name = "test_rule_delete_%s" % (str(uuid.uuid4())[1:6])
self.rule_id = rules_utils.create_rule(self.server, self.db_name,
self.schema_name,
self.table_name,
self.rule_name)
def runTest(self):
"""This function will update the rule under table node."""
rule_response = rules_utils.verify_rule(self.server, self.db_name,
self.rule_name)
data = {"id": self.rule_id,
"comment": "This is testing comment."
}
if not rule_response:
raise Exception("Could not find the rule to update.")
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.rule_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,86 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_rule(server, db_name, schema_name, table_name, rule_name):
"""
This function creates a rule under provided table.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param table_name: table name
:type table_name: str
:param rule_name: rule name
:type rule_name: str
:return rule_id: role id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "CREATE OR REPLACE RULE %s AS ON UPDATE TO %s.%s DO NOTHING" %\
(rule_name, schema_name, table_name)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get role oid of newly added rule
pg_cursor.execute("select oid from pg_rewrite where rulename='%s'" %
rule_name)
rule = pg_cursor.fetchone()
rule_id = ''
if rule:
rule_id = rule[0]
connection.close()
return rule_id
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def verify_rule(server, db_name, rule_name):
"""
This function verifies rule exist in database or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param rule_name: rule name
:type rule_name: str
:return rule: rule record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute("select * from pg_rewrite where rulename='%s'" %
rule_name)
rule = pg_cursor.fetchone()
connection.close()
return rule
except Exception:
traceback.print_exc(file=sys.stderr)
raise

View File

@ -0,0 +1,16 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class TablesTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,182 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.tablespaces.tests import utils as \
tablespace_utils
from . import utils as table_utils
class TableAddTestCase(BaseTestGenerator):
""" This class will add new collation under schema node. """
scenarios = [
# Fetching default URL for table node.
('Fetch table Node URL', dict(url='/browser/table/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
def runTest(self):
""" This function will add table under schema node. """
db_user = self.server["username"]
self.table_name = "test_table_add_%s" % (str(uuid.uuid4())[1:6])
data = {
"check_constraint": [],
"coll_inherits": "[]",
"columns": [
{
"name": "empno",
"cltype": "numeric",
"attacl": [],
"is_primary_key": False,
"attoptions": [],
"seclabels": []
},
{
"name": "empname",
"cltype": "character[]",
"attacl": [],
"is_primary_key": False,
"attoptions": [],
"seclabels": []
},
{"name": "DOJ",
"cltype": "date[]",
"attacl": [],
"is_primary_key": False,
"attoptions": [],
"seclabels": []
}
],
"exclude_constraint": [],
"fillfactor": "11",
"hastoasttable": True,
"like_constraints": True,
"like_default_value": True,
"like_relation": "pg_catalog.pg_tables",
"name": self.table_name,
"primary_key": [],
"relacl": [
{
"grantee": db_user,
"grantor": db_user,
"privileges":
[
{
"privilege_type": "a",
"privilege": True,
"with_grant": True
},
{
"privilege_type": "r",
"privilege": True,
"with_grant": False
},
{
"privilege_type": "w",
"privilege": True,
"with_grant": False
}
]
}
],
"relhasoids": True,
"relowner": db_user,
"schema": self.schema_name,
"seclabels": [],
"spcname": "pg_default",
"unique_constraint": [],
"vacuum_table": [
{
"name": "autovacuum_analyze_scale_factor"
},
{
"name": "autovacuum_analyze_threshold"
},
{
"name": "autovacuum_freeze_max_age"
},
{
"name": "autovacuum_vacuum_cost_delay"
},
{
"name": "autovacuum_vacuum_cost_limit"
},
{
"name": "autovacuum_vacuum_scale_factor"
},
{
"name": "autovacuum_vacuum_threshold"
},
{
"name": "autovacuum_freeze_min_age"
},
{
"name": "autovacuum_freeze_table_age"
}
],
"vacuum_toast": [
{
"name": "autovacuum_freeze_max_age"
},
{
"name": "autovacuum_vacuum_cost_delay"
},
{
"name": "autovacuum_vacuum_cost_limit"
},
{
"name": "autovacuum_vacuum_scale_factor"
},
{
"name": "autovacuum_vacuum_threshold"
},
{
"name": "autovacuum_freeze_min_age"
},
{
"name": "autovacuum_freeze_table_age"
}
]
}
# Add table
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,65 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as tables_utils
class TableDeleteTestCase(BaseTestGenerator):
"""This class will delete new table under schema node."""
scenarios = [
# Fetching default URL for table node.
('Fetch table Node URL', dict(url='/browser/table/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "test_table_delete_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will delete added table under schema node."""
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id)
if not table_response:
raise Exception("Could not find the table to delete.")
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,61 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as tables_utils
class TableGetTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
scenarios = [
# Fetching default URL for table node.
('Fetch table Node URL', dict(url='/browser/table/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will delete added table under schema node."""
response = self.tester.get(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,69 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as tables_utils
class TableUpdateTestCase(BaseTestGenerator):
"""This class will add new collation under schema node."""
scenarios = [
# Fetching default URL for table node.
('Fetch table Node URL', dict(url='/browser/table/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "test_table_put_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
"""This function will fetch added table under schema node."""
table_response = tables_utils.verify_table(self.server, self.db_name,
self.table_id)
if not table_response:
raise Exception("Could not find the table to update.")
data = {
"description": "This is test comment for table",
"id": self.table_id
}
response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) + '/' +
str(self.schema_id) + '/' + str(self.table_id),
data=json.dumps(data), follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,85 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_table(server, db_name, schema_name, table_name):
"""
This function creates a table under provided schema.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param table_name: table name
:type table_name: str
:return table_id: table id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "CREATE TABLE %s.%s(id serial UNIQUE NOT NULL, name text," \
" location text)" %\
(schema_name, table_name)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get 'oid' from newly created table
pg_cursor.execute("select oid from pg_class where relname='%s'" %
table_name)
table = pg_cursor.fetchone()
table_id = ''
if table:
table_id = table[0]
connection.close()
return table_id
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def verify_table(server, db_name, table_id):
"""
This function verifies table exist in database or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param table_id: schema name
:type table_id: int
:return table: table record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT * FROM pg_class tb WHERE tb.oid=%s" %
table_id)
table = pg_cursor.fetchone()
connection.close()
return table
except Exception:
traceback.print_exc(file=sys.stderr)
raise

View File

@ -0,0 +1,15 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class TriggersTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,78 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as trigger_funcs_utils
class TriggersAddTestCase(BaseTestGenerator):
"""This class will add new trigger under table node."""
scenarios = [
('Add trigger Node URL', dict(url='/browser/trigger/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a trigger.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a trigger.")
self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.func_name = "trigger_func_add_%s" % str(uuid.uuid4())[1:6]
self.function_info = \
trigger_funcs_utils.create_trigger_function_with_trigger(
self.server, self.db_name, self.schema_name, self.func_name)
def runTest(self):
"""This function will trigger under table node."""
trigger_name = "test_trigger_add_%s" % (str(uuid.uuid4())[1:6])
data = {"name": trigger_name,
"is_row_trigger": True,
"fires": "BEFORE",
"columns": [],
"tfunction": "{0}.{1}".format(self.schema_name,
self.func_name),
"evnt_insert": True
}
response = self.tester.post(
"{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id),
data=json.dumps(data),
content_type='html/json'
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,81 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as trigger_funcs_utils
from . import utils as triggers_utils
class TriggersDeleteTestCase(BaseTestGenerator):
"""This class will delete trigger under table node."""
scenarios = [
('Delete trigger Node URL', dict(url='/browser/trigger/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete trigger.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete trigger.")
self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.func_name = "trigger_func_delete_%s" % str(uuid.uuid4())[1:6]
self.function_info = \
trigger_funcs_utils.create_trigger_function_with_trigger(
self.server, self.db_name, self.schema_name, self.func_name)
self.trigger_name = "test_trigger_delete_%s" % (str(uuid.uuid4())[1:6])
self.trigger_id = triggers_utils.create_trigger(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.trigger_name,
self.func_name)
def runTest(self):
"""This function will delete trigger under table node."""
trigger_response = triggers_utils.verify_trigger(self.server,
self.db_name,
self.trigger_name)
if not trigger_response:
raise Exception("Could not find the trigger to delete.")
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.trigger_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,76 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as trigger_funcs_utils
from . import utils as triggers_utils
class TriggersGetTestCase(BaseTestGenerator):
"""This class will fetch trigger under table node."""
scenarios = [
('Fetch trigger Node URL', dict(url='/browser/trigger/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to get a trigger.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to get a trigger.")
self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.func_name = "trigger_func_get_%s" % str(uuid.uuid4())[1:6]
self.function_info = \
trigger_funcs_utils.create_trigger_function_with_trigger(
self.server, self.db_name, self.schema_name, self.func_name)
self.trigger_name = "test_trigger_get_%s" % (str(uuid.uuid4())[1:6])
self.trigger_id = triggers_utils.create_trigger(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.trigger_name,
self.func_name)
def runTest(self):
"""This function will fetch trigger under table node."""
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.trigger_id),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,87 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as trigger_funcs_utils
from . import utils as triggers_utils
class TriggersUpdateTestCase(BaseTestGenerator):
"""This class will update trigger under table node."""
scenarios = [
('Put trigger Node URL', dict(url='/browser/trigger/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception(
"Could not connect to database to update a trigger.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to update a trigger.")
self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:6])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.func_name = "trigger_func_add_%s" % str(uuid.uuid4())[1:6]
self.function_info = \
trigger_funcs_utils.create_trigger_function_with_trigger(
self.server, self.db_name, self.schema_name, self.func_name)
self.trigger_name = "test_trigger_delete_%s" % (str(uuid.uuid4())[1:6])
self.trigger_id = triggers_utils.create_trigger(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.trigger_name,
self.func_name)
def runTest(self):
"""This function will update trigger under table node."""
trigger_response = triggers_utils.verify_trigger(self.server,
self.db_name,
self.trigger_name)
if not trigger_response:
raise Exception("Could not find the trigger to delete.")
data = {"id": self.trigger_id,
"description": "This is test comment."
}
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.trigger_id),
data=json.dumps(data),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,90 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_trigger(server, db_name, schema_name, table_name, trigger_name,
trigger_func_name):
"""
This function creates a column under provided table.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param table_name: table name
:type table_name: str
:param trigger_name: trigger name
:type trigger_name: str
:param trigger_func_name: trigger function name
:type trigger_func_name: str
:return trigger_id: trigger id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = "CREATE TRIGGER %s BEFORE INSERT ON %s.%s FOR EACH ROW " \
"EXECUTE PROCEDURE %s.%s()" % (trigger_name, schema_name,
table_name, schema_name,
trigger_func_name)
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
pg_cursor.execute("SELECT oid FROM pg_trigger where tgname='%s'" %
trigger_name)
trigger = pg_cursor.fetchone()
trigger_id = ''
if trigger:
trigger_id = trigger[0]
connection.close()
return trigger_id
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def verify_trigger(server, db_name, trigger_name):
"""
This function verifies table exist in database or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param trigger_name: column name
:type trigger_name: str
:return table: table record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT oid FROM pg_trigger where tgname='%s'" %
trigger_name)
trigger = pg_cursor.fetchone()
connection.close()
return trigger
except Exception:
traceback.print_exc(file=sys.stderr)
raise

View File

@ -0,0 +1,16 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class TypesTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,68 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
class TypesAddTestCase(BaseTestGenerator):
""" This class will add type under schema node. """
scenarios = [
('Add type under schema node', dict(url='/browser/type/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a type.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a type.")
def runTest(self):
""" This function will add type under schema node. """
db_user = self.server["username"]
self.type_name = "test_type_add_%s" % (str(uuid.uuid4())[1:6])
data = {"name": self.type_name,
"is_sys_type": False,
"typtype": "c",
"typeowner": db_user,
"schema": self.schema_name,
"composite": [{"member_name": "one", "type": "abstime",
"is_tlength": False, "is_precision": False},
{"member_name": "two", "type": "\"char\"[]",
"is_tlength": False, "is_precision": False}],
"enum": [], "typacl": [], "seclabels": []}
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,65 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as types_utils
class TypesDeleteTestCase(BaseTestGenerator):
""" This class will delete type under schema node. """
scenarios = [
('Delete type under schema node', dict(url='/browser/type/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete a type.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete a type.")
self.type_name = "test_type_delete_%s" % (str(uuid.uuid4())[1:6])
self.type_id = types_utils.create_type(self.server, self.db_name,
self.schema_name, self.type_name
)
def runTest(self):
""" This function will delete type under schema node. """
type_response = types_utils.verify_type(self.server, self.db_name,
self.type_name)
if not type_response:
raise Exception("Could not find the type to delete.")
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.type_id
),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,61 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as types_utils
class TypesGetTestCase(BaseTestGenerator):
""" This class will get the type under schema node. """
scenarios = [
('Get type under schema node', dict(url='/browser/type/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to get a type.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to get a type.")
self.type_name = "test_type_get_%s" % (str(uuid.uuid4())[1:6])
self.type_id = types_utils.create_type(self.server, self.db_name,
self.schema_name, self.type_name
)
def runTest(self):
""" This function will get a type under schema node. """
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.type_id
),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,68 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as types_utils
class TypesUpdateTestCase(BaseTestGenerator):
""" This class will update type under schema node. """
scenarios = [
('Update type under schema node', dict(url='/browser/type/obj/'))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to update a type.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to update a type.")
self.type_name = "test_type_put_%s" % (str(uuid.uuid4())[1:6])
self.type_id = types_utils.create_type(self.server, self.db_name,
self.schema_name, self.type_name
)
def runTest(self):
""" This function will update type under schema node. """
type_response = types_utils.verify_type(self.server, self.db_name,
self.type_name)
if not type_response:
raise Exception("Could not find the type to update.")
data = {"id": self.type_id,
"description": "this is test comment."}
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.type_id
),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,82 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_type(server, db_name, schema_name, type_name):
"""
This function creates a type under provided schema.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param type_name: type name
:type type_name: str
:return type_id: type id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = 'CREATE TYPE %s.%s AS (one "char", two "char"[]); ' \
'ALTER TYPE %s.%s OWNER TO %s' % (schema_name, type_name,
schema_name, type_name,
server['username'])
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get 'oid' from newly created type
pg_cursor.execute(
"select oid from pg_type where typname='%s'" % type_name)
schema_type = pg_cursor.fetchone()
type_id = schema_type[0]
connection.close()
return type_id
except Exception:
traceback.print_exc(file=sys.stderr)
def verify_type(server, db_name, type_name):
"""
This function verifies type exist in database or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param type_name: type name
:type type_name: str
:return schema_type: type record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute(
"select oid from pg_type where typname='%s'" % type_name)
schema_type = pg_cursor.fetchone()
connection.close()
return schema_type
except Exception:
traceback.print_exc(file=sys.stderr)

View File

@ -0,0 +1,16 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class ViewsTestGenerator(BaseTestGenerator):
def generate_tests(self):
return

View File

@ -0,0 +1,95 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
class ViewsAddTestCase(BaseTestGenerator):
"""This class will add new view under schema node."""
view_name = "test_view_add_%s" % (str(uuid.uuid4())[1:6])
v_data = {"schema": "",
"owner": "",
"datacl": [],
"seclabels": [],
"name": view_name,
"definition": "SELECT 'Hello World';"
}
m_view_name = "test_mview_add_%s" % (str(uuid.uuid4())[1:6])
m_view_data = {"spcname": "pg_default",
"toast_autovacuum_enabled": False,
"autovacuum_enabled": False,
"schema": "",
"owner": "",
"vacuum_table": [
{"name": "autovacuum_analyze_scale_factor"},
{"name": "autovacuum_analyze_threshold"},
{"name": "autovacuum_freeze_max_age"},
{"name": "autovacuum_vacuum_cost_delay"},
{"name": "autovacuum_vacuum_cost_limit"},
{"name": "autovacuum_vacuum_scale_factor"},
{"name": "autovacuum_vacuum_threshold"},
{"name": "autovacuum_freeze_min_age"},
{"name": "autovacuum_freeze_table_age"}],
"vacuum_toast": [{"name": "autovacuum_freeze_max_age"},
{"name": "autovacuum_vacuum_cost_delay"},
{"name": "autovacuum_vacuum_cost_limit"},
{"name": "autovacuum_vacuum_scale_factor"},
{"name": "autovacuum_vacuum_threshold"},
{"name": "autovacuum_freeze_min_age"},
{"name": "autovacuum_freeze_table_age"}],
"datacl": [],
"seclabels": [],
"name": m_view_name,
"definition": "SELECT 'test_pgadmin';"}
scenarios = [
('Add view under schema node', dict(url='/browser/view/obj/',
data=v_data)),
('Add materialized view under schema node',
dict(url='/browser/mview/obj/', data=m_view_data))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add view.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add the view.")
def runTest(self):
"""This function will add view under schema node."""
db_user = self.server["username"]
self.data["schema"] = self.schema_name
self.data["owner"] = db_user
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id)
+ '/' + str(self.db_id) + '/' + str(self.schema_id) + '/',
data=json.dumps(self.data), content_type='html/json')
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,79 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as views_utils
class ViewsDeleteTestCase(BaseTestGenerator):
"""This class will delete the view/mview under schema node."""
view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
"ALTER TABLE %s.%s OWNER TO %s"
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
"SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
" TO %s"
scenarios = [
('Delete view under schema node', dict(url='/browser/view/obj/',
view_name=
"test_view_delete_%s" %
(str(uuid.uuid4())[1:6]),
sql_query=view_sql)),
('Delete materialized view under schema node',
dict(url='/browser/mview/obj/',
view_name="test_mview_delete_%s" % (str(uuid.uuid4())[1:6]),
sql_query=m_view_sql))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to delete view.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to delete the view.")
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.sql_query,
self.view_name)
def runTest(self):
"""This function will delete the view/mview under schema node."""
view_response = views_utils.verify_view(self.server, self.db_name,
self.view_name)
if not view_response:
raise Exception("Could not find the view to delete.")
response = self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.view_id
),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,75 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as views_utils
class ViewsGetTestCase(BaseTestGenerator):
"""This class will fetch the view under schema node."""
view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
"ALTER TABLE %s.%s OWNER TO %s"
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
"SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
" TO %s"
scenarios = [
('Get view under schema node', dict(url='/browser/view/obj/',
view_name=
"test_view_get_%s" %
(str(uuid.uuid4())[1:6]),
sql_query=view_sql)),
('Get materialized view under schema node',
dict(url='/browser/mview/obj/',
view_name="test_mview_get_%s" % (str(uuid.uuid4())[1:6]),
sql_query=m_view_sql))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to fetch the view.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to fetch the view.")
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.sql_query,
self.view_name)
def runTest(self):
"""This function will fetch the view/mview under schema node."""
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.view_id
),
follow_redirects=True
)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,83 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
import json
from regression import test_utils as utils
from regression import parent_node_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as views_utils
class ViewsUpdateTestCase(BaseTestGenerator):
"""This class will update the view/mview under schema node."""
view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
"ALTER TABLE %s.%s OWNER TO %s"
m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
"SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
" TO %s"
scenarios = [
('Update view under schema node', dict(url='/browser/view/obj/',
view_name=
"test_view_put_%s" %
(str(uuid.uuid4())[1:6]),
sql_query=view_sql)),
('Update materialized view under schema node',
dict(url='/browser/mview/obj/',
view_name="test_mview_put_%s" % (str(uuid.uuid4())[1:6]),
sql_query=m_view_sql))
]
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to update a view.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to update a view.")
self.view_id = views_utils.create_view(self.server,
self.db_name,
self.schema_name,
self.sql_query,
self.view_name)
def runTest(self):
"""This function will update the view/mview under schema node."""
view_response = views_utils.verify_view(self.server, self.db_name,
self.view_name)
if not view_response:
raise Exception("Could not find the view to update.")
data = {"id": self.view_id,
"comment": "This is test comment"
}
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.view_id
),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -0,0 +1,84 @@
# ##########################################################################
#
# #pgAdmin 4 - PostgreSQL Tools
#
# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
# #This software is released under the PostgreSQL Licence
#
# ##########################################################################
from __future__ import print_function
import traceback
import sys
from regression import test_utils as utils
def create_view(server, db_name, schema_name, sql_query, view_name):
"""
This function creates a table under provided schema.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param schema_name: schema name
:type schema_name: str
:param sql_query: sql query to create view
:type sql_query: str
:param view_name: view name
:type view_name: str
:return view_id: view id
:rtype: int
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
query = sql_query % (schema_name, view_name, schema_name, view_name,
server['username'])
pg_cursor.execute(query)
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Get 'oid' from newly created view
pg_cursor.execute("select oid from pg_class where relname='%s'" %
view_name)
view = pg_cursor.fetchone()
view_id = view[0]
connection.close()
return view_id
except Exception:
traceback.print_exc(file=sys.stderr)
raise
def verify_view(server, db_name, view_name):
"""
This function verifies view exist in database or not.
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param view_name: view name
:type view_name: str
:return table: table record from database
:rtype: tuple
"""
try:
connection = utils.get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
pg_cursor = connection.cursor()
pg_cursor.execute("select oid from pg_class where relname='%s'" %
view_name)
view = pg_cursor.fetchone()
connection.close()
return view
except Exception:
traceback.print_exc(file=sys.stderr)
raise

View File

@ -136,7 +136,7 @@ def connect_database(self, server_group, server_id, db_id):
db_con = self.tester.post('{0}{1}/{2}/{3}'.format(
DATABASE_CONNECT_URL, server_group, server_id, db_id),
follow_redirects=True)
self.assertEquals(db_con.status_code, 200)
assert db_con.status_code == 200
db_con = json.loads(db_con.data.decode('utf-8'))
return db_con
@ -146,4 +146,4 @@ def disconnect_database(self, server_id, db_id):
db_con = self.tester.delete('{0}{1}/{2}/{3}'.format(
'browser/database/connect/', utils.SERVER_GROUP, server_id, db_id),
follow_redirects=True)
self.assertEquals(db_con.status_code, 200)
assert db_con.status_code == 200