mirror of
https://github.com/sphinx-doc/sphinx.git
synced 2025-02-25 18:55:22 -06:00
Fix #2177: Remove parallel hangs
This commit is contained in:
parent
d044268d60
commit
8d373b9ab1
1
CHANGES
1
CHANGES
@ -55,6 +55,7 @@ Release 1.3.3 (released Dec 2, 2015)
|
|||||||
Bugs fixed
|
Bugs fixed
|
||||||
----------
|
----------
|
||||||
|
|
||||||
|
* #2177: Fix parallel hangs
|
||||||
* #2012: Fix exception occurred if ``numfig_format`` is invalid
|
* #2012: Fix exception occurred if ``numfig_format`` is invalid
|
||||||
* #2142: Provide non-minified JS code in ``sphinx/search/non-minified-js/*.js`` for
|
* #2142: Provide non-minified JS code in ``sphinx/search/non-minified-js/*.js`` for
|
||||||
source distribution on PyPI.
|
source distribution on PyPI.
|
||||||
|
@ -14,9 +14,8 @@ from os import path
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
import threading
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
multiprocessing = threading = None
|
multiprocessing = None
|
||||||
|
|
||||||
from docutils import nodes
|
from docutils import nodes
|
||||||
|
|
||||||
|
@ -14,11 +14,10 @@ import traceback
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
import threading
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
multiprocessing = threading = None
|
multiprocessing = None
|
||||||
|
|
||||||
from six.moves import queue
|
from math import sqrt
|
||||||
|
|
||||||
from sphinx.errors import SphinxParallelError
|
from sphinx.errors import SphinxParallelError
|
||||||
|
|
||||||
@ -49,17 +48,22 @@ class ParallelTasks(object):
|
|||||||
|
|
||||||
def __init__(self, nproc):
|
def __init__(self, nproc):
|
||||||
self.nproc = nproc
|
self.nproc = nproc
|
||||||
# list of threads to join when waiting for completion
|
# main task performed by each task, returning result
|
||||||
|
self._task_func = 0
|
||||||
|
# (optional) function performed by each task on the result of main task
|
||||||
|
self._result_func = 0
|
||||||
|
# task arguments
|
||||||
|
self._args = {}
|
||||||
|
# list of subprocesses (both started and waiting)
|
||||||
|
self._procs = {}
|
||||||
|
# list of receiving pipe connections of running subprocesses
|
||||||
|
self._precvs = {}
|
||||||
|
# list of receiving pipe connections of waiting subprocesses
|
||||||
|
self._precvsWaiting = {}
|
||||||
|
# number of working subprocesses
|
||||||
|
self._pworking = 0
|
||||||
|
# task number of each subprocess
|
||||||
self._taskid = 0
|
self._taskid = 0
|
||||||
self._threads = {}
|
|
||||||
self._nthreads = 0
|
|
||||||
# queue of result objects to process
|
|
||||||
self.result_queue = queue.Queue()
|
|
||||||
self._nprocessed = 0
|
|
||||||
# maps tasks to result functions
|
|
||||||
self._result_funcs = {}
|
|
||||||
# allow only "nproc" worker processes at once
|
|
||||||
self._semaphore = threading.Semaphore(self.nproc)
|
|
||||||
|
|
||||||
def _process(self, pipe, func, arg):
|
def _process(self, pipe, func, arg):
|
||||||
try:
|
try:
|
||||||
@ -71,55 +75,44 @@ class ParallelTasks(object):
|
|||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
pipe.send((True, (err, traceback.format_exc())))
|
pipe.send((True, (err, traceback.format_exc())))
|
||||||
|
|
||||||
def _process_thread(self, tid, func, arg):
|
def _result_func_wrapper(self, arg, result):
|
||||||
precv, psend = multiprocessing.Pipe(False)
|
result_func = self._result_func(arg, result)
|
||||||
proc = multiprocessing.Process(target=self._process,
|
if result_func:
|
||||||
args=(psend, func, arg))
|
result_func(result)
|
||||||
proc.start()
|
|
||||||
result = precv.recv()
|
|
||||||
self.result_queue.put((tid, arg) + result)
|
|
||||||
proc.join()
|
|
||||||
self._semaphore.release()
|
|
||||||
|
|
||||||
def add_task(self, task_func, arg=None, result_func=None):
|
def add_task(self, task_func, arg=None, result_func=None):
|
||||||
|
self._task_func = task_func # dummy code after first call
|
||||||
|
self._result_func = result_func or (lambda *x: None) # dummy code after first call
|
||||||
tid = self._taskid
|
tid = self._taskid
|
||||||
self._taskid += 1
|
self._taskid += 1
|
||||||
self._semaphore.acquire()
|
self._args[tid] = arg
|
||||||
thread = threading.Thread(target=self._process_thread,
|
precv, psend = multiprocessing.Pipe(False)
|
||||||
args=(tid, task_func, arg))
|
proc = multiprocessing.Process(target=self._process,
|
||||||
thread.setDaemon(True)
|
args=(psend, self._task_func, arg))
|
||||||
thread.start()
|
self._procs[tid] = proc
|
||||||
self._nthreads += 1
|
if self._pworking < self.nproc:
|
||||||
self._threads[tid] = thread
|
self._precvs[tid] = precv
|
||||||
self._result_funcs[tid] = result_func or (lambda *x: None)
|
self._pworking += 1
|
||||||
# try processing results already in parallel
|
proc.start()
|
||||||
try:
|
|
||||||
tid, arg, exc, result = self.result_queue.get(False)
|
|
||||||
except queue.Empty:
|
|
||||||
pass
|
|
||||||
else:
|
else:
|
||||||
del self._threads[tid]
|
self._precvsWaiting[tid] = precv
|
||||||
if exc:
|
|
||||||
raise SphinxParallelError(*result)
|
|
||||||
result_func = self._result_funcs.pop(tid)(arg, result)
|
|
||||||
if result_func:
|
|
||||||
result_func(result)
|
|
||||||
self._nprocessed += 1
|
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
while self._nprocessed < self._nthreads:
|
while self._pworking:
|
||||||
tid, arg, exc, result = self.result_queue.get()
|
for tid, pipe in self._precvs.items():
|
||||||
del self._threads[tid]
|
if pipe.poll():
|
||||||
|
exc, result = pipe.recv()
|
||||||
if exc:
|
if exc:
|
||||||
raise SphinxParallelError(*result)
|
raise SphinxParallelError(*result)
|
||||||
result_func = self._result_funcs.pop(tid)(arg, result)
|
self._result_func_wrapper(self._args[tid], result)
|
||||||
if result_func:
|
self._procs[tid].join()
|
||||||
result_func(result)
|
if len(self._precvsWaiting) > 0:
|
||||||
self._nprocessed += 1
|
newtid, newprecv = self._precvsWaiting.popitem()
|
||||||
|
self._precvs[newtid] = newprecv
|
||||||
# there shouldn't be any threads left...
|
self._procs[newtid].start()
|
||||||
for t in self._threads.values():
|
break
|
||||||
t.join()
|
else:
|
||||||
|
self._pworking -= 1
|
||||||
|
|
||||||
|
|
||||||
def make_chunks(arguments, nproc, maxbatch=10):
|
def make_chunks(arguments, nproc, maxbatch=10):
|
||||||
@ -128,6 +121,9 @@ def make_chunks(arguments, nproc, maxbatch=10):
|
|||||||
chunksize = min(nargs // nproc, maxbatch)
|
chunksize = min(nargs // nproc, maxbatch)
|
||||||
if chunksize == 0:
|
if chunksize == 0:
|
||||||
chunksize = 1
|
chunksize = 1
|
||||||
|
if chunksize == maxbatch:
|
||||||
|
# try to improve batch size vs. number of batches
|
||||||
|
chunksize = int(sqrt(nargs/nproc * maxbatch))
|
||||||
nchunks, rest = divmod(nargs, chunksize)
|
nchunks, rest = divmod(nargs, chunksize)
|
||||||
if rest:
|
if rest:
|
||||||
nchunks += 1
|
nchunks += 1
|
||||||
|
Loading…
Reference in New Issue
Block a user