pgadmin4/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py
2024-01-01 14:13:48 +05:30

308 lines
9.5 KiB
Python

# -*- coding: utf-8 -*-
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2024, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import secrets
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from regression.python_test_utils import test_utils
from pgadmin.utils import server_utils
from pgadmin.tools.sqleditor.tests.execute_query_test_utils \
import async_poll
class TestEncodingCharset(BaseTestGenerator):
"""
This class validates character support in pgAdmin4 for
different PostgresDB encodings
"""
scenarios = [
(
'With Encoding UTF8',
dict(
db_encoding='UTF8',
lc_collate='C',
test_str='A'
)),
(
'With Encoding EUC_CN',
dict(
db_encoding='EUC_CN',
lc_collate='C',
test_str='A'
)),
(
'With Encoding SQL_ASCII',
dict(
db_encoding='SQL_ASCII',
lc_collate='C',
test_str='Tif'
)),
(
'With Encoding SQL_ASCII (additional test)',
dict(
db_encoding='SQL_ASCII',
lc_collate='C',
test_str='ü'
)),
(
'With Encoding LATIN1',
dict(
db_encoding='LATIN1',
lc_collate='C',
test_str='äöüßÑ'
)),
(
'With Encoding LATIN2',
dict(
db_encoding='LATIN2',
lc_collate='C',
test_str='§'
)),
(
'With Encoding LATIN9',
dict(
db_encoding='LATIN9',
lc_collate='C',
test_str='äöüß'
)),
(
'With Encoding EUC_JIS_2004',
dict(
db_encoding='EUC_JIS_2004',
lc_collate='C',
test_str='じんぼはりんごをたべる'
)),
(
'With Encoding WIN1256',
dict(
db_encoding='WIN1256',
lc_collate='C',
test_str='صباح الخير'
)),
(
'With Encoding WIN866',
dict(
db_encoding='WIN866',
lc_collate='C',
test_str='Альтернативная'
)),
(
'With Encoding WIN874',
dict(
db_encoding='WIN874',
lc_collate='C',
test_str='กลิ่นหอม'
)),
(
'With Encoding WIN1250',
dict(
db_encoding='WIN1250',
lc_collate='C',
test_str='ŔÁÄÇ'
)),
(
'With Encoding WIN1251',
dict(
db_encoding='WIN1251',
lc_collate='C',
test_str='ЖИЙЮ'
)),
(
'With Encoding WIN1252',
dict(
db_encoding='WIN1252',
lc_collate='C',
test_str='ÆØÙü'
)),
(
'With Encoding WIN1253',
dict(
db_encoding='WIN1253',
lc_collate='C',
test_str='ΨΪμΫ'
)),
(
'With Encoding WIN1254',
dict(
db_encoding='WIN1254',
lc_collate='C',
test_str='ĞğØŠ'
)),
(
'With Encoding WIN1255',
dict(
db_encoding='WIN1255',
lc_collate='C',
test_str='₪¥©¾'
)),
(
'With Encoding WIN1256',
dict(
db_encoding='WIN1256',
lc_collate='C',
test_str='بؤغق'
)),
(
'With Encoding WIN1257',
dict(
db_encoding='WIN1257',
lc_collate='C',
test_str='‰ķģž'
)),
(
'With Encoding WIN1258',
dict(
db_encoding='WIN1258',
lc_collate='C',
test_str='₫SHYÑđ'
)),
(
'With Encoding EUC_CN',
dict(
db_encoding='EUC_CN',
lc_collate='C',
test_str='汉字不灭'
)),
(
'With Encoding EUC_JP',
dict(
db_encoding='EUC_JP',
lc_collate='C',
test_str='での日本'
)),
(
'With Encoding EUC_KR',
dict(
db_encoding='EUC_KR',
lc_collate='C',
test_str='ㄱㄲㄴㄷ'
)),
(
'With Encoding EUC_TW',
dict(
db_encoding='EUC_TW',
lc_collate='C',
test_str='中文'
)),
(
'With Encoding ISO_8859_5',
dict(
db_encoding='ISO_8859_5',
lc_collate='C',
test_str='ЁЎФЮ'
)),
(
'With Encoding ISO_8859_6',
dict(
db_encoding='ISO_8859_6',
lc_collate='C',
test_str='العَرَبِيَّة'
)),
(
'With Encoding ISO_8859_7',
dict(
db_encoding='ISO_8859_7',
lc_collate='C',
test_str='ελληνικά'
)),
(
'With Encoding ISO_8859_8',
dict(
db_encoding='ISO_8859_8',
lc_collate='C',
test_str='דבא'
)),
(
'With Encoding KOI8R',
dict(
db_encoding='KOI8R',
lc_collate='C',
test_str='Альтернативная'
)),
(
'With Encoding KOI8U',
dict(
db_encoding='KOI8U',
lc_collate='C',
test_str='українська'
)),
]
def setUp(self):
self.encode_db_name = 'encoding_' + self.db_encoding + \
str(secrets.choice(range(10000, 65535)))
self.encode_sid = self.server_information['server_id']
server_con = server_utils.connect_server(self, self.encode_sid)
if hasattr(self, 'skip_on_database'):
if 'data' in server_con and 'type' in server_con['data']:
if server_con['data']['type'] in self.skip_on_database:
self.skipTest('cannot run in: %s' %
server_con['data']['type'])
self.encode_did = test_utils.create_database(
self.server, self.encode_db_name,
(self.db_encoding, self.lc_collate))
def runTest(self):
db_con = database_utils.connect_database(self,
test_utils.SERVER_GROUP,
self.encode_sid,
self.encode_did)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to the database.")
# Initialize query tool
self.trans_id = str(secrets.choice(range(1, 9999999)))
url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'\
.format(self.trans_id, test_utils.SERVER_GROUP, self.encode_sid,
self.encode_did)
response = self.tester.post(url, data=json.dumps({
"dbname": self.encode_db_name
}))
self.assertEqual(response.status_code, 200)
# Check character
url = "/sqleditor/query_tool/start/{0}".format(self.trans_id)
sql = "select E'{0}';".format(self.test_str)
response = (self.tester.post(url, data=json.dumps({"sql": sql}),
content_type='html/json'))
self.assertEqual(response.status_code, 200)
response = async_poll(tester=self.tester,
poll_url='/sqleditor/poll/{0}'.format(
self.trans_id))
self.assertEqual(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
self.assertEqual(response_data['data']['rows_fetched_to'], 1)
result = response_data['data']['result'][0][0]
self.assertEqual(result, self.test_str)
# Close query tool
url = '/sqleditor/close/{0}'.format(self.trans_id)
response = self.tester.delete(url)
self.assertEqual(response.status_code, 200)
database_utils.disconnect_database(self, self.encode_sid,
self.encode_did)
def tearDown(self):
main_conn = test_utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
test_utils.drop_database(main_conn, self.encode_db_name)