Fix process execution. Fixes #1679. Fixes #2144.

Re-engineer the background process executor, to avoid using sqlite as some builds of
components it relies on do not support working in forked children.
This commit is contained in:
Ashesh Vashi 2017-02-04 15:26:57 +01:00 committed by Dave Page
parent 426ee40a71
commit b7c5039416
5 changed files with 625 additions and 377 deletions

View File

@ -15,7 +15,7 @@ from flask import url_for
from flask_babel import gettext as _
from flask_security import login_required
from pgadmin.utils import PgAdminModule
from pgadmin.utils.ajax import make_response, gone, bad_request, success_return
from pgadmin.utils.ajax import make_response, gone, success_return
from .processes import BatchProcess
@ -47,7 +47,6 @@ class BGProcessModule(PgAdminModule):
"""
return {
'bgprocess.index': url_for("bgprocess.index"),
'bgprocess.list': url_for("bgprocess.list"),
'seconds': _('seconds'),
'started': _('Started'),
'START_TIME': _('Start time'),
@ -55,7 +54,8 @@ class BGProcessModule(PgAdminModule):
'EXECUTION_TIME': _('Execution time'),
'running': _('Running...'),
'successfully_finished': _("Successfully completed."),
'failed_with_exit_code': _("Failed (exit code: %s).")
'failed_with_exit_code': _("Failed (exit code: %s)."),
'BG_TOO_MANY_LOGS': _("Too many logs generated!")
}
@ -65,14 +65,14 @@ blueprint = BGProcessModule(
)
@blueprint.route('/')
@blueprint.route('/', methods=['GET'])
@login_required
def index():
return bad_request(errormsg=_('This URL can not be called directly.'))
return make_response(response=BatchProcess.list())
@blueprint.route('/status/<pid>/', methods=['GET'])
@blueprint.route('/status/<pid>/<int:out>/<int:err>/', methods=['GET'])
@blueprint.route('/<pid>', methods=['GET'])
@blueprint.route('/<pid>/<int:out>/<int:err>/', methods=['GET'])
@login_required
def status(pid, out=-1, err=-1):
"""
@ -96,12 +96,7 @@ def status(pid, out=-1, err=-1):
return gone(errormsg=str(lerr))
@blueprint.route('/list/', methods=['GET'])
def list():
return make_response(response=BatchProcess.list())
@blueprint.route('/acknowledge/<pid>/', methods=['PUT'])
@blueprint.route('/<pid>', methods=['PUT'])
@login_required
def acknowledge(pid):
"""
@ -114,7 +109,7 @@ def acknowledge(pid):
Positive status
"""
try:
BatchProcess.acknowledge(pid, True)
BatchProcess.acknowledge(pid)
return success_return()
except LookupError as lerr:

View File

@ -23,66 +23,108 @@ This script will:
database.
Args:
process_id -- Process id
db_file -- Database file which holds list of processes to be executed
output_directory -- output directory
list of program and arguments passed to it.
It also depends on the following environment variable for proper execution.
PROCID - Process-id
OUTDIR - Output directory
"""
from __future__ import print_function, unicode_literals
# To make print function compatible with python2 & python3
import sys
import os
import argparse
import sqlite3
from datetime import datetime
from datetime import datetime, timedelta, tzinfo
from subprocess import Popen, PIPE
from threading import Thread
import csv
import pytz
import codecs
import signal
# SQLite3 needs all string as UTF-8
# We need to make string for Python2/3 compatible
if sys.version_info < (3,):
from cStringIO import StringIO
def log(msg):
if 'OUTDIR' not in os.environ:
return
with open(
os.path.join(os.environ['OUTDIR'], ('log_%s' % os.getpid())), 'a'
) as fp:
fp.write(('INFO:: %s\n' % str(msg)))
def u(x):
return x
else:
from io import StringIO
def log_exception():
if 'OUTDIR' not in os.environ:
return
type_, value_, traceback_ = info=sys.exc_info()
with open(
os.path.join(os.environ['OUTDIR'], ('log_%s' % os.getpid())), 'a'
) as fp:
from traceback import format_exception
res = ''.join(
format_exception(type_, value_, traceback_)
)
fp.write('EXCEPTION::\n{0}'.format(res))
return res
def u(x):
if hasattr(x, 'decode'):
return x.decode()
return x
IS_WIN = (os.name == 'nt')
ZERO = timedelta(0)
default_encoding = sys.getdefaultencoding() or "utf-8"
def usage():
# Copied the 'UTC' class from the 'pytz' package to allow to run this script
# without any external dependent library, and can be used with any python
# version.
class UTC(tzinfo):
"""UTC
Optimized UTC implementation. It unpickles using the single module global
instance defined beneath this class declaration.
"""
This function will display usage message.
zone = "UTC"
Args:
None
_utcoffset = ZERO
_dst = ZERO
_tzname = zone
Returns:
Displays help message
"""
def fromutc(self, dt):
if dt.tzinfo is None:
return self.localize(dt)
return super(UTC.__class__, self).fromutc(dt)
help_msg = """
Usage:
def utcoffset(self, dt):
return ZERO
executer.py [-h|--help]
[-p|--process] Process ID
[-d|--db_file] SQLite3 database file path
"""
print(help_msg)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
def localize(self, dt, is_dst=False):
'''Convert naive time to local time'''
if dt.tzinfo is not None:
raise ValueError('Not naive datetime (tzinfo is already set)')
return dt.replace(tzinfo=self)
def normalize(self, dt, is_dst=False):
'''Correct the timezone information on the given datetime'''
if dt.tzinfo is self:
return dt
if dt.tzinfo is None:
raise ValueError('Naive time - no tzinfo set')
return dt.astimezone(self)
def __repr__(self):
return "<UTC>"
def __str__(self):
return "UTC"
def get_current_time(format='%Y-%m-%d %H:%M:%S.%f %z'):
return datetime.utcnow().replace(
tzinfo=pytz.utc
tzinfo=UTC()
).strftime(format)
@ -93,35 +135,34 @@ class ProcessLogger(Thread):
Methods:
--------
* __init__(stream_type, configs)
* __init__(stream_type)
- This method is use to initlize the ProcessLogger class object
* logging(msg)
- This method is use to log messages in sqlite3 database
* log(msg)
- Log message in the orderly manner.
* run()
- Reads the stdout/stderr for messages and sent them to logger
"""
def __init__(self, stream_type, configs):
def __init__(self, stream_type):
"""
This method is use to initialize the ProcessLogger class object
Args:
stream_type: Type of STD (std)
configs: Process details dict
Returns:
None
"""
Thread.__init__(self)
self.configs = configs
self.process = None
self.stream = None
self.encoding = default_encoding
self.logger = codecs.open(
os.path.join(
configs['output_directory'], stream_type
), 'w', "utf-8"
os.environ['OUTDIR'], stream_type
), 'w', self.encoding, "ignore"
)
def attach_process_stream(self, process, stream):
@ -153,7 +194,8 @@ class ProcessLogger(Thread):
if msg:
self.logger.write(
str('{0},{1}').format(
get_current_time(format='%Y%m%d%H%M%S%f'), u(msg)
get_current_time(format='%y%m%d%H%M%S%f'),
msg.decode(self.encoding, 'replace')
)
)
return True
@ -176,44 +218,7 @@ class ProcessLogger(Thread):
self.logger = None
def read_configs(data):
"""
This reads SQLite3 database and fetches process details
Args:
data - configuration details
Returns:
Process details fetched from database as a dict
"""
if data.db_file is not None and data.process_id is not None:
conn = sqlite3.connect(data.db_file)
c = conn.cursor()
t = (data.process_id,)
c.execute('SELECT command, arguments FROM process WHERE \
exit_code is NULL \
AND pid=?', t)
row = c.fetchone()
conn.close()
if row and len(row) > 1:
configs = {
'pid': data.process_id,
'cmd': row[0],
'args': row[1],
'output_directory': data.output_directory,
'db_file': data.db_file
}
return configs
else:
return None
else:
raise ValueError("Please verify pid and db_file arguments.")
def update_configs(kwargs):
def update_status(**kw):
"""
This function will updates process stats
@ -223,166 +228,268 @@ def update_configs(kwargs):
Returns:
None
"""
if 'db_file' in kwargs and 'pid' in kwargs:
conn = sqlite3.connect(kwargs['db_file'])
sql = 'UPDATE process SET '
params = list()
import json
for param in ['start_time', 'end_time', 'exit_code']:
if param in kwargs:
sql += (',' if len(params) else '') + param + '=? '
params.append(kwargs[param])
if len(params) == 0:
return
sql += 'WHERE pid=?'
params.append(kwargs['pid'])
with conn:
c = conn.cursor()
c.execute(sql, params)
conn.commit()
# Commit & close cursor
conn.close()
if os.environ['OUTDIR']:
status = {
k: v for k, v in kw.items() if k in [
'start_time', 'end_time', 'exit_code', 'pid'
]
}
log('Updating the status:\n{0}'.format(json.dumps(status)))
with open(os.path.join(os.environ['OUTDIR'], 'status'), 'w') as fp:
json.dump(status, fp)
else:
raise ValueError("Please verify pid and db_file arguments.")
def execute(configs):
def execute():
"""
This function will execute the background process
Args:
configs: Process configuration details
Returns:
None
"""
if configs is not None:
command = [configs['cmd']]
args_csv = StringIO(configs['args'])
args_reader = csv.reader(args_csv, delimiter=str(','))
for args in args_reader:
command = command + args
args = {
'pid': configs['pid'],
'db_file': configs['db_file']
}
command = sys.argv[1:]
args = dict()
log('Initialize the process execution: {0}'.format(command))
# Create seprate thread for stdout and stderr
process_stdout = ProcessLogger('out')
process_stderr = ProcessLogger('err')
process = None
try:
# update start_time
args.update({
'start_time': get_current_time(),
'stdout': process_stdout.log,
'stderr': process_stderr.log,
'pid': os.getpid()
})
# Update start time
update_status(**args)
log('Status updated...')
if 'PROCID' in os.environ and os.environ['PROCID'] in os.environ:
os.environ['PGPASSWORD'] = os.environ[os.environ['PROCID']]
kwargs = dict()
kwargs['close_fds'] = False
kwargs['shell'] = True if IS_WIN else False
# We need environment variables & values in string
log('Converting the environment variable in the bytes format...')
kwargs['env'] = convert_environment_variables(os.environ.copy())
log('Starting the command execution...')
process = Popen(
command, stdout=PIPE, stderr=PIPE, stdin=None, **kwargs
)
log('Attaching the loggers to stdout, and stderr...')
# Attach the stream to the process logger, and start logging.
process_stdout.attach_process_stream(process, process.stdout)
process_stdout.start()
process_stderr.attach_process_stream(process, process.stderr)
process_stderr.start()
# Join both threads together
process_stdout.join()
process_stderr.join()
log('Waiting for the process to finish...')
# Child process return code
exitCode = process.wait()
if exitCode is None:
exitCode = process.poll()
log('Process exited with code: {0}'.format(exitCode))
args.update({'exit_code': exitCode})
# Add end_time
args.update({'end_time': get_current_time()})
# Fetch last output, and error from process if it has missed.
data = process.communicate()
if data:
if data[0]:
process_stdout.log(data[0])
if data[1]:
process_stderr.log(data[1])
# If executable not found or invalid arguments passed
except OSError:
info = log_exception()
args.update({'exit_code': 500})
if process_stderr:
process_stderr.log(info)
else:
print("WARNING: ", e.strerror, file=sys.stderr)
args.update({'end_time': get_current_time()})
args.update({'exit_code': e.errno})
# Unknown errors
except Exception:
info = log_exception()
args.update({'exit_code': 501})
if process_stderr:
process_stderr.log(info)
else:
print("WARNING: ", str(e), file=sys.stderr)
args.update({'end_time': get_current_time()})
args.update({'exit_code': -1})
finally:
# Update the execution end_time, and exit-code.
update_status(**args)
log('Exiting the process executor...')
if process_stderr:
process_stderr.release()
if process_stdout:
process_stdout.release()
log('Bye!')
# Let's ignore all the signal comming to us.
def signal_handler(signal, msg):
pass
def convert_environment_variables(env):
"""
This function is use to convert environment variable to string
because environment variable must be string in popen
:param env: Dict of environment variable
:return: Encoded environment variable as string
"""
temp_env = dict()
for key, value in env.items():
try:
reload(sys)
sys.setdefaultencoding('utf8')
except:
pass
# Create seprate thread for stdout and stderr
process_stdout = ProcessLogger('out', configs)
process_stderr = ProcessLogger('err', configs)
try:
# update start_time
args.update({
'start_time': get_current_time(),
'stdout': process_stdout.log,
'stderr': process_stderr.log
})
# Update start time
update_configs(args)
if args['pid'] in os.environ:
os.environ['PGPASSWORD'] = os.environ[args['pid']]
process = Popen(
command, stdout=PIPE, stderr=PIPE, stdin=PIPE,
shell=(os.name == 'nt'), close_fds=(os.name != 'nt')
)
try:
del (os.environ['PGPASSWORD'])
except:
pass
# Attach the stream to the process logger, and start logging.
process_stdout.attach_process_stream(process, process.stdout)
process_stdout.start()
process_stderr.attach_process_stream(process, process.stderr)
process_stderr.start()
# Join both threads together
process_stdout.join()
process_stderr.join()
# Child process return code
exitCode = process.wait()
if exitCode is None:
exitCode = process.poll()
args.update({'exit_code': exitCode})
# Add end_time
args.update({'end_time': get_current_time()})
# Fetch last output, and error from process if it has missed.
data = process.communicate()
if data:
if data[0]:
process_stdout.log(data[0])
if data[1]:
process_stderr.log(data[1])
# If executable not found or invalid arguments passed
except OSError as e:
if process_stderr:
process_stderr.log(e.strerror)
else:
print("WARNING: ", e.strerror, file=sys.stderr)
args.update({'end_time': get_current_time()})
args.update({'exit_code': e.errno})
# Unknown errors
if not isinstance(key, str):
key = key.encode(default_encoding)
if not isinstance(value, str):
value = value.encode(default_encoding)
temp_env[key] = value
except Exception as e:
if process_stderr:
process_stderr.log(str(e))
else:
print("WARNING: ", str(e), file=sys.stderr)
args.update({'end_time': get_current_time()})
args.update({'exit_code': -1})
finally:
# Update the execution end_time, and exit-code.
update_configs(args)
if process_stderr:
process_stderr.release()
process_stderr = None
if process_stdout:
process_stdout.release()
process_stdout = None
else:
raise ValueError("Please verify process configs.")
log_exception()
return temp_env
if __name__ == '__main__':
# Read command line arguments
parser = argparse.ArgumentParser(
description='Process executor for pgAdmin 4'
)
parser.add_argument(
'-p', '--process_id', help='Process ID', required=True
)
parser.add_argument(
'-d', '--db_file', help='Configuration Database', required=True
)
parser.add_argument(
'-o', '--output_directory',
help='Location where the logs will be created', required=True
)
args = parser.parse_args()
log('Starting the process executor...')
# Fetch bakcground process details from SQLite3 database file
configs = read_configs(args)
# Ignore any signals
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
log('Disabled the SIGINT, SIGTERM signals...')
# Execute the background process
execute(configs)
if IS_WIN:
log('Disable the SIGBREAKM signal (windows)...')
signal.signal(signal.SIGBREAK, signal_handler)
log('Disabled the SIGBREAKM signal (windows)...')
# For windows:
# We would run the process_executor in the detached mode again to make
# the child process to run as a daemon. And, it would run without
# depending on the status of the web-server.
if 'PGA_BGP_FOREGROUND' in os.environ and \
os.environ['PGA_BGP_FOREGROUND'] == "1":
log('[CHILD] Start process execution...')
log('Executing the command now from the detached child...')
# This is a child process running as the daemon process.
# Let's do the job assing to it.
execute()
else:
from subprocess import CREATE_NEW_PROCESS_GROUP
DETACHED_PROCESS = 0x00000008
# Forward the standard input, output, and error stream to the
# 'devnull'.
stdin = open(os.devnull, "r")
stdout = open(os.devnull, "a")
stderr = open(os.devnull, "a")
env = os.environ.copy()
env['PGA_BGP_FOREGROUND'] = "1"
# We need environment variables & values in string
log('[PARENT] Converting the environment variable in the bytes format...')
try:
env = convert_environment_variables(env)
except Exception as e:
log_exception()
kwargs = {
'stdin': stdin.fileno(),
'stdout': stdout.fileno(),
'stderr': stderr.fileno(),
'creationflags': CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS,
'close_fds': False,
'cwd': os.environ['OUTDIR'],
'env': env
}
cmd = [sys.executable]
cmd.extend(sys.argv)
log('[PARENT] Command executings: {0}'.format(cmd))
p = Popen(cmd, **kwargs)
exitCode = p.poll()
if exitCode is not None:
log(
'[PARENT] Child exited with exit-code#{0}...'.format(
exitCode
)
)
else:
log('[PARENT] Started the child with PID#{0}'.format(p.pid))
# Question: Should we wait for sometime?
# Answer: Looks the case...
from time import sleep
sleep(2)
log('[PARENT] Exiting...')
sys.exit(0)
else:
r, w = os.pipe()
# For POSIX:
# We will fork the process, and run the child process as daemon, and
# let it do the job.
if os.fork() == 0:
log('[CHILD] Forked the child process...')
# Hmm... So - I need to do the job now...
try:
os.close(r)
log('[CHILD] Make the child process leader...')
# Let me be the process leader first.
os.setsid()
os.umask(0)
log('[CHILD] Make the child process leader...')
w = os.fdopen(w, 'w')
# Let me inform my parent - I will do the job, do not worry
# now, and die peacefully.
log('[CHILD] Inform parent about successful child forking...')
w.write('1')
w.close()
log('[CHILD] Start executing the background process...')
execute()
except Exception:
sys.exit(1)
else:
os.close(w)
r = os.fdopen(r)
# I do not care, what the child send.
r.read()
log('[PARENT] Got message from the child...')
r.close()
log('[PARENT] Exiting...')
sys.exit(0)

View File

@ -19,11 +19,11 @@ import sys
from abc import ABCMeta, abstractproperty, abstractmethod
from datetime import datetime
from pickle import dumps, loads
from subprocess import Popen, PIPE
from subprocess import Popen
import pytz
from dateutil import parser
from flask import current_app as app
from flask import current_app
from flask_babel import gettext as _
from flask_security import current_user
@ -154,21 +154,52 @@ class BatchProcess(object):
self.ecode = None
# Arguments
self.args = _args
args_csv_io = StringIO()
csv_writer = csv.writer(
args_csv_io, delimiter=str(','), quoting=csv.QUOTE_MINIMAL
)
csv_writer.writerow(_args)
self.args = args_csv_io.getvalue().strip(str('\r\n'))
j = Process(
pid=int(id), command=_cmd, arguments=self.args, logdir=log_dir,
desc=dumps(self.desc), user_id=current_user.id
pid=int(id), command=_cmd,
arguments=args_csv_io.getvalue().strip(str('\r\n')),
logdir=log_dir, desc=dumps(self.desc), user_id=current_user.id
)
db.session.add(j)
db.session.commit()
def start(self):
def which(program, paths):
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
for path in paths:
if not os.path.isdir(path):
continue
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def convert_environment_variables(env):
"""
This function is use to convert environment variable to string
because environment variable must be string in popen
:param env: Dict of environment variable
:return: Encoded environment variable as string
"""
encoding = sys.getdefaultencoding()
temp_env = dict()
for key, value in env.items():
if not isinstance(key, str):
key = key.encode(encoding)
if not isinstance(value, str):
value = value.encode(encoding)
temp_env[key] = value
return temp_env
if self.stime is not None:
if self.etime is None:
raise Exception(_('The process has already been started.'))
@ -179,21 +210,63 @@ class BatchProcess(object):
executor = os.path.join(
os.path.dirname(__file__), 'process_executor.py'
)
paths = sys.path[:]
interpreter = None
if os.name == 'nt':
paths.insert(0, os.path.join(sys.prefix, 'Scripts'))
paths.insert(0, os.path.join(sys.prefix))
interpreter = which('pythonw.exe', paths)
if interpreter is None:
interpreter = which('python.exe', paths)
else:
paths.insert(0, os.path.join(sys.prefix, 'bin'))
interpreter = which('python', paths)
p = None
cmd = [
(sys.executable if not app.PGADMIN_RUNTIME else
'pythonw.exe' if os.name == 'nt' else 'python'),
executor,
'-p', self.id,
'-o', self.log_dir,
'-d', config.SQLITE_PATH
interpreter if interpreter is not None else 'python',
executor, self.cmd
]
cmd.extend(self.args)
command = []
for c in cmd:
command.append(str(c))
current_app.logger.info(
"Executing the process executor with the arguments: %s",
' '.join(command)
)
cmd = command
# Make a copy of environment, and add new variables to support
env = os.environ.copy()
env['PROCID'] = self.id
env['OUTDIR'] = self.log_dir
env['PGA_BGP_FOREGROUND'] = "1"
# We need environment variables & values in string
env = convert_environment_variables(env)
if os.name == 'nt':
DETACHED_PROCESS = 0x00000008
from subprocess import CREATE_NEW_PROCESS_GROUP
# We need to redirect the standard input, standard output, and
# standard error to devnull in order to allow it start in detached
# mode on
stdout = os.devnull
stderr = stdout
stdin = open(os.devnull, "r")
stdout = open(stdout, "a")
stderr = open(stderr, "a")
p = Popen(
cmd, stdout=None, stderr=None, stdin=None, close_fds=True,
shell=False, creationflags=0x00000008
cmd, close_fds=False, env=env, stdout=stdout.fileno(),
stderr=stderr.fileno(), stdin=stdin.fileno(),
creationflags=(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS)
)
else:
def preexec_function():
@ -204,15 +277,19 @@ class BatchProcess(object):
signal.signal(signal.SIGINT, signal.SIG_IGN)
p = Popen(
cmd, stdout=PIPE, stderr=None, stdin=None, close_fds=True,
shell=False, preexec_fn=preexec_function
cmd, close_fds=True, stdout=None, stderr=None, stdin=None,
preexec_fn=preexec_function, env=env
)
self.ecode = p.poll()
if self.ecode is not None and self.ecode != 0:
# TODO:// Find a way to read error from detached failed process
# Couldn't start execution
# Execution completed immediately.
# Process executor can not update the status, if it was not able to
# start properly.
if self.ecode is not None and self.ecode != 0:
# There is no way to find out the error message from this process
# as standard output, and standard error were redirected to
# devnull.
p = Process.query.filter_by(
pid=self.id, user_id=current_user.id
).first()
@ -222,54 +299,49 @@ class BatchProcess(object):
db.session.commit()
def status(self, out=0, err=0):
import codecs
import re
ctime = get_current_time(format='%Y%m%d%H%M%S%f')
stdout = []
stderr = []
out_completed = err_completed = False
process_output = (out != -1 and err != -1)
enc = sys.getdefaultencoding()
def read_log(logfile, log, pos, ctime, check=True):
def read_log(logfile, log, pos, ctime):
completed = True
lines = 0
idx = 0
c = re.compile(r"(\d+),(.*$)")
if not os.path.isfile(logfile):
return 0, False
with codecs.open(logfile, 'r', 'utf-8') as stream:
stream.seek(pos)
for line in stream:
logtime = StringIO()
idx = 0
for c in line:
idx += 1
if c == ',':
break
logtime.write(c)
logtime = logtime.getvalue()
if check and logtime > ctime:
with open(logfile, 'rb') as f:
eofs = os.fstat(f.fileno()).st_size
f.seek(pos, 0)
while pos < eofs:
idx += 1
line = f.readline()
line = line.decode(enc, 'replace')
r = c.split(line)
if r[1] > ctime:
completed = False
break
if lines == 5120:
ctime = logtime
log.append([r[1], r[2]])
pos = f.tell()
if idx == 1024:
completed = False
break
lines += 1
log.append([logtime, line[idx:]])
pos = stream.tell()
if pos == eofs:
completed = True
break
return pos, completed
if process_output:
out, out_completed = read_log(
self.stdout, stdout, out, ctime, True
)
err, err_completed = read_log(
self.stderr, stderr, err, ctime, True
)
out, out_completed = read_log(self.stdout, stdout, out, ctime)
err, err_completed = read_log(self.stderr, stderr, err, ctime)
j = Process.query.filter_by(
pid=self.id, user_id=current_user.id
@ -278,6 +350,9 @@ class BatchProcess(object):
execution_time = None
if j is not None:
status, updated = BatchProcess.update_process_info(j)
if updated:
db.session.commit()
self.stime = j.start_time
self.etime = j.end_time
self.ecode = j.exit_code
@ -289,19 +364,16 @@ class BatchProcess(object):
execution_time = (etime - stime).total_seconds()
if process_output and self.ecode is not None and (
len(stdout) + len(stderr) < 3073
len(stdout) + len(stderr) < 1024
):
out, out_completed = read_log(
self.stdout, stdout, out, ctime, False
)
err, err_completed = read_log(
self.stderr, stderr, err, ctime, False
)
out, out_completed = read_log(self.stdout, stdout, out, ctime)
err, err_completed = read_log(self.stderr, stderr, err, ctime)
else:
out_completed = err_completed = False
if out == -1 or err == -1:
return {
'start_time': self.stime,
'exit_code': self.ecode,
'execution_time': execution_time
}
@ -309,18 +381,67 @@ class BatchProcess(object):
return {
'out': {'pos': out, 'lines': stdout, 'done': out_completed},
'err': {'pos': err, 'lines': stderr, 'done': err_completed},
'start_time': self.stime,
'exit_code': self.ecode,
'execution_time': execution_time
}
@staticmethod
def update_process_info(p):
if p.start_time is None or p.end_time is None:
status = os.path.join(p.logdir, 'status')
if not os.path.isfile(status):
return False, False
with open(status, 'r') as fp:
import json
try:
data = json.load(fp)
# First - check for the existance of 'start_time'.
if 'start_time' in data and data['start_time']:
p.start_time = data['start_time']
# We can't have 'exit_code' without the 'start_time'
if 'exit_code' in data and \
data['exit_code'] is not None:
p.exit_code = data['exit_code']
# We can't have 'end_time' without the 'exit_code'.
if 'end_time' in data and data['end_time']:
p.end_time = data['end_time']
return True, True
except ValueError as e:
current_app.logger.warning(
_("Status for the background process '{0}' couldn't be loaded!").format(
p.pid
)
)
current_app.logger.exception(e)
return False, False
return True, False
@staticmethod
def list():
processes = Process.query.filter_by(user_id=current_user.id)
changed = False
res = []
for p in processes:
if p.start_time is None or p.acknowledge is not None:
status, updated = BatchProcess.update_process_info(p)
if not status:
continue
if not changed:
changed = updated
if p.start_time is None or (
p.acknowledge is not None and p.end_time is None
):
continue
execution_time = None
stime = parser.parse(p.start_time)
@ -350,10 +471,20 @@ class BatchProcess(object):
'execution_time': execution_time
})
if changed:
db.session.commit()
return res
@staticmethod
def acknowledge(_pid, _release):
def acknowledge(_pid):
"""
Acknowledge from the user, he/she has alredy watched the status.
Update the acknowledgement status, if the process is still running.
And, delete the process information from the configuration, and the log
files related to the process, if it has already been completed.
"""
p = Process.query.filter_by(
user_id=current_user.id, pid=_pid
).first()
@ -363,33 +494,12 @@ class BatchProcess(object):
_("Could not find a process with the specified ID.")
)
if _release:
import shutil
shutil.rmtree(p.logdir, True)
if p.end_time is not None:
logdir = p.logdir
db.session.delete(p)
import shutil
shutil.rmtree(logdir, True)
else:
p.acknowledge = get_current_time()
db.session.commit()
@staticmethod
def release(pid=None):
import shutil
processes = None
if pid is not None:
processes = Process.query.filter_by(
user_id=current_user.id, pid=pid
)
else:
processes = Process.query.filter_by(
user_id=current_user.id,
acknowledge=None
)
if processes:
for p in processes:
shutil.rmtree(p.logdir, True)
db.session.delete(p)
db.session.commit()

View File

@ -20,6 +20,7 @@
margin-top: 0px;
margin-bottom: 5px;
padding: 5px;
padding-right: 20px;
white-space: pre-wrap;
text-align: center;
border-top-left-radius: 5px;
@ -165,3 +166,28 @@ ol.pg-bg-process-logs {
.bg-process-footer .bg-process-exec-time {
padding-right: 0;
}
.pg-bg-bgprocess .ajs-commands {
right: -13px;
top: 2px;
opacity: 0.5;
}
.pg-bg-bgprocess .bg-close {
display: inline-block;
position: absolute;
height: 25px;
width: 25px;
right: -12px;
top: 3px;
padding: 2px;
border: 2px solid #1f5fa6;
border-radius: 4px;
opacity: 0.5;
background-color: white;
color: red;
}
.pg-bg-bgprocess:hover .bg-close {
opacity: 0.95;
}

View File

@ -1,6 +1,7 @@
define(
['underscore', 'underscore.string', 'jquery', 'pgadmin.browser', 'alertify', 'pgadmin.browser.messages'],
function(_, S, $, pgBrowser, alertify, pgMessages) {
define([
'pgadmin', 'underscore', 'underscore.string', 'jquery', 'pgadmin.browser',
'alertify', 'pgadmin.browser.messages'
], function(pgAdmin, _, S, $, pgBrowser, alertify, pgMessages) {
pgBrowser.BackgroundProcessObsorver = pgBrowser.BackgroundProcessObsorver || {};
@ -34,8 +35,9 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
exit_code: null,
acknowledge: info['acknowledge'],
execution_time: null,
out: null,
err: null,
out: -1,
err: -1,
lot_more: false,
notifier: null,
container: null,
@ -69,23 +71,16 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
},
url: function(type) {
var base_url = pgMessages['bgprocess.index'],
url = base_url;
var url = S('%s%s').sprintf(pgMessages['bgprocess.index'], this.id).value();
switch (type) {
case 'status':
url = S('%sstatus/%s/').sprintf(base_url, this.id).value();
if (this.details) {
url = S('%s%s/%s/').sprintf(
url, (this.out && this.out.pos) || 0,
(this.err && this.err.pos) || 0
if (this.details && this.out != -1 && this.err != -1) {
url = S('%s/%s/%s/').sprintf(
url, this.out, this.err
).value();
}
break;
case 'info':
case 'acknowledge':
url = S('%s%s/%s/').sprintf(base_url, type, this.id).value();
break;
}
return url;
@ -114,53 +109,49 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
if ('out' in data) {
self.out = data.out && data.out.pos;
self.completed = data.out.done;
if (data.out && data.out.lines) {
data.out.lines.sort(function(a, b) { return a[0] < b[0]; });
out = data.out.lines;
}
}
if ('err' in data) {
self.err = data.err && data.err.pos;
self.completed = (self.completed && data.err.done);
if (data.err && data.err.lines) {
data.err.lines.sort(function(a, b) { return a[0] < b[0]; });
err = data.err.lines;
}
}
self.completed = self.completed || (
'err' in data && 'out' in data && data.err.done && data.out.done
) || (
!self.details && !_.isNull(self.exit_code)
);
var io = ie = 0;
var io = ie = 0, res = [],
escapeEl = document.createElement('textarea'),
escapeHTML = function(html) {
escapeEl.textContent = html;
return escapeEl.innerHTML;
};
while (io < out.length && ie < err.length &&
self.logs[0].children.length < 5120) {
if (out[io][0] < err[ie][0]){
self.logs.append(
$('<li></li>', {class: 'pg-bg-res-out'}).text(out[io++][1])
);
while (io < out.length && ie < err.length) {
if (pgAdmin.natural_sort(out[io][0], err[ie][0]) <= 0){
res.push('<li class="pg-bg-res-out">' + escapeHTML(out[io++][1]) + '</li>');
} else {
self.logs.append(
$('<li></li>', {class: 'pg-bg-res-err'}).text(err[ie++][1])
);
res.push('<li class="pg-bg-res-err">' + escapeHTML(err[ie++][1]) + '</li>');
}
}
while (io < out.length && self.logs[0].children.length < 5120) {
self.logs.append(
$('<li></li>', {class: 'pg-bg-res-out'}).text(out[io++][1])
);
while (io < out.length) {
res.push('<li class="pg-bg-res-out">' + escapeHTML(out[io++][1]) + '</li>');
}
while (ie < err.length && self.logs[0].children.length < 5120) {
self.logs.append(
$('<li></li>', {class: 'pg-bg-res-err'}).text(err[ie++][1])
);
while (ie < err.length) {
res.push('<li class="pg-bg-res-err">' + escapeHTML(err[ie++][1]) + '</li>');
}
if (self.logs[0].children.length >= 5120) {
self.completed = true;
if (res.length) {
self.logs.append(res.join(''));
}
if (self.stime) {
@ -197,7 +188,7 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
setTimeout(function() {self.show.apply(self)}, 10);
}
if (self.state != 2 || (self.details && !self.completed)) {
if (!self.completed) {
setTimeout(
function() {
self.status.apply(self);
@ -232,12 +223,11 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
if (self.notify && !self.details) {
if (!self.notifier) {
var content = $('<div class="pg-bg-bgprocess row"></div>').append(
$('<div></div>', {
class: "h5 pg-bg-notify-header"
}).text(
self.desc
)
var header = $('<div></div>', {
class: "h5 pg-bg-notify-header"
}).append($('<span></span>').text(self.desc)),
content = $('<div class="pg-bg-bgprocess row"></div>').append(
header
).append(
$('<div></div>', {class: 'pg-bg-notify-body h6' }).append(
$('<div></div>', {class: 'pg-bg-start col-xs-12' }).append(
@ -249,12 +239,17 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
),
for_details = $('<div></div>', {
class: "col-xs-12 text-center pg-bg-click h6"
}).text(pgMessages.CLICK_FOR_DETAILED_MSG).appendTo(content),
}).append(
$('<span></span>').text(pgMessages.CLICK_FOR_DETAILED_MSG)
).appendTo(content),
status = $('<div></div>', {
class: "pg-bg-status col-xs-12 h5 " + ((self.exit_code === 0) ?
'bg-success': (self.exit_code == 1) ?
'bg-failed' : '')
}).appendTo(content);
}).appendTo(content),
close_me = $(
'<div class="bg-close"><i class="fa fa-close"></i></div>'
).appendTo(header);
self.container = content;
self.notifier = alertify.notify(
@ -268,10 +263,17 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
this.notifier.dismiss();
this.notifier = null;
this.completed = false;
this.show_detailed_view.apply(this);
}.bind(self));
close_me.on('click', function(ev) {
this.notifier.dismiss();
this.notifier = null;
this.acknowledge_server.apply(this);
}.bind(this));
// Do not close the notifier, when clicked on the container, which
// is a default behaviour.
content.on('click', function(ev) {
@ -351,6 +353,8 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
if (is_new) {
self.details = true;
self.err = 0;
self.out = 0;
setTimeout(
function() {
self.status.apply(self);
@ -419,28 +423,26 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
function() {
setTimeout(
function() {
pgBrowser.BackgroundProcessObsorver.update_process_list();
pgBrowser.BackgroundProcessObsorver.update_process_list(true);
}, 1000
);
}
)
},
update_process_list: function() {
update_process_list: function(recheck) {
var observer = this;
$.ajax({
typs: 'GET',
timeout: 30000,
url: pgMessages['bgprocess.list'],
url: pgMessages['bgprocess.index'],
cache: false,
async: true,
contentType: "application/json",
success: function(res) {
if (!res) {
// FIXME::
// Do you think - we should call the list agains after some
// interval?
var cnt = 0;
if (!res || !_.isArray(res)) {
return;
}
for (idx in res) {
@ -451,6 +453,14 @@ function(_, S, $, pgBrowser, alertify, pgMessages) {
}
}
}
if (recheck && res.length == 0) {
// Recheck after some more time
setTimeout(
function() {
observer.update_process_list(false);
}, 3000
);
}
},
error: function(res) {
// FIXME:: What to do now?