Remove psycopg2 completely.

This commit is contained in:
Khushboo Vashi 2023-03-20 16:57:30 +05:30 committed by GitHub
parent f43b3b0ce5
commit 366dd3a973
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 48 additions and 4630 deletions

View File

@ -74,7 +74,7 @@ simple - adapt as required for your distribution:
```
4. Ensure that a PostgreSQL installation's bin/ directory is in the path (so
pg_config can be found for building psycopg2), and install the required
pg_config can be found for building psycopg3), and install the required
packages:
```bash

View File

@ -303,10 +303,7 @@ LOG_ROTATION_MAX_LOG_FILES = 90 # Maximum number of backups to retain
##########################################################################
# The default driver used for making connection with PostgreSQL
if sys.version_info < (3, 7):
PG_DEFAULT_DRIVER = 'psycopg2'
else:
PG_DEFAULT_DRIVER = 'psycopg3'
PG_DEFAULT_DRIVER = 'psycopg3'
# Maximum allowed idle time in minutes before which releasing the connection
# for the particular session. (in minutes)

View File

@ -1,8 +0,0 @@
SELECT setval('public."Seq1_$%{}[]()&*^!@""''`\/#"', 7, true);
ALTER SEQUENCE IF EXISTS public."Seq1_$%{}[]()&*^!@""'`\/#"
INCREMENT 12
MINVALUE 2
MAXVALUE 9992
CACHE 2
CYCLE;

View File

@ -70,25 +70,14 @@
"current_value": "7", "increment": "12", "minimum": "2", "maximum": "9992", "cache": "2", "cycled": true
},
"expected_sql_file": "alter_seq_props.sql",
"expected_msql_file": "alter_seq_props_msql_psycopg2.sql",
"pg_driver": "psycopg2"
},{
"type": "alter",
"name": "Alter Sequence properties",
"endpoint": "NODE-sequence.obj_id",
"sql_endpoint": "NODE-sequence.sql_id",
"msql_endpoint": "NODE-sequence.msql_id",
"data": {
"current_value": "7", "increment": "12", "minimum": "2", "maximum": "9992", "cache": "2", "cycled": true
},
"expected_sql_file": "alter_seq_props.sql",
"expected_msql_file": "alter_seq_props_msql.sql",
"pg_driver": "psycopg3"
"expected_msql_file": "alter_seq_props_msql.sql"
},{
"type": "alter",
"name": "Alter Sequence add privileges",
"endpoint": "NODE-sequence.obj_id",
"sql_endpoint": "NODE-sequence.sql_id",
"sql_endpoint": "NODE-sequence.sql_id",
"sql_endpoint": "NODE-sequence.sql_id",
"msql_endpoint": "NODE-sequence.msql_id",
"data": {
"relacl": {

View File

@ -1,8 +0,0 @@
SELECT setval('public."Seq1_$%{}[]()&*^!@""''`\/#"', 7, true);
ALTER SEQUENCE IF EXISTS public."Seq1_$%{}[]()&*^!@""'`\/#"
INCREMENT 12
MINVALUE 2
MAXVALUE 9992
CACHE 2
CYCLE;

View File

@ -36,7 +36,6 @@ from pgadmin.browser.server_groups.servers.databases.extensions.utils \
from pgadmin.utils.constants import PREF_LABEL_KEYBOARD_SHORTCUTS, \
SERVER_CONNECTION_CLOSED
from pgadmin.preferences import preferences
from pgadmin.utils.constants import PSYCOPG2
MODULE_NAME = 'debugger'
@ -1318,11 +1317,6 @@ def messages(trans_id):
if conn.connected():
status = 'Busy'
if PG_DEFAULT_DRIVER == PSYCOPG2:
# psycopg3 doesn't require polling to get the
# messages as debugger connection is already open
# Remove this block while removing psucopg2 completely
_, result = conn.poll()
notify = conn.messages()
if notify:
# In notice message we need to find "PLDBGBREAK" string to find

View File

@ -10,7 +10,6 @@
from unittest.mock import patch
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.utils.constants import PSYCOPG2
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from regression.python_test_utils import test_utils
@ -153,13 +152,8 @@ class TestDownloadCSV(BaseTestGenerator):
# Disable the console logging from Flask logger
self.app.logger.disabled = True
if not self.is_valid and self.is_valid_tx:
if config.PG_DEFAULT_DRIVER == PSYCOPG2:
# When user enters wrong query, poll will throw 500,
# so expecting 500, as poll is never called for a wrong query.
self.assertEqual(res.status_code, 500)
else:
# The result will be null but status code will be 200
self.assertEqual(res.status_code, 200)
# The result will be null but status code will be 200
self.assertEqual(res.status_code, 200)
elif self.filename is None:
if self.download_as_txt:
with patch('pgadmin.tools.sqleditor.blueprint.'

View File

@ -2,17 +2,19 @@
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# Copyright (C) 2013 - 2022, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import secrets
import json
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.utils.constants import PSYCOPG3
from regression.python_test_utils import test_utils
from pgadmin.utils import server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
import config
@ -66,9 +68,6 @@ class TestSQLASCIIEncoding(BaseTestGenerator):
]
def setUp(self):
if config.PG_DEFAULT_DRIVER == PSYCOPG3:
self.skipTest('SQL_ASCII encoding: skipping for psycopg3.')
self.encode_db_name = 'test_encoding_' + self.db_encoding + \
str(secrets.choice(range(1000, 65535)))
self.encode_sid = self.server_information['server_id']
@ -84,38 +83,43 @@ class TestSQLASCIIEncoding(BaseTestGenerator):
self.server, self.encode_db_name,
(self.db_encoding, self.lc_collate))
test_utils.create_table_with_query(
self.server,
self.encode_db_name,
"""CREATE TABLE {0}(
name character varying(200) COLLATE pg_catalog."default")
""".format(self.table_name))
def runTest(self):
db_con = test_utils.get_db_connection(
self.encode_db_name,
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
db_con = database_utils.connect_database(self,
test_utils.SERVER_GROUP,
self.encode_sid,
self.encode_did)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to the database.")
old_isolation_level = db_con.isolation_level
test_utils.set_isolation_level(db_con, 0)
pg_cursor = db_con.cursor()
pg_cursor.execute("SET client_encoding='{0}'".format(self.db_encoding))
query = """INSERT INTO {0} VALUES('{1}')""".format(
self.table_name, self.test_str)
pg_cursor.execute(query)
test_utils.set_isolation_level(db_con, old_isolation_level)
db_con.commit()
# Initialize query tool
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'\
.format(self.trans_id, test_utils.SERVER_GROUP, self.encode_sid,
self.encode_did)
response = self.tester.post(url)
self.assertEqual(response.status_code, 200)
query = """SELECT * FROM {0}""".format(self.table_name)
pg_cursor.execute(query)
resp = pg_cursor.fetchone()
# Check character
url = "/sqleditor/query_tool/start/{0}".format(self.trans_id)
sql = "select '{0}';".format(self.test_str)
response = self.tester.post(url, data=json.dumps({"sql": sql}),
content_type='html/json')
self.assertEqual(response.status_code, 200)
url = '/sqleditor/poll/{0}'.format(self.trans_id)
response = self.tester.get(url)
self.assertEqual(response.status_code, 200)
response_data = json.loads(response.data)
self.assertEqual(response_data['data']['rows_fetched_to'], 1)
result = response_data['data']['result'][0][0]
self.assertEqual(result, self.test_str)
self.assertEqual(resp[0], self.test_str)
# Close query tool
url = '/sqleditor/close/{0}'.format(self.trans_id)
response = self.tester.delete(url)
self.assertEqual(response.status_code, 200)
database_utils.disconnect_database(self, self.encode_sid,
self.encode_did)
def tearDown(self):
main_conn = test_utils.get_db_connection(

View File

@ -1,137 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2022, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import secrets
import json
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.utils.constants import PSYCOPG2
from regression.python_test_utils import test_utils
from pgadmin.utils import server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
import config
class TestSQLASCIIEncodingPsycopg3(BaseTestGenerator):
"""
This class validates character support in pgAdmin4 for
SQL_ASCII encodings
"""
scenarios = [
(
'Test SQL_ASCII data with multiple backslashes',
dict(
table_name='test_sql_ascii',
db_encoding='SQL_ASCII',
lc_collate='C',
test_str='\\\\Four\\\Three\\Two\One'
)),
(
'Test SQL_ASCII data with file path',
dict(
table_name='test_sql_ascii',
db_encoding='SQL_ASCII',
lc_collate='C',
test_str='\\test\Documents\2017\12\19\AD93E646-'
'E5FE-11E7-85AE-EB2E217F96F0.tif'
)),
(
'Test SQL_ASCII data with multiple forward slashes',
dict(
table_name='test_sql_ascii',
db_encoding='SQL_ASCII',
lc_collate='C',
test_str='////4///3//2/1'
)),
(
'Test SQL_ASCII data with blob string',
dict(
table_name='test_sql_ascii',
db_encoding='SQL_ASCII',
lc_collate='C',
test_str='Blob: \xf4\xa5\xa3\xa5'
)),
(
'Test SQL_ASCII data with blob string & ascii table name',
dict(
table_name='ü',
db_encoding='SQL_ASCII',
lc_collate='C',
test_str='Blob: \xf4\xa5\xa3\xa5'
)),
]
def setUp(self):
if config.PG_DEFAULT_DRIVER == PSYCOPG2:
self.skipTest('SQL_ASCII encoding: skipping for psycopg2.')
self.encode_db_name = 'test_encoding_' + self.db_encoding + \
str(secrets.choice(range(1000, 65535)))
self.encode_sid = self.server_information['server_id']
server_con = server_utils.connect_server(self, self.encode_sid)
if hasattr(self, 'skip_on_database'):
if 'data' in server_con and 'type' in server_con['data']:
if server_con['data']['type'] in self.skip_on_database:
self.skipTest('cannot run in: %s' %
server_con['data']['type'])
self.encode_did = test_utils.create_database(
self.server, self.encode_db_name,
(self.db_encoding, self.lc_collate))
def runTest(self):
db_con = database_utils.connect_database(self,
test_utils.SERVER_GROUP,
self.encode_sid,
self.encode_did)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'\
.format(self.trans_id, test_utils.SERVER_GROUP, self.encode_sid,
self.encode_did)
response = self.tester.post(url)
self.assertEqual(response.status_code, 200)
# Check character
url = "/sqleditor/query_tool/start/{0}".format(self.trans_id)
sql = "select '{0}';".format(self.test_str)
response = self.tester.post(url, data=json.dumps({"sql": sql}),
content_type='html/json')
self.assertEqual(response.status_code, 200)
url = '/sqleditor/poll/{0}'.format(self.trans_id)
response = self.tester.get(url)
self.assertEqual(response.status_code, 200)
response_data = json.loads(response.data)
self.assertEqual(response_data['data']['rows_fetched_to'], 1)
result = response_data['data']['result'][0][0]
self.assertEqual(result, self.test_str)
# Close query tool
url = '/sqleditor/close/{0}'.format(self.trans_id)
response = self.tester.delete(url)
self.assertEqual(response.status_code, 200)
database_utils.disconnect_database(self, self.encode_sid,
self.encode_did)
def tearDown(self):
main_conn = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(main_conn, self.encode_db_name)

View File

@ -118,7 +118,6 @@ USER_NOT_FOUND = gettext("The specified user ID (%s) could not be found.")
DATABASE_LAST_SYSTEM_OID = 16383
# Drivers
PSYCOPG2 = 'psycopg2'
PSYCOPG3 = 'psycopg3'
# Shared storage

View File

@ -1,415 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""
Implementation of Driver class
It is a wrapper around the actual psycopg2 driver, and connection
object.
"""
import datetime
import re
from flask import session
from flask_login import current_user
from werkzeug.exceptions import InternalServerError
import psycopg2
from psycopg2.extensions import adapt
from threading import Lock
import config
from pgadmin.model import Server
from .keywords import scan_keyword
from ..abstract import BaseDriver
from .connection import Connection
from .server_manager import ServerManager
connection_restore_lock = Lock()
class Driver(BaseDriver):
"""
class Driver(BaseDriver):
This driver acts as a wrapper around psycopg2 connection driver
implementation. We will be using psycopg2 for makeing connection with
the PostgreSQL/EDB Postgres Advanced Server (EnterpriseDB).
Properties:
----------
* Version (string):
Version of psycopg2 driver
Methods:
-------
* get_connection(sid, database, conn_id, auto_reconnect)
- It returns a Connection class object, which may/may not be connected
to the database server for this sesssion
* release_connection(seid, database, conn_id)
- It releases the connection object for the given conn_id/database for this
session.
* connection_manager(sid, reset)
- It returns the server connection manager for this session.
"""
def __init__(self, **kwargs):
self.managers = dict()
super().__init__()
def _restore_connections_from_session(self):
"""
Used internally by connection_manager to restore connections
from sessions.
"""
if session.sid not in self.managers:
self.managers[session.sid] = managers = dict()
if '__pgsql_server_managers' in session:
session_managers = \
session['__pgsql_server_managers'].copy()
for server in \
Server.query.filter_by(
user_id=current_user.id):
manager = managers[str(server.id)] = \
ServerManager(server)
if server.id in session_managers:
manager._restore(session_managers[server.id])
manager.update_session()
return managers
return {}
def connection_manager(self, sid=None):
"""
connection_manager(...)
Returns the ServerManager object for the current session. It will
create new ServerManager object (if necessary).
Parameters:
sid
- Server ID
"""
assert (sid is not None and isinstance(sid, int))
managers = None
server_data = Server.query.filter_by(id=sid).first()
if server_data is None:
return None
if session.sid not in self.managers:
with connection_restore_lock:
# The wait is over but the object might have been loaded
# by some other thread check again
managers = self._restore_connections_from_session()
else:
managers = self.managers[session.sid]
if str(sid) in managers:
manager = managers[str(sid)]
with connection_restore_lock:
manager._restore_connections()
manager.update_session()
managers['pinged'] = datetime.datetime.now()
if str(sid) not in managers:
s = Server.query.filter_by(id=sid).first()
if not s:
return None
managers[str(sid)] = ServerManager(s)
return managers[str(sid)]
return managers[str(sid)]
def version(self):
"""
version(...)
Returns the current version of psycopg2 driver
"""
_version = getattr(psycopg2, '__version__', None)
if _version:
return _version
raise InternalServerError(
"Driver Version information for psycopg2 is not available!"
)
def libpq_version(self):
"""
Returns the loaded libpq version
"""
version = getattr(psycopg2, '__libpq_version__', None)
if version:
return version
raise InternalServerError(
"libpq version information is not available!"
)
def get_connection(
self, sid, database=None, conn_id=None, auto_reconnect=True
):
"""
get_connection(...)
Returns the connection object for the certain connection-id/database
for the specific server, identified by sid. Create a new Connection
object (if require).
Parameters:
sid
- Server ID
database
- Database, on which the connection needs to be made
If provided none, maintenance_db for the server will be used,
while generating new Connection object.
conn_id
- Identification String for the Connection This will be used by
certain tools, which will require a dedicated connection for it.
i.e. Debugger, Query Tool, etc.
auto_reconnect
- This parameters define, if we should attempt to reconnect the
database server automatically, when connection has been lost for
any reason. Certain tools like Debugger will require a permenant
connection, and it stops working on disconnection.
"""
manager = self.connection_manager(sid)
return manager.connection(database=database, conn_id=conn_id,
auto_reconnect=auto_reconnect)
def release_connection(self, sid, database=None, conn_id=None):
"""
Release the connection for the given connection-id/database in this
session.
"""
return self.connection_manager(sid).release(database, conn_id)
def delete_manager(self, sid):
"""
Delete manager for given server id.
"""
manager = self.connection_manager(sid)
if manager is not None:
manager.release()
if session.sid in self.managers and \
str(sid) in self.managers[session.sid]:
del self.managers[session.sid][str(sid)]
def gc_timeout(self):
"""
Release the connections for the sessions, which have not pinged the
server for more than config.MAX_SESSION_IDLE_TIME.
"""
# Minimum session idle is 20 minutes
max_idle_time = max(config.MAX_SESSION_IDLE_TIME or 60, 20)
session_idle_timeout = datetime.timedelta(minutes=max_idle_time)
curr_time = datetime.datetime.now()
for sess in self.managers:
sess_mgr = self.managers[sess]
if sess == session.sid:
sess_mgr['pinged'] = curr_time
continue
if curr_time - sess_mgr['pinged'] >= session_idle_timeout:
for mgr in [
m for m in sess_mgr.values() if isinstance(m,
ServerManager)
]:
mgr.release()
def gc_own(self):
"""
Release the connections for current session
This is useful when (eg. logout) we want to release all
connections (except dedicated connections created by utilities
like backup, restore etc) of all servers for current user.
"""
sess_mgr = self.managers.get(session.sid, None)
if sess_mgr:
for mgr in (
m for m in sess_mgr.values() if isinstance(m, ServerManager)
):
mgr.release()
@staticmethod
def qtLiteral(value, force_quote=False):
adapted = adapt(value)
# Not all adapted objects have encoding
# e.g.
# psycopg2.extensions.BOOLEAN
# psycopg2.extensions.FLOAT
# psycopg2.extensions.INTEGER
# etc...
if hasattr(adapted, 'encoding'):
adapted.encoding = 'utf8'
res = adapted.getquoted()
if isinstance(res, bytes):
res = res.decode('utf-8')
if force_quote is True:
# Convert the input to the string to use the startsWith(...)
res = str(res)
if not res.startswith("'"):
return "'" + res + "'"
return res
@staticmethod
def ScanKeywordExtraLookup(key):
# UNRESERVED_KEYWORD 0
# COL_NAME_KEYWORD 1
# TYPE_FUNC_NAME_KEYWORD 2
# RESERVED_KEYWORD 3
extra_keywords = {
'connect': 3,
'convert': 3,
'distributed': 0,
'exec': 3,
'log': 0,
'long': 3,
'minus': 3,
'nocache': 3,
'number': 3,
'package': 3,
'pls_integer': 3,
'raw': 3,
'return': 3,
'smalldatetime': 3,
'smallfloat': 3,
'smallmoney': 3,
'sysdate': 3,
'systimestap': 3,
'tinyint': 3,
'tinytext': 3,
'varchar2': 3
}
return extra_keywords.get(key, None) or scan_keyword(key)
@staticmethod
def needsQuoting(key, for_types):
value = key
val_noarray = value
# check if the string is number or not
if isinstance(value, int):
return True
# certain types should not be quoted even though it contains a space.
# Evilness.
elif for_types and value[-2:] == "[]":
val_noarray = value[:-2]
if for_types and val_noarray.lower() in [
'bit varying',
'"char"',
'character varying',
'double precision',
'timestamp without time zone',
'timestamp with time zone',
'time without time zone',
'time with time zone',
'"trigger"',
'"unknown"'
]:
return False
# If already quoted?, If yes then do not quote again
if for_types and val_noarray and \
(val_noarray.startswith('"') or val_noarray.endswith('"')):
return False
if '0' <= val_noarray[0] <= '9':
return True
if re.search('[^a-z_0-9]+', val_noarray):
return True
# check string is keywaord or not
category = Driver.ScanKeywordExtraLookup(value)
if category is None:
return False
# UNRESERVED_KEYWORD
if category == 0:
return False
# COL_NAME_KEYWORD
if for_types and category == 1:
return False
return True
@staticmethod
def qtTypeIdent(conn, *args):
# We're not using the conn object at the moment, but - we will
# modify the
# logic to use the server version specific keywords later.
res = None
value = None
for val in args:
# DataType doesn't have len function then convert it to string
if not hasattr(val, '__len__'):
val = str(val)
if len(val) == 0:
continue
value = val
if Driver.needsQuoting(val, True):
value = value.replace("\"", "\"\"")
value = "\"" + value + "\""
res = ((res and res + '.') or '') + value
return res
@staticmethod
def qtIdent(conn, *args):
# We're not using the conn object at the moment, but - we will
# modify the logic to use the server version specific keywords later.
res = None
value = None
for val in args:
if isinstance(val, list):
return map(lambda w: Driver.qtIdent(conn, w), val)
# DataType doesn't have len function then convert it to string
if not hasattr(val, '__len__'):
val = str(val)
if len(val) == 0:
continue
value = val
if Driver.needsQuoting(val, False):
value = value.replace("\"", "\"\"")
value = "\"" + value + "\""
res = ((res and res + '.') or '') + value
return res

File diff suppressed because it is too large Load Diff

View File

@ -1,235 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""
Implementation of an extended cursor, which returns ordered dictionary when
fetching results from it, and also takes care of the duplicate column name in
result.
"""
from collections import OrderedDict
import psycopg2
from psycopg2.extensions import cursor as _cursor, encodings
from .encoding import configure_driver_encodings
configure_driver_encodings(encodings)
class _WrapperColumn():
"""
class _WrapperColumn()
A wrapper class, which wraps the individual description column object,
to allow identify the duplicate column name, created by PostgreSQL database
server implicitly during query execution.
Methods:
-------
* __init__(_col, _name)
- Initialize the wrapper around the description column object, which will
present the dummy name when available instead of the duplicate name.
* __getattribute__(name)
- Get attributes from the original column description (which is a named
tuple) except for few of the attributes of this object (i.e. orig_col,
dummy_name, __class__, to_dict) are part of this object.
* __getitem__(idx)
- Get the item from the original object except for the 0th index item,
which is for 'name'.
* __setitem__(idx, value)
* __delitem__(idx)
- Override them to make the operations on original object.
* to_dict()
- Converts original objects data as OrderedDict (except the name will same
as dummy name (if available), and one more parameter as 'display_name'.
"""
def __init__(self, _col, _name):
"""Initializer for _WrapperColumn"""
self.orig_col = _col
self.dummy_name = _name
def __getattribute__(self, name):
"""Getting the attributes from the original object. (except few)"""
if (name == 'orig_col' or name == 'dummy_name' or
name == '__class__' or name == 'to_dict'):
return object.__getattribute__(self, name)
elif name == 'name':
res = object.__getattribute__(self, 'dummy_name')
if res is not None:
return res
return self.orig_col.__getattribute__(name)
def __getitem__(self, idx):
"""Overrides __getitem__ to fetch item from original object"""
if idx == 0 and self.dummy_name is not None:
return self.dummy_name
return self.orig_col.__getitem__(idx)
def __setitem__(self, *args, **kwargs):
"""Orverrides __setitem__ to do the operations on original object."""
return self.orig_col.__setitem__(*args, **kwargs)
def __delitem__(self, *args, **kwargs):
"""Orverrides __delitem__ to do the operations on original object."""
return self.orig_col.__delitem__(*args, **kwargs)
def to_dict(self):
"""
Generates an OrderedDict from the fields of the original objects
with avoiding the duplicate name.
"""
# In psycopg2 2.8, the description of one result column,
# exposed as items of the cursor.description sequence.
# Before psycopg2 2.8 the description attribute was a sequence
# of simple tuples or namedtuples.
ores = OrderedDict()
ores['name'] = self.orig_col.name
ores['type_code'] = self.orig_col.type_code
ores['display_size'] = self.orig_col.display_size
ores['internal_size'] = self.orig_col.internal_size
ores['precision'] = self.orig_col.precision
ores['scale'] = self.orig_col.scale
ores['null_ok'] = self.orig_col.null_ok
ores['table_oid'] = self.orig_col.table_oid
ores['table_column'] = self.orig_col.table_column
name = ores['name']
if self.dummy_name:
ores['name'] = self.dummy_name
ores['display_name'] = name
return ores
class DictCursor(_cursor):
"""
DictCursor
A class to generate the dictionary from the tuple, and also takes care of
the duplicate column name in result description.
Methods:
-------
* __init__()
- Initialize the cursor object
* _dict_tuple(tuple)
- Generate a dictionary object from a tuple, based on the column
description.
* _ordered_description()
- Generates the _WrapperColumn object from the description column, and
identifies duplicate column name
"""
def __init__(self, *args, **kwargs):
"""
Initialize the cursor object.
"""
self._odt_desc = None
_cursor.__init__(self, *args, **kwargs)
def _dict_tuple(self, tup):
"""
Transform the tuple into a dictionary object.
"""
if self._odt_desc is None:
self._ordered_description()
return dict((k[0], v) for k, v in zip(self._odt_desc, tup))
def _ordered_description(self):
"""
Transform the regular description to wrapper object, which handles
duplicate column name.
"""
self._odt_desc = _cursor.__getattribute__(self, 'description')
desc = self._odt_desc
if desc is None or len(desc) == 0:
return
res = list()
od = dict((d[0], 0) for d in desc)
for d in desc:
dummy = None
idx = od[d.name]
if idx == 0:
od[d.name] = 1
else:
name = d.name
while name in od:
idx += 1
name = ("%s-%s" % (d.name, idx))
od[d.name] = idx
dummy = name
res.append(_WrapperColumn(d, dummy))
self._odt_desc = tuple(res)
def ordered_description(self):
"""
Use this to fetch the description
"""
if self._odt_desc is None:
self._ordered_description()
return self._odt_desc
def execute(self, query, params=None):
"""
Execute function
"""
self._odt_desc = None
if params is not None and len(params) == 0:
params = None
return _cursor.execute(self, query, params)
def executemany(self, query, params=None):
"""
Execute many function of regular cursor.
"""
self._odt_desc = None
return _cursor.executemany(self, query, params)
def callproc(self, proname, params=None):
"""
Call a procedure by a name.
"""
self._odt_desc = None
return _cursor.callproc(self, proname, params)
def fetchmany(self, size=None):
"""
Fetch many tuples as ordered dictionary list.
"""
tuples = _cursor.fetchmany(self, size)
if tuples is not None:
return [self._dict_tuple(t) for t in tuples]
return None
def fetchall(self):
"""
Fetch all tuples as ordered dictionary list.
"""
tuples = _cursor.fetchall(self)
if tuples is not None:
return [self._dict_tuple(t) for t in tuples]
def __iter__(self):
it = _cursor.__iter__(self)
try:
yield self._dict_tuple(next(it))
while True:
yield self._dict_tuple(next(it))
except StopIteration:
pass

View File

@ -1,83 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
# Get Postgres and Python encoding
encode_dict = {
'SQL_ASCII': ['SQL_ASCII', 'raw_unicode_escape', 'unicode_escape'],
'SQLASCII': ['SQL_ASCII', 'raw_unicode_escape', 'unicode_escape'],
'MULE_INTERNAL': ['MULE_INTERNAL', 'raw_unicode_escape', 'unicode_escape'],
'MULEINTERNAL': ['MULEINTERNAL', 'raw_unicode_escape', 'unicode_escape'],
'LATIN1': ['LATIN1', 'latin1', 'latin1'],
'LATIN2': ['LATIN2', 'latin2', 'latin2'],
'LATIN3': ['LATIN3', 'latin3', 'latin3'],
'LATIN4': ['LATIN4', 'latin4', 'latin4'],
'LATIN5': ['LATIN5', 'latin5', 'latin5'],
'LATIN6': ['LATIN6', 'latin6', 'latin6'],
'LATIN7': ['LATIN7', 'latin7', 'latin7'],
'LATIN8': ['LATIN8', 'latin8', 'latin8'],
'LATIN9': ['LATIN9', 'latin9', 'latin9'],
'LATIN10': ['LATIN10', 'latin10', 'latin10'],
'WIN866': ['WIN866', 'cp866', 'cp866'],
'WIN874': ['WIN874', 'cp874', 'cp874'],
'WIN1250': ['WIN1250', 'cp1250', 'cp1250'],
'WIN1251': ['WIN1251', 'cp1251', 'cp1251'],
'WIN1252': ['WIN1252', 'cp1252', 'cp1252'],
'WIN1253': ['WIN1253', 'cp1253', 'cp1253'],
'WIN1254': ['WIN1254', 'cp1254', 'cp1254'],
'WIN1255': ['WIN1255', 'cp1255', 'cp1255'],
'WIN1256': ['WIN1256', 'cp1256', 'cp1256'],
'WIN1257': ['WIN1257', 'cp1257', 'cp1257'],
'WIN1258': ['WIN1258', 'cp1258', 'cp1258'],
'EUC_JIS_2004': ['EUC_JIS_2004', 'eucjis2004', 'eucjis2004'],
'EUCJIS2004': ['EUCJIS2004', 'eucjis2004', 'eucjis2004'],
'EUC_CN': ['EUC_CN', 'euc-cn', 'euc-cn'],
'EUCCN': ['EUCCN', 'euc-cn', 'euc-cn'],
'EUC_JP': ['EUC_JP', 'euc_jp', 'euc_jp'],
'EUCJP': ['EUCJP', 'euc_jp', 'euc_jp'],
'EUC_KR': ['EUC_KR', 'euc_kr', 'euc_kr'],
'EUCKR': ['EUCKR', 'euc_kr', 'euc_kr'],
'EUC_TW': ['BIG5', 'big5', 'big5'],
'EUCTW': ['BIG5', 'big5', 'big5'],
'ISO_8859_5': ['ISO_8859_5', 'iso8859_5', 'iso8859_5'],
'ISO88595': ['ISO88595', 'iso8859_5', 'iso8859_5'],
'ISO_8859_6': ['ISO_8859_6', 'iso8859_6', 'iso8859_6'],
'ISO88596': ['ISO88596', 'iso8859_6', 'iso8859_6'],
'ISO_8859_7': ['ISO_8859_7', 'iso8859_7', 'iso8859_7'],
'ISO88597': ['ISO88597', 'iso8859_7', 'iso8859_7'],
'ISO_8859_8': ['ISO_8859_8', 'iso8859_8', 'iso8859_8'],
'ISO88598': ['ISO88598', 'iso8859_8', 'iso8859_8'],
'KOI8R': ['KOI8R', 'koi8_r', 'koi8_r'],
'KOI8U': ['KOI8U', 'koi8_u', 'koi8_u'],
}
def get_encoding(key):
"""
:param key: Database Encoding
:return:
[Postgres_encoding, Python_encoding, typecast_encoding] -
Postgres encoding, Python encoding, type cast encoding
"""
#
# Reference: https://www.postgresql.org/docs/11/multibyte.html
return encode_dict.get(key, ['UNICODE', 'utf-8', 'utf-8'])
def configure_driver_encodings(encodings):
# Replace the python encoding for original name and renamed encodings
# psycopg2 removes the underscore in conn.encoding
# Setting the encodings dict value will only help for select statements
# because for parameterized DML, param values are converted based on
# python encoding of pyscopg2s internal encodings dict.
for key, val in encode_dict.items():
postgres_encoding, python_encoding, typecast_encoding = val
encodings[key] = python_encoding

View File

@ -1,63 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
# This allows us to generate to keywords.py for PostgreSQL for used by
# qtIdent and qtTypeIdent functions for scanning the keywords type.
#
# In order to generate keywords.py for specific version of PostgreSQL, put
# pg_config executable in the PATH.
#
##########################################################################
import os
import re
if __name__ == '__main__':
include_dir = os.popen('pg_config --includedir').read().rstrip()
version = os.popen('pg_config --version').read().rstrip()
keywords_file = open('keywords.py', 'w')
keywords_file.write("""##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
""")
keywords_file.write('# ScanKeyword function for ' + version)
keywords_file.write('\n\ndef ScanKeyword(key):')
keywords_file.write('\n keywordDict = {\n')
idx = 0
with open(include_dir + "/postgresql/server/parser/kwlist.h", "rb") as ins:
pattern = re.compile(r'"([^"]+)",\s*[^,]*\s*,\s*(.*)$')
keyword_types = [
'UNRESERVED_KEYWORD', 'COL_NAME_KEYWORD',
'TYPE_FUNC_NAME_KEYWORD', 'RESERVED_KEYWORD'
]
for line in ins:
line = line.decode().rstrip()
if line[0:11] == 'PG_KEYWORD(' and line[-1] == ')':
match = pattern.match(line[11:-1])
if idx != 0:
keywords_file.write(", ")
else:
keywords_file.write(" ")
keywords_file.write(
'"' + match.group(1) + '": ' +
str(keyword_types.index(match.group(2)))
)
idx += 1
keywords_file.write('\n }\n')
keywords_file.write(
' return (key in keywordDict and keywordDict[key]) or None')

View File

@ -1,432 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
# ScanKeyword function for PostgreSQL 9.5rc1
def scan_keyword(key):
keywords = {
'abort': 0,
'absolute': 0,
'access': 0,
'action': 0,
'add': 0,
'admin': 0,
'after': 0,
'aggregate': 0,
'all': 3,
'also': 0,
'alter': 0,
'always': 0,
'analyze': 3,
'and': 3,
'any': 3,
'array': 3,
'as': 3,
'asc': 3,
'assertion': 0,
'assignment': 0,
'asymmetric': 3,
'at': 0,
'attribute': 0,
'authorization': 2,
'backward': 0,
'before': 0,
'begin': 0,
'between': 1,
'bigint': 1,
'binary': 2,
'bit': 1,
'boolean': 1,
'both': 3,
'by': 0,
'cache': 0,
'called': 0,
'cascade': 0,
'cascaded': 0,
'case': 3,
'cast': 3,
'catalog': 0,
'chain': 0,
'char': 1,
'character': 1,
'characteristics': 0,
'check': 3,
'checkpoint': 0,
'class': 0,
'close': 0,
'cluster': 0,
'coalesce': 1,
'collate': 3,
'collation': 2,
'column': 3,
'comment': 0,
'comments': 0,
'commit': 0,
'committed': 0,
'concurrently': 2,
'configuration': 0,
'conflict': 0,
'connection': 0,
'constraint': 3,
'constraints': 0,
'content': 0,
'continue': 0,
'conversion': 0,
'copy': 0,
'cost': 0,
'create': 3,
'cross': 2,
'csv': 0,
'cube': 0,
'current': 0,
'current_catalog': 3,
'current_date': 3,
'current_role': 3,
'current_schema': 2,
'current_time': 3,
'current_timestamp': 3,
'current_user': 3,
'cursor': 0,
'cycle': 0,
'data': 0,
'database': 0,
'day': 0,
'deallocate': 0,
'dec': 1,
'decimal': 1,
'declare': 0,
'default': 3,
'defaults': 0,
'deferrable': 3,
'deferred': 0,
'definer': 0,
'delete': 0,
'delimiter': 0,
'delimiters': 0,
'desc': 3,
'dictionary': 0,
'disable': 0,
'discard': 0,
'distinct': 3,
'do': 3,
'document': 0,
'domain': 0,
'double': 0,
'drop': 0,
'each': 0,
'else': 3,
'enable': 0,
'encoding': 0,
'encrypted': 0,
'end': 3,
'enum': 0,
'escape': 0,
'event': 0,
'except': 3,
'exclude': 0,
'excluding': 0,
'exclusive': 0,
'execute': 0,
'exists': 1,
'explain': 0,
'extension': 0,
'external': 0,
'extract': 1,
'false': 3,
'family': 0,
'fetch': 3,
'filter': 0,
'first': 0,
'float': 1,
'following': 0,
'for': 3,
'force': 0,
'foreign': 3,
'forward': 0,
'freeze': 2,
'from': 3,
'full': 2,
'function': 0,
'functions': 0,
'global': 0,
'grant': 3,
'granted': 0,
'greatest': 1,
'group': 3,
'grouping': 1,
'handler': 0,
'having': 3,
'header': 0,
'hold': 0,
'hour': 0,
'identity': 0,
'if': 0,
'ilike': 2,
'immediate': 0,
'immutable': 0,
'implicit': 0,
'import': 0,
'in': 3,
'including': 0,
'increment': 0,
'index': 0,
'indexes': 0,
'inherit': 0,
'inherits': 0,
'initially': 3,
'inline': 0,
'inner': 2,
'inout': 1,
'input': 0,
'insensitive': 0,
'insert': 0,
'instead': 0,
'int': 1,
'integer': 1,
'intersect': 3,
'interval': 1,
'into': 3,
'invoker': 0,
'is': 2,
'isnull': 2,
'isolation': 0,
'join': 2,
'key': 0,
'label': 0,
'language': 0,
'large': 0,
'last': 0,
'lateral': 3,
'leading': 3,
'leakproof': 0,
'least': 1,
'left': 2,
'level': 0,
'like': 2,
'limit': 3,
'listen': 0,
'load': 0,
'local': 0,
'localtime': 3,
'localtimestamp': 3,
'location': 0,
'lock': 0,
'locked': 0,
'logged': 0,
'mapping': 0,
'match': 0,
'materialized': 0,
'maxvalue': 0,
'minute': 0,
'minvalue': 0,
'mode': 0,
'month': 0,
'move': 0,
'name': 0,
'names': 0,
'national': 1,
'natural': 2,
'nchar': 1,
'next': 0,
'no': 0,
'none': 1,
'not': 3,
'nothing': 0,
'notify': 0,
'notnull': 2,
'nowait': 0,
'null': 3,
'nullif': 1,
'nulls': 0,
'numeric': 1,
'object': 0,
'of': 0,
'off': 0,
'offset': 3,
'oids': 0,
'on': 3,
'only': 3,
'operator': 0,
'option': 0,
'options': 0,
'or': 3,
'order': 3,
'ordinality': 0,
'out': 1,
'outer': 2,
'over': 0,
'overlaps': 2,
'overlay': 1,
'owned': 0,
'owner': 0,
'parser': 0,
'partial': 0,
'partition': 0,
'passing': 0,
'password': 0,
'placing': 3,
'plans': 0,
'policy': 0,
'position': 1,
'preceding': 0,
'precision': 1,
'prepare': 0,
'prepared': 0,
'preserve': 0,
'primary': 3,
'prior': 0,
'privileges': 0,
'procedural': 0,
'procedure': 0,
'program': 0,
'quote': 0,
'range': 0,
'read': 0,
'real': 1,
'reassign': 0,
'recheck': 0,
'recursive': 0,
'ref': 0,
'references': 3,
'refresh': 0,
'reindex': 0,
'relative': 0,
'release': 0,
'rename': 0,
'repeatable': 0,
'replace': 0,
'replica': 0,
'reset': 0,
'restart': 0,
'restrict': 0,
'returning': 3,
'returns': 0,
'revoke': 0,
'right': 2,
'role': 0,
'rollback': 0,
'rollup': 0,
'row': 1,
'rows': 0,
'rule': 0,
'savepoint': 0,
'schema': 0,
'scroll': 0,
'search': 0,
'second': 0,
'security': 0,
'select': 3,
'sequence': 0,
'sequences': 0,
'serializable': 0,
'server': 0,
'session': 0,
'session_user': 3,
'set': 0,
'setof': 1,
'sets': 0,
'share': 0,
'show': 0,
'similar': 2,
'simple': 0,
'skip': 0,
'smallint': 1,
'snapshot': 0,
'some': 3,
'sql': 0,
'stable': 0,
'standalone': 0,
'start': 0,
'statement': 0,
'statistics': 0,
'stdin': 0,
'stdout': 0,
'storage': 0,
'strict': 0,
'strip': 0,
'substring': 1,
'symmetric': 3,
'sysid': 0,
'system': 0,
'table': 3,
'tables': 0,
'tablesample': 2,
'tablespace': 0,
'temp': 0,
'template': 0,
'temporary': 0,
'text': 0,
'then': 3,
'time': 1,
'timestamp': 1,
'to': 3,
'trailing': 3,
'transaction': 0,
'transform': 0,
'treat': 1,
'trigger': 0,
'trim': 1,
'true': 3,
'truncate': 0,
'trusted': 0,
'type': 0,
'types': 0,
'unbounded': 0,
'uncommitted': 0,
'unencrypted': 0,
'union': 3,
'unique': 3,
'unknown': 0,
'unlisten': 0,
'unlogged': 0,
'until': 0,
'update': 0,
'user': 3,
'using': 3,
'vacuum': 0,
'valid': 0,
'validate': 0,
'validator': 0,
'value': 0,
'values': 1,
'varchar': 1,
'variadic': 3,
'varying': 0,
'verbose': 2,
'version': 0,
'view': 0,
'views': 0,
'volatile': 0,
'when': 3,
'where': 3,
'whitespace': 0,
'window': 3,
'with': 3,
'within': 0,
'without': 0,
'work': 0,
'wrapper': 0,
'write': 0,
'xml': 0,
'xmlattributes': 1,
'xmlconcat': 1,
'xmlelement': 1,
'xmlexists': 1,
'xmlforest': 1,
'xmlparse': 1,
'xmlpi': 1,
'xmlroot': 1,
'xmlserialize': 1,
'year': 0,
'yes': 0,
'zone': 0,
}
return keywords.get(key, None)

View File

@ -1,677 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""
Implementation of ServerManager
"""
import os
import datetime
import config
import logging
from flask import current_app, session
from flask_security import current_user
from flask_babel import gettext
from werkzeug.exceptions import InternalServerError
from pgadmin.utils import get_complete_file_path
from pgadmin.utils.crypto import decrypt
from pgadmin.utils.master_password import process_masterpass_disabled
from .connection import Connection
from pgadmin.model import Server, User
from pgadmin.utils.exception import ConnectionLost, SSHTunnelConnectionLost,\
CryptKeyMissing
from pgadmin.utils.master_password import get_crypt_key
from pgadmin.utils.exception import ObjectGone
from pgadmin.utils.passexec import PasswordExec
from psycopg2.extensions import make_dsn
if config.SUPPORT_SSH_TUNNEL:
from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError
class ServerManager():
"""
class ServerManager
This class contains the information about the given server.
And, acts as connection manager for that particular session.
"""
_INFORMATION_MSG = gettext("Information is not available.")
def __init__(self, server):
self.connections = dict()
self.local_bind_host = '127.0.0.1'
self.local_bind_port = None
self.tunnel_object = None
self.tunnel_created = False
self.display_connection_string = ''
self.update(server)
def update(self, server):
assert server is not None
assert isinstance(server, Server)
self.ver = None
self.sversion = None
self.server_type = None
self.server_cls = None
self.password = None
self.tunnel_password = None
self.sid = server.id
self.host = server.host
self.port = server.port
self.db = server.maintenance_db
self.shared = server.shared
self.did = None
self.user = server.username
self.password = server.password
self.role = server.role
self.pinged = datetime.datetime.now()
self.db_info = dict()
self.server_types = None
self.db_res = server.db_res
self.passexec = \
PasswordExec(server.passexec_cmd, server.passexec_expiration) \
if server.passexec_cmd else None
self.service = server.service
if config.SUPPORT_SSH_TUNNEL:
self.use_ssh_tunnel = server.use_ssh_tunnel
self.tunnel_host = server.tunnel_host
self.tunnel_port = \
22 if server.tunnel_port is None else server.tunnel_port
self.tunnel_username = server.tunnel_username
self.tunnel_authentication = 0 \
if server.tunnel_authentication is None \
else server.tunnel_authentication
self.tunnel_identity_file = server.tunnel_identity_file
self.tunnel_password = server.tunnel_password
else:
self.use_ssh_tunnel = 0
self.tunnel_host = None
self.tunnel_port = 22
self.tunnel_username = None
self.tunnel_authentication = None
self.tunnel_identity_file = None
self.tunnel_password = None
self.kerberos_conn = server.kerberos_conn
self.gss_authenticated = False
self.gss_encrypted = False
self.connection_params = server.connection_params
self.create_connection_string(self.db, self.user)
for con in self.connections:
self.connections[con]._release()
self.update_session()
self.connections = dict()
def _set_password(self, res):
"""
Set password for server manager object.
:param res: response dict.
:return:
"""
if hasattr(self, 'password') and self.password:
if hasattr(self.password, 'decode'):
res['password'] = self.password.decode('utf-8')
else:
res['password'] = str(self.password)
else:
res['password'] = self.password
def as_dict(self):
"""
Returns a dictionary object representing the server manager.
"""
if self.ver is None or len(self.connections) == 0:
return None
res = dict()
res['sid'] = self.sid
res['ver'] = self.ver
res['sversion'] = self.sversion
self._set_password(res)
if self.use_ssh_tunnel:
if hasattr(self, 'tunnel_password') and self.tunnel_password:
if hasattr(self.tunnel_password, 'decode'):
res['tunnel_password'] = \
self.tunnel_password.decode('utf-8')
else:
res['tunnel_password'] = str(self.tunnel_password)
else:
res['tunnel_password'] = self.tunnel_password
connections = res['connections'] = dict()
for conn_id in self.connections:
conn = self.connections[conn_id].as_dict()
if conn is not None:
connections[conn_id] = conn
return res
def server_version(self):
return self.ver
@property
def version(self):
return self.sversion
def major_version(self):
if self.sversion is not None:
return int(self.sversion / 10000)
raise InternalServerError(self._INFORMATION_MSG)
def minor_version(self):
if self.sversion:
return int(int(self.sversion / 100) % 100)
raise InternalServerError(self._INFORMATION_MSG)
def patch_version(self):
if self.sversion:
return int(int(self.sversion / 100) / 100)
raise InternalServerError(self._INFORMATION_MSG)
def connection(self, **kwargs):
database = kwargs.get('database', None)
conn_id = kwargs.get('conn_id', None)
auto_reconnect = kwargs.get('auto_reconnect', True)
did = kwargs.get('did', None)
async_ = kwargs.get('async_', None)
use_binary_placeholder = kwargs.get('use_binary_placeholder', False)
array_to_string = kwargs.get('array_to_string', False)
if database is not None:
if did is not None and did in self.db_info:
self.db_info[did]['datname'] = database
else:
if did is None:
database = self.db
elif did in self.db_info:
database = self.db_info[did]['datname']
else:
maintenance_db_id = 'DB:{0}'.format(self.db)
if maintenance_db_id in self.connections:
conn = self.connections[maintenance_db_id]
# try to connect maintenance db if not connected
if not conn.connected():
conn.connect()
if conn.connected():
status, res = conn.execute_dict("""
SELECT
db.oid as did, db.datname, db.datallowconn,
pg_catalog.pg_encoding_to_char(db.encoding) AS serverencoding,
pg_catalog.has_database_privilege(db.oid, 'CREATE') as cancreate,
datistemplate
FROM
pg_catalog.pg_database db
WHERE db.oid = {0}""".format(did))
if status and len(res['rows']) > 0:
for row in res['rows']:
self.db_info[did] = row
database = self.db_info[did]['datname']
if did not in self.db_info:
raise ObjectGone(gettext(
"Could not find the specified database."
))
if not get_crypt_key()[0]:
# the reason its not connected might be missing key
raise CryptKeyMissing()
if database is None:
# Check SSH Tunnel is alive or not.
if self.use_ssh_tunnel == 1:
self.check_ssh_tunnel_alive()
else:
raise ConnectionLost(self.sid, None, None)
my_id = ('CONN:{0}'.format(conn_id)) if conn_id is not None else \
('DB:{0}'.format(database))
self.pinged = datetime.datetime.now()
if my_id in self.connections:
return self.connections[my_id]
else:
if async_ is None:
async_ = 1 if conn_id is not None else 0
else:
async_ = 1 if async_ is True else 0
self.connections[my_id] = Connection(
self, my_id, database, auto_reconnect=auto_reconnect,
async_=async_,
use_binary_placeholder=use_binary_placeholder,
array_to_string=array_to_string
)
return self.connections[my_id]
@staticmethod
def _get_password_to_conn(data, masterpass_processed):
"""
Get password for connect to server with simple and ssh connection.
:param data: Data.
:param masterpass_processed:
:return:
"""
# The data variable is a copy so is not automatically synced
# update here
if masterpass_processed and 'password' in data:
data['password'] = None
if masterpass_processed and 'tunnel_password' in data:
data['tunnel_password'] = None
def _get_server_type(self):
"""
Get server type and server cls.
:return:
"""
from pgadmin.browser.server_groups.servers.types import ServerType
if self.ver and not self.server_type:
for st in ServerType.types():
if st.instance_of(self.ver):
self.server_type = st.stype
self.server_cls = st
break
def _check_and_reconnect_server(self, conn, conn_info, data):
"""
Check and try to reconnect the server if server previously connected
and auto_reconnect is true.
:param conn:
:type conn:
:param conn_info:
:type conn_info:
:param data:
:type data:
:return:
:rtype:
"""
from pgadmin.browser.server_groups.servers.types import ServerType
if conn_info['wasConnected'] and conn_info['auto_reconnect']:
try:
# Check SSH Tunnel needs to be created
if self.use_ssh_tunnel == 1 and \
not self.tunnel_created:
status, error = self.create_ssh_tunnel(
data['tunnel_password'])
# Check SSH Tunnel is alive or not.
self.check_ssh_tunnel_alive()
conn.connect(
password=data['password'],
server_types=ServerType.types()
)
# This will also update wasConnected flag in
# connection so no need to update the flag manually.
except CryptKeyMissing:
# maintain the status as this will help to restore once
# the key is available
conn.wasConnected = conn_info['wasConnected']
conn.auto_reconnect = conn_info['auto_reconnect']
except Exception as e:
current_app.logger.exception(e)
self.connections.pop(conn_info['conn_id'])
raise
def _restore(self, data):
"""
Helps restoring to reconnect the auto-connect connections smoothly on
reload/restart of the app server..
"""
# restore server version from flask session if flask server was
# restarted. As we need server version to resolve sql template paths.
masterpass_processed = process_masterpass_disabled()
ServerManager._get_password_to_conn(data, masterpass_processed)
# Get server type.
self._get_server_type()
# We need to know about the existing server variant supports during
# first connection for identifications.
self.pinged = datetime.datetime.now()
try:
if 'password' in data and data['password'] and \
hasattr(data['password'], 'encode'):
data['password'] = data['password'].encode('utf-8')
if 'tunnel_password' in data and data['tunnel_password']:
data['tunnel_password'] = \
data['tunnel_password'].encode('utf-8')
except Exception as e:
current_app.logger.exception(e)
connections = data['connections']
for conn_id in connections:
conn_info = connections[conn_id]
if conn_info['conn_id'] in self.connections:
conn = self.connections[conn_info['conn_id']]
else:
conn = self.connections[conn_info['conn_id']] = Connection(
self, conn_info['conn_id'], conn_info['database'],
auto_reconnect=conn_info['auto_reconnect'],
async_=conn_info['async_'],
use_binary_placeholder=conn_info[
'use_binary_placeholder'],
array_to_string=conn_info['array_to_string']
)
# only try to reconnect
self._check_and_reconnect_server(conn, conn_info, data)
def _restore_connections(self):
for conn_id in self.connections:
conn = self.connections[conn_id]
# only try to reconnect if connection was connected previously
# and auto_reconnect is true.
wasConnected = conn.wasConnected
auto_reconnect = conn.auto_reconnect
if conn.wasConnected and conn.auto_reconnect:
try:
# Check SSH Tunnel needs to be created
if self.use_ssh_tunnel == 1 and \
not self.tunnel_created:
status, error = self.create_ssh_tunnel(
self.tunnel_password
)
# Check SSH Tunnel is alive or not.
self.check_ssh_tunnel_alive()
conn.connect()
# This will also update wasConnected flag in
# connection so no need to update the flag manually.
except CryptKeyMissing:
# maintain the status as this will help to restore once
# the key is available
conn.wasConnected = wasConnected
conn.auto_reconnect = auto_reconnect
except Exception as e:
self.connections.pop(conn_id)
current_app.logger.exception(e)
raise
def _stop_ssh_tunnel(self, did, database, conn_id):
"""
Stop ssh tunnel connection if function call without any parameter.
:param did: Database Id.
:param database: Database.
:param conn_id: COnnection Id.
:return:
"""
if database is None and conn_id is None and did is None:
self.stop_ssh_tunnel()
def _check_db_info(self, did, conn_id, database):
"""
Check did is not none and it is resent in db_info.
:param did: Database Id.
:param conn_id: Connection Id.
:return:
"""
if database is None and conn_id is None and did is None:
self.stop_ssh_tunnel()
my_id = None
if did is not None:
if did in self.db_info and 'datname' in self.db_info[did]:
database = self.db_info[did]['datname']
if database is None:
return True, False, my_id
else:
return True, False, my_id
if conn_id is not None:
my_id = 'CONN:{0}'.format(conn_id)
elif database is not None:
my_id = 'DB:{0}'.format(database)
return False, True, my_id
def release(self, database=None, conn_id=None, did=None):
# Stop the SSH tunnel if release() function calls without
# any parameter.
is_return, return_value, my_id = self._check_db_info(did, conn_id,
database)
if is_return:
return return_value
if my_id is not None:
if my_id in self.connections:
self.connections[my_id]._release()
del self.connections[my_id]
if did is not None:
del self.db_info[did]
if len(self.connections) == 0:
self.ver = None
self.sversion = None
self.server_type = None
self.server_cls = None
self.password = None
self.update_session()
return True
else:
return False
for con_key in list(self.connections.keys()):
conn = self.connections[con_key]
# Cancel the ongoing transaction before closing the connection
# as it may hang forever
if conn.connected() and conn.conn_id is not None and \
conn.conn_id.startswith('CONN:'):
conn.cancel_transaction(conn.conn_id[5:])
conn._release()
self.connections = dict()
self.ver = None
self.sversion = None
self.server_type = None
self.server_cls = None
self.password = None
self.update_session()
return True
def _update_password(self, passwd):
self.password = passwd
for conn_id in self.connections:
conn = self.connections[conn_id]
if conn.conn is not None or conn.wasConnected is True:
conn.password = passwd
def update_session(self):
managers = session['__pgsql_server_managers'] \
if '__pgsql_server_managers' in session else dict()
updated_mgr = self.as_dict()
if not updated_mgr:
if self.sid in managers:
managers.pop(self.sid)
else:
managers[self.sid] = updated_mgr
session['__pgsql_server_managers'] = managers
session.force_write = True
def utility(self, operation):
"""
utility(operation)
Returns: name of the utility which used for the operation
"""
if self.server_cls is not None:
return self.server_cls.utility(operation, self.sversion)
return None
def export_password_env(self, env):
if self.password:
crypt_key_present, crypt_key = get_crypt_key()
if not crypt_key_present:
return False, crypt_key
password = decrypt(self.password, crypt_key).decode()
os.environ[str(env)] = password
def create_ssh_tunnel(self, tunnel_password):
"""
This method is used to create ssh tunnel and update the IP Address and
IP Address and port to localhost and the local bind port return by the
SSHTunnelForwarder class.
:return: True if tunnel is successfully created else error message.
"""
# Fetch Logged in User Details.
user = User.query.filter_by(id=current_user.id).first()
if user is None:
return False, gettext("Unauthorized request.")
if tunnel_password is not None and tunnel_password != '':
crypt_key_present, crypt_key = get_crypt_key()
if not crypt_key_present:
raise CryptKeyMissing()
try:
tunnel_password = decrypt(tunnel_password, crypt_key)
# password is in bytes, for python3 we need it in string
if isinstance(tunnel_password, bytes):
tunnel_password = tunnel_password.decode()
except Exception as e:
current_app.logger.exception(e)
return False, gettext("Failed to decrypt the SSH tunnel "
"password.\nError: {0}").format(str(e))
try:
# If authentication method is 1 then it uses identity file
# and password
ssh_logger = None
if current_app.debug:
ssh_logger = logging.getLogger('sshtunnel')
ssh_logger.setLevel(logging.DEBUG)
for h in current_app.logger.handlers:
ssh_logger.addHandler(h)
if self.tunnel_authentication == 1:
self.tunnel_object = SSHTunnelForwarder(
(self.tunnel_host, int(self.tunnel_port)),
ssh_username=self.tunnel_username,
ssh_pkey=get_complete_file_path(self.tunnel_identity_file),
ssh_private_key_password=tunnel_password,
remote_bind_address=(self.host, self.port),
logger=ssh_logger
)
else:
self.tunnel_object = SSHTunnelForwarder(
(self.tunnel_host, int(self.tunnel_port)),
ssh_username=self.tunnel_username,
ssh_password=tunnel_password,
remote_bind_address=(self.host, self.port),
logger=ssh_logger
)
# flag tunnel threads in daemon mode to fix hang issue.
self.tunnel_object.daemon_forward_servers = True
self.tunnel_object.start()
self.tunnel_created = True
except BaseSSHTunnelForwarderError as e:
current_app.logger.exception(e)
return False, gettext("Failed to create the SSH tunnel.\n"
"Error: {0}").format(str(e))
# Update the port to communicate locally
self.local_bind_port = self.tunnel_object.local_bind_port
return True, None
def check_ssh_tunnel_alive(self):
# Check SSH Tunnel is alive or not. if it is not then
# raise the ConnectionLost exception.
if self.tunnel_object is None or not self.tunnel_object.is_active:
self.tunnel_created = False
raise SSHTunnelConnectionLost(self.tunnel_host)
def stop_ssh_tunnel(self):
# Stop the SSH tunnel if created.
if self.tunnel_object and self.tunnel_object.is_active:
self.tunnel_object.stop()
self.local_bind_port = None
self.tunnel_object = None
self.tunnel_created = False
def get_connection_param_value(self, param_name):
"""
This function return the value of param_name if found in the
connection parameter.
"""
value = None
if self.connection_params and param_name in self.connection_params:
value = self.connection_params[param_name]
return value
def create_connection_string(self, database, user, password=None):
"""
This function is used to create connection string based on the
parameters.
"""
dsn_args = dict()
dsn_args['host'] = \
self.local_bind_host if self.use_ssh_tunnel else self.host
dsn_args['port'] = \
self.local_bind_port if self.use_ssh_tunnel else self.port
dsn_args['dbname'] = database
dsn_args['user'] = user
if self.service is not None:
dsn_args['service'] = self.service
# Make a copy to display the connection string on GUI.
display_dsn_args = dsn_args.copy()
# Password should not be visible into the connection string, so
# setting the value with password to 'xxxxxxx'.
if password:
display_dsn_args['password'] = 'xxxxxxx'
dsn_args['password'] = password
# Loop through all the connection parameters set in the server dialog.
if self.connection_params and isinstance(self.connection_params, dict):
for key, value in self.connection_params.items():
with_complete_path = False
orig_value = value
# Getting complete file path if the key is one of the below.
if key in ['passfile', 'sslcert', 'sslkey', 'sslrootcert',
'sslcrl', 'sslcrldir']:
with_complete_path = True
value = get_complete_file_path(value)
# In case of host address need to check ssh tunnel flag.
if key == 'hostaddr':
value = self.local_bind_host if self.use_ssh_tunnel else \
value
dsn_args[key] = value
display_dsn_args[key] = orig_value if with_complete_path else \
value
self.display_connection_string = make_dsn(**display_dsn_args)
return make_dsn(**dsn_args)

View File

@ -1,265 +0,0 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""
Typecast various data types so that they can be compatible with Javascript
data types.
"""
from psycopg2 import STRING as _STRING
from psycopg2.extensions import FLOAT as _FLOAT
from psycopg2.extensions import DECIMAL as _DECIMAL, encodings
import psycopg2
from psycopg2.extras import Json as psycopg2_json
from .encoding import configure_driver_encodings, get_encoding
configure_driver_encodings(encodings)
# OIDs of data types which need to typecast as string to avoid JavaScript
# compatibility issues.
# e.g JavaScript does not support 64 bit integers. It has 64-bit double
# giving only 53 bits of integer range (IEEE 754)
# So to avoid loss of remaining 11 bits (64-53) we need to typecast bigint to
# string.
TO_STRING_DATATYPES = (
# To cast bytea, interval type
17, 1186,
# date, timestamp, timestamp with zone, time without time zone
1082, 1114, 1184, 1083
)
TO_STRING_NUMERIC_DATATYPES = (
# Real, double precision, numeric, bigint
700, 701, 1700, 20
)
# OIDs of array data types which need to typecast to array of string.
# This list may contain:
# OIDs of data types from PSYCOPG_SUPPORTED_ARRAY_DATATYPES as they need to be
# typecast to array of string.
# Also OIDs of data types which psycopg2 does not typecast array of that
# data type. e.g: uuid, bit, varbit, etc.
TO_ARRAY_OF_STRING_DATATYPES = (
# To cast bytea[] type
1001,
# bigint[]
1016,
# double precision[], real[]
1022, 1021,
# bit[], varbit[]
1561, 1563,
)
# OID of record array data type
RECORD_ARRAY = (2287,)
# OIDs of builtin array datatypes supported by psycopg2
# OID reference psycopg2/psycopg/typecast_builtins.c
#
# For these array data types psycopg2 returns result in list.
# For all other array data types psycopg2 returns result as string (string
# representing array literal)
# e.g:
#
# For below two sql psycopg2 returns result in different formats.
# SELECT '{foo,bar}'::text[];
# print('type of {} ==> {}'.format(res[0], type(res[0])))
# SELECT '{<a>foo</a>,<b>bar</b>}'::xml[];
# print('type of {} ==> {}'.format(res[0], type(res[0])))
#
# Output:
# type of ['foo', 'bar'] ==> <type 'list'>
# type of {<a>foo</a>,<b>bar</b>} ==> <type 'str'>
PSYCOPG_SUPPORTED_BUILTIN_ARRAY_DATATYPES = (
1016, 1005, 1006, 1007, 1021, 1022, 1231,
1002, 1003, 1009, 1014, 1015, 1009, 1014,
1015, 1000, 1115, 1185, 1183, 1270, 1182,
1187, 1001, 1028, 1013, 1041, 651, 1040
)
# json, jsonb
# OID reference psycopg2/lib/_json.py
PSYCOPG_SUPPORTED_JSON_TYPES = (114, 3802)
# json[], jsonb[]
PSYCOPG_SUPPORTED_JSON_ARRAY_TYPES = (199, 3807)
ALL_JSON_TYPES = PSYCOPG_SUPPORTED_JSON_TYPES +\
PSYCOPG_SUPPORTED_JSON_ARRAY_TYPES
# INET[], CIDR[]
# OID reference psycopg2/lib/_ipaddress.py
PSYCOPG_SUPPORTED_IPADDRESS_ARRAY_TYPES = (1041, 651)
# uuid[]
# OID reference psycopg2/lib/extras.py
PSYCOPG_SUPPORTED_IPADDRESS_ARRAY_TYPES = (2951,)
# int4range, int8range, numrange, daterange tsrange, tstzrange[]
# OID reference psycopg2/lib/_range.py
PSYCOPG_SUPPORTED_RANGE_TYPES = (3904, 3926, 3906, 3912, 3908, 3910)
# int4range[], int8range[], numrange[], daterange[] tsrange[], tstzrange[]
# OID reference psycopg2/lib/_range.py
PSYCOPG_SUPPORTED_RANGE_ARRAY_TYPES = (3905, 3927, 3907, 3913, 3909, 3911)
def register_global_typecasters():
unicode_type_for_record = psycopg2.extensions.new_type(
(2249,),
"RECORD",
psycopg2.extensions.UNICODE
)
unicode_array_type_for_record_array = psycopg2.extensions.new_array_type(
RECORD_ARRAY,
"ARRAY_RECORD",
unicode_type_for_record
)
# This registers a unicode type caster for datatype 'RECORD'.
psycopg2.extensions.register_type(unicode_type_for_record)
# This registers a array unicode type caster for datatype 'ARRAY_RECORD'.
psycopg2.extensions.register_type(unicode_array_type_for_record_array)
# define type caster to convert various pg types into string type
pg_types_to_string_type = psycopg2.extensions.new_type(
TO_STRING_DATATYPES + TO_STRING_NUMERIC_DATATYPES +
PSYCOPG_SUPPORTED_RANGE_TYPES, 'TYPECAST_TO_STRING', _STRING
)
# define type caster to convert pg array types of above types into
# array of string type
pg_array_types_to_array_of_string_type = \
psycopg2.extensions.new_array_type(
TO_ARRAY_OF_STRING_DATATYPES,
'TYPECAST_TO_ARRAY_OF_STRING', pg_types_to_string_type
)
# This registers a type caster to convert various pg types into string type
psycopg2.extensions.register_type(pg_types_to_string_type)
# This registers a type caster to convert various pg array types into
# array of string type
psycopg2.extensions.register_type(pg_array_types_to_array_of_string_type)
# Treat JSON data as text because converting it to dict alters the data
# which should not happen as per postgres docs
psycopg2.extras.register_default_json(loads=lambda x: x)
psycopg2.extras.register_default_jsonb(loads=lambda x: x)
# pysycopg2 adapt does not support dict by default. Need to register
# Used http://initd.org/psycopg/docs/extras.html#json-adaptation
psycopg2.extensions.register_adapter(dict, psycopg2_json)
def register_string_typecasters(connection):
# raw_unicode_escape used for SQL ASCII will escape the
# characters. Here we unescape them using unicode_escape
# and send ahead. When insert update is done, the characters
# are escaped again and sent to the DB.
postgres_encoding, python_encoding, typecast_encoding = \
get_encoding(connection.encoding)
if postgres_encoding != 'UNICODE':
def non_ascii_escape(value, cursor):
if value is None:
return None
return bytes(
value, encodings[cursor.connection.encoding]
).decode(typecast_encoding, errors='replace')
unicode_type = psycopg2.extensions.new_type(
# "char", name, text, character, character varying
(19, 18, 25, 1042, 1043, 0),
'UNICODE', non_ascii_escape)
unicode_array_type = psycopg2.extensions.new_array_type(
# "char"[], name[], text[], character[], character varying[]
(1002, 1003, 1009, 1014, 1015, 0
), 'UNICODEARRAY', unicode_type)
psycopg2.extensions.register_type(unicode_type, connection)
psycopg2.extensions.register_type(unicode_array_type, connection)
def numeric_typecasters(results, conn_obj):
# This function is to convert pg types to numeic type caster
numeric_cols = []
for obj_type in conn_obj.column_info:
if obj_type['type_code'] in TO_STRING_NUMERIC_DATATYPES:
numeric_cols.append(obj_type['name'])
for result in results:
for key, value in result.items():
if isinstance(result[key],
str) and key in numeric_cols and not value.isdigit():
result[key] = float(result[key])
elif isinstance(result[key], str) and key in numeric_cols:
result[key] = int(result[key])
return results
def register_binary_typecasters(connection):
psycopg2.extensions.register_type(
psycopg2.extensions.new_type(
(
# To cast bytea type
17,
),
'BYTEA_PLACEHOLDER',
# Only show placeholder if data actually exists.
lambda value, cursor: 'binary data'
if value is not None else None),
connection
)
psycopg2.extensions.register_type(
psycopg2.extensions.new_type(
(
# To cast bytea[] type
1001,
),
'BYTEA_ARRAY_PLACEHOLDER',
# Only show placeholder if data actually exists.
lambda value, cursor: 'binary data[]'
if value is not None else None),
connection
)
def register_array_to_string_typecasters(connection):
psycopg2.extensions.register_type(
psycopg2.extensions.new_type(
PSYCOPG_SUPPORTED_BUILTIN_ARRAY_DATATYPES +
PSYCOPG_SUPPORTED_JSON_ARRAY_TYPES +
PSYCOPG_SUPPORTED_IPADDRESS_ARRAY_TYPES +
PSYCOPG_SUPPORTED_RANGE_ARRAY_TYPES +
TO_ARRAY_OF_STRING_DATATYPES,
'ARRAY_TO_STRING',
_STRING),
connection
)
def unregister_numeric_typecasters(connection):
string_type_to_decimal = \
psycopg2.extensions.new_type(TO_STRING_NUMERIC_DATATYPES,
'TYPECAST_TO_DECIMAL', _DECIMAL)
psycopg2.extensions.register_type(string_type_to_decimal, connection)

View File

@ -9,7 +9,6 @@
from abc import ABCMeta
from pgadmin.utils.constants import PSYCOPG2
from pgadmin.utils.dynamic_registry import create_registry_metaclass
import config
@ -18,12 +17,8 @@ import config
def load_modules(cls, app=None):
submodules = []
if config.PG_DEFAULT_DRIVER == PSYCOPG2:
from . import psycopg2 as module
submodules.append(module)
else:
from . import psycopg3 as module
submodules.append(module)
from . import psycopg3 as module
submodules.append(module)
from . import abstract as module
submodules.append(module)

View File

@ -14,7 +14,7 @@ from importlib import import_module
from werkzeug.utils import find_modules
from pgadmin.utils import server_utils
from pgadmin.utils.constants import PSYCOPG2, PSYCOPG3
from pgadmin.utils.constants import PSYCOPG3
from .. import socketio
import unittest
@ -69,9 +69,6 @@ class TestsGeneratorRegistry(ABCMeta):
try:
for module_name in find_modules(pkg_root, False, True):
if module_name.find(PSYCOPG2) != -1:
print("Skipping ", module_name)
continue
all_modules.append(module_name)
except Exception:
pass

View File

@ -1,227 +0,0 @@
#######################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import config
from pgadmin.utils.driver.psycopg2.encoding import get_encoding
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.utils.constants import PSYCOPG3
class TestEncoding(BaseTestGenerator):
scenarios = [
(
'When the database encoding is SQL_ASCII',
dict(
db_encoding='raw_unicode_escape',
expected_return_value=['SQL_ASCII', 'raw-unicode-escape',
'unicode-escape']
)
), (
'When the database encoding is LATIN1',
dict(
db_encoding='latin1',
expected_return_value=['LATIN1', 'iso8859-1', 'iso8859-1']
)
), (
'When the database encoding is LATIN2',
dict(
db_encoding='latin2',
expected_return_value=['LATIN2', 'iso8859-2', 'iso8859-2']
)
), (
'When the database encoding is LATIN3',
dict(
db_encoding='latin3',
expected_return_value=['LATIN3', 'iso8859-3', 'iso8859-3']
)
), (
'When the database encoding is LATIN4',
dict(
db_encoding='latin4',
expected_return_value=['LATIN4', 'iso8859-4', 'iso8859-4']
)
), (
'When the database encoding is LATIN5',
dict(
db_encoding='latin5',
expected_return_value=['LATIN5', 'iso8859-9', 'iso8859-9']
)
), (
'When the database encoding is LATIN6',
dict(
db_encoding='latin6',
expected_return_value=['LATIN6', 'iso8859-10', 'iso8859-10']
)
), (
'When the database encoding is LATIN7',
dict(
db_encoding='latin7',
expected_return_value=['LATIN7', 'iso8859-13', 'iso8859-13']
)
), (
'When the database encoding is LATIN8',
dict(
db_encoding='latin8',
expected_return_value=['LATIN8', 'iso8859-14', 'iso8859-14']
)
), (
'When the database encoding is LATIN9',
dict(
db_encoding='latin9',
expected_return_value=['LATIN9', 'iso8859-15', 'iso8859-15']
)
), (
'When the database encoding is LATIN10',
dict(
db_encoding='latin10',
expected_return_value=['LATIN10', 'iso8859-16', 'iso8859-16']
)
), (
'When the database encoding is WIN1250',
dict(
db_encoding='cp1250',
expected_return_value=['WIN1250', 'cp1250', 'cp1250']
)
), (
'When the database encoding is WIN1251',
dict(
db_encoding='cp1251',
expected_return_value=['WIN1251', 'cp1251', 'cp1251']
)
), (
'When the database encoding is WIN1252',
dict(
db_encoding='cp1252',
expected_return_value=['WIN1252', 'cp1252', 'cp1252']
)
), (
'When the database encoding is WIN1253',
dict(
db_encoding='cp1253',
expected_return_value=['WIN1253', 'cp1253', 'cp1253']
)
), (
'When the database encoding is WIN1254',
dict(
db_encoding='cp1254',
expected_return_value=['WIN1254', 'cp1254', 'cp1254']
)
), (
'When the database encoding is WIN1255',
dict(
db_encoding='cp1255',
expected_return_value=['WIN1255', 'cp1255', 'cp1255']
)
), (
'When the database encoding is WIN1256',
dict(
db_encoding='cp1256',
expected_return_value=['WIN1256', 'cp1256', 'cp1256']
)
), (
'When the database encoding is WIN1257',
dict(
db_encoding='cp1257',
expected_return_value=['WIN1257', 'cp1257', 'cp1257']
)
), (
'When the database encoding is WIN1258',
dict(
db_encoding='cp1258',
expected_return_value=['WIN1258', 'cp1258', 'cp1258']
)
), (
'When the database encoding is EUC_JIS_2004',
dict(
db_encoding='eucjis2004',
expected_return_value=['EUC_JIS_2004', 'euc_jis_2004',
'euc_jis_2004']
)
), (
'When the database encoding is EUC_CN',
dict(
db_encoding='euc-cn',
expected_return_value=['EUC_CN', 'gb2312', 'gb2312']
)
), (
'When the database encoding is EUC_JP',
dict(
db_encoding='euc_jp',
expected_return_value=['EUC_JP', 'euc_jp', 'euc_jp']
)
), (
'When the database encoding is EUC_KR',
dict(
db_encoding='euc_kr',
expected_return_value=['EUC_KR', 'euc_kr', 'euc_kr']
)
), (
'When the database encoding is EUC_TW',
dict(
db_encoding='big5',
expected_return_value=['BIG5', 'big5', 'big5']
)
), (
'When the database encoding is ISO_8859_5',
dict(
db_encoding='iso8859_5',
expected_return_value=['ISO_8859_5', 'iso8859-5', 'iso8859-5']
)
), (
'When the database encoding is ISO_8859_6',
dict(
db_encoding='iso8859_6',
expected_return_value=['ISO_8859_6', 'iso8859-6', 'iso8859-6']
)
), (
'When the database encoding is ISO_8859_7',
dict(
db_encoding='iso8859_7',
expected_return_value=['ISO_8859_7', 'iso8859-7', 'iso8859-7']
)
), (
'When the database encoding is ISO_8859_8',
dict(
db_encoding='iso8859_8',
expected_return_value=['ISO_8859_8', 'iso8859-8', 'iso8859-8']
)
), (
'When the database encoding is KOI8R',
dict(
db_encoding='koi8_r',
expected_return_value=['KOI8R', 'koi8-r', 'koi8-r']
)
), (
'When the database encoding is KOI8U',
dict(
db_encoding='koi8_u',
expected_return_value=['KOI8U', 'koi8-u', 'koi8-u']
)
), (
'When the database encoding is WIN866',
dict(
db_encoding='cp866',
expected_return_value=['WIN866', 'cp866', 'cp866']
)
), (
'When the database encoding is WIN874',
dict(
db_encoding='cp874',
expected_return_value=['WIN874', 'cp874', 'cp874']
)
),
]
def setUp(self):
if config.PG_DEFAULT_DRIVER == PSYCOPG3:
self.skipTest('Skipping for psycopg3 '
'as we get the mapping from the driver itself.')
def runTest(self):
result = get_encoding(self.db_encoding)
self.assertEqual(result, self.expected_return_value)

View File

@ -39,12 +39,8 @@ from pgadmin.utils.constants import BINARY_PATHS, PSYCOPG3
from pgadmin.utils import set_binary_path
from functools import wraps
import psycopg
# Remove this condition, once psycopg2 will be removed completely
if config.PG_DEFAULT_DRIVER == PSYCOPG3:
import psycopg
else:
import psycopg2 as psycopg
CURRENT_PATH = os.path.abspath(os.path.join(os.path.dirname(
os.path.realpath(__file__)), "../"))

View File

@ -236,9 +236,6 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
elif self.check_precondition(
scenario['precondition_sql'], False):
skip_test_case = False
elif 'pg_driver' in scenario and\
scenario['pg_driver'] != PG_DEFAULT_DRIVER:
skip_test_case = True
else:
skip_test_case = False