Refactor parallel process into a base class that executes any task, and a derived class that executes a batch of the same task.

This commit is contained in:
Georg Brandl 2014-09-22 17:29:52 +02:00
parent 1f23a5c369
commit 0c833c020e
4 changed files with 103 additions and 66 deletions

View File

@ -23,7 +23,7 @@ from docutils import nodes
from sphinx.util import i18n, path_stabilize
from sphinx.util.osutil import SEP, relative_uri, find_catalog
from sphinx.util.console import bold, darkgreen
from sphinx.util.parallel import ParallelProcess, parallel_available
from sphinx.util.parallel import ParallelChunked, parallel_available
# side effect: registers roles and directives
from sphinx import roles
@ -361,18 +361,18 @@ class Builder(object):
self.write_doc_serialized(firstname, doctree)
self.write_doc(firstname, doctree)
proc = ParallelProcess(write_process, process_warnings, nproc)
proc = ParallelChunked(write_process, process_warnings, nproc)
proc.set_arguments(docnames)
for chunk in self.app.status_iterator(proc.spawn(), 'writing output... ',
darkgreen, proc.nchunks):
for chunk in self.app.status_iterator(
proc.iter_chunks(), 'writing output... ', darkgreen, proc.nchunks):
for i, docname in enumerate(chunk):
doctree = self.env.get_and_resolve_doctree(docname, self)
self.write_doc_serialized(docname, doctree)
chunk[i] = (docname, doctree)
# make sure all threads have finished
self.info(bold('waiting for workers... '))
self.info(bold('waiting for workers...'))
proc.join()
def prepare_writing(self, docnames):

View File

@ -29,7 +29,7 @@ from docutils.readers.doctree import Reader as DoctreeReader
from sphinx import package_dir, __version__
from sphinx.util import jsonimpl, copy_static_entry
from sphinx.util.osutil import SEP, os_path, relative_uri, ensuredir, \
movefile, ustrftime, copyfile
movefile, ustrftime, copyfile
from sphinx.util.nodes import inline_all_toctrees
from sphinx.util.matching import patmatch, compile_matchers
from sphinx.locale import _
@ -40,7 +40,7 @@ from sphinx.application import ENV_PICKLE_FILENAME
from sphinx.highlighting import PygmentsBridge
from sphinx.util.console import bold, darkgreen, brown
from sphinx.writers.html import HTMLWriter, HTMLTranslator, \
SmartyPantsHTMLTranslator
SmartyPantsHTMLTranslator
#: the filename for the inventory of objects
INVENTORY_FILENAME = 'objects.inv'

View File

@ -42,7 +42,7 @@ from sphinx.util.nodes import clean_astext, make_refnode, WarningStream
from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding
from sphinx.util.console import bold, purple
from sphinx.util.matching import compile_matchers
from sphinx.util.parallel import ParallelProcess, parallel_available
from sphinx.util.parallel import ParallelChunked, parallel_available
from sphinx.util.websupport import is_commentable
from sphinx.errors import SphinxError, ExtensionError
from sphinx.locale import _
@ -619,16 +619,16 @@ class BuildEnvironment:
warnings.extend(otherenv.warnings)
self.merge_info_from(docs, otherenv, app)
proc = ParallelProcess(read_process, merge, nproc)
proc = ParallelChunked(read_process, merge, nproc)
proc.set_arguments(docnames)
warnings = []
for chunk in app.status_iterator(proc.spawn(), 'reading sources... ',
purple, proc.nchunks):
for chunk in app.status_iterator(
proc.iter_chunks(), 'reading sources... ', purple, proc.nchunks):
pass # spawning in the iterator
# make sure all threads have finished
app.info(bold('waiting for workers... '))
app.info(bold('waiting for workers...'))
proc.join()
for warning in warnings:

View File

@ -26,20 +26,102 @@ from sphinx.errors import SphinxParallelError
parallel_available = multiprocessing and (os.name == 'posix')
class ParallelProcess(object):
class SerialTasks(object):
"""Has the same interface as ParallelTasks, but executes tasks directly."""
def __init__(self, process_func, result_func, nproc, maxbatch=10):
self.process_func = process_func
self.result_func = result_func
def __init__(self, nproc=1):
pass
def add_task(self, task_func, arg=None, result_func=None):
if arg is not None:
res = task_func(arg)
else:
res = task_func()
if result_func:
result_func(res)
def join(self):
pass
class ParallelTasks(object):
"""Executes *nproc* tasks in parallel after forking."""
def __init__(self, nproc):
self.nproc = nproc
self.maxbatch = maxbatch
# list of threads to join when waiting for completion
self._threads = []
self._chunks = []
self.nchunks = 0
self._nthreads = 0
# queue of result objects to process
self.result_queue = queue.Queue()
self._nprocessed = 0
# maps tasks to result functions
self._result_funcs = {}
# allow only "nproc" worker processes at once
self._semaphore = threading.Semaphore(self.nproc)
def _process_thread(self, tid, func, arg):
def process(pipe, arg):
try:
if arg is None:
ret = func()
else:
ret = func(arg)
pipe.send((False, ret))
except BaseException as err:
pipe.send((True, (err, traceback.format_exc())))
precv, psend = multiprocessing.Pipe(False)
proc = multiprocessing.Process(target=process, args=(psend, arg))
proc.start()
result = precv.recv()
self.result_queue.put((tid, arg) + result)
proc.join()
self._semaphore.release()
def add_task(self, task_func, arg=None, result_func=None):
tid = len(self._threads)
self._semaphore.acquire()
t = threading.Thread(target=self._process_thread,
args=(tid, task_func, arg))
t.setDaemon(True)
t.start()
self._nthreads += 1
self._threads.append(t)
self._result_funcs[tid] = result_func or (lambda *x: None)
# try processing results already in parallel
try:
tid, arg, exc, result = self.result_queue.get(False)
except queue.Empty:
pass
else:
if exc:
raise SphinxParallelError(*result)
self._result_funcs.pop(tid)(arg, result)
self._nprocessed += 1
def join(self):
while self._nprocessed < self._nthreads:
tid, arg, exc, result = self.result_queue.get()
if exc:
raise SphinxParallelError(*result)
self._result_funcs.pop(tid)(arg, result)
self._nprocessed += 1
for t in self._threads:
t.join()
class ParallelChunked(ParallelTasks):
"""Executes chunks of a list of arguments in parallel."""
def __init__(self, process_func, result_func, nproc, maxbatch=10):
ParallelTasks.__init__(self, nproc)
self.process_func = process_func
self.result_func = result_func
self.maxbatch = maxbatch
self._chunks = []
self.nchunks = 0
def set_arguments(self, arguments):
# determine how many documents to read in one go
@ -54,53 +136,8 @@ class ParallelProcess(object):
self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
self.nchunks = len(self._chunks)
def spawn(self):
def iter_chunks(self):
assert self._chunks
def process(pipe, chunk):
try:
ret = self.process_func(chunk)
pipe.send((False, ret))
except BaseException as err:
pipe.send((True, (err, traceback.format_exc())))
def process_thread(chunk):
precv, psend = multiprocessing.Pipe(False)
proc = multiprocessing.Process(target=process, args=(psend, chunk))
proc.start()
result = precv.recv()
self.result_queue.put((chunk,) + result)
proc.join()
semaphore.release()
# allow only "nproc" worker processes at once
semaphore = threading.Semaphore(self.nproc)
for chunk in self._chunks:
yield chunk
semaphore.acquire()
t = threading.Thread(target=process_thread, args=(chunk,))
t.setDaemon(True)
t.start()
self._threads.append(t)
# try processing results already in parallel
try:
chunk, exc, result = self.result_queue.get(False)
except queue.Empty:
pass
else:
if exc:
raise SphinxParallelError(*result)
self.result_func(chunk, result)
self._nprocessed += 1
def join(self):
while self._nprocessed < self.nchunks:
chunk, exc, result = self.result_queue.get()
if exc:
raise SphinxParallelError(*result)
self.result_func(chunk, result)
self._nprocessed += 1
for t in self._threads:
t.join()
self.add_task(self.process_func, chunk, self.result_func)