Testsuite fixes for 9.3 and 9.2 support.

This commit is contained in:
Navnath Gadakh 2017-02-28 14:22:10 +00:00 committed by Dave Page
parent bc521dfc49
commit 1f935d699e
45 changed files with 203 additions and 95 deletions

View File

@ -28,8 +28,8 @@ class CastsDeleteTestCase(BaseTestGenerator):
self.database_info = parent_node_dict['database'][-1]
self.db_name = self.database_info['db_name']
self.server["db"] = self.db_name
self.source_type = 'circle'
self.target_type = 'line'
self.source_type = 'money'
self.target_type = 'bigint'
self.cast_id = cast_utils.create_cast(self.server, self.source_type,
self.target_type)

View File

@ -18,6 +18,7 @@ from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
from regression import test_utils as utils
from regression import parent_node_dict
from regression import trigger_funcs_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class EventTriggerAddTestCase(BaseTestGenerator):
@ -38,8 +39,19 @@ class EventTriggerAddTestCase(BaseTestGenerator):
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:6]
self.db_user = self.server["username"]
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
message = "Event triggers are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, self.func_name)
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
def runTest(self):
""" This function will add event trigger under test database. """

View File

@ -18,6 +18,7 @@ from regression import test_utils as utils
from regression import parent_node_dict
from regression import trigger_funcs_utils
from . import utils as event_trigger_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class EventTriggerDeleteTestCase(BaseTestGenerator):
@ -39,9 +40,20 @@ class EventTriggerDeleteTestCase(BaseTestGenerator):
self.db_user = self.server["username"]
self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:6]
self.trigger_name = "event_trigger_delete_%s" % (
str(uuid.uuid4())[1:6])
str(uuid.uuid4())[1:6])
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
message = "Event triggers are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, self.func_name)
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
self.event_trigger_id = event_trigger_utils.create_event_trigger(
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)

View File

@ -18,6 +18,7 @@ from regression import test_utils as utils
from regression import parent_node_dict
from regression import trigger_funcs_utils
from . import utils as event_trigger_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class EventTriggerGetTestCase(BaseTestGenerator):
@ -39,8 +40,19 @@ class EventTriggerGetTestCase(BaseTestGenerator):
self.db_user = self.server["username"]
self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:6]
self.trigger_name = "event_trigger_get_%s" % (str(uuid.uuid4())[1:6])
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
message = "Event triggers are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, self.func_name)
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
self.event_trigger_id = event_trigger_utils.create_event_trigger(
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)

View File

@ -18,6 +18,7 @@ from regression import test_utils as utils
from regression import parent_node_dict
from regression import trigger_funcs_utils
from . import utils as event_trigger_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class EventTriggerPutTestCase(BaseTestGenerator):
@ -39,8 +40,19 @@ class EventTriggerPutTestCase(BaseTestGenerator):
self.db_user = self.server["username"]
self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:6]
self.trigger_name = "event_trigger_put_%s" % (str(uuid.uuid4())[1:6])
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
message = "Event triggers are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, self.func_name)
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
self.event_trigger_id = event_trigger_utils.create_event_trigger(
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)
@ -68,9 +80,9 @@ class EventTriggerPutTestCase(BaseTestGenerator):
if not trigger_response:
raise Exception("Could not find event trigger.")
data = {
"comment": "This is event trigger update comment",
"id": self.event_trigger_id
}
"comment": "This is event trigger update comment",
"id": self.event_trigger_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
@ -82,4 +94,3 @@ class EventTriggerPutTestCase(BaseTestGenerator):
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@ -36,7 +36,7 @@ class ExtensionsDeleteTestCase(BaseTestGenerator):
self.server_id = self.schema_data['server_id']
self.db_id = self.schema_data['db_id']
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.extension_id = extension_utils.create_extension(
self.server, self.db_name, self.extension_name, self.schema_name)

View File

@ -28,7 +28,7 @@ class ExtensionsGetTestCase(BaseTestGenerator):
self.server_id = self.schema_data['server_id']
self.db_id = self.schema_data['db_id']
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.extension_id = extension_utils.create_extension(
self.server, self.db_name, self.extension_name, self.schema_name)

View File

@ -29,7 +29,7 @@ class ExtensionsPutTestCase(BaseTestGenerator):
self.server_id = self.schema_data['server_id']
self.db_id = self.schema_data['db_id']
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.extension_id = extension_utils.create_extension(
self.server, self.db_name, self.extension_name, self.schema_name)

View File

@ -15,10 +15,9 @@ from regression.test_utils import get_db_connection
def get_extension_data(schema_name):
data = {
"name": "postgres_fdw",
"name": "cube",
"relocatable": "true",
"schema": schema_name,
"version": "1.0"
"schema": schema_name
}
return data

View File

@ -37,7 +37,7 @@ class ForeignServerAddTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.fdw_name = "fdw_{0}".format(str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(
self.server, self.db_name, self.extension_name, self.schema_name)

View File

@ -35,7 +35,7 @@ class ForeignServerDeleteTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.fdw_name = "test_fdw_%s" % (str(uuid.uuid4())[1:6])
self.fsrv_name = "test_fsrv_%s" % (str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(

View File

@ -35,7 +35,7 @@ class ForeignServerGetTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.fdw_name = "fdw_%s" % (str(uuid.uuid4())[1:6])
self.fsrv_name = "test_fsrv_add_%s" % (str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(

View File

@ -36,7 +36,7 @@ class ForeignServerPutTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.fdw_name = "fdw_%s" % (str(uuid.uuid4())[1:6])
self.fsrv_name = "test_fsrv_put_%s" % (str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(

View File

@ -38,7 +38,7 @@ class UserMappingAddTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.fdw_name = "fdw_%s" % (str(uuid.uuid4())[1:6])
self.fsrv_name = "fsrv_%s" % (str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(

View File

@ -37,7 +37,7 @@ class UserMappingDeleteTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.fdw_name = "fdw_%s" % (str(uuid.uuid4())[1:6])
self.fsrv_name = "fsrv_%s" % (str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(

View File

@ -37,7 +37,7 @@ class UserMappingGetTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.fdw_name = "fdw_%s" % (str(uuid.uuid4())[1:6])
self.fsrv_name = "fsrv_%s" % (str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(

View File

@ -38,7 +38,7 @@ class UserMappingPutTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.extension_name = "cube"
self.fdw_name = "fdw_%s" % (str(uuid.uuid4())[1:6])
self.fsrv_name = "fsrv_%s" % (str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(

View File

@ -33,10 +33,7 @@ class FDWDAddTestCase(BaseTestGenerator):
self.server_id = self.schema_data['server_id']
self.db_id = self.schema_data['db_id']
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.extension_id = extension_utils.create_extension(
self.server, self.db_name, self.extension_name, self.schema_name)
def runTest(self):
"""This function will add foreign data wrapper under test database."""
@ -46,10 +43,6 @@ class FDWDAddTestCase(BaseTestGenerator):
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
extension_response = extension_utils.verify_extension(
self.server, self.db_name, self.extension_name)
if not extension_response:
raise Exception("Could not find extension.")
self.data = fdw_utils.get_fdw_data(self.schema_name,
self.server['username'])
response = self.tester.post(
@ -62,7 +55,5 @@ class FDWDAddTestCase(BaseTestGenerator):
def tearDown(self):
"""This function disconnect the test database and
drop added extension."""
extension_utils.drop_extension(self.server, self.db_name,
self.extension_name)
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -34,10 +34,7 @@ class FDWDDeleteTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.fdw_name = "fdw_{0}".format(str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(
self.server, self.db_name, self.extension_name, self.schema_name)
self.fdw_id = fdw_utils.create_fdw(self.server, self.db_name,
self.fdw_name)
@ -50,10 +47,6 @@ class FDWDDeleteTestCase(BaseTestGenerator):
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
extension_response = extension_utils.verify_extension(
self.server, self.db_name, self.extension_name)
if not extension_response:
raise Exception("Could not find extension.")
fdw_response = fdw_utils.verify_fdw(self.server, self.db_name,
self.fdw_name)
if not fdw_response:
@ -68,7 +61,5 @@ class FDWDDeleteTestCase(BaseTestGenerator):
def tearDown(self):
"""This function disconnect the test database and drop added extension
and dependant objects."""
extension_utils.drop_extension(self.server, self.db_name,
self.extension_name)
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -35,10 +35,7 @@ class FDWDGetTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.fdw_name = "fdw_{0}".format(str(uuid.uuid4())[1:4])
self.extension_id = extension_utils.create_extension(
self.server, self.db_name, self.extension_name, self.schema_name)
self.fdw_id = fdw_utils.create_fdw(self.server, self.db_name,
self.fdw_name)
@ -51,10 +48,6 @@ class FDWDGetTestCase(BaseTestGenerator):
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
extension_response = extension_utils.verify_extension(
self.server, self.db_name, self.extension_name)
if not extension_response:
raise Exception("Could not find extension.")
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
@ -65,7 +58,5 @@ class FDWDGetTestCase(BaseTestGenerator):
def tearDown(self):
"""This function disconnect the test database and drop added extension
and dependant objects."""
extension_utils.drop_extension(self.server, self.db_name,
self.extension_name)
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -35,10 +35,7 @@ class FDWDPutTestCase(BaseTestGenerator):
self.db_id = self.schema_data['db_id']
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = self.schema_data['schema_name']
self.extension_name = "postgres_fdw"
self.fdw_name = "fdw_put_%s".format(str(uuid.uuid4())[1:6])
self.extension_id = extension_utils.create_extension(
self.server, self.db_name, self.extension_name, self.schema_name)
self.fdw_id = fdw_utils.create_fdw(self.server, self.db_name,
self.fdw_name)
@ -51,10 +48,6 @@ class FDWDPutTestCase(BaseTestGenerator):
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to database.")
extension_response = extension_utils.verify_extension(
self.server, self.db_name, self.extension_name)
if not extension_response:
raise Exception("Could not find extension.")
fdw_response = fdw_utils.verify_fdw(self.server, self.db_name,
self.fdw_name)
if not fdw_response:
@ -74,7 +67,5 @@ class FDWDPutTestCase(BaseTestGenerator):
def tearDown(self):
"""This function disconnect the test database and drop added extension
and dependant objects."""
extension_utils.drop_extension(self.server, self.db_name,
self.extension_name)
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@ -31,10 +31,8 @@ def get_fdw_data(schema_name, db_user):
]
}
],
"fdwhan": "%s.%s" % (schema_name, "postgres_fdw_handler"),
"fdwoptions": [],
"fdwowner": db_user,
"fdwvalue": "%s.%s" % (schema_name, "postgres_fdw_validator"),
"name": "fdw_add_%s" % (str(uuid.uuid4())[1:6])
}
return data

View File

@ -55,11 +55,11 @@ class FtsParserAddTestCase(BaseTestGenerator):
{
"name": "fts_parser_%s" % str(uuid.uuid4())[1:4],
"schema": self.schema_id,
"prsend": "btfloat4sortsupport",
"prsend": "prsd_end",
"prsheadline": "prsd_headline",
"prslextype": "dsynonym_init",
"prsstart": "int4_accum",
"prstoken": "gist_box_penalty"
"prsstart": "prsd_start",
"prstoken": "prsd_nexttoken"
}
response = self.tester.post(

View File

@ -31,8 +31,8 @@ def create_fts_parser(server, db_name, schema_name, fts_parser_name):
pg_cursor.execute(query)
query = "CREATE TEXT SEARCH PARSER " + schema_name + "." + fts_parser_name + \
"(START=int4_accum, GETTOKEN=gist_box_penalty, " \
"END=btfloat4sortsupport, LEXTYPES=dsynonym_init)"
"(START=prsd_start, GETTOKEN=prsd_nexttoken, " \
"END=prsd_end, LEXTYPES=dispell_init)"
pg_cursor.execute(query)
connection.commit()

View File

@ -16,6 +16,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class TriggerFuncAddTestCase(BaseTestGenerator):
@ -32,6 +33,15 @@ class TriggerFuncAddTestCase(BaseTestGenerator):
schema_info = parent_node_dict["schema"][-1]
server_id = schema_info["server_id"]
db_id = schema_info["db_id"]
prorettypename = "event_trigger/trigger"
server_con = server_utils.connect_server(self, server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
prorettypename = "trigger"
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
server_id, db_id)
if not db_con['data']["connected"]:
@ -66,7 +76,7 @@ class TriggerFuncAddTestCase(BaseTestGenerator):
"options": [],
"proleakproof": True,
"pronamespace": 2200,
"prorettypename": "event_trigger/trigger",
"prorettypename": prorettypename,
"prosecdef": True,
"prosrc": "BEGIN RAISE EXCEPTION 'command % is disabled',"
" tg_tag; END;",

View File

@ -31,16 +31,25 @@ class TriggerFuncDeleteTestCase(BaseTestGenerator):
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = parent_node_dict["schema"][-1]["schema_name"]
self.schema_id = parent_node_dict["schema"][-1]["schema_id"]
func_name = "test_event_delete_%s" % str(uuid.uuid4())[1:6]
db_user = self.server["username"]
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, func_name)
def runTest(self):
""" This function will delete trigger function under database node. """
schema_info = parent_node_dict["schema"][-1]
server_id = schema_info["server_id"]
db_id = schema_info["db_id"]
func_name = "test_event_delete_%s" % str(uuid.uuid4())[1:6]
server_con = server_utils.connect_server(self, server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
server_version = server_con["data"]["version"]
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, func_name,
server_version)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
server_id, db_id)
if not db_con['data']["connected"]:

View File

@ -16,6 +16,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as trigger_funcs_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class TriggerFuncGetTestCase(BaseTestGenerator):
@ -30,16 +31,26 @@ class TriggerFuncGetTestCase(BaseTestGenerator):
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = parent_node_dict["schema"][-1]["schema_name"]
self.schema_id = parent_node_dict["schema"][-1]["schema_id"]
func_name = "test_event_get_%s" % str(uuid.uuid4())[1:6]
db_user = self.server["username"]
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, func_name)
def runTest(self):
""" This function will delete trigger function under database node. """
schema_info = parent_node_dict["schema"][-1]
server_id = schema_info["server_id"]
db_id = schema_info["db_id"]
func_name = "test_event_get_%s" % str(uuid.uuid4())[1:6]
db_user = self.server["username"]
server_con = server_utils.connect_server(self, server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
server_version = server_con["data"]["version"]
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, func_name,
server_version)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
server_id, db_id)
if not db_con['data']["connected"]:

View File

@ -17,6 +17,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as trigger_funcs_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class TriggerFuncPutTestCase(BaseTestGenerator):
@ -31,16 +32,24 @@ class TriggerFuncPutTestCase(BaseTestGenerator):
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.schema_name = parent_node_dict["schema"][-1]["schema_name"]
self.schema_id = parent_node_dict["schema"][-1]["schema_id"]
func_name = "test_event_put_%s" % str(uuid.uuid4())[1:6]
db_user = self.server["username"]
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, func_name)
def runTest(self):
""" This function will update trigger function under database node. """
schema_info = parent_node_dict["schema"][-1]
server_id = schema_info["server_id"]
db_id = schema_info["db_id"]
func_name = "test_event_put_%s" % str(uuid.uuid4())[1:6]
server_con = server_utils.connect_server(self, server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
server_version = server_con["data"]["version"]
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, func_name,
server_version)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
server_id, db_id)
if not db_con['data']["connected"]:

View File

@ -13,7 +13,8 @@ import sys
from regression import test_utils as utils
def create_trigger_function(server, db_name, schema_name, func_name):
def create_trigger_function(server, db_name, schema_name, func_name,
server_version=0):
"""This function add the trigger function to schema"""
try:
connection = utils.get_db_connection(db_name,
@ -22,10 +23,13 @@ def create_trigger_function(server, db_name, schema_name, func_name):
server['host'],
server['port'])
pg_cursor = connection.cursor()
r_type = "event_trigger"
if server_version != 0:
r_type = "trigger"
query = "CREATE FUNCTION "+schema_name+"."+func_name+"()" \
" RETURNS event_trigger LANGUAGE 'plpgsql' STABLE LEAKPROOF" \
" RETURNS {0} LANGUAGE 'plpgsql' STABLE LEAKPROOF" \
" SECURITY DEFINER SET enable_sort=true AS $BODY$ BEGIN" \
" NULL; END; $BODY$"
" NULL; END; $BODY$".format(r_type)
pg_cursor.execute(query)
connection.commit()
# Get 'oid' from newly created function

View File

@ -42,7 +42,7 @@ class PackageAddTestCase(BaseTestGenerator):
if server_con:
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Packages not supported by PostgreSQL."
message = "Packages are not supported by PG."
self.skipTest(message)
def runTest(self):

View File

@ -44,7 +44,7 @@ class PackageDeleteTestCase(BaseTestGenerator):
if server_con:
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Packages not supported by PostgreSQL."
message = "Packages are not supported by PG."
self.skipTest(message)
self.package_id = package_utils.create_package(self.server,

View File

@ -44,7 +44,7 @@ class PackageGetTestCase(BaseTestGenerator):
if server_con:
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Packages not supported by PostgreSQL."
message = "Packages are not supported by PG."
self.skipTest(message)
self.package_id = package_utils.create_package(self.server,

View File

@ -46,7 +46,7 @@ class PackagePutTestCase(BaseTestGenerator):
if server_con:
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Packages not supported by PostgreSQL."
message = "Packages are not supported by PG."
self.skipTest(message)
self.package_id = package_utils.create_package(self.server,

View File

@ -38,7 +38,7 @@ class SynonymAddTestCase(BaseTestGenerator):
if server_con:
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Synonym not supported by PG."
message = "Synonym are not supported by PG."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)

View File

@ -38,7 +38,7 @@ class SynonymDeleteTestCase(BaseTestGenerator):
if server_con:
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Synonym not supported by PG."
message = "Synonym are not supported by PG."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)

View File

@ -38,7 +38,7 @@ class SynonymGetTestCase(BaseTestGenerator):
if server_con:
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Synonym not supported by PG."
message = "Synonym are not supported by PG."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)

View File

@ -39,11 +39,15 @@ class SynonymPutTestCase(BaseTestGenerator):
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
self.server_version = 0
if server_con:
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Synonym not supported by PG."
message = "Synonym are not supported by PG."
self.skipTest(message)
else:
if server_con["data"]["version"] >= 90200:
self.server_version = server_con["data"]["version"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
@ -76,7 +80,8 @@ class SynonymPutTestCase(BaseTestGenerator):
raise Exception("No synonym node to update.")
func_name = "test_function_synonym_%s" % str(uuid.uuid4())[1:6]
self.table_id = functions_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, func_name)
self.server, self.db_name, self.schema_name, func_name,
self.server_version)
data = {
"name": self.synonym_name,

View File

@ -16,6 +16,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class ViewsAddTestCase(BaseTestGenerator):
@ -67,6 +68,13 @@ class ViewsAddTestCase(BaseTestGenerator):
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:

View File

@ -16,6 +16,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as views_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class ViewsDeleteTestCase(BaseTestGenerator):
@ -42,6 +43,13 @@ class ViewsDeleteTestCase(BaseTestGenerator):
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:

View File

@ -16,6 +16,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as views_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class ViewsGetTestCase(BaseTestGenerator):
@ -42,6 +43,13 @@ class ViewsGetTestCase(BaseTestGenerator):
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:

View File

@ -17,6 +17,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as views_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class ViewsUpdateTestCase(BaseTestGenerator):
@ -43,6 +44,13 @@ class ViewsUpdateTestCase(BaseTestGenerator):
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["data"]["version"] < 90300 and "mview" in self.url:
message = "Materialized Views are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:

View File

@ -32,6 +32,11 @@ class ResourceGroupsAddTestCase(BaseTestGenerator):
if server_con["data"]["type"] == "pg":
message = "Resource groups are not supported by PG."
self.skipTest(message)
else:
if server_con["data"]["version"] < 90400:
message = "Resource groups are not supported by PPAS 9.3" \
" and below."
self.skipTest(message)
def runTest(self):
"""This function will add resource groups under server node"""

View File

@ -29,10 +29,15 @@ class ResourceGroupsDeleteTestCase(BaseTestGenerator):
"groups.")
if "type" in server_response["data"]:
if server_response["data"]["type"] == "pg":
message = "Resource groupa are not supported by PG."
message = "Resource groups are not supported by PG."
self.skipTest(message)
else:
if server_response["data"]["version"] < 90400:
message = "Resource groups are not supported by PPAS 9.3 " \
"and below."
self.skipTest(message)
self.resource_group = "test_resource_group_delete%s" % \
str(uuid.uuid4())[1:6]
str(uuid.uuid4())[1:6]
self.resource_group_id = resource_groups_utils.create_resource_groups(
self.server, self.resource_group)

View File

@ -32,6 +32,11 @@ class ResourceGroupsPutTestCase(BaseTestGenerator):
if server_response["data"]["type"] == "pg":
message = "Resource groups are not supported by PG."
self.skipTest(message)
else:
if server_response["data"]["version"] < 90400:
message = "Resource groups are not supported by PPAS 9.3" \
" and below."
self.skipTest(message)
self.resource_group_name = "test_resource_group_put%s" % \
str(uuid.uuid4())[1:6]
self.resource_group_id = resource_groups_utils.create_resource_groups(

View File

@ -31,6 +31,11 @@ class ResourceGroupsGetTestCase(BaseTestGenerator):
if server_response["data"]["type"] == "pg":
message = "Resource groups are not supported by PG."
self.skipTest(message)
else:
if server_response["data"]["version"] < 90400:
message = "Resource groups are not supported by PPAS 9.3" \
" and below."
self.skipTest(message)
self.resource_group = "test_resource_group_get%s" % \
str(uuid.uuid4())[1:6]
self.resource_group_id = resource_groups_utils.create_resource_groups(