Add Python API tests for execution of external utilities such as pg_dump.

This commit is contained in:
Khushboo Vashi 2018-06-15 11:36:07 +01:00 committed by Dave Page
parent 766b389001
commit d1902cd639
21 changed files with 2455 additions and 40 deletions

View File

@ -109,8 +109,7 @@ class BackupMessage(IProcessDesc):
else: else:
self.cmd += cmdArg(arg) self.cmd += cmdArg(arg)
@property def get_server_details(self):
def message(self):
# Fetch the server details like hostname, port, roles etc # Fetch the server details like hostname, port, roles etc
s = Server.query.filter_by( s = Server.query.filter_by(
id=self.sid, user_id=current_user.id id=self.sid, user_id=current_user.id
@ -123,13 +122,19 @@ class BackupMessage(IProcessDesc):
host = manager.local_bind_host if manager.use_ssh_tunnel else s.host host = manager.local_bind_host if manager.use_ssh_tunnel else s.host
port = manager.local_bind_port if manager.use_ssh_tunnel else s.port port = manager.local_bind_port if manager.use_ssh_tunnel else s.port
return s.name, host, port
@property
def message(self):
name, host, port = self.get_server_details()
if self.backup_type == BACKUP.OBJECT: if self.backup_type == BACKUP.OBJECT:
return _( return _(
"Backing up an object on the server '{0}' " "Backing up an object on the server '{0}' "
"from database '{1}'..." "from database '{1}'..."
).format( ).format(
"{0} ({1}:{2})".format( "{0} ({1}:{2})".format(
s.name, host, port name, host, port
), ),
self.database self.database
) )
@ -137,13 +142,13 @@ class BackupMessage(IProcessDesc):
return _("Backing up the global objects on " return _("Backing up the global objects on "
"the server '{0}'...").format( "the server '{0}'...").format(
"{0} ({1}:{2})".format( "{0} ({1}:{2})".format(
s.name, host, port name, host, port
) )
) )
elif self.backup_type == BACKUP.SERVER: elif self.backup_type == BACKUP.SERVER:
return _("Backing up the server '{0}'...").format( return _("Backing up the server '{0}'...").format(
"{0} ({1}:{2})".format( "{0} ({1}:{2})".format(
s.name, host, port name, host, port
) )
) )
else: else:
@ -151,17 +156,7 @@ class BackupMessage(IProcessDesc):
return "Unknown Backup" return "Unknown Backup"
def details(self, cmd, args): def details(self, cmd, args):
# Fetch the server details like hostname, port, roles etc name, host, port = self.get_server_details()
s = Server.query.filter_by(
id=self.sid, user_id=current_user.id
).first()
from pgadmin.utils.driver import get_driver
driver = get_driver(PG_DEFAULT_DRIVER)
manager = driver.connection_manager(self.sid)
host = manager.local_bind_host if manager.use_ssh_tunnel else s.host
port = manager.local_bind_port if manager.use_ssh_tunnel else s.port
res = '<div class="h5">' res = '<div class="h5">'
@ -171,7 +166,7 @@ class BackupMessage(IProcessDesc):
"from database '{1}'..." "from database '{1}'..."
).format( ).format(
"{0} ({1}:{2})".format( "{0} ({1}:{2})".format(
html.safe_str(s.name), html.safe_str(name),
html.safe_str(host), html.safe_str(host),
html.safe_str(port), html.safe_str(port),
), ),
@ -181,7 +176,7 @@ class BackupMessage(IProcessDesc):
res += _("Backing up the global objects on " res += _("Backing up the global objects on "
"the server '{0}'...").format( "the server '{0}'...").format(
"{0} ({1}:{2})".format( "{0} ({1}:{2})".format(
html.safe_str(s.name), html.safe_str(name),
html.safe_str(host), html.safe_str(host),
html.safe_str(port) html.safe_str(port)
) )
@ -189,7 +184,7 @@ class BackupMessage(IProcessDesc):
elif self.backup_type == BACKUP.SERVER: elif self.backup_type == BACKUP.SERVER:
res += _("Backing up the server '{0}'...").format( res += _("Backing up the server '{0}'...").format(
"{0} ({1}:{2})".format( "{0} ({1}:{2})".format(
html.safe_str(s.name), html.safe_str(name),
html.safe_str(host), html.safe_str(host),
html.safe_str(port) html.safe_str(port)
) )

View File

@ -0,0 +1,454 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
import simplejson as json
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from pgadmin.utils import server_utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
if sys.version_info < (3, 3):
from mock import patch, MagicMock
else:
from unittest.mock import patch, MagicMock
class BackupCreateJobTest(BaseTestGenerator):
"""Test the BackupCreateJob class"""
scenarios = [
('When backup object with default options',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='custom',
verbose=True,
blobs=True,
schemas=[],
tables=[],
database='postgres'
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--verbose', '--format=c', '--blobs'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the object with option sections to all data',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='custom',
verbose=True,
schemas=[],
tables=[],
database='postgres',
data=True,
pre_data=True,
post_data=True
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--verbose', '--format=c',
'--section=pre-data', '--section=data',
'--section=post-data'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the object with option only_data',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='plain',
verbose=True,
schemas=[],
tables=[],
database='postgres',
only_data=True,
only_schema=False
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--verbose', '--format=p', '--data-only'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the object with option only_data',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='plain',
verbose=True,
schemas=[],
tables=[],
database='postgres',
only_data=True,
only_schema=True,
dns_owner=True
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--verbose', '--format=p', '--data-only'],
not_expected_cmd_opts=['--schema-only', '--no-owner'],
expected_exit_code=[0, None]
)),
('When backup the object with option only_schema',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='plain',
verbose=True,
schemas=[],
tables=[],
database='postgres',
only_data=False,
only_schema=True
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--verbose', '--format=p', '--schema-only'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the object with option - format plain and dns_owner',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='plain',
verbose=True,
schemas=[],
tables=[],
database='postgres',
dns_owner=True
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--verbose', '--format=p', '--no-owner'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the object with option - Do not save privilege,'
' tablespace, unlogged table data',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='custom',
verbose=True,
schemas=[],
tables=[],
database='postgres',
dns_privilege=True,
dns_unlogged_tbl_data=True,
dns_tablespace=True
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--no-privileges',
'--no-tablespaces',
'--no-unlogged-table-data'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the object with option - all queries',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='plain',
verbose=True,
schemas=[],
tables=[],
database='postgres',
use_column_inserts=True,
include_create_database=True,
use_insert_commands=True,
include_drop_database=True
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--create', '--clean', '--inserts',
'--column-inserts'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the object with option - all queries and format custom',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='custom',
verbose=True,
schemas=[],
tables=[],
database='postgres',
use_column_inserts=True,
include_create_database=True,
use_insert_commands=True,
include_drop_database=True
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--inserts',
'--column-inserts'],
not_expected_cmd_opts=['--create', '--clean'],
expected_exit_code=[0, None]
)),
('When backup the object with option - miscellaneous',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='custom',
verbose=True,
schemas=[],
tables=[],
database='postgres',
disable_quoting=True,
use_set_session_auth=True,
with_oids=True,
dqoute=True
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--verbose', '--quote-all-identifiers',
'--disable-dollar-quoting', '--oids',
'--use-set-session-authorization'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the object with format tar',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_file',
format='tar',
verbose=True,
schemas=[],
tables=[],
database='postgres',
blobs=True,
),
url='/backup/job/{0}/object',
expected_cmd_opts=['--verbose',
'--blobs',
'--format=t'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup the server',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_server_file',
dqoute=False,
verbose=True,
type='server'
),
url='/backup/job/{0}',
expected_cmd_opts=['--verbose'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When backup globals',
dict(
class_params=dict(
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
username='postgres'
),
params=dict(
file='test_backup_global_file',
dqoute=False,
verbose=True,
type='globals'
),
url='/backup/job/{0}',
expected_cmd_opts=['--verbose'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
))
]
def setUp(self):
if self.server['default_binary_paths'] is None:
self.skipTest(
"default_binary_paths is not set for the server {0}".format(
self.server['name']
)
)
@patch('pgadmin.tools.backup.Server')
@patch('pgadmin.tools.backup.BackupMessage')
@patch('pgadmin.tools.backup.filename_with_file_manager_path')
@patch('pgadmin.tools.backup.BatchProcess')
@patch('pgadmin.utils.driver.psycopg2.server_manager.ServerManager.'
'export_password_env')
def runTest(self, export_password_env_mock, batch_process_mock,
filename_mock, backup_message_mock, server_mock):
class TestMockServer():
def __init__(self, name, host, port, id, username,
maintenance_db):
self.name = name
self.host = host
self.port = port
self.id = id
self.username = username
self.maintenance_db = maintenance_db
self.server_id = parent_node_dict["server"][-1]["server_id"]
mock_obj = TestMockServer(self.class_params['name'],
self.class_params['host'],
self.class_params['port'],
self.server_id,
self.class_params['username'],
self.class_params['database']
)
mock_result = server_mock.query.filter_by.return_value
mock_result.first.return_value = mock_obj
filename_mock.return_value = self.params['file']
batch_process_mock.set_env_variables = MagicMock(
return_value=True
)
batch_process_mock.start = MagicMock(
return_value=True
)
export_password_env_mock.return_value = True
server_response = server_utils.connect_server(self, self.server_id)
if server_response["info"] == "Server connected.":
db_owner = server_response['data']['user']['name']
self.data = database_utils.get_db_data(db_owner)
url = self.url.format(self.server_id)
# Create the backup job
response = self.tester.post(url,
data=json.dumps(self.params),
content_type='html/json')
self.assertEqual(response.status_code, 200)
self.assertTrue(backup_message_mock.called)
self.assertTrue(batch_process_mock.called)
if self.expected_cmd_opts:
for opt in self.expected_cmd_opts:
self.assertIn(
opt,
batch_process_mock.call_args_list[0][1]['args']
)
if self.not_expected_cmd_opts:
for opt in self.not_expected_cmd_opts:
self.assertNotIn(
opt,
batch_process_mock.call_args_list[0][1]['args']
)

View File

@ -0,0 +1,147 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
from pgadmin.tools.backup import BackupMessage, BACKUP
from pgadmin.utils.route import BaseTestGenerator
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class BackupMessageTest(BaseTestGenerator):
"""Test the BackupMessage class"""
scenarios = [
('When Backup server',
dict(
class_params=dict(
type=BACKUP.SERVER,
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
args=[
'--file',
"backup_file",
'--host',
"localhost",
'--port',
"5444",
'--username',
"postgres",
'--no-password',
'--database',
"postgres"
],
cmd="/test_path/pg_dump"
),
extected_msg="Backing up the server"
" 'test_backup_server (localhost:5444)'...",
expetced_details_cmd='/test_path/pg_dump --file '
'"backup_file" --host "localhost" '
'--port "5444" --username "postgres" '
'--no-password --database "postgres"'
)),
('When Backup global',
dict(
class_params=dict(
type=BACKUP.GLOBALS,
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
args=[
'--file',
'backup_file',
'--host',
'localhost',
'--port',
'5444',
'--username',
'postgres',
'--no-password',
'--database',
'postgres'
],
cmd="/test_path/pg_dump"
),
extected_msg="Backing up the global objects on the server "
"'test_backup_server (localhost:5444)'...",
expetced_details_cmd='/test_path/pg_dump --file "backup_file" '
'--host "localhost"'
' --port "5444" --username "postgres" '
'--no-password --database "postgres"'
)),
('When backup object',
dict(
class_params=dict(
type=BACKUP.OBJECT,
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_backup',
args=[
'--file',
'backup_file',
'--host',
'localhost',
'--port',
'5444',
'--username',
'postgres',
'--no-password',
'--database',
'postgres'
],
cmd="/test_path/pg_dump"
),
extected_msg="Backing up an object on the server "
"'test_backup_server (localhost:5444)'"
" from database 'postgres'...",
expetced_details_cmd='/test_path/pg_dump --file "backup_file" '
'--host "localhost" '
'--port "5444" --username "postgres" '
'--no-password --database "postgres"'
))
]
@patch('pgadmin.tools.backup.BackupMessage.get_server_details')
def runTest(self, get_server_details_mock):
get_server_details_mock.return_value = \
self.class_params['name'],\
self.class_params['host'],\
self.class_params['port']
backup_obj = BackupMessage(
self.class_params['type'],
self.class_params['sid'],
self.class_params['bfile'],
*self.class_params['args'],
**{'database': self.class_params['database']}
)
# Check the expected message returned
self.assertEqual(backup_obj.message, self.extected_msg)
# Check the command
obj_details = backup_obj.details(self.class_params['cmd'],
self.class_params['args'])
self.assertIn(self.expetced_details_cmd, obj_details)

View File

@ -0,0 +1,119 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import time
import random
import simplejson as json
def create_backup_job(tester, url, params, assert_equal):
# Create the backup job
response = tester.post(url,
data=json.dumps(params),
content_type='html/json')
assert_equal(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
job_id = response_data['data']['job_id']
return job_id
def run_backup_job(tester, job_id, expected_params, assert_in, assert_not_in,
assert_equal):
cnt = 0
while 1:
if cnt >= 5:
break
# Check the process list
response1 = tester.get('/misc/bgprocess/?_='.format(
random.randint(1, 9999999)))
assert_equal(response1.status_code, 200)
process_list = json.loads(response1.data.decode('utf-8'))
if len(process_list) > 0 and 'execution_time' in process_list[0]:
break
time.sleep(0.5)
cnt += 1
assert_equal('execution_time' in process_list[0], True)
assert_equal('stime' in process_list[0], True)
assert_equal('exit_code' in process_list[0], True)
assert_equal(process_list[0]['exit_code'] in expected_params[
'expected_exit_code'
], True)
backup_file = None
if 'details' in process_list[0]:
backup_det = process_list[0]['details']
backup_file = backup_det[int(backup_det.find('--file')) +
8:int(backup_det.find('--host')) - 2]
if expected_params['expected_cmd_opts']:
for opt in expected_params['expected_cmd_opts']:
assert_in(opt, process_list[0]['details'])
if expected_params['not_expected_cmd_opts']:
for opt in expected_params['not_expected_cmd_opts']:
assert_not_in(opt, process_list[0]['details'])
# Check the process details
p_details = tester.get('/misc/bgprocess/{0}?_='.format(
job_id, random.randint(1, 9999999))
)
assert_equal(p_details.status_code, 200)
p_details = tester.get('/misc/bgprocess/{0}/{1}/{2}/?_='.format(
job_id, 0, 0, random.randint(1, 9999999))
)
assert_equal(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))
cnt = 0
# Retrieve the backup job process logs
while 1:
out, err, status = get_params(p_details_data)
if status or cnt >= 5:
break
p_details = tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, out, err, random.randint(1, 9999999))
)
assert_equal(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))
cnt += 1
time.sleep(1)
# Check the job is complete.
backup_ack = tester.put('/misc/bgprocess/{0}'.format(job_id))
assert_equal(backup_ack.status_code, 200)
backup_ack_res = json.loads(backup_ack.data.decode('utf-8'))
assert_equal(backup_ack_res['success'], 1)
return backup_file
def get_params(data):
out = 0
out_done = False
err = 0
err_done = False
if 'out' in data:
out = data['out'] and data['out']['pos']
if 'done' in data['out']:
out_done = data['out']['done']
if 'err' in data:
err = data['err'] and data['err']['pos']
if 'done' in data['err']:
err_done = data['err']['done']
return out, err, (out_done and err_done)

View File

@ -0,0 +1,213 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
from pgadmin.misc.bgprocess.processes import BatchProcess, IProcessDesc
from pgadmin.tools.backup import BackupMessage, BACKUP
from pgadmin.utils.route import BaseTestGenerator
from pickle import dumps, loads
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class BatchProcessTest(BaseTestGenerator):
"""Test the BatchProcess class"""
scenarios = [
('When backup server',
dict(
class_params=dict(
type=BACKUP.SERVER,
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
username='postgres',
bfile='test_backup',
args=[
'--file',
"backup_file",
'--host',
"localhost",
'--port',
"5444",
'--username',
"postgres",
'--no-password',
'--database',
"postgres"
],
cmd='backup_server'
)
)),
('When backup globals',
dict(
class_params=dict(
type=BACKUP.GLOBALS,
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
username='postgres',
bfile='test_backup',
args=[
'--file',
"backup_file",
'--host',
"localhost",
'--port',
"5444",
'--username',
"postgres",
'--no-password',
'--database',
"postgres"
],
cmd='backup'
)
)),
('When backup object',
dict(
class_params=dict(
type=BACKUP.OBJECT,
sid=1,
name='test_backup_server',
port=5444,
host='localhost',
database='postgres',
username='postgres',
bfile='test_backup',
args=[
'--file',
"backup_file",
'--host',
"localhost",
'--port',
"5444",
'--username',
"postgres",
'--no-password',
'--database',
"postgres"
],
cmd='backup'
)
))
]
@patch('pgadmin.tools.backup.BackupMessage.get_server_details')
@patch('pgadmin.misc.bgprocess.processes.Popen')
@patch('pgadmin.misc.bgprocess.processes.current_app')
@patch('pgadmin.misc.bgprocess.processes.db')
@patch('pgadmin.tools.backup.current_user')
@patch('pgadmin.misc.bgprocess.processes.current_user')
def runTest(self, current_user_mock, current_user, db_mock,
current_app_mock, popen_mock, get_server_details_mock):
current_user.id = 1
current_user_mock.id = 1
current_app_mock.PGADMIN_RUNTIME = False
def db_session_add_mock(j):
cmd_obj = loads(j.desc)
self.assertTrue(isinstance(cmd_obj, IProcessDesc))
self.assertEqual(cmd_obj.backup_type, self.class_params['type'])
self.assertEqual(cmd_obj.bfile, self.class_params['bfile'])
self.assertEqual(cmd_obj.database, self.class_params['database'])
self.assertEqual(cmd_obj.cmd,
' --file "backup_file" '
'--host "{0}" '
'--port "{1}" '
'--username "{2}" '
'--no-password '
'--database "{3}"'.format(
self.class_params['host'],
self.class_params['port'],
self.class_params['username'],
self.class_params['database']
))
db_mock.session.add.side_effect = db_session_add_mock
get_server_details_mock.return_value = \
self.class_params['name'],\
self.class_params['host'],\
self.class_params['port']
backup_obj = BackupMessage(
self.class_params['type'],
self.class_params['sid'],
self.class_params['bfile'],
*self.class_params['args'],
**{'database': self.class_params['database']}
)
p = BatchProcess(
desc=backup_obj,
cmd=self.class_params['cmd'],
args=self.class_params['args']
)
# Check that _create_process has been called
self.assertTrue(db_mock.session.add.called)
# Check start method
self._check_start(popen_mock, p)
# Check list method
self._check_list(p, backup_obj)
def _check_start(self, popen_mock, p):
cmd_test = self.class_params['cmd']
assert_true = self.assertTrue
class popenMockSideEffect():
def __init__(self, cmd, **kwargs):
assert_true(cmd_test in cmd)
assert_true('env' in kwargs)
def poll(self):
pass
popen_mock.side_effect = popenMockSideEffect
p.start()
self.assertTrue(popen_mock.called)
@patch('pgadmin.misc.bgprocess.processes.Process')
@patch('pgadmin.misc.bgprocess.processes.BatchProcess.'
'update_process_info')
def _check_list(self, p, backup_obj, update_process_info_mock,
process_mock):
class TestMockProcess():
def __init__(self, desc, args, cmd):
self.pid = 1
self.exit_code = 1
self.start_time = '2018-04-17 06:18:56.315445 +0000'
self.end_time = None
self.desc = dumps(desc)
self.arguments = " ".join(args)
self.command = cmd
self.acknowledge = None
process_mock.query.filter_by.return_value = [
TestMockProcess(backup_obj,
self.class_params['args'],
self.class_params['cmd'])]
update_process_info_mock.return_value = [True, True]
ret_value = p.list()
self.assertEqual(1, len(ret_value))
self.assertTrue('details' in ret_value[0])
self.assertTrue('desc' in ret_value[0])

View File

@ -0,0 +1,65 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import os
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
import pgadmin.tools.backup.tests.test_backup_utils as backup_utils
class BackupJobTest(BaseTestGenerator):
"""Backup api test cases"""
scenarios = [
('When backup the object with the default options',
dict(
params=dict(
file='test_backup',
format='custom',
verbose=True,
blobs=True,
schemas=[],
tables=[],
database='postgres'
),
url='/backup/job/{0}/object',
expected_params=dict(
expected_cmd_opts=['--verbose', '--format=c', '--blobs'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)
))
]
def setUp(self):
if self.server['default_binary_paths'] is None:
self.skipTest(
"default_binary_paths is not set for the server {0}".format(
self.server['name']
)
)
def runTest(self):
self.server_id = parent_node_dict["server"][-1]["server_id"]
url = self.url.format(self.server_id)
# Create the backup job
job_id = backup_utils.create_backup_job(self.tester, url, self.params,
self.assertEqual)
backup_file = backup_utils.run_backup_job(self.tester,
job_id,
self.expected_params,
self.assertIn,
self.assertNotIn,
self.assertEqual
)
if backup_file is not None:
if os.path.isfile(backup_file):
os.remove(backup_file)

View File

@ -0,0 +1,155 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
from pgadmin.misc.bgprocess.processes import BatchProcess, IProcessDesc
from pgadmin.tools.maintenance import Message
from pgadmin.utils.route import BaseTestGenerator
from pickle import dumps, loads
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class BatchProcessTest(BaseTestGenerator):
"""Test the BatchProcess class"""
scenarios = [
('When maintained server',
dict(
class_params=dict(
sid=1,
host='localhost',
port=5444,
username='postgres',
args=[
'--host',
"localhost",
'--port',
"5444",
'--username',
'--dbname',
"postgres",
'--command',
"VACUUM VERBOSE;\n"
],
data={
'database': 'postgres',
'op': 'VACUUM',
'vacuum_analyze': False,
'vacuum_freeze': False,
'vacuum_full': False,
'verbose': True
},
cmd="VACUUM VERBOSE;\n"
),
expected_msg="Maintenance (Vacuum)",
expetced_details_cmd='VACUUM VERBOSE;'
))
]
@patch('pgadmin.misc.bgprocess.processes.Popen')
@patch('pgadmin.misc.bgprocess.processes.current_app')
@patch('pgadmin.misc.bgprocess.processes.db')
@patch('pgadmin.tools.maintenance.Server')
@patch('pgadmin.misc.bgprocess.processes.current_user')
def runTest(self, current_user_mock, server_mock, db_mock,
current_app_mock, popen_mock):
current_user_mock.id = 1
current_app_mock.PGADMIN_RUNTIME = False
class TestMockServer():
def __init__(self, name, host, port):
self.name = name
self.host = host
self.port = port
def db_session_add_mock(j):
cmd_obj = loads(j.desc)
self.assertTrue(isinstance(cmd_obj, IProcessDesc))
self.assertEqual(cmd_obj.query, self.class_params['cmd'])
self.assertEqual(cmd_obj.message, self.expected_msg)
self.assertEqual(cmd_obj.data, self.class_params['data'])
mock_obj = TestMockServer(self.class_params['username'],
self.class_params['host'],
self.class_params['port'])
mock_result = server_mock.query.filter_by.return_value
mock_result.first.return_value = mock_obj
db_mock.session.add.side_effect = db_session_add_mock
maintenance_obj = Message(
self.class_params['sid'],
self.class_params['data'],
self.class_params['cmd']
)
p = BatchProcess(
desc=maintenance_obj,
cmd=self.class_params['cmd'],
args=self.class_params['args']
)
# Check that _create_process has been called
self.assertTrue(db_mock.session.add.called)
# Check start method
self._check_start(popen_mock, p)
# Check list method
self._check_list(p, maintenance_obj)
def _check_start(self, popen_mock, p):
cmd_test = self.class_params['cmd']
assert_true = self.assertTrue
class popenMockSideEffect():
def __init__(self, cmd, **kwargs):
assert_true(cmd_test in cmd)
assert_true('env' in kwargs)
def poll(self):
pass
popen_mock.side_effect = popenMockSideEffect
p.start()
self.assertTrue(popen_mock.called)
@patch('pgadmin.misc.bgprocess.processes.Process')
@patch('pgadmin.misc.bgprocess.processes.BatchProcess.'
'update_process_info')
def _check_list(self, p, maintenance_obj, update_process_info_mock,
process_mock):
class TestMockProcess():
def __init__(self, desc, args, cmd):
self.pid = 1
self.exit_code = 1
self.start_time = '2018-04-17 06:18:56.315445 +0000'
self.end_time = None
self.desc = dumps(desc)
self.arguments = " ".join(args)
self.command = cmd
self.acknowledge = None
process_mock.query.filter_by.return_value = [
TestMockProcess(maintenance_obj,
self.class_params['args'],
self.class_params['cmd'])
]
update_process_info_mock.return_value = [True, True]
ret_value = p.list()
self.assertEqual(1, len(ret_value))
self.assertTrue('details' in ret_value[0])
self.assertTrue('desc' in ret_value[0])

View File

@ -0,0 +1,136 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import time
import random
import simplejson as json
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
class MaintenanceJobTest(BaseTestGenerator):
"""Maintenance api test cases"""
scenarios = [
('When maintenance the object with the default options',
dict(
params=dict(
data={
'database': 'postgres',
'op': 'VACUUM',
'vacuum_analyze': False,
'vacuum_freeze': False,
'vacuum_full': False,
'verbose': True
},
cmd="VACUUM VERBOSE;\n"
),
url='/maintenance/job/{0}/{1}',
expected_cmd='VACUUM VERBOSE',
expected_exit_code=[0, None]
))
]
def setUp(self):
if self.server['default_binary_paths'] is None:
self.skipTest(
"default_binary_paths is not set for the server {0}".format(
self.server['name']
)
)
def runTest(self):
self.server_id = parent_node_dict["database"][-1]["server_id"]
self.db_id = parent_node_dict["database"][-1]["db_id"]
url = self.url.format(self.server_id, self.db_id)
# Create the backup job
response = self.tester.post(url,
data=json.dumps(self.params['data']),
content_type='html/json')
self.assertEqual(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
job_id = response_data['data']['job_id']
cnt = 0
while 1:
if cnt >= 10:
break
# Check the process list
response1 = self.tester.get('/misc/bgprocess/?_='.format(
random.randint(1, 9999999)))
self.assertEqual(response1.status_code, 200)
process_list = json.loads(response1.data.decode('utf-8'))
if len(process_list) > 0 and 'execution_time' in process_list[0]:
break
time.sleep(0.5)
cnt += 1
self.assertTrue('execution_time' in process_list[0])
self.assertTrue('stime' in process_list[0])
self.assertTrue('exit_code' in process_list[0])
self.assertTrue(process_list[0]['exit_code'] in
self.expected_exit_code)
self.assertIn(self.expected_cmd, process_list[0]['details'])
# Check the process details
p_details = self.tester.get('/misc/bgprocess/{0}?_='.format(
job_id, random.randint(1, 9999999))
)
self.assertEqual(p_details.status_code, 200)
p_details = self.tester.get('/misc/bgprocess/{0}/{1}/{2}/?_='.format(
job_id, 0, 0, random.randint(1, 9999999))
)
self.assertEqual(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))
# Retrieve the backup job process logs
while 1:
out, err, status = MaintenanceJobTest.get_params(p_details_data)
if status:
break
p_details = self.tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, out, err, random.randint(1, 9999999))
)
self.assertEqual(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))
time.sleep(1)
# Check the job is complete.
backup_ack = self.tester.put('/misc/bgprocess/{0}'.format(job_id))
self.assertEqual(backup_ack.status_code, 200)
backup_ack_res = json.loads(backup_ack.data.decode('utf-8'))
self.assertEqual(backup_ack_res['success'], 1)
@staticmethod
def get_params(data):
out = 0
out_done = False
err = 0
err_done = False
if 'out' in data:
out = data['out'] and data['out']['pos']
if 'done' in data['out']:
out_done = data['out']['done']
if 'err' in data:
err = data['err'] and data['err']['pos']
if 'done' in data['err']:
err_done = data['err']['done']
return out, err, (out_done and err_done)

View File

@ -0,0 +1,190 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
import simplejson as json
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from pgadmin.utils import server_utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
if sys.version_info < (3, 3):
from mock import patch, MagicMock
else:
from unittest.mock import patch, MagicMock
class MaintenanceCreateJobTest(BaseTestGenerator):
"""Test the BackupCreateJob class"""
scenarios = [
('When maintenance object with default options',
dict(
class_params=dict(
sid=1,
name='test_maintenance_server',
port=5444,
host='localhost',
username='postgres'
),
params=dict(
database='postgres',
op='VACUUM',
vacuum_analyze=False,
vacuum_freeze=False,
vacuum_full=False,
verbose=True
),
url='/maintenance/job/{0}/{1}',
expected_cmd_opts=['VACUUM VERBOSE;\n'],
)),
('When maintenance object with VACUUM FULL',
dict(
class_params=dict(
sid=1,
name='test_maintenance_server',
port=5444,
host='localhost',
username='postgres'
),
params=dict(
database='postgres',
op='VACUUM',
vacuum_analyze=False,
vacuum_freeze=False,
vacuum_full=True,
verbose=True
),
url='/maintenance/job/{0}/{1}',
expected_cmd_opts=['VACUUM FULL VERBOSE;\n'],
)),
('When maintenance object with the ANALYZE',
dict(
class_params=dict(
sid=1,
name='test_maintenance_server',
port=5444,
host='localhost',
username='postgres'
),
params=dict(
database='postgres',
op='ANALYZE',
vacuum_analyze=True,
vacuum_freeze=False,
vacuum_full=False,
verbose=True
),
url='/maintenance/job/{0}/{1}',
expected_cmd_opts=['ANALYZE VERBOSE;\n'],
)),
('When maintenance the object with the REINDEX',
dict(
class_params=dict(
sid=1,
name='test_maintenance_server',
port=5444,
host='localhost',
username='postgres'
),
params=dict(
database='postgres',
op='REINDEX',
vacuum_analyze=False,
vacuum_freeze=False,
vacuum_full=False,
verbose=False
),
url='/maintenance/job/{0}/{1}',
expected_cmd_opts=['REINDEX DATABASE postgres;\n'],
)),
('When maintenance the object with the CLUSTER',
dict(
class_params=dict(
sid=1,
name='test_maintenance_server',
port=5444,
host='localhost',
username='postgres'
),
params=dict(
database='postgres',
op='CLUSTER',
vacuum_analyze=False,
vacuum_freeze=False,
vacuum_full=False,
verbose=False
),
url='/maintenance/job/{0}/{1}',
expected_cmd_opts=['CLUSTER;\n'],
))
]
def setUp(self):
if self.server['default_binary_paths'] is None:
self.skipTest(
"default_binary_paths is not set for the server {0}".format(
self.server['name']
)
)
@patch('pgadmin.tools.maintenance.Server')
@patch('pgadmin.tools.maintenance.Message')
@patch('pgadmin.tools.maintenance.BatchProcess')
@patch('pgadmin.utils.driver.psycopg2.server_manager.ServerManager.'
'export_password_env')
def runTest(self, export_password_env_mock,
batch_process_mock, message_mock, server_mock):
self.server_id = parent_node_dict["database"][-1]["server_id"]
self.db_id = parent_node_dict["database"][-1]["db_id"]
url = self.url.format(self.server_id, self.db_id)
class TestMockServer():
def __init__(self, host, port, id, username):
self.host = host
self.port = port
self.id = id
self.username = username
mock_obj = TestMockServer(self.class_params['host'],
self.class_params['port'],
self.server_id,
self.class_params['username']
)
mock_result = server_mock.query.filter_by.return_value
mock_result.first.return_value = mock_obj
batch_process_mock.set_env_variables = MagicMock(
return_value=True
)
batch_process_mock.start = MagicMock(
return_value=True
)
export_password_env_mock.return_value = True
server_response = server_utils.connect_server(self, self.server_id)
if server_response["info"] == "Server connected.":
db_owner = server_response['data']['user']['name']
self.data = database_utils.get_db_data(db_owner)
self.db_name = self.data['name']
# Create the backup job
response = self.tester.post(url,
data=json.dumps(self.params),
content_type='html/json')
self.assertEqual(response.status_code, 200)
self.assertTrue(message_mock.called)
self.assertTrue(batch_process_mock.called)
if self.expected_cmd_opts:
for opt in self.expected_cmd_opts:
self.assertIn(opt,
batch_process_mock.call_args_list[0][1]['args'])

View File

@ -0,0 +1,121 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from pgadmin.tools.maintenance import Message
from pgadmin.utils.route import BaseTestGenerator
class MaintenanceMessageTest(BaseTestGenerator):
"""Test the Maintenance Message class"""
scenarios = [
('When maintained the server',
dict(
class_params=dict(
sid=1,
data={
'database': 'postgres',
'op': 'VACUUM',
'vacuum_analyze': False,
'vacuum_freeze': False,
'vacuum_full': False,
'verbose': True
},
cmd="VACUUM VERBOSE;\n"
),
extected_msg="Maintenance (Vacuum)",
expetced_details_cmd='VACUUM VERBOSE;'
)),
('When maintained the server with FULL VERBOSE options',
dict(
class_params=dict(
sid=1,
data={
'database': 'postgres',
'op': 'VACUUM',
'vacuum_analyze': False,
'vacuum_freeze': False,
'vacuum_full': True,
'verbose': True
},
cmd="VACUUM FULL VERBOSE;\n"
),
extected_msg="Maintenance (Vacuum)",
expetced_details_cmd='VACUUM FULL VERBOSE;'
)),
('When maintained the server with ANALYZE',
dict(
class_params=dict(
sid=1,
data={
'database': 'postgres',
'op': 'ANALYZE',
'vacuum_analyze': False,
'vacuum_freeze': False,
'vacuum_full': False,
'verbose': True
},
cmd="ANALYZE VERBOSE;\n"
),
extected_msg="Maintenance (Analyze)",
expetced_details_cmd='ANALYZE VERBOSE;'
)),
('When maintained the server with REINDEX',
dict(
class_params=dict(
sid=1,
data={
'database': 'postgres',
'op': 'REINDEX',
'vacuum_analyze': False,
'vacuum_freeze': False,
'vacuum_full': False,
'verbose': False
},
cmd="REINDEX;\n"
),
extected_msg="Maintenance (Reindex)",
expetced_details_cmd='REINDEX;'
)),
('When maintained the server with CLUSTER',
dict(
class_params=dict(
sid=1,
data={
'database': 'postgres',
'op': 'CLUSTER',
'vacuum_analyze': False,
'vacuum_freeze': False,
'vacuum_full': False,
'verbose': True
},
cmd="CLUSTER VERBOSE;\n"
),
extected_msg="Maintenance (Cluster)",
expetced_details_cmd='CLUSTER VERBOSE;'
)),
]
def runTest(self):
maintenance_obj = Message(
self.class_params['sid'],
self.class_params['data'],
self.class_params['cmd']
)
# Check the expected message returned
self.assertEqual(maintenance_obj.message, self.extected_msg)
# Check the command
obj_details = maintenance_obj.details(self.class_params['cmd'], None)
self.assertIn(self.expetced_details_cmd, obj_details)

View File

@ -86,8 +86,7 @@ class RestoreMessage(IProcessDesc):
else: else:
self.cmd += cmdArg(arg) self.cmd += cmdArg(arg)
@property def get_server_details(self):
def message(self):
# Fetch the server details like hostname, port, roles etc # Fetch the server details like hostname, port, roles etc
s = Server.query.filter_by( s = Server.query.filter_by(
id=self.sid, user_id=current_user.id id=self.sid, user_id=current_user.id
@ -100,30 +99,25 @@ class RestoreMessage(IProcessDesc):
host = manager.local_bind_host if manager.use_ssh_tunnel else s.host host = manager.local_bind_host if manager.use_ssh_tunnel else s.host
port = manager.local_bind_port if manager.use_ssh_tunnel else s.port port = manager.local_bind_port if manager.use_ssh_tunnel else s.port
return s.name, host, port
@property
def message(self):
name, host, port = self.get_server_details()
return _("Restoring backup on the server '{0}'...").format( return _("Restoring backup on the server '{0}'...").format(
"{0} ({1}:{2})".format(s.name, host, port), "{0} ({1}:{2})".format(name, host, port),
) )
def details(self, cmd, args): def details(self, cmd, args):
# Fetch the server details like hostname, port, roles etc name, host, port = self.get_server_details()
s = Server.query.filter_by(
id=self.sid, user_id=current_user.id
).first()
from pgadmin.utils.driver import get_driver
driver = get_driver(PG_DEFAULT_DRIVER)
manager = driver.connection_manager(self.sid)
host = manager.local_bind_host if manager.use_ssh_tunnel else s.host
port = manager.local_bind_port if manager.use_ssh_tunnel else s.port
res = '<div class="h5">' res = '<div class="h5">'
res += html.safe_str( res += html.safe_str(
_( _(
"Restoring backup on the server '{0}'..." "Restoring backup on the server '{0}'..."
).format( ).format(
"{0} ({1}:{2})".format(s.name, host, port) "{0} ({1}:{2})".format(name, host, port)
) )
) )
@ -206,6 +200,7 @@ def create_restore_job(sid):
if _file is None: if _file is None:
return make_json_response( return make_json_response(
status=410,
success=0, success=0,
errormsg=_("File could not be found.") errormsg=_("File could not be found.")
) )

View File

@ -0,0 +1,155 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
from pgadmin.misc.bgprocess.processes import BatchProcess, IProcessDesc
from pgadmin.tools.restore import RestoreMessage
from pgadmin.utils.route import BaseTestGenerator
from pickle import dumps, loads
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class BatchProcessTest(BaseTestGenerator):
"""Test the BatchProcess class"""
scenarios = [
('When restore server',
dict(
class_params=dict(
sid=1,
name='test_restore_server',
port=5444,
host='localhost',
database='postgres',
username='postgres',
bfile='test_restore',
args=[
'--file',
"restore_file",
'--host',
"localhost",
'--port',
"5444",
'--username',
"postgres",
'--no-password',
'--database',
"postgres"
],
cmd='restore_server'
)
))
]
@patch('pgadmin.tools.restore.RestoreMessage.get_server_details')
@patch('pgadmin.misc.bgprocess.processes.Popen')
@patch('pgadmin.misc.bgprocess.processes.current_app')
@patch('pgadmin.misc.bgprocess.processes.db')
@patch('pgadmin.tools.restore.current_user')
@patch('pgadmin.misc.bgprocess.processes.current_user')
def runTest(self, current_user_mock, current_user, db_mock,
current_app_mock, popen_mock, get_server_details_mock):
current_user.id = 1
current_user_mock.id = 1
current_app_mock.PGADMIN_RUNTIME = False
def db_session_add_mock(j):
cmd_obj = loads(j.desc)
self.assertTrue(isinstance(cmd_obj, IProcessDesc))
self.assertEqual(cmd_obj.bfile, self.class_params['bfile'])
self.assertEqual(cmd_obj.cmd,
' --file "restore_file" '
'--host "{0}" '
'--port "{1}" '
'--username "{2}" '
'--no-password '
'--database "{3}"'.format(
self.class_params['host'],
self.class_params['port'],
self.class_params['username'],
self.class_params['database']
))
get_server_details_mock.return_value = \
self.class_params['name'],\
self.class_params['host'],\
self.class_params['port']
db_mock.session.add.side_effect = db_session_add_mock
restore_obj = RestoreMessage(
self.class_params['sid'],
self.class_params['bfile'],
*self.class_params['args']
)
p = BatchProcess(
desc=restore_obj,
cmd=self.class_params['cmd'],
args=self.class_params['args']
)
# Check that _create_process has been called
self.assertTrue(db_mock.session.add.called)
# Check start method
self._check_start(popen_mock, p)
# Check list method
self._check_list(p, restore_obj)
def _check_start(self, popen_mock, p):
cmd_test = self.class_params['cmd']
assert_true = self.assertTrue
class popenMockSideEffect():
def __init__(self, cmd, **kwargs):
assert_true(cmd_test in cmd)
assert_true('env' in kwargs)
def poll(self):
pass
popen_mock.side_effect = popenMockSideEffect
p.start()
self.assertTrue(popen_mock.called)
@patch('pgadmin.misc.bgprocess.processes.Process')
@patch('pgadmin.misc.bgprocess.processes.BatchProcess.'
'update_process_info')
def _check_list(self, p, restore_obj, update_process_info_mock,
process_mock):
class TestMockProcess():
def __init__(self, desc, args, cmd):
self.pid = 1
self.exit_code = 1
self.start_time = '2018-04-17 06:18:56.315445 +0000'
self.end_time = None
self.desc = dumps(desc)
self.arguments = " ".join(args)
self.command = cmd
self.acknowledge = None
process_mock.query.filter_by.return_value = [
TestMockProcess(restore_obj,
self.class_params['args'],
self.class_params['cmd'])
]
update_process_info_mock.return_value = [True, True]
ret_value = p.list()
self.assertEqual(1, len(ret_value))
self.assertTrue('details' in ret_value[0])
self.assertTrue('desc' in ret_value[0])

View File

@ -0,0 +1,201 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
import time
import random
import os
import simplejson as json
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from pgadmin.utils import server_utils as server_utils
import pgadmin.tools.backup.tests.test_backup_utils as backup_utils
class RestoreJobTest(BaseTestGenerator):
"""Backup api test cases"""
scenarios = [
('When restore the object with the default options',
dict(
params=dict(
file='test_restore_file',
format='custom',
custom=False,
verbose=True,
blobs=True,
schemas=[],
tables=[],
database='test_restore_database'
),
url='/restore/job/{0}',
expected_cmd_opts=['--verbose'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None],
backup_options=dict(
params=dict(
file='test_restore_file',
format='custom',
verbose=True,
blobs=True,
schemas=[],
tables=[],
database='test_restore_database'
),
url='/backup/job/{0}/object',
expected_params=dict(
expected_cmd_opts=['--verbose', '--format=c', '--blobs'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)
)
))
]
def setUp(self):
if self.server['default_binary_paths'] is None:
self.skipTest(
"default_binary_paths is not set for the server {0}".format(
self.server['name']
)
)
def create_backup(self):
url = self.backup_options['url'].format(self.server_id)
job_id = backup_utils.create_backup_job(self.tester, url,
self.backup_options['params'],
self.assertEqual)
self.backup_file = backup_utils.run_backup_job(
self.tester,
job_id,
self.backup_options['expected_params'],
self.assertIn,
self.assertNotIn,
self.assertEqual
)
def runTest(self):
self.db_name = ''
self.server_id = parent_node_dict["server"][-1]["server_id"]
server_utils.connect_server(self, self.server_id)
utils.create_database(self.server, self.params['database'])
self.create_backup()
url = self.url.format(self.server_id)
# Create the restore job
response = self.tester.post(url,
data=json.dumps(self.params),
content_type='html/json')
self.assertEqual(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
job_id = response_data['data']['job_id']
cnt = 0
while 1:
if cnt >= 5:
break
# Check the process list
response1 = self.tester.get('/misc/bgprocess/?_='.format(
random.randint(1, 9999999)))
self.assertEqual(response1.status_code, 200)
process_list = json.loads(response1.data.decode('utf-8'))
if len(process_list) > 0 and 'execution_time' in process_list[0]:
break
time.sleep(0.5)
cnt += 1
self.assertTrue('execution_time' in process_list[0])
self.assertTrue('stime' in process_list[0])
self.assertTrue('exit_code' in process_list[0])
self.assertTrue(process_list[0]['exit_code'] in
self.expected_exit_code)
if self.expected_cmd_opts:
for opt in self.expected_cmd_opts:
self.assertIn(opt, process_list[0]['details'])
if self.not_expected_cmd_opts:
for opt in self.not_expected_cmd_opts:
self.assertNotIn(opt, process_list[0]['details'])
# Check the process details
p_details = self.tester.get('/misc/bgprocess/{0}?_='.format(
job_id, random.randint(1, 9999999))
)
self.assertEqual(p_details.status_code, 200)
json.loads(p_details.data.decode('utf-8'))
p_details = self.tester.get('/misc/bgprocess/{0}/{1}/{2}/?_='.format(
job_id, 0, 0, random.randint(1, 9999999))
)
self.assertEqual(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))
# Retrieve the restore job process logs
cnt = 0
while 1:
out, err, status = RestoreJobTest.get_params(p_details_data)
if status or cnt >= 5:
break
p_details = self.tester.get(
'/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
job_id, out, err, random.randint(1, 9999999))
)
self.assertEqual(p_details.status_code, 200)
p_details_data = json.loads(p_details.data.decode('utf-8'))
cnt += 1
time.sleep(1)
# Check the job is complete.
restore_ack = self.tester.put('/misc/bgprocess/{0}'.format(job_id))
self.assertEqual(restore_ack.status_code, 200)
restore_ack_res = json.loads(restore_ack.data.decode('utf-8'))
self.assertEqual(restore_ack_res['success'], 1)
if self.backup_file is not None:
if os.path.isfile(self.backup_file):
os.remove(self.backup_file)
@staticmethod
def get_params(data):
out = 0
out_done = False
err = 0
err_done = False
if 'out' in data:
out = data['out'] and data['out']['pos']
if 'done' in data['out']:
out_done = data['out']['done']
if 'err' in data:
err = data['err'] and data['err']['pos']
if 'done' in data['err']:
err_done = data['err']['done']
return out, err, (out_done and err_done)
def tearDown(self):
connection = utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode']
)
utils.drop_database(connection, self.params['database'])

View File

@ -0,0 +1,314 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
import simplejson as json
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from pgadmin.utils import server_utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
if sys.version_info < (3, 3):
from mock import patch, MagicMock
else:
from unittest.mock import patch, MagicMock
class RestoreCreateJobTest(BaseTestGenerator):
"""Test the RestoreCreateJob class"""
scenarios = [
('When restore object with default options',
dict(
class_params=dict(
sid=1,
name='test_restore_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
username='postgres'
),
params=dict(
file='test_restore_file',
format='custom',
custom=False,
verbose=True,
blobs=True,
schemas=[],
tables=[],
database='postgres'
),
url='/restore/job/{0}',
expected_cmd_opts=['--verbose'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When restore object with the sections options',
dict(
class_params=dict(
sid=1,
name='test_restore_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
username='postgres'
),
params=dict(
file='test_restore_file',
format='custom',
no_of_jobs='2',
custom=False,
verbose=True,
schemas=[],
tables=[],
database='postgres',
data=True,
pre_data=True,
post_data=True,
only_data=True,
only_schema=True
),
url='/restore/job/{0}',
# Please include sections data here, right now this is a bug
expected_cmd_opts=['--verbose', '--jobs', '2'],
not_expected_cmd_opts=[],
# Below options should be enabled once we fix the issue #3368
# not_expected_cmd_opts=['--data-only', '--schema-only'],
expected_exit_code=[0, None],
)),
('When restore the object with Type of objects',
dict(
class_params=dict(
sid=1,
name='test_restore_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
username='postgres'
),
params=dict(
file='test_restore_file',
format='custom',
no_of_jobs='2',
custom=False,
verbose=True,
schemas=[],
tables=[],
database='postgres',
only_data=True,
only_schema=True,
dns_owner=True
),
url='/restore/job/{0}',
expected_cmd_opts=['--verbose', '--data-only'],
not_expected_cmd_opts=[],
# Below options should be enabled once we fix the issue #3368
# not_expected_cmd_opts=['--schema-only', '--no-owner'],
expected_exit_code=[0, None],
)),
('When restore object with option - Do not save',
dict(
class_params=dict(
sid=1,
name='test_restore_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
username='postgres'
),
params=dict(
file='test_restore_file',
format='custom',
verbose=True,
custom=False,
schemas=[],
tables=[],
database='postgres',
dns_owner=True,
dns_privilege=True,
dns_tablespace=True,
only_data=False
),
url='/restore/job/{0}',
# Add '--no-privileges' to the expected_cmd once #3363 fixed
expected_cmd_opts=['--no-owner',
'--no-tablespaces'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When restore object with option - Queries',
dict(
class_params=dict(
sid=1,
name='test_restore_file',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
username='postgres'
),
params=dict(
file='test_backup_file',
format='custom',
verbose=True,
schemas=[],
tables=[],
database='postgres',
clean=True,
include_create_database=True,
single_transaction=True,
),
url='/restore/job/{0}',
expected_cmd_opts=['--create', '--clean',
'--single-transaction'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When restore object with option - Disbale',
dict(
class_params=dict(
sid=1,
name='test_restore_file',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
username='postgres'
),
params=dict(
file='test_backup_file',
format='custom',
verbose=True,
schemas=[],
tables=[],
database='postgres',
disable_trigger=True,
no_data_fail_table=True,
only_schema=False
),
url='/restore/job/{0}',
# Add '--no-data-for-failed-tables' into
# expected_cmd_opts once #3363 fixed
expected_cmd_opts=['--disable-triggers'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
('When restore object with option - Miscellaneous',
dict(
class_params=dict(
sid=1,
name='test_restore_file',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
username='postgres'
),
params=dict(
file='test_backup_file',
format='custom',
verbose=True,
schemas=[],
tables=[],
database='postgres',
use_set_session_auth=True,
exit_on_error=True,
),
url='/restore/job/{0}',
# Add '--use_set_session_auth' into
# expected_cmd_opts once #3363 fixed
expected_cmd_opts=['--exit-on-error'],
not_expected_cmd_opts=[],
expected_exit_code=[0, None]
)),
]
def setUp(self):
if self.server['default_binary_paths'] is None:
self.skipTest(
"default_binary_paths is not set for the server {0}".format(
self.server['name']
)
)
@patch('pgadmin.tools.restore.Server')
@patch('pgadmin.tools.restore.current_user')
@patch('pgadmin.tools.restore.RestoreMessage')
@patch('pgadmin.tools.restore.filename_with_file_manager_path')
@patch('pgadmin.tools.restore.BatchProcess')
@patch('pgadmin.utils.driver.psycopg2.server_manager.ServerManager.'
'export_password_env')
def runTest(self, export_password_env_mock, batch_process_mock,
filename_mock, restore_message_mock,
current_user_mock, server_mock):
class TestMockServer():
def __init__(self, name, host, port, id, username):
self.name = name
self.host = host
self.port = port
self.id = id
self.username = username
self.db_name = ''
self.server_id = parent_node_dict["server"][-1]["server_id"]
mock_obj = TestMockServer(self.class_params['name'],
self.class_params['host'],
self.class_params['port'],
self.server_id,
self.class_params['username']
)
mock_result = server_mock.query.filter_by.return_value
mock_result.first.return_value = mock_obj
filename_mock.return_value = self.params['file']
batch_process_mock.set_env_variables = MagicMock(
return_value=True
)
batch_process_mock.start = MagicMock(
return_value=True
)
export_password_env_mock.return_value = True
server_response = server_utils.connect_server(self, self.server_id)
if server_response["info"] == "Server connected.":
db_owner = server_response['data']['user']['name']
self.data = database_utils.get_db_data(db_owner)
self.db_name = self.data['name']
url = self.url.format(self.server_id)
# Create the restore job
response = self.tester.post(url,
data=json.dumps(self.params),
content_type='html/json')
self.assertEqual(response.status_code, 200)
self.assertTrue(restore_message_mock.called)
self.assertTrue(batch_process_mock.called)
if self.expected_cmd_opts:
for opt in self.expected_cmd_opts:
self.assertIn(
opt,
batch_process_mock.call_args_list[0][1]['args']
)
if self.not_expected_cmd_opts:
for opt in self.not_expected_cmd_opts:
self.assertNotIn(
opt,
batch_process_mock.call_args_list[0][1]['args']
)

View File

@ -0,0 +1,77 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import sys
from pgadmin.tools.restore import RestoreMessage
from pgadmin.utils.route import BaseTestGenerator
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class RestoreMessageTest(BaseTestGenerator):
"""Test the RestoreMessage class"""
scenarios = [
('When restore object',
dict(
class_params=dict(
sid=1,
name='test_restore_server',
port=5444,
host='localhost',
database='postgres',
bfile='test_restore',
args=[
'--file',
'restore_file',
'--host',
'localhost',
'--port',
'5444',
'--username',
'postgres',
'--no-password',
'--database',
'postgres'
],
cmd="/test_path/pg_restore"
),
extected_msg="Restoring backup on the server "
"'test_restore_server (localhost:5444)'...",
expetced_details_cmd='/test_path/pg_restore --file '
'"restore_file" --host "localhost"'
' --port "5444" --username "postgres" '
'--no-password --database "postgres"'
))
]
@patch('pgadmin.tools.restore.RestoreMessage.get_server_details')
def runTest(self, get_server_details_mock):
get_server_details_mock.return_value = \
self.class_params['name'],\
self.class_params['host'],\
self.class_params['port']
restore_obj = RestoreMessage(
self.class_params['sid'],
self.class_params['bfile'],
*self.class_params['args']
)
# Check the expected message returned
self.assertEqual(restore_obj.message, self.extected_msg)
# Check the command
obj_details = restore_obj.details(self.class_params['cmd'],
self.class_params['args'])
self.assertIn(self.expetced_details_cmd, obj_details)

View File

@ -21,6 +21,8 @@ import config
import regression import regression
from regression import test_setup from regression import test_setup
from pgadmin.utils.preferences import Preferences
SERVER_GROUP = test_setup.config_data['server_group'] SERVER_GROUP = test_setup.config_data['server_group']
file_name = os.path.realpath(__file__) file_name = os.path.realpath(__file__)
@ -68,7 +70,7 @@ def logout_tester_account(tester):
:return: None :return: None
""" """
response = tester.get('/logout') tester.get('/logout')
def get_config_data(): def get_config_data():
@ -86,7 +88,8 @@ def get_config_data():
"db_password": srv['db_password'], "db_password": srv['db_password'],
"role": "", "role": "",
"sslmode": srv['sslmode'], "sslmode": srv['sslmode'],
"tablespace_path": srv.get('tablespace_path', None) "tablespace_path": srv.get('tablespace_path', None),
"default_binary_paths": srv.get('default_binary_paths', None)
} }
if 'db_type' in srv: if 'db_type' in srv:
data['db_type'] = srv['db_type'] data['db_type'] = srv['db_type']
@ -444,7 +447,14 @@ def delete_server_with_api(tester, sid):
try: try:
url = '/browser/server/obj/' + str(SERVER_GROUP) + "/" url = '/browser/server/obj/' + str(SERVER_GROUP) + "/"
# Call API to delete the server # Call API to delete the server
response = tester.delete(url + str(sid)) tester.delete(url + str(sid))
cnt = 0
for s in regression.parent_node_dict["server"]:
if s['server_id'] == int(sid):
del regression.parent_node_dict["server"][cnt]
cnt += 1
except Exception: except Exception:
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)
@ -596,6 +606,64 @@ def get_db_server(sid):
return connection return connection
def set_preference(default_binary_path):
conn = sqlite3.connect(config.TEST_SQLITE_PATH)
cur = conn.cursor()
perf = Preferences.module('paths')
pg_path_pref = perf.preference('pg_bin_dir')
user_pref = cur.execute(
'SELECT pid, uid FROM user_preferences where pid=%s' % pg_path_pref.pid
)
user_pref = user_pref.fetchone()
if user_pref:
cur.execute('UPDATE user_preferences SET value = ? WHERE pid = ?',
(default_binary_path['pg'], pg_path_pref.pid))
else:
pg_pref_details = (pg_path_pref.pid, 1,
default_binary_path['pg'])
cur.execute('INSERT INTO user_preferences(pid, uid, value)'
' VALUES (?,?,?)', pg_pref_details)
ppas_path_pref = perf.preference('ppas_bin_dir')
user_pref = cur.execute(
'SELECT pid, uid FROM user_preferences where pid=%s' %
ppas_path_pref.pid
)
user_pref = user_pref.fetchone()
if user_pref:
cur.execute('UPDATE user_preferences SET value = ? WHERE pid = ? ',
(default_binary_path['ppas'], ppas_path_pref.pid))
else:
ppas_pref_details = (ppas_path_pref.pid, 1,
default_binary_path['ppas'])
cur.execute('INSERT INTO user_preferences(pid, uid, value)'
' VALUES (?,?,?)', ppas_pref_details)
gpdb_path_pref = perf.preference('gpdb_bin_dir')
user_pref = cur.execute(
'SELECT pid, uid FROM user_preferences where pid=%s' %
gpdb_path_pref.pid
)
user_pref = user_pref.fetchone()
if user_pref:
cur.execute('UPDATE user_preferences SET value = ? WHERE pid = ? ',
(default_binary_path['gpdb'], gpdb_path_pref.pid))
else:
gpdb_pref_details = (gpdb_path_pref.pid, 1,
default_binary_path['gpdb'])
cur.execute('INSERT INTO user_preferences(pid, uid, value)'
' VALUES (?,?,?)', gpdb_pref_details)
conn.commit()
def remove_db_file(): def remove_db_file():
"""This function use to remove SQLite DB file""" """This function use to remove SQLite DB file"""
if os.path.isfile(config.TEST_SQLITE_PATH): if os.path.isfile(config.TEST_SQLITE_PATH):

View File

@ -114,6 +114,9 @@ test_client = app.test_client()
driver = None driver = None
app_starter = None app_starter = None
handle_cleanup = None handle_cleanup = None
app.PGADMIN_RUNTIME = True
if config.SERVER_MODE is True:
app.PGADMIN_RUNTIME = False
setattr(unit_test.result.TestResult, "passed", []) setattr(unit_test.result.TestResult, "passed", [])
@ -234,7 +237,6 @@ def get_test_modules(arguments):
# Sort module list so that test suite executes the test cases sequentially # Sort module list so that test suite executes the test cases sequentially
module_list = TestsGeneratorRegistry.registry.items() module_list = TestsGeneratorRegistry.registry.items()
module_list = sorted(module_list, key=lambda module_tuple: module_tuple[0]) module_list = sorted(module_list, key=lambda module_tuple: module_tuple[0])
return module_list return module_list
@ -393,6 +395,9 @@ if __name__ == '__main__':
# Create test server # Create test server
server_information = test_utils.create_parent_server_node(server) server_information = test_utils.create_parent_server_node(server)
if server['default_binary_paths'] is not None:
test_utils.set_preference(server['default_binary_paths'])
suite = get_suite(test_module_list, suite = get_suite(test_module_list,
server, server,
test_client, test_client,

View File

@ -23,7 +23,12 @@
"maintenance_db": "postgres", "maintenance_db": "postgres",
"sslmode": "prefer", "sslmode": "prefer",
"tablespace_path": "", "tablespace_path": "",
"enabled": true "enabled": true,
"default_binary_paths": {
"pg": "/opt/PostgreSQL/9.4/bin/",
"ppas": "/opt/edb/as10/bin/",
"gpdb": ""
}
} }
], ],
"server_update_data": [ "server_update_data": [