mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-01-24 15:26:46 -06:00
684 lines
22 KiB
Python
684 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
##########################################################################
|
|
#
|
|
# pgAdmin 4 - PostgreSQL Tools
|
|
#
|
|
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
|
# This software is released under the PostgreSQL License
|
|
#
|
|
##########################################################################
|
|
|
|
"""
|
|
Introduce a function to run the process executor in detached mode.
|
|
"""
|
|
import csv
|
|
import os
|
|
import sys
|
|
import psutil
|
|
from abc import ABCMeta, abstractproperty, abstractmethod
|
|
from datetime import datetime
|
|
from pickle import dumps, loads
|
|
from subprocess import Popen, PIPE
|
|
import logging
|
|
|
|
from pgadmin.utils import u_encode, file_quote, fs_encoding, \
|
|
get_complete_file_path
|
|
|
|
import pytz
|
|
from dateutil import parser
|
|
from flask import current_app
|
|
from flask_babelex import gettext as _
|
|
from flask_security import current_user
|
|
|
|
import config
|
|
from pgadmin.model import Process, db
|
|
from io import StringIO
|
|
|
|
PROCESS_NOT_STARTED = 0
|
|
PROCESS_STARTED = 1
|
|
PROCESS_FINISHED = 2
|
|
PROCESS_TERMINATED = 3
|
|
|
|
|
|
def get_current_time(format='%Y-%m-%d %H:%M:%S.%f %z'):
|
|
"""
|
|
Generate the current time string in the given format.
|
|
"""
|
|
return datetime.utcnow().replace(
|
|
tzinfo=pytz.utc
|
|
).strftime(format)
|
|
|
|
|
|
class IProcessDesc(object):
|
|
__metaclass__ = ABCMeta
|
|
|
|
@abstractproperty
|
|
def message(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def details(self, cmd, args):
|
|
pass
|
|
|
|
|
|
class BatchProcess(object):
|
|
def __init__(self, **kwargs):
|
|
|
|
self.id = self.desc = self.cmd = self.args = self.log_dir = \
|
|
self.stdout = self.stderr = self.stime = self.etime = \
|
|
self.ecode = None
|
|
self.env = dict()
|
|
|
|
if 'id' in kwargs:
|
|
self._retrieve_process(kwargs['id'])
|
|
else:
|
|
self._create_process(
|
|
kwargs['desc'], kwargs['cmd'], kwargs['args']
|
|
)
|
|
|
|
def _retrieve_process(self, _id):
|
|
p = Process.query.filter_by(pid=_id, user_id=current_user.id).first()
|
|
|
|
if p is None:
|
|
raise LookupError(
|
|
_("Could not find a process with the specified ID.")
|
|
)
|
|
|
|
tmp_desc = loads(p.desc)
|
|
|
|
# ID
|
|
self.id = _id
|
|
# Description
|
|
self.desc = tmp_desc
|
|
# Status Acknowledged time
|
|
self.atime = p.acknowledge
|
|
# Command
|
|
self.cmd = p.command
|
|
# Arguments
|
|
self.args = p.arguments
|
|
# Log Directory
|
|
self.log_dir = p.logdir
|
|
# Standard ouput log file
|
|
self.stdout = os.path.join(p.logdir, 'out')
|
|
# Standard error log file
|
|
self.stderr = os.path.join(p.logdir, 'err')
|
|
# Start time
|
|
self.stime = p.start_time
|
|
# End time
|
|
self.etime = p.end_time
|
|
# Exit code
|
|
self.ecode = p.exit_code
|
|
# Process State
|
|
self.process_state = p.process_state
|
|
|
|
def _create_process(self, _desc, _cmd, _args):
|
|
ctime = get_current_time(format='%y%m%d%H%M%S%f')
|
|
log_dir = os.path.join(
|
|
config.SESSION_DB_PATH, 'process_logs'
|
|
)
|
|
|
|
def random_number(size):
|
|
import random
|
|
import string
|
|
|
|
return ''.join(
|
|
random.choice(
|
|
string.ascii_uppercase + string.digits
|
|
) for _ in range(size)
|
|
)
|
|
|
|
created = False
|
|
size = 0
|
|
id = ctime
|
|
while not created:
|
|
try:
|
|
id += random_number(size)
|
|
log_dir = os.path.join(log_dir, id)
|
|
size += 1
|
|
if not os.path.exists(log_dir):
|
|
os.makedirs(log_dir, int('700', 8))
|
|
created = True
|
|
except OSError as oe:
|
|
import errno
|
|
if oe.errno != errno.EEXIST:
|
|
raise
|
|
|
|
# ID
|
|
self.id = ctime
|
|
# Description
|
|
self.desc = _desc
|
|
# Status Acknowledged time
|
|
self.atime = None
|
|
# Command
|
|
self.cmd = _cmd
|
|
# Log Directory
|
|
self.log_dir = log_dir
|
|
# Standard ouput log file
|
|
self.stdout = os.path.join(log_dir, 'out')
|
|
# Standard error log file
|
|
self.stderr = os.path.join(log_dir, 'err')
|
|
# Start time
|
|
self.stime = None
|
|
# End time
|
|
self.etime = None
|
|
# Exit code
|
|
self.ecode = None
|
|
# Process State
|
|
self.process_state = PROCESS_NOT_STARTED
|
|
|
|
# Arguments
|
|
self.args = _args
|
|
args_csv_io = StringIO()
|
|
csv_writer = csv.writer(
|
|
args_csv_io, delimiter=str(','), quoting=csv.QUOTE_MINIMAL
|
|
)
|
|
csv_writer.writerow(_args)
|
|
|
|
args_val = args_csv_io.getvalue().strip(str('\r\n'))
|
|
tmp_desc = dumps(self.desc)
|
|
|
|
j = Process(
|
|
pid=int(id),
|
|
command=_cmd,
|
|
arguments=args_val,
|
|
logdir=log_dir,
|
|
desc=tmp_desc,
|
|
user_id=current_user.id
|
|
)
|
|
db.session.add(j)
|
|
db.session.commit()
|
|
|
|
def start(self, cb=None):
|
|
|
|
def which(program, paths):
|
|
def is_exe(fpath):
|
|
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
|
|
|
|
for path in paths:
|
|
if not os.path.isdir(path):
|
|
continue
|
|
exe_file = os.path.join(u_encode(path, fs_encoding), program)
|
|
if is_exe(exe_file):
|
|
return file_quote(exe_file)
|
|
return None
|
|
|
|
def convert_environment_variables(env):
|
|
"""
|
|
This function is use to convert environment variable to string
|
|
because environment variable must be string in popen
|
|
:param env: Dict of environment variable
|
|
:return: Encoded environment variable as string
|
|
"""
|
|
encoding = sys.getdefaultencoding()
|
|
if encoding is None or encoding == 'ascii':
|
|
encoding = 'utf-8'
|
|
temp_env = dict()
|
|
for key, value in env.items():
|
|
if not isinstance(key, str):
|
|
key = key.encode(encoding)
|
|
if not isinstance(value, str):
|
|
value = value.encode(encoding)
|
|
temp_env[key] = value
|
|
return temp_env
|
|
|
|
if self.stime is not None:
|
|
if self.etime is None:
|
|
raise Exception(_('The process has already been started.'))
|
|
raise Exception(
|
|
_('The process has already finished and cannot be restarted.')
|
|
)
|
|
|
|
executor = file_quote(os.path.join(
|
|
os.path.dirname(u_encode(__file__)), u'process_executor.py'
|
|
))
|
|
paths = os.environ['PATH'].split(os.pathsep)
|
|
interpreter = None
|
|
|
|
current_app.logger.info(
|
|
u"Process Executor: Operating System Path %s",
|
|
str(paths)
|
|
)
|
|
|
|
if os.name == 'nt':
|
|
paths.insert(0, os.path.join(u_encode(sys.prefix), u'Scripts'))
|
|
paths.insert(0, u_encode(sys.prefix))
|
|
|
|
interpreter = which(u'pythonw.exe', paths)
|
|
if interpreter is None:
|
|
interpreter = which(u'python.exe', paths)
|
|
|
|
current_app.logger.info(
|
|
u"Process Executor: Interpreter value in path: %s",
|
|
str(interpreter)
|
|
)
|
|
if interpreter is None and current_app.PGADMIN_RUNTIME:
|
|
# We've faced an issue with Windows 2008 R2 (x86) regarding,
|
|
# not honouring the environment variables set under the Qt
|
|
# (e.g. runtime), and also setting PYTHONHOME same as
|
|
# sys.executable (i.e. pgAdmin4.exe).
|
|
#
|
|
# As we know, we're running it under the runtime, we can assume
|
|
# that 'venv' directory will be available outside of 'bin'
|
|
# directory.
|
|
#
|
|
# We would try out luck to find python executable based on that
|
|
# assumptions.
|
|
bin_path = os.path.dirname(sys.executable)
|
|
|
|
venv = os.path.realpath(
|
|
os.path.join(bin_path, u'..\\venv')
|
|
)
|
|
|
|
interpreter = which(u'pythonw.exe', [venv])
|
|
if interpreter is None:
|
|
interpreter = which(u'python.exe', [venv])
|
|
|
|
current_app.logger.info(
|
|
u"Process Executor: Interpreter value in virtual "
|
|
u"environment: %s", str(interpreter)
|
|
)
|
|
|
|
if interpreter is not None:
|
|
# Our assumptions are proven right.
|
|
# Let's append the 'bin' directory to the PATH environment
|
|
# variable. And, also set PYTHONHOME environment variable
|
|
# to 'venv' directory.
|
|
os.environ['PATH'] = bin_path + ';' + os.environ['PATH']
|
|
os.environ['PYTHONHOME'] = venv
|
|
else:
|
|
# Let's not use sys.prefix in runtime.
|
|
# 'sys.prefix' is not identified on *nix systems for some unknown
|
|
# reason, while running under the runtime.
|
|
# We're already adding '<installation path>/pgAdmin 4/venv/bin'
|
|
# directory in the PATH environment variable. Hence - it will
|
|
# anyway be the redundant value in paths.
|
|
if not current_app.PGADMIN_RUNTIME:
|
|
paths.insert(0, os.path.join(u_encode(sys.prefix), u'bin'))
|
|
python_binary_name = 'python{0}'.format(sys.version_info[0])
|
|
interpreter = which(u_encode(python_binary_name), paths)
|
|
|
|
p = None
|
|
cmd = [
|
|
interpreter if interpreter is not None else 'python',
|
|
executor, self.cmd
|
|
]
|
|
cmd.extend(self.args)
|
|
|
|
current_app.logger.info(
|
|
u"Executing the process executor with the arguments: %s",
|
|
str(cmd)
|
|
)
|
|
|
|
# Make a copy of environment, and add new variables to support
|
|
env = os.environ.copy()
|
|
env['PROCID'] = self.id
|
|
env['OUTDIR'] = self.log_dir
|
|
env['PGA_BGP_FOREGROUND'] = "1"
|
|
|
|
if self.env:
|
|
env.update(self.env)
|
|
|
|
if cb is not None:
|
|
cb(env)
|
|
|
|
if os.name == 'nt':
|
|
DETACHED_PROCESS = 0x00000008
|
|
from subprocess import CREATE_NEW_PROCESS_GROUP
|
|
|
|
# We need to redirect the standard input, standard output, and
|
|
# standard error to devnull in order to allow it start in detached
|
|
# mode on
|
|
stdout = os.devnull
|
|
stderr = stdout
|
|
stdin = open(os.devnull, "r")
|
|
stdout = open(stdout, "a")
|
|
stderr = open(stderr, "a")
|
|
|
|
p = Popen(
|
|
cmd,
|
|
close_fds=False,
|
|
env=env,
|
|
stdout=stdout.fileno(),
|
|
stderr=stderr.fileno(),
|
|
stdin=stdin.fileno(),
|
|
creationflags=(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS)
|
|
)
|
|
else:
|
|
def preexec_function():
|
|
import signal
|
|
# Detaching from the parent process group
|
|
os.setpgrp()
|
|
# Explicitly ignoring signals in the child process
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
|
|
# if in debug mode, wait for process to complete and
|
|
# get the stdout and stderr of popen.
|
|
if config.CONSOLE_LOG_LEVEL <= logging.DEBUG:
|
|
p = Popen(
|
|
cmd, close_fds=True, stdout=PIPE, stderr=PIPE, stdin=None,
|
|
preexec_fn=preexec_function, env=env
|
|
)
|
|
|
|
output, errors = p.communicate()
|
|
output = output.decode() \
|
|
if hasattr(output, 'decode') else output
|
|
errors = errors.decode() \
|
|
if hasattr(errors, 'decode') else errors
|
|
current_app.logger.debug(
|
|
'Process Watcher Out:{0}'.format(output))
|
|
current_app.logger.debug(
|
|
'Process Watcher Err:{0}'.format(errors))
|
|
else:
|
|
p = Popen(
|
|
cmd, close_fds=True, stdout=None, stderr=None, stdin=None,
|
|
preexec_fn=preexec_function, env=env
|
|
)
|
|
|
|
self.ecode = p.poll()
|
|
|
|
# Execution completed immediately.
|
|
# Process executor cannot update the status, if it was not able to
|
|
# start properly.
|
|
if self.ecode is not None and self.ecode != 0:
|
|
# There is no way to find out the error message from this process
|
|
# as standard output, and standard error were redirected to
|
|
# devnull.
|
|
p = Process.query.filter_by(
|
|
pid=self.id, user_id=current_user.id
|
|
).first()
|
|
p.start_time = p.end_time = get_current_time()
|
|
if not p.exit_code:
|
|
p.exit_code = self.ecode
|
|
p.process_state = PROCESS_FINISHED
|
|
db.session.commit()
|
|
else:
|
|
# Update the process state to "Started"
|
|
p = Process.query.filter_by(
|
|
pid=self.id, user_id=current_user.id
|
|
).first()
|
|
p.process_state = PROCESS_STARTED
|
|
db.session.commit()
|
|
|
|
def status(self, out=0, err=0):
|
|
import re
|
|
|
|
ctime = get_current_time(format='%Y%m%d%H%M%S%f')
|
|
|
|
stdout = []
|
|
stderr = []
|
|
out_completed = err_completed = False
|
|
process_output = (out != -1 and err != -1)
|
|
enc = sys.getdefaultencoding()
|
|
if enc is None or enc == 'ascii':
|
|
enc = 'utf-8'
|
|
|
|
def read_log(logfile, log, pos, ctime, ecode=None):
|
|
completed = True
|
|
idx = 0
|
|
c = re.compile(r"(\d+),(.*$)")
|
|
|
|
if not os.path.isfile(logfile):
|
|
return 0, False
|
|
|
|
with open(logfile, 'rb') as f:
|
|
eofs = os.fstat(f.fileno()).st_size
|
|
f.seek(pos, 0)
|
|
if pos == eofs and ecode is None:
|
|
completed = False
|
|
|
|
while pos < eofs:
|
|
idx += 1
|
|
line = f.readline()
|
|
line = line.decode(enc, 'replace')
|
|
r = c.split(line)
|
|
if len(r) < 3:
|
|
# ignore this line
|
|
pos = f.tell()
|
|
continue
|
|
if r[1] > ctime:
|
|
completed = False
|
|
break
|
|
log.append([r[1], r[2]])
|
|
pos = f.tell()
|
|
if idx >= 1024:
|
|
completed = False
|
|
break
|
|
if pos == eofs:
|
|
if ecode is None:
|
|
completed = False
|
|
break
|
|
|
|
return pos, completed
|
|
|
|
j = Process.query.filter_by(
|
|
pid=self.id, user_id=current_user.id
|
|
).first()
|
|
|
|
execution_time = None
|
|
|
|
if j is not None:
|
|
status, updated = BatchProcess.update_process_info(j)
|
|
if updated:
|
|
db.session.commit()
|
|
self.stime = j.start_time
|
|
self.etime = j.end_time
|
|
self.ecode = j.exit_code
|
|
|
|
if self.stime is not None:
|
|
stime = parser.parse(self.stime)
|
|
etime = parser.parse(self.etime or get_current_time())
|
|
|
|
execution_time = BatchProcess.total_seconds(etime - stime)
|
|
|
|
if process_output:
|
|
out, out_completed = read_log(
|
|
self.stdout, stdout, out, ctime, self.ecode
|
|
)
|
|
err, err_completed = read_log(
|
|
self.stderr, stderr, err, ctime, self.ecode
|
|
)
|
|
else:
|
|
out_completed = err_completed = False
|
|
|
|
if out == -1 or err == -1:
|
|
return {
|
|
'start_time': self.stime,
|
|
'exit_code': self.ecode,
|
|
'execution_time': execution_time,
|
|
'process_state': self.process_state
|
|
}
|
|
|
|
return {
|
|
'out': {
|
|
'pos': out,
|
|
'lines': stdout,
|
|
'done': out_completed
|
|
},
|
|
'err': {
|
|
'pos': err,
|
|
'lines': stderr,
|
|
'done': err_completed
|
|
},
|
|
'start_time': self.stime,
|
|
'exit_code': self.ecode,
|
|
'execution_time': execution_time,
|
|
'process_state': self.process_state
|
|
}
|
|
|
|
@staticmethod
|
|
def update_process_info(p):
|
|
if p.start_time is None or p.end_time is None:
|
|
status = os.path.join(p.logdir, 'status')
|
|
if not os.path.isfile(status):
|
|
return False, False
|
|
|
|
with open(status, 'r') as fp:
|
|
import json
|
|
try:
|
|
data = json.load(fp)
|
|
|
|
# First - check for the existance of 'start_time'.
|
|
if 'start_time' in data and data['start_time']:
|
|
p.start_time = data['start_time']
|
|
|
|
# We can't have 'exit_code' without the 'start_time'
|
|
if 'exit_code' in data and \
|
|
data['exit_code'] is not None:
|
|
p.exit_code = data['exit_code']
|
|
|
|
# We can't have 'end_time' without the 'exit_code'.
|
|
if 'end_time' in data and data['end_time']:
|
|
p.end_time = data['end_time']
|
|
|
|
# get the pid of the utility.
|
|
if 'pid' in data:
|
|
p.utility_pid = data['pid']
|
|
|
|
return True, True
|
|
|
|
except ValueError as e:
|
|
current_app.logger.warning(
|
|
_("Status for the background process '{0}' could "
|
|
"not be loaded.").format(p.pid)
|
|
)
|
|
current_app.logger.exception(e)
|
|
return False, False
|
|
return True, False
|
|
|
|
@staticmethod
|
|
def list():
|
|
processes = Process.query.filter_by(user_id=current_user.id)
|
|
changed = False
|
|
|
|
res = []
|
|
for p in processes:
|
|
status, updated = BatchProcess.update_process_info(p)
|
|
if not status:
|
|
continue
|
|
|
|
if not changed:
|
|
changed = updated
|
|
|
|
if p.start_time is None or (
|
|
p.acknowledge is not None and p.end_time is None
|
|
):
|
|
continue
|
|
|
|
execution_time = None
|
|
|
|
stime = parser.parse(p.start_time)
|
|
etime = parser.parse(p.end_time or get_current_time())
|
|
|
|
execution_time = BatchProcess.total_seconds(etime - stime)
|
|
desc = loads(p.desc)
|
|
details = desc
|
|
|
|
if isinstance(desc, IProcessDesc):
|
|
args = []
|
|
args_csv = StringIO(
|
|
p.arguments.encode('utf-8')
|
|
if hasattr(p.arguments, 'decode') else p.arguments
|
|
)
|
|
args_reader = csv.reader(args_csv, delimiter=str(','))
|
|
for arg in args_reader:
|
|
args = args + arg
|
|
details = desc.details(p.command, args)
|
|
type_desc = desc.type_desc
|
|
desc = desc.message
|
|
|
|
res.append({
|
|
'id': p.pid,
|
|
'desc': desc,
|
|
'type_desc': type_desc,
|
|
'details': details,
|
|
'stime': stime,
|
|
'etime': p.end_time,
|
|
'exit_code': p.exit_code,
|
|
'acknowledge': p.acknowledge,
|
|
'execution_time': execution_time,
|
|
'process_state': p.process_state
|
|
})
|
|
|
|
if changed:
|
|
db.session.commit()
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def total_seconds(dt):
|
|
return round(dt.total_seconds(), 2)
|
|
|
|
@staticmethod
|
|
def acknowledge(_pid):
|
|
"""
|
|
Acknowledge from the user, he/she has alredy watched the status.
|
|
|
|
Update the acknowledgement status, if the process is still running.
|
|
And, delete the process information from the configuration, and the log
|
|
files related to the process, if it has already been completed.
|
|
"""
|
|
p = Process.query.filter_by(
|
|
user_id=current_user.id, pid=_pid
|
|
).first()
|
|
|
|
if p is None:
|
|
raise LookupError(
|
|
_("Could not find a process with the specified ID.")
|
|
)
|
|
|
|
if p.end_time is not None:
|
|
logdir = p.logdir
|
|
db.session.delete(p)
|
|
import shutil
|
|
shutil.rmtree(logdir, True)
|
|
else:
|
|
p.acknowledge = get_current_time()
|
|
|
|
db.session.commit()
|
|
|
|
def set_env_variables(self, server, **kwargs):
|
|
"""Set environment variables"""
|
|
if server:
|
|
# Set SSL related ENV variables
|
|
if server.sslcert and server.sslkey and server.sslrootcert:
|
|
# SSL environment variables
|
|
self.env['PGSSLMODE'] = server.ssl_mode
|
|
self.env['PGSSLCERT'] = get_complete_file_path(server.sslcert)
|
|
self.env['PGSSLKEY'] = get_complete_file_path(server.sslkey)
|
|
self.env['PGSSLROOTCERT'] = get_complete_file_path(
|
|
server.sslrootcert
|
|
)
|
|
|
|
# Set service name related ENV variable
|
|
if server.service:
|
|
self.env['PGSERVICE'] = server.service
|
|
|
|
if 'env' in kwargs:
|
|
self.env.update(kwargs['env'])
|
|
|
|
@staticmethod
|
|
def stop_process(_pid):
|
|
"""
|
|
"""
|
|
p = Process.query.filter_by(
|
|
user_id=current_user.id, pid=_pid
|
|
).first()
|
|
|
|
if p is None:
|
|
raise LookupError(
|
|
_("Could not find a process with the specified ID.")
|
|
)
|
|
|
|
try:
|
|
process = psutil.Process(p.utility_pid)
|
|
process.terminate()
|
|
# Update the process state to "Terminated"
|
|
p.process_state = PROCESS_TERMINATED
|
|
db.session.commit()
|
|
except psutil.Error as e:
|
|
current_app.logger.warning(
|
|
_("Unable to kill the background process '{0}'").format(
|
|
p.utility_pid)
|
|
)
|
|
current_app.logger.exception(e)
|