pgadmin4/web/pgadmin/misc/bgprocess/processes.py
Ashesh Vashi a35288e25d Save the runtime status information in the application object for using
it other places.

globals() does not return consistent value across the application
context.

Thanks Neel Patel for reporting the issue.
2016-06-21 15:13:04 +05:30

388 lines
11 KiB
Python

# -*- coding: utf-8 -*-
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL License
#
##########################################################################
"""
Introduce a function to run the process executor in detached mode.
"""
from __future__ import print_function, unicode_literals
from abc import ABCMeta, abstractproperty, abstractmethod
import csv
from datetime import datetime
from dateutil import parser
import os
from pickle import dumps, loads
import pytz
from subprocess import Popen, PIPE
import sys
from flask import current_app as app
from flask.ext.babel import gettext as _
from flask.ext.security import current_user
import config
from pgadmin.model import Process, db
if sys.version_info < (3,):
from StringIO import StringIO
else:
from io import StringIO
def get_current_time(format='%Y-%m-%d %H:%M:%S.%f %z'):
"""
Generate the current time string in the given format.
"""
return datetime.utcnow().replace(
tzinfo=pytz.utc
).strftime(format)
class IProcessDesc(object):
__metaclass__ = ABCMeta
@abstractproperty
def message(self):
pass
@abstractmethod
def details(self, cmd, args):
pass
class BatchProcess(object):
def __init__(self, **kwargs):
self.id = self.desc = self.cmd = self.args = self.log_dir = \
self.stdout = self.stderr = self.stime = self.etime = \
self.ecode = None
if 'id' in kwargs:
self._retrieve_process(kwargs['id'])
else:
self._create_process(kwargs['desc'], kwargs['cmd'], kwargs['args'])
def _retrieve_process(self, _id):
p = Process.query.filter_by(pid=_id, user_id=current_user.id).first()
if p is None:
raise LookupError(
_("Could not find a process with the specified ID.")
)
# ID
self.id = _id
# Description
self.desc = loads(p.desc)
# Status Acknowledged time
self.atime = p.acknowledge
# Command
self.cmd = p.command
# Arguments
self.args = p.arguments
# Log Directory
self.log_dir = p.logdir
# Standard ouput log file
self.stdout = os.path.join(p.logdir, 'out')
# Standard error log file
self.stderr = os.path.join(p.logdir, 'err')
# Start time
self.stime = p.start_time
# End time
self.etime = p.end_time
# Exit code
self.ecode = p.exit_code
def _create_process(self, _desc, _cmd, _args):
ctime = get_current_time(format='%y%m%d%H%M%S%f')
log_dir = os.path.join(
config.SESSION_DB_PATH, 'process_logs'
)
def random_number(size):
import random
import string
return ''.join(
random.choice(
string.ascii_uppercase + string.digits
) for _ in range(size)
)
created = False
size = 0
id = ctime
while not created:
try:
id += random_number(size)
log_dir = os.path.join(log_dir, id)
size += 1
if not os.path.exists(log_dir):
os.makedirs(log_dir, int('700', 8))
created = True
except OSError as oe:
import errno
if oe.errno != errno.EEXIST:
raise
# ID
self.id = ctime
# Description
self.desc = _desc
# Status Acknowledged time
self.atime = None
# Command
self.cmd = _cmd
# Log Directory
self.log_dir = log_dir
# Standard ouput log file
self.stdout = os.path.join(log_dir, 'out')
# Standard error log file
self.stderr = os.path.join(log_dir, 'err')
# Start time
self.stime = None
# End time
self.etime = None
# Exit code
self.ecode = None
# Arguments
args_csv_io = StringIO()
csv_writer = csv.writer(
args_csv_io, delimiter=str(','), quoting=csv.QUOTE_MINIMAL
)
csv_writer.writerow(_args)
self.args = args_csv_io.getvalue().strip(str('\r\n'))
j = Process(
pid=int(id), command=_cmd, arguments=self.args, logdir=log_dir,
desc=dumps(self.desc), user_id=current_user.id
)
db.session.add(j)
db.session.commit()
def start(self):
if self.stime is not None:
if self.etime is None:
raise Exception(_('The process has already been started.'))
raise Exception(
_('The process has already finished and can not be restarted.')
)
executor = os.path.join(
os.path.dirname(__file__), 'process_executor.py'
)
p = None
cmd = [
(sys.executable if not app.PGADMIN_RUNTIME else
'pythonw.exe' if os.name == 'nt' else 'python'),
executor,
'-p', self.id,
'-o', self.log_dir,
'-d', config.SQLITE_PATH
]
if os.name == 'nt':
p = Popen(
cmd, stdout=None, stderr=None, stdin=None, close_fds=True,
shell=False, creationflags=0x00000008
)
else:
def preexec_function():
import signal
# Detaching from the parent process group
os.setpgrp()
# Explicitly ignoring signals in the child process
signal.signal(signal.SIGINT, signal.SIG_IGN)
p = Popen(
cmd, stdout=PIPE, stderr=None, stdin=None, close_fds=True,
shell=False, preexec_fn=preexec_function
)
self.ecode = p.poll()
if self.ecode is not None:
# TODO:: Couldn't start execution
pass
def status(self, out=0, err=0):
import codecs
ctime = get_current_time(format='%Y%m%d%H%M%S%f')
stdout = []
stderr = []
out_completed = err_completed = False
process_output = (out != -1 and err != -1)
def read_log(logfile, log, pos, ctime, check=True):
completed = True
lines = 0
if not os.path.isfile(logfile):
return 0, False
with codecs.open(logfile, 'r', 'utf-8') as stream:
stream.seek(pos)
for line in stream:
logtime = StringIO()
idx = 0
for c in line:
idx += 1
if c == ',':
break
logtime.write(c)
logtime = logtime.getvalue()
if check and logtime > ctime:
completed = False
break
if lines == 5120:
ctime = logtime
completed = False
break
lines += 1
log.append([logtime, line[idx:]])
pos = stream.tell()
return pos, completed
if process_output:
out, out_completed = read_log(
self.stdout, stdout, out, ctime, True
)
err, err_completed = read_log(
self.stderr, stderr, err, ctime, True
)
j = Process.query.filter_by(
pid=self.id, user_id=current_user.id
).first()
execution_time = None
if j is not None:
self.stime = j.start_time
self.etime = j.end_time
self.ecode = j.exit_code
if self.stime is not None:
stime = parser.parse(self.stime)
etime = parser.parse(self.etime or get_current_time())
execution_time = (etime - stime).total_seconds()
if process_output and self.ecode is not None and (
len(stdout) + len(stderr) < 3073
):
out, out_completed = read_log(
self.stdout, stdout, out, ctime, False
)
err, err_completed = read_log(
self.stderr, stderr, err, ctime, False
)
else:
out_completed = err_completed = False
if out == -1 or err == -1:
return {
'exit_code': self.ecode,
'execution_time': execution_time
}
return {
'out': {'pos': out, 'lines': stdout, 'done': out_completed},
'err': {'pos': err, 'lines': stderr, 'done': err_completed},
'exit_code': self.ecode,
'execution_time': execution_time
}
@staticmethod
def list():
processes = Process.query.filter_by(user_id=current_user.id)
res = []
for p in processes:
if p.start_time is None or p.acknowledge is not None:
continue
execution_time = None
stime = parser.parse(p.start_time)
etime = parser.parse(p.end_time or get_current_time())
execution_time = (etime - stime).total_seconds()
desc = loads(p.desc)
details = desc
if isinstance(desc, IProcessDesc):
args = []
args_csv = StringIO(p.arguments)
args_reader = csv.reader(args_csv, delimiter=str(','))
for arg in args_reader:
args = args + arg
details = desc.details(p.command, args)
desc = desc.message
res.append({
'id': p.pid,
'desc': desc,
'details': details,
'stime': stime,
'etime': p.end_time,
'exit_code': p.exit_code,
'acknowledge': p.acknowledge,
'execution_time': execution_time
})
return res
@staticmethod
def acknowledge(_pid, _release):
p = Process.query.filter_by(
user_id=current_user.id, pid=_pid
).first()
if p is None:
raise LookupError(
_("Could not find a process with the specified ID.")
)
if _release:
import shutil
shutil.rmtree(p.logdir, True)
db.session.delete(p)
else:
p.acknowledge = get_current_time()
db.session.commit()
@staticmethod
def release(pid=None):
import shutil
processes = None
if pid is not None:
processes = Process.query.filter_by(
user_id=current_user.id, pid=pid
)
else:
processes = Process.query.filter_by(
user_id=current_user.id,
acknowledge=None
)
if processes:
for p in processes:
shutil.rmtree(p.logdir, True)
db.session.delete(p)
db.session.commit()