mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
1) Splits the SQL query used to retrieve the Dependents, Dependencies, and Roles SQL file into multiple versioned files.
2) Add Unit Tests for each file. 3) Add ORDER BY into Copy Selection Feature test to ensure the results are retrieved always in the same order 4) Renamed the Scenario of the xss_checks_pgadmin_debugger_test and skip it for versions less than 9.1 5) Deleted unused __init__.py files.
This commit is contained in:
parent
0e7efc0cf8
commit
3bf17d9df4
@ -1,47 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pgadmin.utils.driver import DriverRegistry
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
DriverRegistry.load_drivers()
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
|
||||
|
||||
class TestColumnAcl(BaseTestGenerator):
|
||||
def runTest(self):
|
||||
""" When there are no permissions on the column, it returns an empty result """
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
test_utils.create_table(self.server, database_name, "test_table")
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SELECT pg_class.oid as table_id, "
|
||||
"pg_attribute.attnum as column_id "
|
||||
"FROM pg_class join pg_attribute on attrelid=pg_class.oid "
|
||||
"where pg_class.relname='test_table'"
|
||||
" and pg_attribute.attname = 'some_column'")
|
||||
table_id, column_id = cursor.fetchone()
|
||||
|
||||
if connection.server_version < 90100:
|
||||
self.versions_to_test = ['default']
|
||||
else:
|
||||
self.versions_to_test = ['9.1_plus']
|
||||
|
||||
for version in self.versions_to_test:
|
||||
template_file = os.path.join(os.path.dirname(__file__), "..", version, "acl.sql")
|
||||
template = file_as_template(template_file)
|
||||
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id,
|
||||
tid=table_id,
|
||||
clid=column_id
|
||||
)
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(sql)
|
||||
fetch_result = cursor.fetchall()
|
||||
self.assertEqual(0, len(fetch_result))
|
@ -1,56 +0,0 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pgadmin.utils.driver import DriverRegistry
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
DriverRegistry.load_drivers()
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
|
||||
|
||||
class TestColumnProperties(BaseTestGenerator):
|
||||
def runTest(self):
|
||||
""" This tests that column properties are returned"""
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
test_utils.create_table(self.server, database_name, "test_table")
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SELECT oid FROM pg_class where relname='test_table'")
|
||||
table_id = cursor.fetchone()[0]
|
||||
|
||||
if connection.server_version < 90100:
|
||||
self.versions_to_test = ['default']
|
||||
else:
|
||||
self.versions_to_test = ['9.1_plus']
|
||||
|
||||
for version in self.versions_to_test:
|
||||
template_file = os.path.join(os.path.dirname(__file__), "..", version, "properties.sql")
|
||||
template = file_as_template(template_file)
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id,
|
||||
tid=table_id
|
||||
)
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(sql)
|
||||
fetch_result = cursor.fetchall()
|
||||
first_row = {}
|
||||
for index, description in enumerate(cursor.description):
|
||||
first_row[description.name] = fetch_result[0][index]
|
||||
|
||||
self.assertEqual('some_column', first_row['name'])
|
||||
self.assertEqual('character varying', first_row['cltype'])
|
||||
self.assertEqual(2, len(fetch_result))
|
@ -1,66 +0,0 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pgadmin.utils.driver import DriverRegistry
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
DriverRegistry.load_drivers()
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
|
||||
|
||||
class TestTablesAcl(BaseTestGenerator):
|
||||
scenarios = [
|
||||
("Test query returns the permissions when there are permissions set up"
|
||||
" on the table", dict())
|
||||
]
|
||||
|
||||
def runTest(self):
|
||||
""" This tests that when there are permissions set up on the table, acl query returns the permissions"""
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
test_utils.create_table(self.server, database_name, "test_table")
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("GRANT SELECT ON test_table TO PUBLIC")
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SELECT oid FROM pg_class WHERE relname='test_table'")
|
||||
table_id = cursor.fetchone()[0]
|
||||
|
||||
if connection.server_version < 90100:
|
||||
self.versions_to_test = ['default']
|
||||
else:
|
||||
self.versions_to_test = ['9.1_plus']
|
||||
|
||||
for version in self.versions_to_test:
|
||||
template_file = os.path.join(os.path.dirname(__file__), "..", version, "acl.sql")
|
||||
template = file_as_template(template_file)
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id,
|
||||
tid=table_id)
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(sql)
|
||||
fetch_result = cursor.fetchall()
|
||||
public_acls = list(filter(lambda acl: acl[1] == 'PUBLIC', fetch_result))
|
||||
self.assertEqual(len(public_acls), 1)
|
||||
|
||||
new_acl_map = dict(zip(map(lambda column: column.name, cursor.description), public_acls[0]))
|
||||
|
||||
self.assertEqual('PUBLIC', new_acl_map['grantee'])
|
||||
self.assertEqual(self.server['username'], new_acl_map['grantor'])
|
||||
self.assertEqual('relacl', new_acl_map['deftype'])
|
||||
self.assertEqual(['r'], new_acl_map['privileges'])
|
||||
self.assertEqual([False], new_acl_map['grantable'])
|
@ -1,74 +0,0 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from jinja2 import BaseLoader
|
||||
from jinja2 import Environment
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
|
||||
|
||||
class TestTablesNode(BaseTestGenerator):
|
||||
scenarios = [
|
||||
("This scenario tests that all applicable sql template versions can "
|
||||
"fetch table names", dict())
|
||||
]
|
||||
|
||||
def runTest(self):
|
||||
""" This tests that all applicable sql template versions can fetch table names """
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
test_utils.create_table(self.server, database_name, "test_table")
|
||||
|
||||
if connection.server_version < 91000:
|
||||
self.versions_to_test = ['default']
|
||||
else:
|
||||
self.versions_to_test = ['default', '9.1_plus']
|
||||
|
||||
for version in self.versions_to_test:
|
||||
template_file = os.path.join(os.path.dirname(__file__), "..", version, "nodes.sql")
|
||||
file_content = open(template_file, 'r').read()
|
||||
|
||||
env = Environment(loader=SimpleTemplateLoader(file_content))
|
||||
|
||||
template = env.get_template("")
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id)
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(sql)
|
||||
fetch_result = cursor.fetchall()
|
||||
|
||||
first_row = {}
|
||||
for index, description in enumerate(cursor.description):
|
||||
first_row[description.name] = fetch_result[0][index]
|
||||
|
||||
oid = first_row['oid']
|
||||
name = first_row['name']
|
||||
triggercount = first_row['triggercount']
|
||||
has_enable_triggers = first_row['has_enable_triggers']
|
||||
|
||||
self.assertIsNotNone(long(oid))
|
||||
self.assertEqual('test_table', name)
|
||||
# triggercount is sometimes returned as a string for some reason
|
||||
self.assertEqual(0, long(triggercount))
|
||||
self.assertIsNotNone(long(has_enable_triggers))
|
||||
|
||||
|
||||
class SimpleTemplateLoader(BaseLoader):
|
||||
def __init__(self, file_content):
|
||||
self.file_content = file_content
|
||||
|
||||
def get_source(self, *args):
|
||||
return self.file_content, "required-return-not-a-real-file.txt", True
|
@ -1,77 +0,0 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pgadmin.utils.driver import DriverRegistry
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
DriverRegistry.load_drivers()
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
|
||||
|
||||
class TestTablesProperties(BaseTestGenerator):
|
||||
scenarios = [
|
||||
("This scenario tests that all applicable sql template versions can "
|
||||
"fetch some ddl", dict())
|
||||
]
|
||||
|
||||
def runTest(self):
|
||||
""" This tests that all applicable sql template versions can fetch some ddl """
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
test_utils.create_table(self.server, database_name, "test_table")
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(u"""
|
||||
SELECT
|
||||
db.oid as did, datlastsysoid
|
||||
FROM
|
||||
pg_database db
|
||||
WHERE db.datname = '{0}'""".format(database_name)
|
||||
)
|
||||
database_id, last_system_oid = cursor.fetchone()
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SELECT oid FROM pg_class where relname='test_table'")
|
||||
table_id = cursor.fetchone()[0]
|
||||
|
||||
if connection.server_version < 90100:
|
||||
self.versions_to_test = ['default']
|
||||
else:
|
||||
self.versions_to_test = ['9.1_plus']
|
||||
|
||||
for version in self.versions_to_test:
|
||||
template_file = os.path.join(os.path.dirname(__file__), "..", version, "properties.sql")
|
||||
template = file_as_template(template_file)
|
||||
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id,
|
||||
did=database_id,
|
||||
datlastsysoid=last_system_oid,
|
||||
tid=table_id
|
||||
)
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(sql)
|
||||
fetch_result = cursor.fetchone()
|
||||
|
||||
first_row = {}
|
||||
for index, description in enumerate(cursor.description):
|
||||
first_row[description.name] = fetch_result[index]
|
||||
|
||||
self.assertEqual('test_table', first_row['name'])
|
||||
# triggercount is sometimes returned as a string for some reason
|
||||
self.assertEqual(0, long(first_row['triggercount']))
|
||||
self.assertEqual(None, first_row['typname'])
|
||||
self.assertEqual([], first_row['coll_inherits'])
|
@ -1,60 +0,0 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
import jinja2
|
||||
|
||||
from pgadmin.utils.driver import DriverRegistry
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
DriverRegistry.load_drivers()
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
|
||||
|
||||
class TestTriggerGetOid(BaseTestGenerator):
|
||||
def runTest(self):
|
||||
""" When there are no permissions on the column, it returns an empty result """
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
test_utils.create_table(self.server, database_name, "test_table")
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SELECT pg_class.oid as table_id, "
|
||||
"pg_attribute.attnum as column_id "
|
||||
"FROM pg_class join pg_attribute on attrelid=pg_class.oid "
|
||||
"where pg_class.relname='test_table'"
|
||||
" and pg_attribute.attname = 'some_column'")
|
||||
table_id, column_id = cursor.fetchone()
|
||||
|
||||
if connection.server_version < 90100:
|
||||
self.versions_to_test = ['default']
|
||||
else:
|
||||
self.versions_to_test = ['9.1_plus']
|
||||
|
||||
for version in self.versions_to_test:
|
||||
template_file = os.path.join(os.path.dirname(__file__), "..", version, "get_oid.sql")
|
||||
|
||||
jinja2.filters.FILTERS['qtLiteral'] = lambda value: "NULL"
|
||||
template = file_as_template(template_file)
|
||||
|
||||
sql = template.render(data={'name': None},
|
||||
tid=table_id
|
||||
)
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(sql)
|
||||
fetch_result = cursor.fetchall()
|
||||
self.assertEqual(0, len(fetch_result))
|
||||
|
||||
|
@ -0,0 +1,51 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
|
||||
class TestColumnAclSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
# Fetching default URL for schema node.
|
||||
('Test Column ACL SQL file', dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestColumnAclSql, self).__init__()
|
||||
self.table_id = -1
|
||||
self.column_id = -1
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
cursor.execute("SELECT pg_class.oid AS table_id, "
|
||||
"pg_attribute.attnum AS column_id "
|
||||
"FROM pg_class JOIN pg_attribute ON attrelid=pg_class.oid "
|
||||
"WHERE pg_class.relname='test_table'"
|
||||
" AND pg_attribute.attname = 'some_column'")
|
||||
self.table_id, self.column_id = cursor.fetchone()
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "acl.sql")
|
||||
template = file_as_template(template_file)
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id,
|
||||
tid=self.table_id,
|
||||
clid=self.column_id
|
||||
)
|
||||
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
self.assertEqual(0, len(fetch_result))
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "column", "sql", version, filename)
|
@ -0,0 +1,52 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
|
||||
class TestColumnPropertiesSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
# Fetching default URL for schema node.
|
||||
('Test Column Properties SQL file', dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestColumnPropertiesSql, self).__init__()
|
||||
self.table_id = -1
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
cursor.execute("SELECT oid FROM pg_class where relname='test_table'")
|
||||
|
||||
self.table_id = cursor.fetchone()[0]
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "properties.sql")
|
||||
template = file_as_template(template_file)
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id,
|
||||
tid=self.table_id
|
||||
)
|
||||
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
first_row = {}
|
||||
for index, description in enumerate(descriptions):
|
||||
first_row[description.name] = fetch_result[0][index]
|
||||
|
||||
self.assertEqual('some_column', first_row['name'])
|
||||
self.assertEqual('character varying', first_row['cltype'])
|
||||
self.assertEqual(2, len(fetch_result))
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "column", "sql", version, filename)
|
@ -0,0 +1,54 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
|
||||
|
||||
class TestTablesAclSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
("Test query returns the permissions when there are permissions set up"
|
||||
" on the table", dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestTablesAclSql, self).__init__()
|
||||
self.table_id = -1
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
cursor.execute("GRANT SELECT ON test_table TO PUBLIC")
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SELECT oid FROM pg_class WHERE relname='test_table'")
|
||||
self.table_id = cursor.fetchone()[0]
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "acl.sql")
|
||||
template = file_as_template(template_file)
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id,
|
||||
tid=self.table_id)
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
public_acls = list(filter(lambda acl: acl[1] == 'PUBLIC', fetch_result))
|
||||
self.assertEqual(len(public_acls), 1)
|
||||
|
||||
new_acl_map = dict(zip(map(lambda column: column.name, descriptions), public_acls[0]))
|
||||
|
||||
self.assertEqual('PUBLIC', new_acl_map['grantee'])
|
||||
self.assertEqual(self.server['username'], new_acl_map['grantor'])
|
||||
self.assertEqual('relacl', new_acl_map['deftype'])
|
||||
self.assertEqual(['r'], new_acl_map['privileges'])
|
||||
self.assertEqual([False], new_acl_map['grantable'])
|
||||
return public_acls
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "table", "sql", version, filename)
|
@ -0,0 +1,55 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
|
||||
|
||||
class TestTablesNodeSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
("This scenario tests that all applicable sql template versions can "
|
||||
"fetch table names", dict())
|
||||
]
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
pass
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "nodes.sql")
|
||||
template = file_as_template(template_file)
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id)
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
|
||||
first_row = {}
|
||||
for index, description in enumerate(descriptions):
|
||||
first_row[description.name] = fetch_result[0][index]
|
||||
|
||||
oid = first_row['oid']
|
||||
name = first_row['name']
|
||||
triggercount = first_row['triggercount']
|
||||
has_enable_triggers = first_row['has_enable_triggers']
|
||||
|
||||
self.assertIsNotNone(long(oid))
|
||||
self.assertEqual('test_table', name)
|
||||
# triggercount is sometimes returned as a string for some reason
|
||||
self.assertEqual(0, long(triggercount))
|
||||
self.assertIsNotNone(long(has_enable_triggers))
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "table", "sql", version, filename)
|
@ -0,0 +1,72 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
|
||||
|
||||
class TestTablesPropertiesSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
("This scenario tests that all applicable sql template versions can "
|
||||
"fetch some ddl", dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestTablesPropertiesSql, self).__init__()
|
||||
self.database_id = -1
|
||||
self.last_system_oid = -1
|
||||
self.table_id = -1
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
|
||||
first_row = {}
|
||||
for index, description in enumerate(descriptions):
|
||||
first_row[description.name] = fetch_result[0][index]
|
||||
|
||||
self.assertEqual('test_table', first_row['name'])
|
||||
# triggercount is sometimes returned as a string for some reason
|
||||
self.assertEqual(0, long(first_row['triggercount']))
|
||||
self.assertEqual(None, first_row['typname'])
|
||||
self.assertEqual([], first_row['coll_inherits'])
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "properties.sql")
|
||||
template = file_as_template(template_file)
|
||||
public_schema_id = 2200
|
||||
sql = template.render(scid=public_schema_id,
|
||||
did=self.database_id,
|
||||
datlastsysoid=self.last_system_oid,
|
||||
tid=self.table_id
|
||||
)
|
||||
return sql
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
cursor.execute(u"""
|
||||
SELECT
|
||||
db.oid as did, datlastsysoid
|
||||
FROM
|
||||
pg_database db
|
||||
WHERE db.datname = '{0}'""".format(self.database_name)
|
||||
)
|
||||
self.database_id, self.last_system_oid = cursor.fetchone()
|
||||
|
||||
cursor.execute("SELECT oid FROM pg_class where relname='test_table'")
|
||||
self.table_id = cursor.fetchone()[0]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "table", "sql", version, filename)
|
@ -0,0 +1,52 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
|
||||
import jinja2
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
|
||||
class TestTriggerGetOidSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
('Test Trigger to retrieve OID SQL file', dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestTriggerGetOidSql, self).__init__()
|
||||
self.table_id = -1
|
||||
self.column_id = -1
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
cursor.execute("SELECT pg_class.oid AS table_id, "
|
||||
"pg_attribute.attnum AS column_id "
|
||||
"FROM pg_class JOIN pg_attribute ON attrelid=pg_class.oid "
|
||||
"WHERE pg_class.relname='test_table'"
|
||||
" AND pg_attribute.attname = 'some_column'")
|
||||
self.table_id, self.column_id = cursor.fetchone()
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "get_oid.sql")
|
||||
jinja2.filters.FILTERS['qtLiteral'] = lambda value: "NULL"
|
||||
template = file_as_template(template_file)
|
||||
|
||||
sql = template.render(data={'name': None},
|
||||
tid=self.table_id
|
||||
)
|
||||
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
self.assertEqual(0, len(fetch_result))
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "trigger", "sql", version, filename)
|
||||
|
@ -0,0 +1,43 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
|
||||
class TestTriggerNodesSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
('Test Trigger Nodes SQL file', dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestTriggerNodesSql, self).__init__()
|
||||
self.table_id = -1
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
cursor.execute("SELECT pg_class.oid AS table_id "
|
||||
"FROM pg_class "
|
||||
"WHERE pg_class.relname='test_table'")
|
||||
self.table_id = cursor.fetchone()[0]
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "nodes.sql")
|
||||
template = file_as_template(template_file)
|
||||
sql = template.render(tid=self.table_id)
|
||||
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
self.assertEqual(0, len(fetch_result))
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "trigger", "sql", version, filename)
|
@ -0,0 +1,45 @@
|
||||
SELECT DISTINCT dep.deptype, dep.refclassid, cl.relkind, ad.adbin, ad.adsrc,
|
||||
CASE WHEN cl.relkind IS NOT NULL THEN cl.relkind || COALESCE(dep.refobjsubid::character varying, '')
|
||||
WHEN tg.oid IS NOT NULL THEN 'T'::text
|
||||
WHEN ty.oid IS NOT NULL AND ty.typbasetype = 0 THEN 'y'::text
|
||||
WHEN ty.oid IS NOT NULL AND ty.typbasetype != 0 THEN 'd'::text
|
||||
WHEN ns.oid IS NOT NULL THEN 'n'::text
|
||||
WHEN pr.oid IS NOT NULL THEN 'p'::text
|
||||
WHEN la.oid IS NOT NULL THEN 'l'::text
|
||||
WHEN rw.oid IS NOT NULL THEN 'R'::text
|
||||
WHEN co.oid IS NOT NULL THEN 'C'::text || contype
|
||||
WHEN ad.oid IS NOT NULL THEN 'A'::text
|
||||
WHEN fs.oid IS NOT NULL THEN 'F'::text
|
||||
WHEN fdw.oid IS NOT NULL THEN 'f'::text
|
||||
ELSE ''
|
||||
END AS type,
|
||||
COALESCE(coc.relname, clrw.relname) AS ownertable,
|
||||
CASE WHEN cl.relname IS NOT NULL OR att.attname IS NOT NULL THEN cl.relname || '.' || att.attname
|
||||
ELSE COALESCE(cl.relname, co.conname, pr.proname, tg.tgname, ty.typname, la.lanname, rw.rulename, ns.nspname, fs.srvname, fdw.fdwname)
|
||||
END AS refname,
|
||||
COALESCE(nsc.nspname, nso.nspname, nsp.nspname, nst.nspname, nsrw.nspname) AS nspname
|
||||
FROM pg_depend dep
|
||||
LEFT JOIN pg_class cl ON dep.refobjid=cl.oid
|
||||
LEFT JOIN pg_attribute att ON dep.refobjid=att.attrelid AND dep.refobjsubid=att.attnum
|
||||
LEFT JOIN pg_namespace nsc ON cl.relnamespace=nsc.oid
|
||||
LEFT JOIN pg_proc pr ON dep.refobjid=pr.oid
|
||||
LEFT JOIN pg_namespace nsp ON pr.pronamespace=nsp.oid
|
||||
LEFT JOIN pg_trigger tg ON dep.refobjid=tg.oid
|
||||
LEFT JOIN pg_type ty ON dep.refobjid=ty.oid
|
||||
LEFT JOIN pg_namespace nst ON ty.typnamespace=nst.oid
|
||||
LEFT JOIN pg_constraint co ON dep.refobjid=co.oid
|
||||
LEFT JOIN pg_class coc ON co.conrelid=coc.oid
|
||||
LEFT JOIN pg_namespace nso ON co.connamespace=nso.oid
|
||||
LEFT JOIN pg_rewrite rw ON dep.refobjid=rw.oid
|
||||
LEFT JOIN pg_class clrw ON clrw.oid=rw.ev_class
|
||||
LEFT JOIN pg_namespace nsrw ON clrw.relnamespace=nsrw.oid
|
||||
LEFT JOIN pg_language la ON dep.refobjid=la.oid
|
||||
LEFT JOIN pg_namespace ns ON dep.refobjid=ns.oid
|
||||
LEFT JOIN pg_attrdef ad ON ad.adrelid=att.attrelid AND ad.adnum=att.attnum
|
||||
LEFT JOIN pg_foreign_server fs ON fs.oid=dep.refobjid
|
||||
LEFT JOIN pg_foreign_data_wrapper fdw ON fdw.oid=dep.refobjid
|
||||
{{where_clause}} AND
|
||||
refclassid IN ( SELECT oid FROM pg_class WHERE relname IN
|
||||
('pg_class', 'pg_constraint', 'pg_conversion', 'pg_language', 'pg_proc', 'pg_rewrite', 'pg_namespace',
|
||||
'pg_trigger', 'pg_type', 'pg_attrdef', 'pg_event_trigger', 'pg_foreign_server', 'pg_foreign_data_wrapper'))
|
||||
ORDER BY refclassid, cl.relkind
|
@ -0,0 +1,44 @@
|
||||
SELECT DISTINCT dep.deptype, dep.classid, cl.relkind, ad.adbin, ad.adsrc,
|
||||
CASE WHEN cl.relkind IS NOT NULL THEN cl.relkind || COALESCE(dep.objsubid::text, '')
|
||||
WHEN tg.oid IS NOT NULL THEN 'T'::text
|
||||
WHEN ty.oid IS NOT NULL THEN 'y'::text
|
||||
WHEN ns.oid IS NOT NULL THEN 'n'::text
|
||||
WHEN pr.oid IS NOT NULL THEN 'p'::text
|
||||
WHEN la.oid IS NOT NULL THEN 'l'::text
|
||||
WHEN rw.oid IS NOT NULL THEN 'R'::text
|
||||
WHEN co.oid IS NOT NULL THEN 'C'::text || contype
|
||||
WHEN ad.oid IS NOT NULL THEN 'A'::text
|
||||
WHEN fs.oid IS NOT NULL THEN 'F'::text
|
||||
WHEN fdw.oid IS NOT NULL THEN 'f'::text
|
||||
ELSE ''
|
||||
END AS type,
|
||||
COALESCE(coc.relname, clrw.relname) AS ownertable,
|
||||
CASE WHEN cl.relname IS NOT NULL AND att.attname IS NOT NULL THEN cl.relname || '.' || att.attname
|
||||
ELSE COALESCE(cl.relname, co.conname, pr.proname, tg.tgname, ty.typname, la.lanname, rw.rulename, ns.nspname, fs.srvname, fdw.fdwname)
|
||||
END AS refname,
|
||||
COALESCE(nsc.nspname, nso.nspname, nsp.nspname, nst.nspname, nsrw.nspname) AS nspname
|
||||
FROM pg_depend dep
|
||||
LEFT JOIN pg_class cl ON dep.objid=cl.oid
|
||||
LEFT JOIN pg_attribute att ON dep.objid=att.attrelid AND dep.objsubid=att.attnum
|
||||
LEFT JOIN pg_namespace nsc ON cl.relnamespace=nsc.oid
|
||||
LEFT JOIN pg_proc pr ON dep.objid=pr.oid
|
||||
LEFT JOIN pg_namespace nsp ON pr.pronamespace=nsp.oid
|
||||
LEFT JOIN pg_trigger tg ON dep.objid=tg.oid
|
||||
LEFT JOIN pg_type ty ON dep.objid=ty.oid
|
||||
LEFT JOIN pg_namespace nst ON ty.typnamespace=nst.oid
|
||||
LEFT JOIN pg_constraint co ON dep.objid=co.oid
|
||||
LEFT JOIN pg_class coc ON co.conrelid=coc.oid
|
||||
LEFT JOIN pg_namespace nso ON co.connamespace=nso.oid
|
||||
LEFT JOIN pg_rewrite rw ON dep.objid=rw.oid
|
||||
LEFT JOIN pg_class clrw ON clrw.oid=rw.ev_class
|
||||
LEFT JOIN pg_namespace nsrw ON clrw.relnamespace=nsrw.oid
|
||||
LEFT JOIN pg_language la ON dep.objid=la.oid
|
||||
LEFT JOIN pg_namespace ns ON dep.objid=ns.oid
|
||||
LEFT JOIN pg_attrdef ad ON ad.oid=dep.objid
|
||||
LEFT JOIN pg_foreign_server fs ON fs.oid=dep.objid
|
||||
LEFT JOIN pg_foreign_data_wrapper fdw ON fdw.oid=dep.objid
|
||||
{{where_clause}} AND
|
||||
classid IN ( SELECT oid FROM pg_class WHERE relname IN
|
||||
('pg_class', 'pg_constraint', 'pg_conversion', 'pg_language', 'pg_proc', 'pg_rewrite', 'pg_namespace',
|
||||
'pg_trigger', 'pg_type', 'pg_attrdef', 'pg_event_trigger', 'pg_foreign_server', 'pg_foreign_data_wrapper'))
|
||||
ORDER BY classid, cl.relkind
|
@ -0,0 +1,42 @@
|
||||
SET LOCAL join_collapse_limit=8;
|
||||
SELECT DISTINCT dep.deptype, dep.refclassid, cl.relkind, ad.adbin, ad.adsrc,
|
||||
CASE WHEN cl.relkind IS NOT NULL THEN cl.relkind || COALESCE(dep.refobjsubid::character varying, '')
|
||||
WHEN tg.oid IS NOT NULL THEN 'T'::text
|
||||
WHEN ty.oid IS NOT NULL AND ty.typbasetype = 0 THEN 'y'::text
|
||||
WHEN ty.oid IS NOT NULL AND ty.typbasetype != 0 THEN 'd'::text
|
||||
WHEN ns.oid IS NOT NULL THEN 'n'::text
|
||||
WHEN pr.oid IS NOT NULL THEN 'p'::text
|
||||
WHEN la.oid IS NOT NULL THEN 'l'::text
|
||||
WHEN rw.oid IS NOT NULL THEN 'R'::text
|
||||
WHEN co.oid IS NOT NULL THEN 'C'::text || contype
|
||||
WHEN ad.oid IS NOT NULL THEN 'A'::text
|
||||
ELSE ''
|
||||
END AS type,
|
||||
COALESCE(coc.relname, clrw.relname) AS ownertable,
|
||||
CASE WHEN cl.relname IS NOT NULL OR att.attname IS NOT NULL THEN cl.relname || '.' || att.attname
|
||||
ELSE COALESCE(cl.relname, co.conname, pr.proname, tg.tgname, ty.typname, la.lanname, rw.rulename, ns.nspname)
|
||||
END AS refname,
|
||||
COALESCE(nsc.nspname, nso.nspname, nsp.nspname, nst.nspname, nsrw.nspname) AS nspname
|
||||
FROM pg_depend dep
|
||||
LEFT JOIN pg_class cl ON dep.refobjid=cl.oid
|
||||
LEFT JOIN pg_attribute att ON dep.refobjid=att.attrelid AND dep.refobjsubid=att.attnum
|
||||
LEFT JOIN pg_namespace nsc ON cl.relnamespace=nsc.oid
|
||||
LEFT JOIN pg_proc pr ON dep.refobjid=pr.oid
|
||||
LEFT JOIN pg_namespace nsp ON pr.pronamespace=nsp.oid
|
||||
LEFT JOIN pg_trigger tg ON dep.refobjid=tg.oid
|
||||
LEFT JOIN pg_type ty ON dep.refobjid=ty.oid
|
||||
LEFT JOIN pg_namespace nst ON ty.typnamespace=nst.oid
|
||||
LEFT JOIN pg_constraint co ON dep.refobjid=co.oid
|
||||
LEFT JOIN pg_class coc ON co.conrelid=coc.oid
|
||||
LEFT JOIN pg_namespace nso ON co.connamespace=nso.oid
|
||||
LEFT JOIN pg_rewrite rw ON dep.refobjid=rw.oid
|
||||
LEFT JOIN pg_class clrw ON clrw.oid=rw.ev_class
|
||||
LEFT JOIN pg_namespace nsrw ON clrw.relnamespace=nsrw.oid
|
||||
LEFT JOIN pg_language la ON dep.refobjid=la.oid
|
||||
LEFT JOIN pg_namespace ns ON dep.refobjid=ns.oid
|
||||
LEFT JOIN pg_attrdef ad ON ad.adrelid=att.attrelid AND ad.adnum=att.attnum
|
||||
{{where_clause}} AND
|
||||
refclassid IN ( SELECT oid FROM pg_class WHERE relname IN
|
||||
('pg_class', 'pg_constraint', 'pg_conversion', 'pg_language', 'pg_proc', 'pg_rewrite', 'pg_namespace',
|
||||
'pg_trigger', 'pg_type', 'pg_attrdef', 'pg_event_trigger', 'pg_foreign_server', 'pg_foreign_data_wrapper'))
|
||||
ORDER BY refclassid, cl.relkind
|
@ -1,59 +1,4 @@
|
||||
{% if fetch_dependencies %}
|
||||
SELECT DISTINCT dep.deptype, dep.refclassid, cl.relkind, ad.adbin, ad.adsrc,
|
||||
CASE WHEN cl.relkind IS NOT NULL THEN cl.relkind || COALESCE(dep.refobjsubid::character varying, '')
|
||||
WHEN tg.oid IS NOT NULL THEN 'T'::text
|
||||
WHEN ty.oid IS NOT NULL AND ty.typbasetype = 0 THEN 'y'::text
|
||||
WHEN ty.oid IS NOT NULL AND ty.typbasetype != 0 THEN 'd'::text
|
||||
WHEN ns.oid IS NOT NULL THEN 'n'::text
|
||||
WHEN pr.oid IS NOT NULL THEN 'p'::text
|
||||
WHEN la.oid IS NOT NULL THEN 'l'::text
|
||||
WHEN rw.oid IS NOT NULL THEN 'R'::text
|
||||
WHEN co.oid IS NOT NULL THEN 'C'::text || contype
|
||||
WHEN ad.oid IS NOT NULL THEN 'A'::text
|
||||
WHEN fs.oid IS NOT NULL THEN 'F'::text
|
||||
WHEN fdw.oid IS NOT NULL THEN 'f'::text
|
||||
ELSE ''
|
||||
END AS type,
|
||||
COALESCE(coc.relname, clrw.relname) AS ownertable,
|
||||
CASE WHEN cl.relname IS NOT NULL OR att.attname IS NOT NULL THEN cl.relname || '.' || att.attname
|
||||
ELSE COALESCE(cl.relname, co.conname, pr.proname, tg.tgname, ty.typname, la.lanname, rw.rulename, ns.nspname, fs.srvname, fdw.fdwname)
|
||||
END AS refname,
|
||||
COALESCE(nsc.nspname, nso.nspname, nsp.nspname, nst.nspname, nsrw.nspname) AS nspname
|
||||
FROM pg_depend dep
|
||||
LEFT JOIN pg_class cl ON dep.refobjid=cl.oid
|
||||
LEFT JOIN pg_attribute att ON dep.refobjid=att.attrelid AND dep.refobjsubid=att.attnum
|
||||
LEFT JOIN pg_namespace nsc ON cl.relnamespace=nsc.oid
|
||||
LEFT JOIN pg_proc pr ON dep.refobjid=pr.oid
|
||||
LEFT JOIN pg_namespace nsp ON pr.pronamespace=nsp.oid
|
||||
LEFT JOIN pg_trigger tg ON dep.refobjid=tg.oid
|
||||
LEFT JOIN pg_type ty ON dep.refobjid=ty.oid
|
||||
LEFT JOIN pg_namespace nst ON ty.typnamespace=nst.oid
|
||||
LEFT JOIN pg_constraint co ON dep.refobjid=co.oid
|
||||
LEFT JOIN pg_class coc ON co.conrelid=coc.oid
|
||||
LEFT JOIN pg_namespace nso ON co.connamespace=nso.oid
|
||||
LEFT JOIN pg_rewrite rw ON dep.refobjid=rw.oid
|
||||
LEFT JOIN pg_class clrw ON clrw.oid=rw.ev_class
|
||||
LEFT JOIN pg_namespace nsrw ON clrw.relnamespace=nsrw.oid
|
||||
LEFT JOIN pg_language la ON dep.refobjid=la.oid
|
||||
LEFT JOIN pg_namespace ns ON dep.refobjid=ns.oid
|
||||
LEFT JOIN pg_attrdef ad ON ad.adrelid=att.attrelid AND ad.adnum=att.attnum
|
||||
LEFT JOIN pg_foreign_server fs ON fs.oid=dep.refobjid
|
||||
LEFT JOIN pg_foreign_data_wrapper fdw ON fdw.oid=dep.refobjid
|
||||
{{where_clause}} AND
|
||||
refclassid IN ( SELECT oid FROM pg_class WHERE relname IN
|
||||
('pg_class', 'pg_constraint', 'pg_conversion', 'pg_language', 'pg_proc', 'pg_rewrite', 'pg_namespace',
|
||||
'pg_trigger', 'pg_type', 'pg_attrdef', 'pg_event_trigger', 'pg_foreign_server', 'pg_foreign_data_wrapper'))
|
||||
ORDER BY refclassid, cl.relkind
|
||||
{% endif %}
|
||||
|
||||
{% if fetch_role_dependencies %}
|
||||
SELECT rolname AS refname, refclassid, deptype
|
||||
FROM pg_shdepend dep
|
||||
LEFT JOIN pg_roles r ON refclassid=1260 AND refobjid=r.oid
|
||||
{{where_clause}} ORDER BY 1
|
||||
{% endif %}
|
||||
|
||||
{% if fetch_dependents %}
|
||||
SET LOCAL join_collapse_limit=8;
|
||||
SELECT DISTINCT dep.deptype, dep.classid, cl.relkind, ad.adbin, ad.adsrc,
|
||||
CASE WHEN cl.relkind IS NOT NULL THEN cl.relkind || COALESCE(dep.objsubid::text, '')
|
||||
WHEN tg.oid IS NOT NULL THEN 'T'::text
|
||||
@ -64,13 +9,11 @@ SELECT DISTINCT dep.deptype, dep.classid, cl.relkind, ad.adbin, ad.adsrc,
|
||||
WHEN rw.oid IS NOT NULL THEN 'R'::text
|
||||
WHEN co.oid IS NOT NULL THEN 'C'::text || contype
|
||||
WHEN ad.oid IS NOT NULL THEN 'A'::text
|
||||
WHEN fs.oid IS NOT NULL THEN 'F'::text
|
||||
WHEN fdw.oid IS NOT NULL THEN 'f'::text
|
||||
ELSE ''
|
||||
END AS type,
|
||||
COALESCE(coc.relname, clrw.relname) AS ownertable,
|
||||
CASE WHEN cl.relname IS NOT NULL AND att.attname IS NOT NULL THEN cl.relname || '.' || att.attname
|
||||
ELSE COALESCE(cl.relname, co.conname, pr.proname, tg.tgname, ty.typname, la.lanname, rw.rulename, ns.nspname, fs.srvname, fdw.fdwname)
|
||||
ELSE COALESCE(cl.relname, co.conname, pr.proname, tg.tgname, ty.typname, la.lanname, rw.rulename, ns.nspname)
|
||||
END AS refname,
|
||||
COALESCE(nsc.nspname, nso.nspname, nsp.nspname, nst.nspname, nsrw.nspname) AS nspname
|
||||
FROM pg_depend dep
|
||||
@ -91,11 +34,8 @@ LEFT JOIN pg_namespace nsrw ON clrw.relnamespace=nsrw.oid
|
||||
LEFT JOIN pg_language la ON dep.objid=la.oid
|
||||
LEFT JOIN pg_namespace ns ON dep.objid=ns.oid
|
||||
LEFT JOIN pg_attrdef ad ON ad.oid=dep.objid
|
||||
LEFT JOIN pg_foreign_server fs ON fs.oid=dep.objid
|
||||
LEFT JOIN pg_foreign_data_wrapper fdw ON fdw.oid=dep.objid
|
||||
{{where_clause}} AND
|
||||
classid IN ( SELECT oid FROM pg_class WHERE relname IN
|
||||
('pg_class', 'pg_constraint', 'pg_conversion', 'pg_language', 'pg_proc', 'pg_rewrite', 'pg_namespace',
|
||||
'pg_trigger', 'pg_type', 'pg_attrdef', 'pg_event_trigger', 'pg_foreign_server', 'pg_foreign_data_wrapper'))
|
||||
ORDER BY classid, cl.relkind
|
||||
{% endif %}
|
||||
ORDER BY classid, cl.relkind;
|
||||
|
@ -0,0 +1,4 @@
|
||||
SELECT rolname AS refname, refclassid, deptype
|
||||
FROM pg_shdepend dep
|
||||
LEFT JOIN pg_roles r ON refclassid=1260 AND refobjid=r.oid
|
||||
{{where_clause}} ORDER BY 1
|
@ -0,0 +1,51 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
|
||||
class TestDependenciesSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
# Fetching default URL for schema node.
|
||||
('Test dependencies SQL file', dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestDependenciesSql, self).__init__()
|
||||
self.table_id = -1
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
cursor.execute("SELECT pg_class.oid AS table_id "
|
||||
"FROM pg_class "
|
||||
"WHERE pg_class.relname='test_table'")
|
||||
self.table_id = cursor.fetchone()[0]
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "dependencies.sql")
|
||||
template = file_as_template(template_file)
|
||||
sql = template.render(where_clause="WHERE dep.objid=%s::oid" % self.table_id)
|
||||
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
self.assertEqual(1, len(fetch_result))
|
||||
|
||||
first_row = {}
|
||||
for index, description in enumerate(descriptions):
|
||||
first_row[description.name] = fetch_result[0][index]
|
||||
|
||||
self.assertEqual('n', first_row["deptype"])
|
||||
self.assertEqual('public', first_row["refname"])
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "depends", "sql", version, filename)
|
@ -0,0 +1,51 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
|
||||
class TestDependentsSql(SQLTemplateTestBase):
|
||||
scenarios = [
|
||||
# Fetching default URL for schema node.
|
||||
('Test dependencies SQL file', dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestDependentsSql, self).__init__()
|
||||
self.table_id = -1
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
cursor.execute("SELECT pg_class.oid AS table_id "
|
||||
"FROM pg_class "
|
||||
"WHERE pg_class.relname='test_table'")
|
||||
self.table_id = cursor.fetchone()[0]
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "dependents.sql")
|
||||
template = file_as_template(template_file)
|
||||
sql = template.render(where_clause="WHERE dep.objid=%s::oid" % self.table_id)
|
||||
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
self.assertEqual(1, len(fetch_result))
|
||||
|
||||
first_row = {}
|
||||
for index, description in enumerate(descriptions):
|
||||
first_row[description.name] = fetch_result[0][index]
|
||||
|
||||
self.assertEqual('n', first_row["deptype"])
|
||||
self.assertEqual('test_table', first_row["refname"])
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "depends", "sql", version, filename)
|
@ -0,0 +1,85 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
from pgadmin.browser.server_groups.servers.roles.tests.utils import create_role, delete_role
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
import os
|
||||
|
||||
from regression.python_test_utils import test_utils
|
||||
from regression.python_test_utils.sql_template_test_base import SQLTemplateTestBase
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
from regression.python_test_utils.test_utils import create_database
|
||||
|
||||
|
||||
class TestRoleDependenciesSql(BaseTestGenerator):
|
||||
scenarios = [
|
||||
# Fetching default URL for schema node.
|
||||
('Test Role Dependencies SQL file', dict())
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(TestRoleDependenciesSql, self).__init__()
|
||||
self.table_id = -1
|
||||
|
||||
def setUp(self):
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
cursor = connection.cursor()
|
||||
try:
|
||||
cursor.execute(
|
||||
"CREATE ROLE testpgadmin LOGIN PASSWORD '%s'"
|
||||
% self.server['db_password'])
|
||||
except Exception as exception:
|
||||
print(exception)
|
||||
connection.commit()
|
||||
|
||||
self.server_with_modified_user = self.server.copy()
|
||||
self.server_with_modified_user['username'] = "testpgadmin"
|
||||
|
||||
def runTest(self):
|
||||
if hasattr(self, "ignore_test"):
|
||||
return
|
||||
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
test_utils.create_table(self.server_with_modified_user, database_name, "test_new_role_table")
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SELECT pg_class.oid AS table_id "
|
||||
"FROM pg_class "
|
||||
"WHERE pg_class.relname='test_new_role_table'")
|
||||
self.table_id = cursor.fetchone()[0]
|
||||
|
||||
sql = self.generate_sql('default')
|
||||
cursor.execute(sql)
|
||||
|
||||
fetch_result = cursor.fetchall()
|
||||
self.assertions(fetch_result, cursor.description)
|
||||
|
||||
def tearDown(self):
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("DROP ROLE testpgadmin")
|
||||
connection.commit()
|
||||
|
||||
def generate_sql(self, version):
|
||||
template_file = self.get_template_file(version, "role_dependencies.sql")
|
||||
template = file_as_template(template_file)
|
||||
sql = template.render(where_clause="WHERE dep.objid=%s::oid" % self.table_id)
|
||||
|
||||
return sql
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
self.assertEqual(1, len(fetch_result))
|
||||
|
||||
first_row = {}
|
||||
for index, description in enumerate(descriptions):
|
||||
first_row[description.name] = fetch_result[0][index]
|
||||
|
||||
self.assertEqual('o', first_row["deptype"])
|
||||
|
||||
@staticmethod
|
||||
def get_template_file(version, filename):
|
||||
return os.path.join(os.path.dirname(__file__), "..", "templates", "depends", "sql", version, filename)
|
@ -353,15 +353,15 @@ class PGChildNodeView(NodeView):
|
||||
else:
|
||||
where_clause = where
|
||||
|
||||
query = render_template("/".join([sql_path, 'dependents.sql']),
|
||||
fetch_dependencies=True, where_clause=where_clause)
|
||||
query = render_template("/".join([sql_path, 'dependencies.sql']),
|
||||
where_clause=where_clause)
|
||||
# fetch the dependency for the selected object
|
||||
dependencies = self.__fetch_dependency(conn, query)
|
||||
|
||||
# fetch role dependencies
|
||||
if where_clause.find('subid') < 0:
|
||||
sql = render_template("/".join([sql_path, 'dependents.sql']),
|
||||
fetch_role_dependencies=True, where_clause=where_clause)
|
||||
sql = render_template("/".join([sql_path, 'role_dependencies.sql']),
|
||||
where_clause=where_clause)
|
||||
|
||||
status, result = conn.execute_dict(sql)
|
||||
if not status:
|
||||
@ -402,7 +402,7 @@ class PGChildNodeView(NodeView):
|
||||
where_clause = where
|
||||
|
||||
query = render_template("/".join([sql_path, 'dependents.sql']),
|
||||
fetch_dependents=True, where_clause=where_clause)
|
||||
where_clause=where_clause)
|
||||
# fetch the dependency for the selected object
|
||||
dependents = self.__fetch_dependency(conn, query)
|
||||
|
||||
|
@ -1,3 +1,12 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2017, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import pyperclip
|
||||
import time
|
||||
|
||||
@ -37,7 +46,7 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest):
|
||||
self.page.find_by_partial_link_text("Query Tool").click()
|
||||
self.page.click_tab('Query-1')
|
||||
time.sleep(5)
|
||||
ActionChains(self.page.driver).send_keys("SELECT * FROM test_table").perform()
|
||||
ActionChains(self.page.driver).send_keys("SELECT * FROM test_table ORDER BY some_column").perform()
|
||||
self.page.driver.switch_to_frame(self.page.driver.find_element_by_tag_name("iframe"))
|
||||
self.page.find_by_id("btn-flash").click()
|
||||
|
||||
|
@ -16,10 +16,14 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest):
|
||||
"""Tests to check if Debugger is vulnerable to XSS."""
|
||||
|
||||
scenarios = [
|
||||
("Test table DDL generation", dict())
|
||||
("Tests to check if Debugger is vulnerable to XSS", dict())
|
||||
]
|
||||
|
||||
def before(self):
|
||||
with test_utils.Database(self.server) as (connection, _):
|
||||
if connection.server_version < 90100:
|
||||
self.skipTest("Functions tree node is not present in pgAdmin below PG v9.1")
|
||||
|
||||
# Some test function is needed for debugger
|
||||
test_utils.create_debug_function(self.server, "postgres",
|
||||
"test_function")
|
||||
|
@ -7,45 +7,53 @@
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pgadmin.utils.driver import DriverRegistry
|
||||
from regression.python_test_utils.template_helper import file_as_template
|
||||
|
||||
DriverRegistry.load_drivers()
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils
|
||||
from pgadmin.utils.driver import DriverRegistry
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
long = int
|
||||
DriverRegistry.load_drivers()
|
||||
|
||||
|
||||
class TestTriggerNodes(BaseTestGenerator):
|
||||
class SQLTemplateTestBase(BaseTestGenerator):
|
||||
scenarios = [
|
||||
("parent test class", dict(ignore_test=True))
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super(SQLTemplateTestBase, self).__init__()
|
||||
self.database_name = -1
|
||||
self.versions_to_test = -1
|
||||
|
||||
def test_setup(self, connection, cursor):
|
||||
pass
|
||||
|
||||
def generate_sql(self, version):
|
||||
pass
|
||||
|
||||
def assertions(self, fetch_result, descriptions):
|
||||
pass
|
||||
|
||||
def runTest(self):
|
||||
""" When there are no triggers, it returns an empty result """
|
||||
if hasattr(self, "ignore_test"):
|
||||
return
|
||||
|
||||
with test_utils.Database(self.server) as (connection, database_name):
|
||||
test_utils.create_table(self.server, database_name, "test_table")
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SELECT pg_class.oid AS table_id "
|
||||
"FROM pg_class "
|
||||
"WHERE pg_class.relname='test_table'")
|
||||
table_id = cursor.fetchone()[0]
|
||||
self.database_name = database_name
|
||||
|
||||
if connection.server_version < 90100:
|
||||
self.versions_to_test = ['default']
|
||||
else:
|
||||
self.versions_to_test = ['9.1_plus']
|
||||
|
||||
cursor = connection.cursor()
|
||||
self.test_setup(connection, cursor)
|
||||
|
||||
for version in self.versions_to_test:
|
||||
template_file = os.path.join(os.path.dirname(__file__), "..", version, "nodes.sql")
|
||||
|
||||
template = file_as_template(template_file)
|
||||
|
||||
sql = template.render(tid=table_id)
|
||||
sql = self.generate_sql(version)
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(sql)
|
||||
fetch_result = cursor.fetchall()
|
||||
self.assertEqual(0, len(fetch_result))
|
||||
|
||||
self.assertions(fetch_result, cursor.description)
|
Loading…
Reference in New Issue
Block a user