Fix a memory leak in the parallel impl. ParallelChunked is now unnecessary.

This commit is contained in:
Georg Brandl 2014-09-22 19:53:19 +02:00
parent 339c6f1335
commit 4197144f96
3 changed files with 51 additions and 61 deletions

View File

@ -23,7 +23,7 @@ from docutils import nodes
from sphinx.util import i18n, path_stabilize
from sphinx.util.osutil import SEP, relative_uri, find_catalog
from sphinx.util.console import bold, darkgreen
from sphinx.util.parallel import ParallelChunked, ParallelTasks, SerialTasks, \
from sphinx.util.parallel import ParallelTasks, SerialTasks, make_chunks, \
parallel_available
# side effect: registers roles and directives
@ -358,7 +358,7 @@ class Builder(object):
self.write_doc(docname, doctree)
return warnings
def process_warnings(docs, wlist):
def add_warnings(docs, wlist):
warnings.extend(wlist)
# warm up caches/compile templates using the first document
@ -367,19 +367,21 @@ class Builder(object):
self.write_doc_serialized(firstname, doctree)
self.write_doc(firstname, doctree)
proc = ParallelChunked(write_process, process_warnings, nproc)
proc.set_arguments(docnames)
tasks = ParallelTasks(nproc)
chunks = make_chunks(docnames, nproc)
for chunk in self.app.status_iterator(
proc.iter_chunks(), 'writing output... ', darkgreen, proc.nchunks):
chunks, 'writing output... ', darkgreen, len(chunks)):
arg = []
for i, docname in enumerate(chunk):
doctree = self.env.get_and_resolve_doctree(docname, self)
self.write_doc_serialized(docname, doctree)
chunk[i] = (docname, doctree)
arg.append((docname, doctree))
tasks.add_task(write_process, arg, add_warnings)
# make sure all threads have finished
self.info(bold('waiting for workers...'))
proc.join()
tasks.join()
def prepare_writing(self, docnames):
"""A place where you can add logic before :meth:`write_doc` is run"""

View File

@ -42,7 +42,7 @@ from sphinx.util.nodes import clean_astext, make_refnode, WarningStream
from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding
from sphinx.util.console import bold, purple
from sphinx.util.matching import compile_matchers
from sphinx.util.parallel import ParallelChunked, parallel_available
from sphinx.util.parallel import ParallelTasks, parallel_available, make_chunks
from sphinx.util.websupport import is_commentable
from sphinx.errors import SphinxError, ExtensionError
from sphinx.locale import _
@ -619,17 +619,17 @@ class BuildEnvironment:
warnings.extend(otherenv.warnings)
self.merge_info_from(docs, otherenv, app)
proc = ParallelChunked(read_process, merge, nproc)
proc.set_arguments(docnames)
tasks = ParallelTasks(nproc)
chunks = make_chunks(docnames, nproc)
warnings = []
for chunk in app.status_iterator(
proc.iter_chunks(), 'reading sources... ', purple, proc.nchunks):
pass # spawning in the iterator
chunks, 'reading sources... ', purple, len(chunks)):
tasks.add_task(read_process, chunk, merge)
# make sure all threads have finished
app.info(bold('waiting for workers...'))
proc.join()
tasks.join()
for warning in warnings:
self._warnfunc(*warning)

View File

@ -50,7 +50,8 @@ class ParallelTasks(object):
def __init__(self, nproc):
self.nproc = nproc
# list of threads to join when waiting for completion
self._threads = []
self._taskid = 0
self._threads = {}
self._nthreads = 0
# queue of result objects to process
self.result_queue = queue.Queue()
@ -60,19 +61,20 @@ class ParallelTasks(object):
# allow only "nproc" worker processes at once
self._semaphore = threading.Semaphore(self.nproc)
def _process_thread(self, tid, func, arg):
def process(pipe, arg):
try:
if arg is None:
ret = func()
else:
ret = func(arg)
pipe.send((False, ret))
except BaseException as err:
pipe.send((True, (err, traceback.format_exc())))
def _process(self, pipe, func, arg):
try:
if arg is None:
ret = func()
else:
ret = func(arg)
pipe.send((False, ret))
except BaseException as err:
pipe.send((True, (err, traceback.format_exc())))
def _process_thread(self, tid, func, arg):
precv, psend = multiprocessing.Pipe(False)
proc = multiprocessing.Process(target=process, args=(psend, arg))
proc = multiprocessing.Process(target=self._process,
args=(psend, func, arg))
proc.start()
result = precv.recv()
self.result_queue.put((tid, arg) + result)
@ -80,14 +82,15 @@ class ParallelTasks(object):
self._semaphore.release()
def add_task(self, task_func, arg=None, result_func=None):
tid = len(self._threads)
tid = self._taskid
self._taskid += 1
self._semaphore.acquire()
t = threading.Thread(target=self._process_thread,
args=(tid, task_func, arg))
t.setDaemon(True)
t.start()
thread = threading.Thread(target=self._process_thread,
args=(tid, task_func, arg))
thread.setDaemon(True)
thread.start()
self._nthreads += 1
self._threads.append(t)
self._threads[tid] = thread
self._result_funcs[tid] = result_func or (lambda *x: None)
# try processing results already in parallel
try:
@ -95,6 +98,7 @@ class ParallelTasks(object):
except queue.Empty:
pass
else:
del self._threads[tid]
if exc:
raise SphinxParallelError(*result)
self._result_funcs.pop(tid)(arg, result)
@ -103,41 +107,25 @@ class ParallelTasks(object):
def join(self):
while self._nprocessed < self._nthreads:
tid, arg, exc, result = self.result_queue.get()
del self._threads[tid]
if exc:
raise SphinxParallelError(*result)
self._result_funcs.pop(tid)(arg, result)
self._nprocessed += 1
for t in self._threads:
# there shouldn't be any threads left...
for t in self._threads.values():
t.join()
class ParallelChunked(ParallelTasks):
"""Executes chunks of a list of arguments in parallel."""
def __init__(self, process_func, result_func, nproc, maxbatch=10):
ParallelTasks.__init__(self, nproc)
self.process_func = process_func
self.result_func = result_func
self.maxbatch = maxbatch
self._chunks = []
self.nchunks = 0
def set_arguments(self, arguments):
# determine how many documents to read in one go
nargs = len(arguments)
chunksize = min(nargs // self.nproc, self.maxbatch)
if chunksize == 0:
chunksize = 1
nchunks, rest = divmod(nargs, chunksize)
if rest:
nchunks += 1
# partition documents in "chunks" that will be written by one Process
self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
self.nchunks = len(self._chunks)
def iter_chunks(self):
assert self._chunks
for chunk in self._chunks:
yield chunk
self.add_task(self.process_func, chunk, self.result_func)
def make_chunks(arguments, nproc, maxbatch=10):
# determine how many documents to read in one go
nargs = len(arguments)
chunksize = min(nargs // nproc, maxbatch)
if chunksize == 0:
chunksize = 1
nchunks, rest = divmod(nargs, chunksize)
if rest:
nchunks += 1
# partition documents in "chunks" that will be written by one Process
return [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]