Further Python 2.6 fixes.

This commit is contained in:
Khushboo Vashi 2018-06-15 15:03:53 +01:00 committed by Dave Page
parent ab54a6d39a
commit ba8829b64f
4 changed files with 24 additions and 5 deletions

View File

@ -463,7 +463,7 @@ class BatchProcess(object):
stime = parser.parse(self.stime)
etime = parser.parse(self.etime or get_current_time())
execution_time = (etime - stime).total_seconds()
execution_time = BatchProcess.total_seconds(etime - stime)
if process_output:
out, out_completed = read_log(
@ -558,7 +558,7 @@ class BatchProcess(object):
stime = parser.parse(p.start_time)
etime = parser.parse(p.end_time or get_current_time())
execution_time = (etime - stime).total_seconds()
execution_time = BatchProcess.total_seconds(etime - stime)
desc = ""
try:
desc = loads(p.desc.encode('latin-1')) if \
@ -600,6 +600,16 @@ class BatchProcess(object):
return res
@staticmethod
def total_seconds(dt):
# Keep backward compatibility with Python 2.6 which doesn't have
# this method
if hasattr(dt, 'total_seconds'):
return dt.total_seconds()
else:
return (dt.microseconds + (dt.seconds + dt.days * 24 * 3600) *
10**6) / 10**6
@staticmethod
def acknowledge(_pid):
"""

View File

@ -119,7 +119,10 @@ class BatchProcessTest(BaseTestGenerator):
current_app_mock.PGADMIN_RUNTIME = False
def db_session_add_mock(j):
cmd_obj = loads(j.desc)
if sys.version_info < (2, 7):
cmd_obj = loads(str(j.desc))
else:
cmd_obj = loads(j.desc)
self.assertTrue(isinstance(cmd_obj, IProcessDesc))
self.assertEqual(cmd_obj.backup_type, self.class_params['type'])
self.assertEqual(cmd_obj.bfile, self.class_params['bfile'])

View File

@ -73,7 +73,10 @@ class BatchProcessTest(BaseTestGenerator):
self.port = port
def db_session_add_mock(j):
cmd_obj = loads(j.desc)
if sys.version_info < (2, 7):
cmd_obj = loads(str(j.desc))
else:
cmd_obj = loads(j.desc)
self.assertTrue(isinstance(cmd_obj, IProcessDesc))
self.assertEqual(cmd_obj.query, self.class_params['cmd'])
self.assertEqual(cmd_obj.message, self.expected_msg)

View File

@ -64,7 +64,10 @@ class BatchProcessTest(BaseTestGenerator):
current_app_mock.PGADMIN_RUNTIME = False
def db_session_add_mock(j):
cmd_obj = loads(j.desc)
if sys.version_info < (2, 7):
cmd_obj = loads(str(j.desc))
else:
cmd_obj = loads(j.desc)
self.assertTrue(isinstance(cmd_obj, IProcessDesc))
self.assertEqual(cmd_obj.bfile, self.class_params['bfile'])
self.assertEqual(cmd_obj.cmd,