Add schema and database child node regression tests.

This commit is contained in:
Priyanka Shendge 2016-08-23 11:50:41 +01:00 committed by Dave Page
parent 0ce8b031f8
commit a535eddfcd
51 changed files with 3998 additions and 21 deletions

View File

@ -0,0 +1,16 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
class CastTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,65 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as cast_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class CastsAddTestCase(BaseTestGenerator):
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will add cast under database node. """
cast_utils.add_cast(self.tester)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added cast, database, server and the
'parent_id.pkl' file which is created in setUpClass.
:return: None
"""
cast_utils.delete_cast(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,102 @@
# # #################################################################
# #
# # pgAdmin 4 - PostgreSQL Tools
# #
# # Copyright (C) 2013 - 2016, The pgAdmin Development Team
# # This software is released under the PostgreSQL Licence
# #
# # ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_utils import get_ids
from . import utils as cast_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from regression.test_setup import advanced_config_data
import json
class CastsDeleteTestCase(BaseTestGenerator):
""" This class will fetch the cast node added under database node. """
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add cast(s) to databases
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add cast(s) to database(s)
cast_utils.add_cast(cls.tester)
def runTest(self):
""" This function will delete added cast(s)."""
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
cast_ids_dict = all_id["cid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id,
db_id)
if len(db_con) == 0:
raise Exception("No database(s) to delete for server id %s"
% server_id)
cast_id = cast_ids_dict[server_id]
cast_get_data = cast_utils.verify_cast(self.tester,
utils.SERVER_GROUP,
server_id,
db_id, cast_id)
if cast_get_data.status_code == 200:
delete_response = self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(cast_id),
follow_redirects=True)
response_data = json.loads(delete_response.data.decode('utf-8'))
self.assertTrue(response_data['success'], 1)
@classmethod
def tearDownClass(cls):
"""
This function delete the added cast, database, server and the
'parent_id.pkl' file which is created in setUpClass.
:return: None
"""
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,87 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_utils import get_ids
from . import utils as cast_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
class CastsGetTestCase(BaseTestGenerator):
""" This class will fetch the cast node added under database node. """
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function used to add the sever, database, and cast
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
cast_utils.add_cast(cls.tester)
def runTest(self):
""" This function will get added cast."""
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
cast_ids_dict = all_id["cid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id,
db_id)
if len(db_con) == 0:
raise Exception("No database(s) to delete for server id %s"
% server_id)
cast_id = cast_ids_dict[server_id]
cast_get_data = cast_utils.verify_cast(self.tester,
utils.SERVER_GROUP,
server_id,
db_id, cast_id)
self.assertEquals(cast_get_data.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added cast, database, server and the
'parent_id.pkl' file which is created in setup() function.
:return: None
"""
cast_utils.delete_cast(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,107 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression.test_utils import get_ids
from . import utils as cast_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from regression.test_setup import advanced_config_data
import json
class CastsPutTestCase(BaseTestGenerator):
""" This class will fetch the cast node added under database node. """
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add cast(s) to databases
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
cast_utils.add_cast(cls.tester)
def runTest(self):
""" This function will update added cast."""
all_id = get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
cast_ids_dict = all_id["cid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(self.tester,
utils.SERVER_GROUP,
server_id,
db_id)
if len(db_con) == 0:
raise Exception("No database(s) to delete for server id %s"
% server_id)
cast_id = cast_ids_dict[server_id]
cast_get_data = cast_utils.verify_cast(self.tester,
utils.SERVER_GROUP,
server_id,
db_id, cast_id)
if cast_get_data.status_code == 200:
data = {
"description": advanced_config_data["cast_update_data"]
["comment"],
"id": cast_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(
db_id) +
'/' + str(cast_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added cast, database, server and the
'parent_id.pkl' file which is created in setUpClass.
:return: None
"""
cast_utils.delete_cast(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,145 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import os
import pickle
import json
from regression.test_setup import advanced_config_data, pickle_path
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
CAST_URL = '/browser/cast/obj/'
def get_cast_config_data(server_connect_data):
adv_config_data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['casts_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
data = {
"castcontext": adv_config_data
['cast_context'],
"encoding": adv_config_data
['encoding'],
"name": adv_config_data
['name'],
"srctyp": adv_config_data
['source_type'],
"trgtyp": adv_config_data
['target_type']
}
return data
def add_cast(tester):
"""
This function add the cast in the existing database
:param tester: test object
:type tester: flask test object
:return:None
"""
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
server_group = utils.config_data['server_group']
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, server_group,
server_id, db_id)
if db_con['data']['connected']:
server_connect_response = server_utils.verify_server(
tester, server_group, server_id)
data = get_cast_config_data(server_connect_response)
response = tester.post(CAST_URL + str(server_group) + '/' +
str(server_id) + '/' + str(
db_id) + '/',
data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_cast_info(response_data, server_id)
def write_cast_info(response_data, server_id):
"""
This function writes the server's details to file parent_id.pkl
:param response_data: server's data
:type response_data: list of dictionary
:param pickle_id_dict: contains ids of server,database,tables etc.
:type pickle_id_dict: dict
:return: None
"""
cast_id = response_data['node']['_id']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'cid' in pickle_id_dict:
if pickle_id_dict['cid']:
# Add the cast_id as value in dict
pickle_id_dict["cid"][0].update({server_id: cast_id})
else:
# Create new dict with server_id and cast_id
pickle_id_dict["cid"].append({server_id: cast_id})
cast_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, cast_output)
cast_output.close()
def verify_cast(tester, server_group, server_id, db_id, cast_id):
cast_response = tester.get(CAST_URL + str(server_group) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(cast_id),
content_type='html/json')
return cast_response
def delete_cast(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
cast_ids_dict = all_id["cid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id,
db_id)
if len(db_con) == 0:
raise Exception("No database(s) to delete for server id %s"
% server_id)
cast_id = cast_ids_dict[server_id]
cast_get_data = verify_cast(tester, utils.SERVER_GROUP,
server_id,
db_id, cast_id)
if cast_get_data.status_code == 200:
delete_response = tester.delete(
CAST_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(cast_id),
follow_redirects=True)
return delete_response

View File

@ -0,0 +1,16 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
class EventTriggerGeneratorTestCase(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,79 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as func_utils
from . import utils as event_trigger_utils
class EventTriggerAddTestCase(BaseTestGenerator):
""" This class will add new event trigger under schema node. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add trigger function(s) to schema(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
func_utils.add_trigger_function(cls.tester, cls.server_connect_response,
cls.server_ids)
def runTest(self):
""" This function will add event trigger under database node. """
event_trigger_utils.add_event_trigger(self.tester)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
:return: None
"""
event_trigger_utils.delete_event_trigger(cls.tester)
func_utils.delete_trigger_function(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,88 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as func_utils
from . import utils as event_trigger_utils
import json
class EventTriggerDeleteTestCase(BaseTestGenerator):
""" This class will fetch added event trigger under database node. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add trigger function(s) to schema(s)
5. Add event trigger(s) to database(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
func_utils.add_trigger_function(cls.tester, cls.server_connect_response,
cls.server_ids)
event_trigger_utils.add_event_trigger(cls.tester)
def runTest(self):
""" This function will delete event trigger under database node. """
del_response = event_trigger_utils.delete_event_trigger(self.tester)
del_respdata = json.loads(del_response.data.decode("utf-8"))
self.assertTrue(del_respdata['success'], 1)
@classmethod
def tearDownClass(cls):
"""
This function delete the added schema, database, server and parent
id file
:return: None
"""
func_utils.delete_trigger_function(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,97 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import\
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as func_utils
from . import utils as event_trigger_utils
class EventTriggerGetTestCase(BaseTestGenerator):
""" This class will fetch added event trigger under schema node. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add trigger function(s) to schema(s)
5. Add event trigger(s) to database(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
func_utils.add_trigger_function(cls.tester, cls.server_connect_response,
cls.server_ids)
event_trigger_utils.add_event_trigger(cls.tester)
def runTest(self):
""" This function will fetch event trigger under database node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
event_trigger_ids_dict = all_id["etid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
event_trigger_id = event_trigger_ids_dict[server_id]
response = event_trigger_utils.verify_event_trigger(
self.tester, utils.SERVER_GROUP, server_id, db_id,
event_trigger_id)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
:return: None
"""
event_trigger_utils.delete_event_trigger(cls.tester)
func_utils.delete_trigger_function(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,114 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as func_utils
from . import utils as event_trigger_utils
import json
from regression.test_setup import advanced_config_data
class EventTriggerPutTestCase(BaseTestGenerator):
""" This class will fetch added event trigger under database node. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add trigger function(s) to schema(s)
5. Add event trigger(s) to database(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
func_utils.add_trigger_function(cls.tester, cls.server_connect_response,
cls.server_ids)
event_trigger_utils.add_event_trigger(cls.tester)
def runTest(self):
""" This function will update event trigger under database node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
event_trigger_ids_dict = all_id["etid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
event_trigger_id = event_trigger_ids_dict[server_id]
response = event_trigger_utils.verify_event_trigger(
self.tester, utils.SERVER_GROUP, server_id, db_id,
event_trigger_id)
if response.status_code == 200:
data = \
{
"comment":
advanced_config_data['event_trigger_update_data']
['comment'],
"id": event_trigger_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(event_trigger_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
:return: None
"""
event_trigger_utils.delete_event_trigger(cls.tester)
func_utils.delete_trigger_function(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,148 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import os
import pickle
import json
import uuid
from regression.test_setup import advanced_config_data, pickle_path
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
import utils as func_utils
EVENT_TRIGGER_URL = '/browser/event_trigger/obj/'
def get_event_trigger_config_data(schema_name, server_connect_data,
trigger_func_name):
adv_config_data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['event_trigger_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
data = {
"enabled": adv_config_data['enable'],
"eventfunname": "{0}.{1}".format(schema_name, trigger_func_name),
"eventname": adv_config_data['event_name'],
"eventowner": adv_config_data['owner'],
"name": "event_trigger_{}".format(str(uuid.uuid4())[1:4]),
"providers": adv_config_data['provider']
}
return data
def add_event_trigger(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_info_dict = all_id["scid"][0]
trigger_func_info_dict = all_id["tfnid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
server_connect_response = server_utils.verify_server(
tester, utils.SERVER_GROUP, server_id)
schema_info = schema_info_dict[int(server_id)]
trigger_func_list = trigger_func_info_dict[int(server_id)]
trigger_func_info = \
filter(lambda x: x[2] == "event_trigger", trigger_func_list)[0]
trigger_func_name = trigger_func_info[1].replace("()", "")
trigger_func_id = trigger_func_info[0]
trigger_func_response = \
func_utils.verify_trigger_function(tester, server_id,
db_id, schema_info[0],
trigger_func_id)
if trigger_func_response.status_code == 200:
data = get_event_trigger_config_data(
schema_info[1],
server_connect_response, trigger_func_name)
response = tester.post(
EVENT_TRIGGER_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/', data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_event_trigger_info(response_data, server_id)
def write_event_trigger_info(response_data, server_id):
"""
This function writes the schema id into parent_id.pkl
:param response_data: extension add response data
:type response_data: dict
:param server_id: server id
:type server_id: str
:return: None
"""
event_trigger_id = response_data['node']['_id']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'etid' in pickle_id_dict:
if pickle_id_dict['etid']:
# Add the event_trigger_id as value in dict
pickle_id_dict["etid"][0].update({server_id: event_trigger_id})
else:
# Create new dict with server_id and event_trigger_id
pickle_id_dict["etid"].append({server_id: event_trigger_id})
event_trigger_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, event_trigger_output)
event_trigger_output.close()
def verify_event_trigger(tester, server_group, server_id, db_id,
event_trigger_id):
response = tester.get(EVENT_TRIGGER_URL + str(server_group) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(event_trigger_id),
content_type='html/json')
return response
def delete_event_trigger(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
event_trigger_ids_dict = all_id["etid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
event_trigger_id = event_trigger_ids_dict[server_id]
response = verify_event_trigger(tester,
utils.SERVER_GROUP,
server_id,
db_id,
event_trigger_id)
if response.status_code == 200:
del_response = tester.delete(
EVENT_TRIGGER_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(event_trigger_id),
follow_redirects=True)
return del_response

View File

@ -0,0 +1,16 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
class ExtensionGeneratorTestCase(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,72 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import\
utils as schema_utils
class ExtensionsAddTestCase(BaseTestGenerator):
scenarios = [
# Fetching default URL for extension node.
('Check Extension Node', dict(url='/browser/extension/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
def runTest(self):
""" This function will add extension under 1st server of tree node. """
extension_utils.add_extensions(self.tester)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
:return: None
"""
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,74 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class ExtensionsDeleteTestCase(BaseTestGenerator):
scenarios = [
# Fetching default URL for extension node.
('Check Extension Node', dict(url='/browser/extension/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) to database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
def runTest(self):
""" This function will add extension under 1st server of tree node. """
delete_respdata = extension_utils.delete_extension(self.tester)
self.assertTrue(delete_respdata['success'], 1)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
"""
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,87 @@
#################################################################
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class ExtensionsGetTestCase(BaseTestGenerator):
scenarios = [
# Fetching default URL for extension node.
('Check Extension Node', dict(url='/browser/extension/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
extension_utils.add_extensions(cls.tester)
def runTest(self):
""" This function will add extension under 1st server of tree node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
extension_ids_dict = all_id["eid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
extension_id = extension_ids_dict[server_id]
response = extension_utils.verify_extensions(self.tester,
utils.SERVER_GROUP,
server_id, db_id,
extension_id)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
"""
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,104 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
class ExtensionsPutTestCase(BaseTestGenerator):
scenarios = [
# Fetching default URL for extension node.
('Check Extension Node', dict(url='/browser/extension/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
extension_utils.add_extensions(cls.tester)
def runTest(self):
""" This function will add extension under 1st server of tree node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
extension_ids_dict = all_id["eid"][0]
schema_info_dict = all_id["scid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
extension_id = extension_ids_dict[server_id]
response = extension_utils.verify_extensions(self.tester,
utils.SERVER_GROUP,
server_id, db_id,
extension_id)
if response.status_code == 200:
schema_name = schema_info_dict[int(server_id)][1]
data = \
{
"id": extension_id,
"schema": schema_name
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(extension_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
"""
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,134 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import os
import pickle
import json
from regression.test_setup import advanced_config_data, pickle_path
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
EXTENSION_URL = '/browser/extension/obj/'
def get_extension_config_data(schema_name, server_connect_data):
adv_config_data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['extension_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
data = {
"name": adv_config_data['name'],
"relocatable": adv_config_data['relocate'],
"schema": schema_name,
"version": adv_config_data['version']
}
return data
def write_extension_info(response_data, server_id):
"""
This function writes the schema id into parent_id.pkl
:param response_data: extension add response data
:type response_data: dict
:param server_id: server id
:type server_id: str
:return: None
"""
extension_id = response_data['node']['_id']
# schema_name = str(response_data['node']['label'])
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'eid' in pickle_id_dict:
if pickle_id_dict['eid']:
# Add the extension_id as value in dict
pickle_id_dict["eid"][0].update({server_id: extension_id})
else:
# Create new dict with server_id and extension_id
pickle_id_dict["eid"].append({server_id: extension_id})
extension_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, extension_output)
extension_output.close()
def add_extensions(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_info_dict = all_id["scid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
server_connect_response = server_utils.verify_server(
tester, utils.SERVER_GROUP, server_id)
schema_name = schema_info_dict[int(server_id)][1]
data = get_extension_config_data(schema_name,
server_connect_response)
response = tester.post(
EXTENSION_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(
db_id) + '/',
data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_extension_info(response_data, server_id)
def verify_extensions(tester, server_group, server_id, db_id, extension_id):
response = tester.get(EXTENSION_URL + str(server_group) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(extension_id),
content_type='html/json')
return response
def delete_extension(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
extension_ids_dict = all_id["eid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
extension_id = extension_ids_dict[server_id]
response = verify_extensions(tester,
utils.SERVER_GROUP,
server_id, db_id,
extension_id)
if response.status_code == 200:
delete_response = tester.delete(
EXTENSION_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(extension_id),
follow_redirects=True)
delete_respdata = json.loads(delete_response.data.decode())
return delete_respdata

View File

@ -0,0 +1,16 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
class ForeignServersGeneratorTestCase(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,88 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests \
import utils as fdw_utils
from . import utils as fsrv_utils
class ForeignServerAddTestCase(BaseTestGenerator):
"""
This class will add foreign server under database node.
"""
scenarios = [
# Fetching default URL for foreign server node.
('Check FSRV Node', dict(url='/browser/foreign_server/obj/'))
]
@classmethod
def setUpClass(cls):
""""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
:return: None"
"""
# Add the server(s)
server_utils.add_server(cls.tester)
# Connect to server(s)
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database(s) to connected server(s)
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) under connected database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
# Add foreign data wrapper(s) to extension(s)
fdw_utils.add_fdw(cls.tester)
def runTest(self):
""" This function will add foreign server under database node. """
fsrv_utils.add_fsrv(self.tester)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
:return: None
"""
fsrv_utils.delete_fsrv(cls.tester)
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,94 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests\
import utils as fdw_utils
from . import utils as fsrv_utils
class ForeignServerDeleteTestCase(BaseTestGenerator):
"""
This class will add foreign server under FDW node.
"""
scenarios = [
# Fetching default URL for foreign server node.
('Check FSRV Node', dict(url='/browser/foreign_server/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
6. Add foreign server(s) to foreign data wrapper(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) under connected database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
# Add foreign data wrapper(s) to extension(s)
fdw_utils.add_fdw(cls.tester)
# Add foreign server(s) to foreign data wrapper
fsrv_utils.add_fsrv(cls.tester)
#
def runTest(self):
""" This function will delete foreign server under FDW node. """
delete_respdata = fsrv_utils.delete_fsrv(self.tester)
self.assertTrue(delete_respdata['success'], 1)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
:return: None
"""
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,110 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests\
import utils as fdw_utils
from . import utils as fsrv_utils
class ForeignServerGetTestCase(BaseTestGenerator):
"""
This class will add foreign server under FDW node.
"""
scenarios = [
# Fetching default URL for foreign server node.
('Check FSRV Node', dict(url='/browser/foreign_server/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
6. Add foreign server(s) to foreign data wrapper(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) under connected database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
# Add foreign data wrapper(s) to extension(s)
fdw_utils.add_fdw(cls.tester)
# Add foreign server(s) to foreign data wrapper
fsrv_utils.add_fsrv(cls.tester)
#
def runTest(self):
""" This function will fetch foreign server under FDW node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
fsrv_ids_dict = all_id["fsid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
fsrv_id = fsrv_ids_dict[server_id]
response = fsrv_utils.verify_fsrv(self.tester, utils.SERVER_GROUP,
server_id, db_id,
fdw_id, fsrv_id)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
:return: None
"""
fsrv_utils.delete_fsrv(cls.tester)
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,128 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests\
import utils as fdw_utils
from . import utils as fsrv_utils
from regression.test_setup import advanced_config_data
import json
class ForeignServerPutTestCase(BaseTestGenerator):
"""
This class will add foreign server under FDW node.
"""
scenarios = [
# Fetching default URL for foreign server node.
('Check FSRV Node', dict(url='/browser/foreign_server/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
6. Add foreign server(s) to foreign data wrapper(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) under connected database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
# Add foreign data wrapper(s) to extension(s)
fdw_utils.add_fdw(cls.tester)
# Add foreign server(s) to foreign data wrapper
fsrv_utils.add_fsrv(cls.tester)
#
def runTest(self):
""" This function will update foreign server under FDW node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
fsrv_ids_dict = all_id["fsid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
fsrv_id = fsrv_ids_dict[server_id]
response = fsrv_utils.verify_fsrv(self.tester, utils.SERVER_GROUP,
server_id, db_id,
fdw_id, fsrv_id)
if response.status_code == 200:
data = \
{
"description": advanced_config_data['FSRV_update_data']
['comment'],
"id": fsrv_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(fdw_id) + '/' +
str(fsrv_id), data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added foreign server(s) ,
foreign data wrapper(s), extension(s), schema(s), database(s), server(s)
and parent id file
:return: None
"""
fsrv_utils.delete_fsrv(cls.tester)
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,149 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import os
import pickle
import json
from regression.test_setup import advanced_config_data, pickle_path
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
import uuid
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests \
import utils as fdw_utils
FSRV_URL = '/browser/foreign_server/obj/'
def get_fsrv_config_data(server_connect_data):
adv_config_data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['fsrv_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
data = {
"fsrvacl": adv_config_data['fsrv_acl'],
"fsrvoptions": adv_config_data['fsrv_options'],
"fsrvowner": adv_config_data['owner'],
"name": "fsrv_{}".format(str(uuid.uuid4())[1:4])
}
return data
def add_fsrv(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester,
utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']['connected']:
server_connect_response = server_utils.verify_server(
tester, utils.SERVER_GROUP, server_id)
fdw_id = fdw_ids_dict[server_id]
response = fdw_utils.verify_fdws(tester,
utils.SERVER_GROUP,
server_id, db_id,
fdw_id)
if response.status_code == 200:
data = get_fsrv_config_data(server_connect_response)
response = tester.post(
FSRV_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(
db_id) +
'/' + str(fdw_id) + '/',
data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode())
write_fsrv_info(response_data, server_id)
def write_fsrv_info(response_data, server_id):
"""
This function writes the schema id into parent_id.pkl
:param response_data: foreign server add response data
:type response_data: dict
:param server_id: server id
:type server_id: str
:return: None
"""
fsrv_id = response_data['node']['_id']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'fsid' in pickle_id_dict:
if pickle_id_dict['fsid']:
# Add the FSRV_id as value in dict
pickle_id_dict["fsid"][0].update({server_id: fsrv_id})
else:
# Create new dict with server_id and fsrv_id
pickle_id_dict["fsid"].append({server_id: fsrv_id})
fsrv_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, fsrv_output)
fsrv_output.close()
def verify_fsrv(tester, server_group, server_id, db_id, fdw_id, fsrv_id):
response = tester.get(FSRV_URL + str(server_group) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(fdw_id) + '/' + str(fsrv_id),
content_type='html/json')
return response
def delete_fsrv(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
fsrv_ids_dict = all_id["fsid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
fsrv_id = fsrv_ids_dict[server_id]
response = verify_fsrv(tester, utils.SERVER_GROUP,
server_id, db_id,
fdw_id, fsrv_id)
if response.status_code == 200:
delete_response = tester.delete(
FSRV_URL + str(utils.SERVER_GROUP) +
'/' + str(server_id) + '/' +
str(db_id) + '/' +
str(fdw_id) + '/' +
str(fsrv_id),
follow_redirects=True)
delete_respdata = json.loads(delete_response.data.decode())
return delete_respdata

View File

@ -0,0 +1,16 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
class UserMappingGeneratorTestCase(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,98 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests\
import utils as fdw_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.\
foreign_servers.tests import utils as fsrv_utils
from . import utils as um_utils
class UserMappingAddTestCase(BaseTestGenerator):
"""
This class will add user mapping under foreign server node.
"""
scenarios = [
# Fetching default URL for user mapping node.
('Check user mapping Node', dict(url='/browser/user_mapping/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
6. Add foreign server(s) to foreign data wrapper(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) under connected database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
# Add foreign data wrapper(s) to extension(s)
fdw_utils.add_fdw(cls.tester)
# Add foreign server(s) to foreign data wrapper
fsrv_utils.add_fsrv(cls.tester)
def runTest(self):
""" This function will add user mapping under foreign server node. """
um_utils.add_um(self.tester)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added user mapping(s), foreign server(s),
foreign data wrapper(s), extension(s), schema(s), database(s),
server(s) and parent id file
:return: None
"""
um_utils.delete_um(cls.tester)
fsrv_utils.delete_fsrv(cls.tester)
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,102 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests \
import utils as fdw_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.\
foreign_servers.tests import utils as fsrv_utils
from . import utils as um_utils
class UserMappingDeleteTestCase(BaseTestGenerator):
"""
This class will delete user mapping under foreign server node.
"""
scenarios = [
# Fetching default URL for user mapping node.
('Check user mapping Node', dict(url='/browser/user_mapping/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
6. Add foreign server(s) to foreign data wrapper(s)
7. Add user mapping(s) to foreign server(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) under connected database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
# Add foreign data wrapper(s) to extension(s)
fdw_utils.add_fdw(cls.tester)
# Add foreign server(s) to foreign data wrapper
fsrv_utils.add_fsrv(cls.tester)
# Add user mapping(s) to foreign server(s)
um_utils.add_um(cls.tester)
def runTest(self):
""" This function delete user mapping under foreign server node. """
delete_respdata = um_utils.delete_um(self.tester)
self.assertTrue(delete_respdata['success'], 1)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added foreign server(s) ,
foreign data wrapper(s), extension(s), schema(s), database(s),
server(s) and parent id file
:return: None
"""
fsrv_utils.delete_fsrv(cls.tester)
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,120 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests\
import utils as fdw_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.\
foreign_servers.tests import utils as fsrv_utils
from . import utils as um_utils
class UserMappingGetTestCase(BaseTestGenerator):
"""
This class will add user mapping under foreign server node.
"""
scenarios = [
# Fetching default URL for user mapping node.
('Check user mapping Node', dict(url='/browser/user_mapping/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
6. Add foreign server(s) to foreign data wrapper(s)
7. Add user mapping(s) to foreign server(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) under connected database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
# Add foreign data wrapper(s) to extension(s)
fdw_utils.add_fdw(cls.tester)
# Add foreign server(s) to foreign data wrapper
fsrv_utils.add_fsrv(cls.tester)
# Add user mapping(s) to foreign server(s)
um_utils.add_um(cls.tester)
def runTest(self):
""" This function will fetch user mapping added to foreign server
node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
fsrv_ids_dict = all_id["fsid"][0]
um_ids_dict = all_id["umid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
fsrv_id = fsrv_ids_dict[server_id]
um_id = um_ids_dict[server_id]
response = um_utils.verify_um(self.tester, utils.SERVER_GROUP,
server_id, db_id,
fdw_id, fsrv_id, um_id)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function delete the added foreign server(s) ,
foreign data wrapper(s), extension(s), schema(s), database(s),
server(s) and parent id file
:return: None
"""
um_utils.delete_um(cls.tester)
fsrv_utils.delete_fsrv(cls.tester)
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,138 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests \
import utils as fdw_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.\
foreign_servers.tests import utils as fsrv_utils
from . import utils as um_utils
from regression.test_setup import advanced_config_data
import json
class UserMappingPutTestCase(BaseTestGenerator):
"""
This class will update user mapping under foreign server node.
"""
scenarios = [
# Fetching default URL for user mapping node.
('Check user mapping Node', dict(url='/browser/user_mapping/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
6. Add foreign server(s) to foreign data wrapper(s)
7. Add user mapping(s) to foreign server(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s) under connected database(s)
schema_utils.add_schemas(cls.tester)
# Add extension(s) to schema(s)
extension_utils.add_extensions(cls.tester)
# Add foreign data wrapper(s) to extension(s)
fdw_utils.add_fdw(cls.tester)
# Add foreign server(s) to foreign data wrapper
fsrv_utils.add_fsrv(cls.tester)
# Add user mapping(s) to foreign server(s)
um_utils.add_um(cls.tester)
def runTest(self):
""" This function update user mapping under foreign server node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
fsrv_ids_dict = all_id["fsid"][0]
um_ids_dict = all_id["umid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
fsrv_id = fsrv_ids_dict[server_id]
um_id = um_ids_dict[server_id]
response = um_utils.verify_um(self.tester, utils.SERVER_GROUP,
server_id, db_id,
fdw_id, fsrv_id, um_id)
if response.status_code == 200:
data = \
{
"id": um_id,
"umoptions":
advanced_config_data['user_mapping_update_data']
['options']
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(fdw_id) + '/' +
str(fsrv_id) + '/' + str(um_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added foreign server(s) ,
foreign data wrapper(s), extension(s), schema(s), database(s),
server(s) and parent id file
:return: None
"""
um_utils.delete_um(cls.tester)
fsrv_utils.delete_fsrv(cls.tester)
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,156 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import os
import pickle
import json
from regression.test_setup import advanced_config_data, pickle_path
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.tests \
import utils as fdw_utils
from pgadmin.browser.server_groups.servers.databases.foreign_data_wrappers.\
foreign_servers.tests import utils as fsrv_utils
UM_URL = '/browser/user_mapping/obj/'
def get_um_config_data(server_connect_data):
adv_config_data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['user_mapping_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
data = {
"name": adv_config_data['name'],
"um_options": adv_config_data['option'],
"umoptions": adv_config_data['options']
}
return data
def add_um(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
fsrv_ids_dict = all_id["fsid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester,
utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']['connected']:
server_connect_response = server_utils.verify_server(
tester, utils.SERVER_GROUP, server_id)
fdw_id = fdw_ids_dict[server_id]
fdw_response = fdw_utils.verify_fdws(tester,
utils.SERVER_GROUP,
server_id, db_id,
fdw_id)
fsrv_id = fsrv_ids_dict[server_id]
fsrv_response = fsrv_utils.verify_fsrv(tester, utils.SERVER_GROUP,
server_id, db_id,
fdw_id, fsrv_id)
if fsrv_response.status_code == 200:
data = get_um_config_data(server_connect_response)
response = tester.post(
UM_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(
db_id) +
'/' + str(fdw_id) + '/' + str(fsrv_id) + '/',
data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode())
write_um_info(response_data, server_id)
def write_um_info(response_data, server_id):
"""
This function writes the schema id into parent_id.pkl
:param response_data: foreign server add response data
:type response_data: dict
:param server_id: server id
:type server_id: str
:return: None
"""
um_id = response_data['node']['_id']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'umid' in pickle_id_dict:
if pickle_id_dict['umid']:
# Add the umid as value in dict
pickle_id_dict["umid"][0].update({server_id: um_id})
else:
# Create new dict with server_id and umid
pickle_id_dict["umid"].append({server_id: um_id})
fsrv_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, fsrv_output)
fsrv_output.close()
def verify_um(tester, server_group, server_id, db_id, fdw_id, fsrv_id, um_id):
response = tester.get(UM_URL + str(server_group) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(fdw_id) + '/' + str(fsrv_id) + '/' + str(
um_id),
content_type='html/json')
return response
def delete_um(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
fsrv_ids_dict = all_id["fsid"][0]
um_ids_dict = all_id["umid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
fsrv_id = fsrv_ids_dict[server_id]
um_id = um_ids_dict[server_id]
response = verify_um(tester, utils.SERVER_GROUP,
server_id, db_id,
fdw_id, fsrv_id, um_id)
if response.status_code == 200:
delete_response = tester.delete(
UM_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(fdw_id) + '/' +
str(fsrv_id) + '/' + str(um_id),
follow_redirects=True)
delete_respdata = json.loads(delete_response.data.decode())
return delete_respdata

View File

@ -0,0 +1,16 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
class FDWGeneratorTestCase(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,75 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as fdw_utils
class FDWDAddTestCase(BaseTestGenerator):
""" This class will add foreign data wrappers under database node. """
scenarios = [
# Fetching default URL for foreign_data_wrapper node.
('Check FDW Node',
dict(url='/browser/foreign_data_wrapper/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schema(s) to connected database(s)
4. Add extension(s) to schema(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
extension_utils.add_extensions(cls.tester)
def runTest(self):
""" This function will add extension under 1st server of tree node. """
fdw_utils.add_fdw(self.tester)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
"""
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,79 @@
#################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as fdw_utils
import json
class FDWDDeleteTestCase(BaseTestGenerator):
""" This class will delete foreign data wrappers under database node. """
scenarios = [
# Fetching default URL for foreign_data_wrapper node.
('Check FDW Node',
dict(url='/browser/foreign_data_wrapper/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the following tasks:
1. Add and connect to the test server(s)
2. Add database(s) connected to server(s)
3. Add schemas to connected database(s)
4. Add extension(s) to schema(s)
5. Add foreign data wrapper(s) to extension(s)
:return: None
"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
extension_utils.add_extensions(cls.tester)
fdw_utils.add_fdw(cls.tester)
def runTest(self):
""" This function will delete added FDW. """
delete_respdata = fdw_utils.delete_fdw(self.tester)
self.assertTrue(delete_respdata['success'], 1)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
"""
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,83 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as fdw_utils
class FDWDGetTestCase(BaseTestGenerator):
""" This class will add foreign data wrappers under database node. """
scenarios = [
# Fetching default URL for foreign_data_wrapper node.
('Check FDW Node',
dict(url='/browser/foreign_data_wrapper/obj/'))
]
@classmethod
def setUpClass(cls):
"""This function use to add/connect the servers and create databases"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
extension_utils.add_extensions(cls.tester)
fdw_utils.add_fdw(cls.tester)
def runTest(self):
""" This function will get added FDW. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
response = fdw_utils.verify_fdws(self.tester,
utils.SERVER_GROUP,
server_id, db_id,
fdw_id)
self.assertEquals(response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the added schema, database, server and parent
id file
"""
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,101 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.extensions.tests import\
utils as extension_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from . import utils as fdw_utils
from regression.test_setup import advanced_config_data
import json
class FDWDPutTestCase(BaseTestGenerator):
""" This class will add foreign data wrappers under database node. """
scenarios = [
# Fetching default URL for foreign_data_wrapper node.
('Check FDW Node',
dict(url='/browser/foreign_data_wrapper/obj/'))
]
@classmethod
def setUpClass(cls):
"""This function use to add/connect the servers and create databases"""
# Add the server
server_utils.add_server(cls.tester)
# Connect to servers
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add databases to connected servers
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
schema_utils.add_schemas(cls.tester)
extension_utils.add_extensions(cls.tester)
fdw_utils.add_fdw(cls.tester)
def runTest(self):
""" This function will update added FDW. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
response = fdw_utils.verify_fdws(self.tester,
utils.SERVER_GROUP,
server_id, db_id,
fdw_id)
if response.status_code == 200:
data = \
{
"description": advanced_config_data['fdw_update_data']
['comment'],
"id": fdw_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' + str(fdw_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function deletes the added schema, database, server and parent
id file
"""
fdw_utils.delete_fdw(cls.tester)
extension_utils.delete_extension(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,133 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import os
import pickle
import json
from regression.test_setup import advanced_config_data, pickle_path
from regression import test_utils as utils
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
import uuid
FDW_URL = '/browser/foreign_data_wrapper/obj/'
def get_fdw_config_data(schema_name, server_connect_data):
adv_config_data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['fdw_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
data = {
"fdwacl": adv_config_data['acl'],
"fdwhan": "{0}.{1}".format(schema_name, adv_config_data['handler']),
"fdwoptions": adv_config_data['options'],
"fdwowner": adv_config_data['owner'],
"fdwvalue": "{0}.{1}".format(schema_name, adv_config_data['validator']),
"name": "fdw_{}".format(str(uuid.uuid4())[1:4])
}
return data
def add_fdw(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_info_dict = all_id["scid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
server_connect_response = server_utils.verify_server(
tester, utils.SERVER_GROUP, server_id)
schema_name = schema_info_dict[int(server_id)][1]
data = get_fdw_config_data(schema_name,
server_connect_response)
response = tester.post(FDW_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) + '/',
data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_fdw_info(response_data, server_id)
def write_fdw_info(response_data, server_id):
"""
This function writes the sequence id into parent_id.pkl
:param response_data: FDW add response data
:type response_data: dict
:param server_id: server id
:type server_id: str
:return: None
"""
fdw_id = response_data['node']['_id']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'fid' in pickle_id_dict:
if pickle_id_dict['fid']:
# Add the FDW_id as value in dict
pickle_id_dict["fid"][0].update({server_id: fdw_id})
else:
# Create new dict with server_id and FDW_id
pickle_id_dict["fid"].append({server_id: fdw_id})
fdw_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, fdw_output)
fdw_output.close()
def verify_fdws(tester, server_group, server_id, db_id, fdw_id):
response = tester.get(FDW_URL + str(server_group) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(fdw_id),
content_type='html/json')
return response
def delete_fdw(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
fdw_ids_dict = all_id["fid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
fdw_id = fdw_ids_dict[server_id]
response = verify_fdws(tester,
utils.SERVER_GROUP,
server_id, db_id,
fdw_id)
if response.status_code == 200:
delete_response = tester.delete(
FDW_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' + str(fdw_id),
follow_redirects=True)
delete_respdata = json.loads(delete_response.data.decode())
return delete_respdata

View File

@ -12,6 +12,5 @@ from pgadmin.utils.route import BaseTestGenerator
class TriggerFunctionTestGenerator(BaseTestGenerator):
def generate_tests(self):
def runTest(self):
return

View File

@ -51,6 +51,7 @@ class TriggerFuncDeleteTestCase(BaseTestGenerator):
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
# Add trigger functions
trigger_funcs_utils.add_trigger_function(
cls.tester, cls.server_connect_response, cls.server_ids)
@ -74,4 +75,3 @@ class TriggerFuncDeleteTestCase(BaseTestGenerator):
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -68,12 +68,13 @@ class TriggerFuncGetTestCase(BaseTestGenerator):
db_con = database_utils.verify_database(
self.tester, utils.SERVER_GROUP, server_id, db_id)
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)]
schema_id = schema_ids_dict[int(server_id)][0]
schema_response = schema_utils.verify_schemas(
self.tester, server_id, db_id, schema_id)
if schema_response.status_code == 200:
trigger_func_ids = trigger_ids_dict[int(server_id)]
for trigger_func_id in trigger_func_ids:
trigger_func_list = trigger_ids_dict[int(server_id)]
for trigger_func in trigger_func_list:
trigger_func_id = trigger_func[0]
trigger_response = \
trigger_funcs_utils.verify_trigger_function(
self.tester, server_id, db_id, schema_id,

View File

@ -78,8 +78,9 @@ class TriggerFuncPutTestCase(BaseTestGenerator):
db_id,
schema_id)
if schema_response.status_code == 200:
trigger_func_ids = trigger_ids_dict[int(server_id)]
for trigger_func_id in trigger_func_ids:
trigger_func_list = trigger_ids_dict[int(server_id)]
for trigger_func in trigger_func_list:
trigger_func_id = trigger_func[0]
trigger_response = \
trigger_funcs_utils.verify_trigger_function(
self.tester, server_id, db_id, schema_id,

View File

@ -20,6 +20,7 @@ from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
from regression import test_utils as utils
TRIGGER_FUNCTIONS_URL = '/browser/trigger_function/obj/'
TRIGGER_FUNCTIONS_DELETE_URL = '/browser/trigger_function/delete/'
def get_trigger_func_data(server_connect_data):
@ -68,6 +69,7 @@ def write_trigger_func_id(trigger_func_ids_list, server_id):
"""
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
@ -100,7 +102,7 @@ def add_trigger_function(tester, server_connect_response, server_ids):
if db_con['data']["connected"]:
schema_id = schema_ids_dict[int(server_id)][0]
schema_utils.verify_schemas(tester, server_id, db_id,
schema_id)
schema_id)
data = get_trigger_func_data(server_connect_response)
# Get the type from config data. We are adding two types
# i.e. event_trigger and trigger.
@ -108,7 +110,7 @@ def add_trigger_function(tester, server_connect_response, server_ids):
trigger_func_ids_list = []
for func_type in trigger_func_types:
data['prorettypename'] = func_type
data["name"] = str(uuid.uuid4())[1:8]
data["name"] = "event_{}".format(str(uuid.uuid4())[1:8])
if schema_id:
data['pronamespace'] = schema_id
else:
@ -122,7 +124,9 @@ def add_trigger_function(tester, server_connect_response, server_ids):
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
trigger_func_id = response_data['node']['_id']
trigger_func_ids_list.append(trigger_func_id)
event_trigger_name = str(response_data['node']['label'])
trigger_func_ids_list.append(
(trigger_func_id, event_trigger_name, func_type))
write_trigger_func_id(trigger_func_ids_list, server_id)
@ -157,13 +161,14 @@ def delete_trigger_function(tester):
schema_response = schema_utils.verify_schemas(
tester, server_id, db_id, schema_id)
if schema_response.status_code == 200:
trigger_func_ids = trigger_ids_dict[int(server_id)]
for trigger_func_id in trigger_func_ids:
trigger_func_list = trigger_ids_dict[int(server_id)]
for trigger_func in trigger_func_list:
trigger_func_id = trigger_func[0]
trigger_response = verify_trigger_function(
tester, server_id, db_id, schema_id, trigger_func_id)
if trigger_response.status_code == 200:
del_response = tester.delete(
TRIGGER_FUNCTIONS_URL + str(
TRIGGER_FUNCTIONS_DELETE_URL + str(
utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) + '/' +
str(schema_id) + '/' + str(trigger_func_id),

View File

@ -0,0 +1,16 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.utils.route import BaseTestGenerator
class SequenceTestGenerator(BaseTestGenerator):
def runTest(self):
return []

View File

@ -0,0 +1,72 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as sequence_utils
class SequenceAddTestCase(BaseTestGenerator):
""" This class will add new sequence(s) under schema node. """
scenarios = [
# Fetching default URL for sequence node.
('Fetch sequence Node URL', dict(url='/browser/sequence/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server(s)
2. Connect to server(s)
3. Add the database(s)
4. Add the schema(s)
:return: None
"""
# First, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schemas
schema_utils.add_schemas(cls.tester)
def runTest(self):
""" This function will add sequence(s) under schema node. """
sequence_utils.add_sequences(self.tester)
@classmethod
def tearDownClass(cls):
"""This function deletes the added sequence, schema, database, server
and parent id file
:return: None
"""
sequence_utils.delete_sequence(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,75 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as sequence_utils
class SequenceDeleteTestCase(BaseTestGenerator):
""" This class will delete added sequence under schema node. """
scenarios = [
# Fetching default URL for sequence node.
('Fetch sequence Node URL', dict(url='/browser/sequence/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server(s)
2. Connect to server(s)
3. Add database(s)
4. Add schema(s)
5. Add sequence(s)
:return: None
"""
# First, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database(s)
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s)
schema_utils.add_schemas(cls.tester)
# Add sequence(s)
sequence_utils.add_sequences(cls.tester)
def runTest(self):
""" This function will delete added sequence under schema node. """
sequence_utils.delete_sequence(self.tester)
@classmethod
def tearDownClass(cls):
"""This function deletes the added sequence, schema, database, server
and parent id file
:return: None
"""
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,93 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as sequence_utils
class SequenceGetTestCase(BaseTestGenerator):
""" This class will fetch added sequence under schema node. """
scenarios = [
# Fetching default URL for sequence node.
('Fetch sequence Node URL', dict(url='/browser/sequence/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server(s)
2. Connect to server(s)
3. Add database(s)
4. Add schema(s)
5. Add sequence(s)
:return: None
"""
# First, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database(s)
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s)
schema_utils.add_schemas(cls.tester)
# Add sequence(s)
sequence_utils.add_sequences(cls.tester)
def runTest(self):
""" This function will fetch added sequence under schema node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
sequence_ids_dict = all_id["seid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
schema_info = schema_ids_dict[int(server_id)]
schema_id = schema_info[0]
sequence_id = sequence_ids_dict[server_id]
get_response = sequence_utils.verify_sequence(self.tester,
utils.SERVER_GROUP,
server_id,
db_id,
schema_id,
sequence_id)
self.assertEquals(get_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function delete the added sequence, schema, database, server
and parent id file
:return: None
"""
sequence_utils.delete_sequence(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,112 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from . import utils as sequence_utils
import json
from regression.test_setup import advanced_config_data
class SequencePutTestCase(BaseTestGenerator):
""" This class will update added sequence under schema node. """
scenarios = [
# Fetching default URL for sequence node.
('Fetch sequence Node URL', dict(url='/browser/sequence/obj/'))
]
@classmethod
def setUpClass(cls):
"""
This function perform the three tasks
1. Add the test server(s)
2. Connect to server(s)
3. Add database(s)
4. Add schema(s)
5. Add sequence(s)
:return: None
"""
# First, add the server
server_utils.add_server(cls.tester)
# Connect to server
cls.server_connect_response, cls.server_group, cls.server_ids = \
server_utils.connect_server(cls.tester)
if len(cls.server_connect_response) == 0:
raise Exception("No Server(s) connected to add the database!!!")
# Add database(s)
database_utils.add_database(cls.tester, cls.server_connect_response,
cls.server_ids)
# Add schema(s)
schema_utils.add_schemas(cls.tester)
# Add sequence(s)
sequence_utils.add_sequences(cls.tester)
def runTest(self):
""" This function will update added sequence under schema node. """
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
sequence_ids_dict = all_id["seid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
schema_info = schema_ids_dict[int(server_id)]
schema_id = schema_info[0]
sequence_id = sequence_ids_dict[server_id]
get_response = sequence_utils.verify_sequence(self.tester,
utils.SERVER_GROUP,
server_id,
db_id,
schema_id,
sequence_id)
if get_response.status_code == 200:
data = \
{
"comment":
advanced_config_data['sequnce_update_data']
['comment'],
"id": sequence_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(schema_id) + '/' +
str(sequence_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""This function deletes the added sequence, schema, database, server
and parent id file
:return: None
"""
sequence_utils.delete_sequence(cls.tester)
schema_utils.delete_schema(cls.tester)
database_utils.delete_database(cls.tester)
server_utils.delete_server(cls.tester)
utils.delete_parent_id_file()

View File

@ -0,0 +1,152 @@
# #################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
import os
import pickle
from regression.test_setup import pickle_path, advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from regression import test_utils as utils
import uuid
SEQUENCE_URL = '/browser/sequence/obj/'
def get_sequence_config_data(schema_name, server_connect_data):
adv_config_data = None
db_user = server_connect_data['data']['user']['name']
# Get the config data of appropriate db user
for config_test_data in advanced_config_data['sequence_credentials']:
if db_user == config_test_data['owner']:
adv_config_data = config_test_data
data = \
{
"cache": adv_config_data['cache'],
"cycled": adv_config_data['cycled'],
"increment": adv_config_data['increment'],
"maximum": adv_config_data['max_value'],
"minimum": adv_config_data['min_value'],
"name": "sequence_{0}".format(str(uuid.uuid4())[1:4]),
"relacl": adv_config_data['acl'],
"schema": schema_name,
"securities": adv_config_data['security'],
"seqowner": adv_config_data['owner'],
"start": adv_config_data['start_val']
}
return data
def add_sequences(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_info_dict = all_id["scid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
server_id, db_id)
if db_con['data']["connected"]:
server_connect_response = server_utils.verify_server(
tester, utils.SERVER_GROUP, server_id)
schema_name = schema_info_dict[int(server_id)][1]
data = get_sequence_config_data(schema_name,
server_connect_response)
schema_id = schema_info_dict[int(server_id)][0]
response = tester.post(
SEQUENCE_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' + str(db_id) +
'/' + str(schema_id) + '/',
data=json.dumps(data),
content_type='html/json')
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
write_sequence_info(response_data, server_id)
def write_sequence_info(response_data, server_id):
"""
This function writes the sequence id into parent_id.pkl
:param response_data: sequence add response data
:type response_data: dict
:param server_id: server id
:type server_id: str
:return: None
"""
sequence_id = response_data['node']['_id']
pickle_id_dict = utils.get_pickle_id_dict()
if os.path.isfile(pickle_path):
existing_server_id = open(pickle_path, 'rb')
tol_server_id = pickle.load(existing_server_id)
pickle_id_dict = tol_server_id
if 'fid' in pickle_id_dict:
if pickle_id_dict['seid']:
# Add the sequence_id as value in dict
pickle_id_dict["seid"][0].update({server_id: sequence_id})
else:
# Create new dict with server_id and sequence_id
pickle_id_dict["seid"].append({server_id: sequence_id})
sequence_output = open(pickle_path, 'wb')
pickle.dump(pickle_id_dict, sequence_output)
sequence_output.close()
def verify_sequence(tester, server_group, server_id, db_id, schema_id,
sequence_id):
"""This function verifies the sequence using GET API"""
get_response = tester.get(SEQUENCE_URL + str(server_group) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(schema_id) + '/' +
str(sequence_id),
content_type='html/json')
return get_response
def delete_sequence(tester):
all_id = utils.get_ids()
server_ids = all_id["sid"]
db_ids_dict = all_id["did"][0]
schema_ids_dict = all_id["scid"][0]
sequence_ids_dict = all_id["seid"][0]
for server_id in server_ids:
db_id = db_ids_dict[int(server_id)]
schema_info = schema_ids_dict[int(server_id)]
schema_id = schema_info[0]
sequence_id = sequence_ids_dict[server_id]
get_response = verify_sequence(tester,
utils.SERVER_GROUP,
server_id,
db_id,
schema_id,
sequence_id)
if get_response.status_code == 200:
delete_response = tester.delete(
SEQUENCE_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(schema_id) + '/' +
str(sequence_id),
follow_redirects=True)
assert delete_response.status_code == 200
del_resp_data = json.loads(delete_response.data.decode('utf-8'))
assert del_resp_data['success'] == 1

View File

@ -12,6 +12,5 @@ from pgadmin.utils.route import BaseTestGenerator
class SchemaTestGenerator(BaseTestGenerator):
def generate_tests(self):
def runTest(self):
return []

View File

@ -19,6 +19,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from regression import test_utils as utils
SCHEMA_URL = '/browser/schema/obj/'
SCHEMA_DELETE_URL = '/browser/schema/delete/'
def get_schema_config_data(server_connect_response):
@ -39,7 +40,7 @@ def get_schema_config_data(server_connect_response):
"defseqacl": adv_config_data['seq_acl'],
"deftblacl": adv_config_data['tbl_acl'],
"deftypeacl": adv_config_data['type_acl'],
"name": "test_{0}".format(str(uuid.uuid4())[1:8]),
"name": "schema_{0}".format(str(uuid.uuid4())[1:8]),
"namespaceowner": adv_config_data['owner'],
"nspacl": adv_config_data['privilege'],
"seclabels": adv_config_data['sec_label']
@ -138,7 +139,7 @@ def delete_schema(tester):
raise Exception("No schema(s) to delete.")
del_response = tester.delete(
SCHEMA_URL + str(utils.SERVER_GROUP) + '/' +
SCHEMA_DELETE_URL + str(utils.SERVER_GROUP) + '/' +
str(server_id) + '/' +
str(db_id) + '/' +
str(schema_id),

View File

@ -299,6 +299,198 @@
"collation_update_data": {
"comment": "This is collation update comment"
},
"casts_credentials":
[{
"cast_context": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"source_type": "money",
"target_type": "bigint",
"owner": "postgres"
}],
"cast_update_data":
{
"comment": "This is cast update comment"
},
"extension_credentials":[
{
"name": "postgres_fdw",
"relocate": true,
"schema": "",
"version": "1.0",
"owner": "postgres"
}],
"extension_update_data":
{
"schema": "schema"
},
"fdw_credentials":[
{
"acl":
[
{
"grantee":"postgres",
"grantor":"postgres",
"privileges":
[
{
"privilege_type":"U",
"privilege":true,
"with_grant":true
}
]
}
],
"handler": "postgres_fdw_handler",
"options": [],
"owner": "postgres",
"validator": "postgres_fdw_validator",
"name": "fdw"
}],
"fdw_update_data":
{
"comment": "This is FDW update comment"
},
"fsrv_credentials":[
{
"fsrv_acl":
[
{
"grantee":"postgres",
"grantor":"postgres",
"privileges":
[
{
"privilege_type":"U",
"privilege":true,
"with_grant":false
}
]
}
],
"fsrv_options":
[
{
"fsrvoption":"host",
"fsrvvalue":"localhost"
},
{
"fsrvoption":"port",
"fsrvvalue":"5433"
},
{
"fsrvoption":"dbname",
"fsrvvalue":"postgres"
}
],
"owner": "postgres",
"name": "foreign_server1"
}],
"FSRV_update_data":
{
"comment": "This is foreign server update comment"
},
"user_mapping_credentials":[
{
"name": "postgres",
"option": [],
"owner": "postgres",
"options":
[
{
"umoption":"user",
"umvalue":"postgres"
},
{
"umoption":"password",
"umvalue":"edb"
}
]
}],
"user_mapping_update_data":
{
"options":
{
"changed":
[
{"umoption":"password",
"umvalue":"edb1"
}
]
}
},
"event_trigger_credentials":
[{
"enable": "O",
"event_func": "test_schema.test_abort_any_command",
"event_name": "DDL_COMMAND_END",
"owner": "postgres",
"name": "test_event_trg",
"provider": []
}],
"event_trigger_update_data":
{
"comment": "This is event trigger update comment"
},
"sequence_credentials":
[{
"cache": "1",
"cycled": true,
"increment": "1",
"max_value": "100000",
"min_value": "1",
"name": "test_empno",
"acl":
[
{
"grantee":"postgres",
"grantor":"postgres",
"privileges":
[
{
"privilege_type":"r",
"privilege":true,
"with_grant":false
},
{
"privilege_type":"w",
"privilege":true,
"with_grant":false
},
{
"privilege_type":"U",
"privilege":true,
"with_grant":false
}
]
}
],
"schema_name": "test_schema",
"security": [],
"owner": "postgres",
"start_val": "100"
}],
"sequnce_update_data":
{
"comment": "This is sequence update comment"
}
}
}

View File

@ -9,7 +9,6 @@
import os
import pickle
from test_setup import config_data, pickle_path
@ -26,7 +25,14 @@ def get_pickle_id_dict():
"tsid": [], # tablespace
"scid": [], # schema
"tfnid": [], # trigger functions
"coid": [] # collation
"coid": [], # collation
"cid": [], # casts
"etid": [], # event_trigger
"eid": [], # extension
"fid": [], # FDW
"fsid": [], # FRS
"umid": [], # user_mapping
"seid": [] # sequence
}
return pickle_id_dict