mirror of
https://github.com/sphinx-doc/sphinx.git
synced 2025-02-25 18:55:22 -06:00
Factor out parallel building into a utility class. Better error handling
with traceback of the parallel process saved in the error log.
This commit is contained in:
parent
7bbaa4c73f
commit
1f23a5c369
@ -22,7 +22,8 @@ from docutils import nodes
|
||||
|
||||
from sphinx.util import i18n, path_stabilize
|
||||
from sphinx.util.osutil import SEP, relative_uri, find_catalog
|
||||
from sphinx.util.console import bold, purple, darkgreen
|
||||
from sphinx.util.console import bold, darkgreen
|
||||
from sphinx.util.parallel import ParallelProcess, parallel_available
|
||||
|
||||
# side effect: registers roles and directives
|
||||
from sphinx import roles
|
||||
@ -318,10 +319,8 @@ class Builder(object):
|
||||
# check for prerequisites to parallel build
|
||||
# (parallel only works on POSIX, because the forking impl of
|
||||
# multiprocessing is required)
|
||||
if (multiprocessing and
|
||||
self.app.parallel > 1 and
|
||||
self.allow_parallel and
|
||||
os.name == 'posix'):
|
||||
if parallel_available and len(docnames) > 5 and self.app.parallel > 1 \
|
||||
and self.allow_parallel:
|
||||
for extname, md in self.app._extension_metadata.items():
|
||||
par_ok = md.get('parallel_write_safe', True)
|
||||
if not par_ok:
|
||||
@ -349,59 +348,32 @@ class Builder(object):
|
||||
|
||||
def _write_parallel(self, docnames, warnings, nproc):
|
||||
def write_process(docs):
|
||||
try:
|
||||
for docname, doctree in docs:
|
||||
self.write_doc(docname, doctree)
|
||||
except KeyboardInterrupt:
|
||||
pass # do not print a traceback on Ctrl-C
|
||||
finally:
|
||||
for warning in warnings:
|
||||
self.warn(*warning)
|
||||
for docname, doctree in docs:
|
||||
self.write_doc(docname, doctree)
|
||||
return warnings
|
||||
|
||||
def process_thread(docs):
|
||||
p = multiprocessing.Process(target=write_process, args=(docs,))
|
||||
p.start()
|
||||
p.join()
|
||||
semaphore.release()
|
||||
|
||||
# allow only "nproc" worker processes at once
|
||||
semaphore = threading.Semaphore(nproc)
|
||||
# list of threads to join when waiting for completion
|
||||
threads = []
|
||||
def process_warnings(docs, wlist):
|
||||
warnings.extend(wlist)
|
||||
|
||||
# warm up caches/compile templates using the first document
|
||||
firstname, docnames = docnames[0], docnames[1:]
|
||||
doctree = self.env.get_and_resolve_doctree(firstname, self)
|
||||
self.write_doc_serialized(firstname, doctree)
|
||||
self.write_doc(firstname, doctree)
|
||||
# for the rest, determine how many documents to write in one go
|
||||
ndocs = len(docnames)
|
||||
chunksize = min(ndocs // nproc, 10)
|
||||
if chunksize == 0:
|
||||
chunksize = 1
|
||||
nchunks, rest = divmod(ndocs, chunksize)
|
||||
if rest:
|
||||
nchunks += 1
|
||||
# partition documents in "chunks" that will be written by one Process
|
||||
chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
|
||||
for docnames in self.app.status_iterator(
|
||||
chunks, 'writing output... ', darkgreen, len(chunks)):
|
||||
docs = []
|
||||
for docname in docnames:
|
||||
|
||||
proc = ParallelProcess(write_process, process_warnings, nproc)
|
||||
proc.set_arguments(docnames)
|
||||
|
||||
for chunk in self.app.status_iterator(proc.spawn(), 'writing output... ',
|
||||
darkgreen, proc.nchunks):
|
||||
for i, docname in enumerate(chunk):
|
||||
doctree = self.env.get_and_resolve_doctree(docname, self)
|
||||
self.write_doc_serialized(docname, doctree)
|
||||
docs.append((docname, doctree))
|
||||
# start a new thread to oversee the completion of this chunk
|
||||
semaphore.acquire()
|
||||
t = threading.Thread(target=process_thread, args=(docs,))
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
chunk[i] = (docname, doctree)
|
||||
|
||||
# make sure all threads have finished
|
||||
self.info(bold('waiting for workers... '))
|
||||
for t in threads:
|
||||
t.join()
|
||||
proc.join()
|
||||
|
||||
def prepare_writing(self, docnames):
|
||||
"""A place where you can add logic before :meth:`write_doc` is run"""
|
||||
|
@ -22,14 +22,8 @@ from os import path
|
||||
from glob import glob
|
||||
from itertools import groupby
|
||||
|
||||
try:
|
||||
import multiprocessing
|
||||
import threading
|
||||
except ImportError:
|
||||
multiprocessing = threading = None
|
||||
|
||||
from six import iteritems, itervalues, text_type, class_types
|
||||
from six.moves import cPickle as pickle, zip, queue
|
||||
from six.moves import cPickle as pickle, zip
|
||||
from docutils import nodes
|
||||
from docutils.io import FileInput, NullOutput
|
||||
from docutils.core import Publisher
|
||||
@ -48,6 +42,7 @@ from sphinx.util.nodes import clean_astext, make_refnode, WarningStream
|
||||
from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding
|
||||
from sphinx.util.console import bold, purple
|
||||
from sphinx.util.matching import compile_matchers
|
||||
from sphinx.util.parallel import ParallelProcess, parallel_available
|
||||
from sphinx.util.websupport import is_commentable
|
||||
from sphinx.errors import SphinxError, ExtensionError
|
||||
from sphinx.locale import _
|
||||
@ -562,10 +557,7 @@ class BuildEnvironment:
|
||||
|
||||
# check if we should do parallel or serial read
|
||||
par_ok = False
|
||||
if (len(added | changed) > 5 and
|
||||
multiprocessing and
|
||||
app.parallel > 1 and
|
||||
os.name == 'posix'):
|
||||
if parallel_available and len(docnames) > 5 and app.parallel > 1:
|
||||
par_ok = True
|
||||
for extname, md in app._extension_metadata.items():
|
||||
ext_ok = md.get('parallel_read_safe')
|
||||
@ -604,87 +596,43 @@ class BuildEnvironment:
|
||||
self.read_doc(docname, app)
|
||||
|
||||
def _read_parallel(self, docnames, app, nproc):
|
||||
def read_process(docs, pipe):
|
||||
self.app = app
|
||||
self.warnings = []
|
||||
self.set_warnfunc(lambda *args: self.warnings.append(args))
|
||||
try:
|
||||
for docname in docs:
|
||||
self.read_doc(docname, app)
|
||||
except KeyboardInterrupt:
|
||||
# XXX return None?
|
||||
pass # do not print a traceback on Ctrl-C
|
||||
self.set_warnfunc(None)
|
||||
del self.app
|
||||
del self.domains
|
||||
del self.config.values
|
||||
del self.config
|
||||
pipe.send(self)
|
||||
|
||||
def process_thread(docs):
|
||||
precv, psend = multiprocessing.Pipe(False)
|
||||
p = multiprocessing.Process(target=read_process, args=(docs, psend))
|
||||
p.start()
|
||||
# XXX error handling
|
||||
new_env = precv.recv()
|
||||
merge_queue.put((docs, new_env))
|
||||
p.join()
|
||||
semaphore.release()
|
||||
|
||||
# allow only "nproc" worker processes at once
|
||||
semaphore = threading.Semaphore(nproc)
|
||||
# list of threads to join when waiting for completion
|
||||
threads = []
|
||||
# queue of other env objects to merge
|
||||
merge_queue = queue.Queue()
|
||||
|
||||
# clear all outdated docs at once
|
||||
for docname in docnames:
|
||||
app.emit('env-purge-doc', self, docname)
|
||||
self.clear_doc(docname)
|
||||
|
||||
# determine how many documents to read in one go
|
||||
ndocs = len(docnames)
|
||||
chunksize = min(ndocs // nproc, 10)
|
||||
if chunksize == 0:
|
||||
chunksize = 1
|
||||
nchunks, rest = divmod(ndocs, chunksize)
|
||||
if rest:
|
||||
nchunks += 1
|
||||
# partition documents in "chunks" that will be written by one Process
|
||||
chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
|
||||
def read_process(docs):
|
||||
self.app = app
|
||||
self.warnings = []
|
||||
self.set_warnfunc(lambda *args: self.warnings.append(args))
|
||||
for docname in docs:
|
||||
self.read_doc(docname, app)
|
||||
# allow pickling self to send it back
|
||||
self.set_warnfunc(None)
|
||||
del self.app
|
||||
del self.domains
|
||||
del self.config.values
|
||||
del self.config
|
||||
return self
|
||||
|
||||
def merge(docs, otherenv):
|
||||
warnings.extend(otherenv.warnings)
|
||||
self.merge_info_from(docs, otherenv, app)
|
||||
|
||||
proc = ParallelProcess(read_process, merge, nproc)
|
||||
proc.set_arguments(docnames)
|
||||
|
||||
warnings = []
|
||||
merged = 0
|
||||
for chunk in app.status_iterator(chunks, 'reading sources... ',
|
||||
purple, len(chunks)):
|
||||
semaphore.acquire()
|
||||
t = threading.Thread(target=process_thread, args=(chunk,))
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
try:
|
||||
docs, other = merge_queue.get(False)
|
||||
except queue.Empty:
|
||||
pass
|
||||
else:
|
||||
warnings.extend(other.warnings)
|
||||
self.merge_info_from(docs, other, app)
|
||||
merged += 1
|
||||
|
||||
while merged < len(chunks):
|
||||
docs, other = merge_queue.get()
|
||||
warnings.extend(other.warnings)
|
||||
self.merge_info_from(docs, other, app)
|
||||
merged += 1
|
||||
|
||||
for warning in warnings:
|
||||
self._warnfunc(*warning)
|
||||
for chunk in app.status_iterator(proc.spawn(), 'reading sources... ',
|
||||
purple, proc.nchunks):
|
||||
pass # spawning in the iterator
|
||||
|
||||
# make sure all threads have finished
|
||||
app.info(bold('waiting for workers... '))
|
||||
for t in threads:
|
||||
t.join()
|
||||
proc.join()
|
||||
|
||||
for warning in warnings:
|
||||
self._warnfunc(*warning)
|
||||
|
||||
def check_dependents(self, already):
|
||||
to_rewrite = self.assign_section_numbers()
|
||||
|
@ -10,6 +10,9 @@
|
||||
:license: BSD, see LICENSE for details.
|
||||
"""
|
||||
|
||||
import traceback
|
||||
|
||||
|
||||
class SphinxError(Exception):
|
||||
"""
|
||||
Base class for Sphinx errors that are shown to the user in a nicer
|
||||
@ -62,3 +65,13 @@ class PycodeError(Exception):
|
||||
if len(self.args) > 1:
|
||||
res += ' (exception was: %r)' % self.args[1]
|
||||
return res
|
||||
|
||||
|
||||
class SphinxParallelError(Exception):
|
||||
def __init__(self, orig_exc, traceback):
|
||||
self.orig_exc = orig_exc
|
||||
self.traceback = traceback
|
||||
|
||||
def __str__(self):
|
||||
return traceback.format_exception_only(
|
||||
self.orig_exc.__class__, self.orig_exc)[0].strip()
|
||||
|
@ -29,7 +29,7 @@ from docutils.utils import relative_path
|
||||
import jinja2
|
||||
|
||||
import sphinx
|
||||
from sphinx.errors import PycodeError
|
||||
from sphinx.errors import PycodeError, SphinxParallelError
|
||||
from sphinx.util.console import strip_colors
|
||||
from sphinx.util.osutil import fs_encoding
|
||||
|
||||
@ -191,7 +191,11 @@ _DEBUG_HEADER = '''\
|
||||
def save_traceback(app):
|
||||
"""Save the current exception's traceback in a temporary file."""
|
||||
import platform
|
||||
exc = traceback.format_exc()
|
||||
exc = sys.exc_info()[1]
|
||||
if isinstance(exc, SphinxParallelError):
|
||||
exc_format = '(Error in parallel process)\n' + exc.traceback
|
||||
else:
|
||||
exc_format = traceback.format_exc()
|
||||
fd, path = tempfile.mkstemp('.log', 'sphinx-err-')
|
||||
last_msgs = ''
|
||||
if app is not None:
|
||||
@ -212,7 +216,7 @@ def save_traceback(app):
|
||||
os.write(fd, ('# %s (%s) from %s\n' % (
|
||||
extname, app._extension_metadata[extname]['version'],
|
||||
modfile)).encode('utf-8'))
|
||||
os.write(fd, exc.encode('utf-8'))
|
||||
os.write(fd, exc_format.encode('utf-8'))
|
||||
os.close(fd)
|
||||
return path
|
||||
|
||||
|
106
sphinx/util/parallel.py
Normal file
106
sphinx/util/parallel.py
Normal file
@ -0,0 +1,106 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
sphinx.util.parallel
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Parallel building utilities.
|
||||
|
||||
:copyright: Copyright 2007-2014 by the Sphinx team, see AUTHORS.
|
||||
:license: BSD, see LICENSE for details.
|
||||
"""
|
||||
|
||||
import os
|
||||
import traceback
|
||||
|
||||
try:
|
||||
import multiprocessing
|
||||
import threading
|
||||
except ImportError:
|
||||
multiprocessing = threading = None
|
||||
|
||||
from six.moves import queue
|
||||
|
||||
from sphinx.errors import SphinxParallelError
|
||||
|
||||
# our parallel functionality only works for the forking Process
|
||||
parallel_available = multiprocessing and (os.name == 'posix')
|
||||
|
||||
|
||||
class ParallelProcess(object):
|
||||
|
||||
def __init__(self, process_func, result_func, nproc, maxbatch=10):
|
||||
self.process_func = process_func
|
||||
self.result_func = result_func
|
||||
self.nproc = nproc
|
||||
self.maxbatch = maxbatch
|
||||
# list of threads to join when waiting for completion
|
||||
self._threads = []
|
||||
self._chunks = []
|
||||
self.nchunks = 0
|
||||
# queue of result objects to process
|
||||
self.result_queue = queue.Queue()
|
||||
self._nprocessed = 0
|
||||
|
||||
def set_arguments(self, arguments):
|
||||
# determine how many documents to read in one go
|
||||
nargs = len(arguments)
|
||||
chunksize = min(nargs // self.nproc, self.maxbatch)
|
||||
if chunksize == 0:
|
||||
chunksize = 1
|
||||
nchunks, rest = divmod(nargs, chunksize)
|
||||
if rest:
|
||||
nchunks += 1
|
||||
# partition documents in "chunks" that will be written by one Process
|
||||
self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
|
||||
self.nchunks = len(self._chunks)
|
||||
|
||||
def spawn(self):
|
||||
assert self._chunks
|
||||
|
||||
def process(pipe, chunk):
|
||||
try:
|
||||
ret = self.process_func(chunk)
|
||||
pipe.send((False, ret))
|
||||
except BaseException as err:
|
||||
pipe.send((True, (err, traceback.format_exc())))
|
||||
|
||||
def process_thread(chunk):
|
||||
precv, psend = multiprocessing.Pipe(False)
|
||||
proc = multiprocessing.Process(target=process, args=(psend, chunk))
|
||||
proc.start()
|
||||
result = precv.recv()
|
||||
self.result_queue.put((chunk,) + result)
|
||||
proc.join()
|
||||
semaphore.release()
|
||||
|
||||
# allow only "nproc" worker processes at once
|
||||
semaphore = threading.Semaphore(self.nproc)
|
||||
|
||||
for chunk in self._chunks:
|
||||
yield chunk
|
||||
semaphore.acquire()
|
||||
t = threading.Thread(target=process_thread, args=(chunk,))
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
self._threads.append(t)
|
||||
# try processing results already in parallel
|
||||
try:
|
||||
chunk, exc, result = self.result_queue.get(False)
|
||||
except queue.Empty:
|
||||
pass
|
||||
else:
|
||||
if exc:
|
||||
raise SphinxParallelError(*result)
|
||||
self.result_func(chunk, result)
|
||||
self._nprocessed += 1
|
||||
|
||||
def join(self):
|
||||
while self._nprocessed < self.nchunks:
|
||||
chunk, exc, result = self.result_queue.get()
|
||||
if exc:
|
||||
raise SphinxParallelError(*result)
|
||||
self.result_func(chunk, result)
|
||||
self._nprocessed += 1
|
||||
|
||||
for t in self._threads:
|
||||
t.join()
|
Loading…
Reference in New Issue
Block a user