diff --git a/CHANGES b/CHANGES index 8c935097d..bcfb47f34 100644 --- a/CHANGES +++ b/CHANGES @@ -55,6 +55,7 @@ Release 1.3.3 (released Dec 2, 2015) Bugs fixed ---------- +* #2177: Fix parallel hangs * #2012: Fix exception occurred if ``numfig_format`` is invalid * #2142: Provide non-minified JS code in ``sphinx/search/non-minified-js/*.js`` for source distribution on PyPI. diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py index eebd6af64..2688f8466 100644 --- a/sphinx/builders/__init__.py +++ b/sphinx/builders/__init__.py @@ -14,9 +14,8 @@ from os import path try: import multiprocessing - import threading except ImportError: - multiprocessing = threading = None + multiprocessing = None from docutils import nodes diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py index 1d1e0a098..a6985c86b 100644 --- a/sphinx/util/parallel.py +++ b/sphinx/util/parallel.py @@ -14,11 +14,10 @@ import traceback try: import multiprocessing - import threading except ImportError: - multiprocessing = threading = None + multiprocessing = None -from six.moves import queue +from math import sqrt from sphinx.errors import SphinxParallelError @@ -49,17 +48,22 @@ class ParallelTasks(object): def __init__(self, nproc): self.nproc = nproc - # list of threads to join when waiting for completion + # main task performed by each task, returning result + self._task_func = 0 + # (optional) function performed by each task on the result of main task + self._result_func = 0 + # task arguments + self._args = {} + # list of subprocesses (both started and waiting) + self._procs = {} + # list of receiving pipe connections of running subprocesses + self._precvs = {} + # list of receiving pipe connections of waiting subprocesses + self._precvsWaiting = {} + # number of working subprocesses + self._pworking = 0 + # task number of each subprocess self._taskid = 0 - self._threads = {} - self._nthreads = 0 - # queue of result objects to process - self.result_queue = queue.Queue() - self._nprocessed = 0 - # maps tasks to result functions - self._result_funcs = {} - # allow only "nproc" worker processes at once - self._semaphore = threading.Semaphore(self.nproc) def _process(self, pipe, func, arg): try: @@ -71,55 +75,44 @@ class ParallelTasks(object): except BaseException as err: pipe.send((True, (err, traceback.format_exc()))) - def _process_thread(self, tid, func, arg): - precv, psend = multiprocessing.Pipe(False) - proc = multiprocessing.Process(target=self._process, - args=(psend, func, arg)) - proc.start() - result = precv.recv() - self.result_queue.put((tid, arg) + result) - proc.join() - self._semaphore.release() + def _result_func_wrapper(self, arg, result): + result_func = self._result_func(arg, result) + if result_func: + result_func(result) def add_task(self, task_func, arg=None, result_func=None): + self._task_func = task_func # dummy code after first call + self._result_func = result_func or (lambda *x: None) # dummy code after first call tid = self._taskid self._taskid += 1 - self._semaphore.acquire() - thread = threading.Thread(target=self._process_thread, - args=(tid, task_func, arg)) - thread.setDaemon(True) - thread.start() - self._nthreads += 1 - self._threads[tid] = thread - self._result_funcs[tid] = result_func or (lambda *x: None) - # try processing results already in parallel - try: - tid, arg, exc, result = self.result_queue.get(False) - except queue.Empty: - pass + self._args[tid] = arg + precv, psend = multiprocessing.Pipe(False) + proc = multiprocessing.Process(target=self._process, + args=(psend, self._task_func, arg)) + self._procs[tid] = proc + if self._pworking < self.nproc: + self._precvs[tid] = precv + self._pworking += 1 + proc.start() else: - del self._threads[tid] - if exc: - raise SphinxParallelError(*result) - result_func = self._result_funcs.pop(tid)(arg, result) - if result_func: - result_func(result) - self._nprocessed += 1 + self._precvsWaiting[tid] = precv def join(self): - while self._nprocessed < self._nthreads: - tid, arg, exc, result = self.result_queue.get() - del self._threads[tid] - if exc: - raise SphinxParallelError(*result) - result_func = self._result_funcs.pop(tid)(arg, result) - if result_func: - result_func(result) - self._nprocessed += 1 - - # there shouldn't be any threads left... - for t in self._threads.values(): - t.join() + while self._pworking: + for tid, pipe in self._precvs.items(): + if pipe.poll(): + exc, result = pipe.recv() + if exc: + raise SphinxParallelError(*result) + self._result_func_wrapper(self._args[tid], result) + self._procs[tid].join() + if len(self._precvsWaiting) > 0: + newtid, newprecv = self._precvsWaiting.popitem() + self._precvs[newtid] = newprecv + self._procs[newtid].start() + break + else: + self._pworking -= 1 def make_chunks(arguments, nproc, maxbatch=10): @@ -128,6 +121,9 @@ def make_chunks(arguments, nproc, maxbatch=10): chunksize = min(nargs // nproc, maxbatch) if chunksize == 0: chunksize = 1 + if chunksize == maxbatch: + # try to improve batch size vs. number of batches + chunksize = int(sqrt(nargs/nproc * maxbatch)) nchunks, rest = divmod(nargs, chunksize) if rest: nchunks += 1