pgadmin4/web/pgadmin/misc/bgprocess/process_executor.py
Ashesh Vashi f682f06c94 Adding a background process executor, and observer.
We will be using the external utilities like pg_dump, pg_dumpall,
pg_restore in background. pgAdmin 4 can be run as a CGI script, hence -
it is not good idea to run those utility in a controlled environment.
The process executor will run them in background, and we will execute
the process executor in detached mode.

Now that - the process executor runs in detached mode, we need an
observer, which will look at the status of the processes. It also reads
output, and error logs on demand.

Thanks - Surinder for helping in some of the UI changes.
2016-05-13 08:49:51 +05:30

376 lines
9.8 KiB
Python

# -*- coding: utf-8 -*-
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL License
#
##########################################################################
"""
This python script is responsible for executing a process, and logs its output,
and error in the given output directory.
We will create a detached process, which executes this script.
This script will:
* Fetch the configuration from the given database.
* Run the given executable specified in the configuration with the arguments.
* Create log files for both stdout, and stdout.
* Update the start time, end time, exit code, etc in the configuration
database.
Args:
process_id -- Process id
db_file -- Database file which holds list of processes to be executed
output_directory -- output directory
"""
from __future__ import print_function, unicode_literals
# To make print function compatible with python2 & python3
import sys
import os
import argparse
import sqlite3
from datetime import datetime
from subprocess import Popen, PIPE
from threading import Thread
import csv
import pytz
import codecs
# SQLite3 needs all string as UTF-8
# We need to make string for Python2/3 compatible
if sys.version_info < (3,):
from io import StringIO
def u(x):
return codecs.unicode_escape_decode(x)[0]
else:
from cStringIO import StringIO
def u(x):
return x
def usage():
"""
This function will display usage message.
Args:
None
Returns:
Displays help message
"""
help_msg = """
Usage:
executer.py [-h|--help]
[-p|--process] Process ID
[-d|--db_file] SQLite3 database file path
"""
print(help_msg)
def get_current_time(format='%Y-%m-%d %H:%M:%S.%f %z'):
return datetime.utcnow().replace(
tzinfo=pytz.utc
).strftime(format)
class ProcessLogger(Thread):
"""
This class definition is responsible for capturing & logging
stdout & stderr messages from subprocess
Methods:
--------
* __init__(stream_type, configs)
- This method is use to initlize the ProcessLogger class object
* logging(msg)
- This method is use to log messages in sqlite3 database
* run()
- Reads the stdout/stderr for messages and sent them to logger
"""
def __init__(self, stream_type, configs):
"""
This method is use to initialize the ProcessLogger class object
Args:
stream_type: Type of STD (std)
configs: Process details dict
Returns:
None
"""
Thread.__init__(self)
self.configs = configs
self.process = None
self.stream = None
self.logger = codecs.open(
os.path.join(
configs['output_directory'], stream_type
), 'w', "utf-8"
)
def attach_process_stream(self, process, stream):
"""
This function will attach a process and its stream with this thread.
Args:
process: Process
stream: Stream attached with the process
Returns:
None
"""
self.process = process
self.stream = stream
def log(self, msg):
"""
This function will update log file
Args:
msg: message
Returns:
None
"""
# Write into log file
if self.logger:
if msg:
self.logger.write(
str('{0},{1}').format(
get_current_time(format='%Y%m%d%H%M%S%f'), msg
)
)
return True
return False
def run(self):
if self.process and self.stream:
while True:
nextline = self.stream.readline()
if nextline:
self.log(nextline)
else:
if self.process.poll() is not None:
break
def release(self):
if self.logger:
self.logger.close()
self.logger = None
def read_configs(data):
"""
This reads SQLite3 database and fetches process details
Args:
data - configuration details
Returns:
Process details fetched from database as a dict
"""
if data.db_file is not None and data.process_id is not None:
conn = sqlite3.connect(data.db_file)
c = conn.cursor()
t = (data.process_id,)
c.execute('SELECT command, arguments FROM process WHERE \
exit_code is NULL \
AND pid=?', t)
row = c.fetchone()
conn.close()
if row and len(row) > 1:
configs = {
'pid': data.process_id,
'cmd': row[0],
'args': row[1],
'output_directory': data.output_directory,
'db_file': data.db_file
}
return configs
else:
return None
else:
raise ValueError("Please verify process id and db_file arguments")
def update_configs(kwargs):
"""
This function will updates process stats
Args:
kwargs - Process configuration details
Returns:
None
"""
if 'db_file' in kwargs and 'pid' in kwargs:
conn = sqlite3.connect(kwargs['db_file'])
sql = 'UPDATE process SET '
params = list()
for param in ['start_time', 'end_time', 'exit_code']:
if param in kwargs:
sql += (',' if len(params) else '') + param + '=? '
params.append(kwargs[param])
if len(params) == 0:
return
sql += 'WHERE pid=?'
params.append(kwargs['pid'])
with conn:
c = conn.cursor()
c.execute(sql, params)
conn.commit()
# Commit & close cursor
conn.close()
else:
raise ValueError("Please verify pid and db_file arguments")
def execute(configs):
"""
This function will execute the background process
Args:
configs: Process configuration details
Returns:
None
"""
if configs is not None:
command = [configs['cmd']]
args_csv = StringIO(configs['args'])
args_reader = csv.reader(args_csv, delimiter=str(','))
for args in args_reader:
command = command + args
args = {
'pid': configs['pid'],
'db_file': configs['db_file']
}
reload(sys)
sys.setdefaultencoding('utf8')
# Create seprate thread for stdout and stderr
process_stdout = ProcessLogger('out', configs)
process_stderr = ProcessLogger('err', configs)
try:
# update start_time
args.update({
'start_time': get_current_time(),
'stdout': process_stdout.log,
'stderr': process_stderr.log
})
# Update start time
update_configs(args)
process = Popen(
command, stdout=PIPE, stderr=PIPE, stdin=PIPE,
shell=(os.name == 'nt'), close_fds=(os.name != 'nt')
)
# Attach the stream to the process logger, and start logging.
process_stdout.attach_process_stream(process, process.stdout)
process_stdout.start()
process_stderr.attach_process_stream(process, process.stderr)
process_stderr.start()
# Join both threads together
process_stdout.join()
process_stderr.join()
# Child process return code
exitCode = process.wait()
if exitCode is None:
exitCode = process.poll()
args.update({'exit_code': exitCode})
# Add end_time
args.update({'end_time': get_current_time()})
# Fetch last output, and error from process if it has missed.
data = process.communicate()
if data:
if data[0]:
process_stdout.log(data[0])
if data[1]:
process_stderr.log(data[1])
# If executable not found or invalid arguments passed
except OSError as e:
if process_stderr:
process_stderr.log(e.strerror)
else:
print("WARNING: ", e.strerror, file=sys.stderr)
args.update({'end_time': get_current_time()})
args.update({'exit_code': e.errno})
# Unknown errors
except Exception as e:
if process_stderr:
process_stderr.log(str(e))
else:
print("WARNING: ", str(e), file=sys.stderr)
args.update({'end_time': get_current_time()})
args.update({'exit_code': -1})
finally:
# Update the execution end_time, and exit-code.
update_configs(args)
if process_stderr:
process_stderr.release()
process_stderr = None
if process_stdout:
process_stdout.release()
process_stdout = None
else:
raise ValueError("Please verify configs")
if __name__ == '__main__':
# Read command line arguments
parser = argparse.ArgumentParser(
description='Process executor for pgAdmin4'
)
parser.add_argument(
'-p', '--process_id', help='Process ID', required=True
)
parser.add_argument(
'-d', '--db_file', help='Configuration Database', required=True
)
parser.add_argument(
'-o', '--output_directory',
help='Location where the logs will be created', required=True
)
args = parser.parse_args()
# Fetch bakcground process details from SQLite3 database file
configs = read_configs(args)
# Execute the background process
execute(configs)