mirror of
https://github.com/sphinx-doc/sphinx.git
synced 2025-02-25 18:55:22 -06:00
More parallel code optimizations.
This commit is contained in:
parent
9cdaf3406f
commit
580405c714
@ -10,14 +10,16 @@
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from math import sqrt
|
||||
|
||||
try:
|
||||
import multiprocessing
|
||||
except ImportError:
|
||||
multiprocessing = None
|
||||
|
||||
from math import sqrt
|
||||
from six import iteritems
|
||||
|
||||
from sphinx.errors import SphinxParallelError
|
||||
|
||||
@ -48,10 +50,8 @@ class ParallelTasks(object):
|
||||
|
||||
def __init__(self, nproc):
|
||||
self.nproc = nproc
|
||||
# main task performed by each task, returning result
|
||||
self._task_func = 0
|
||||
# (optional) function performed by each task on the result of main task
|
||||
self._result_func = 0
|
||||
self._result_funcs = {}
|
||||
# task arguments
|
||||
self._args = {}
|
||||
# list of subprocesses (both started and waiting)
|
||||
@ -75,55 +75,50 @@ class ParallelTasks(object):
|
||||
except BaseException as err:
|
||||
pipe.send((True, (err, traceback.format_exc())))
|
||||
|
||||
def _result_func_wrapper(self, arg, result):
|
||||
result_func = self._result_func(arg, result)
|
||||
if result_func:
|
||||
result_func(result)
|
||||
|
||||
def add_task(self, task_func, arg=None, result_func=None):
|
||||
self._task_func = task_func # dummy code after first call
|
||||
self._result_func = result_func or (lambda *x: None) # dummy code after first call
|
||||
tid = self._taskid
|
||||
self._taskid += 1
|
||||
self._result_funcs[tid] = result_func or (lambda arg: None)
|
||||
self._args[tid] = arg
|
||||
precv, psend = multiprocessing.Pipe(False)
|
||||
proc = multiprocessing.Process(target=self._process,
|
||||
args=(psend, self._task_func, arg))
|
||||
args=(psend, task_func, arg))
|
||||
self._procs[tid] = proc
|
||||
if self._pworking < self.nproc:
|
||||
self._precvs[tid] = precv
|
||||
self._pworking += 1
|
||||
proc.start()
|
||||
else:
|
||||
self._precvsWaiting[tid] = precv
|
||||
self._precvsWaiting[tid] = precv
|
||||
self._join_one()
|
||||
|
||||
def join(self):
|
||||
while self._pworking:
|
||||
for tid, pipe in self._precvs.items():
|
||||
if pipe.poll():
|
||||
exc, result = pipe.recv()
|
||||
if exc:
|
||||
raise SphinxParallelError(*result)
|
||||
self._result_func_wrapper(self._args[tid], result)
|
||||
self._procs[tid].join()
|
||||
if len(self._precvsWaiting) > 0:
|
||||
newtid, newprecv = self._precvsWaiting.popitem()
|
||||
self._precvs[newtid] = newprecv
|
||||
self._procs[newtid].start()
|
||||
break
|
||||
else:
|
||||
self._pworking -= 1
|
||||
self._join_one()
|
||||
|
||||
def _join_one(self):
|
||||
for tid, pipe in iteritems(self._precvs):
|
||||
if pipe.poll():
|
||||
exc, result = pipe.recv()
|
||||
if exc:
|
||||
raise SphinxParallelError(*result)
|
||||
self._result_funcs.pop(tid)(self._args.pop(tid), result)
|
||||
self._procs[tid].join()
|
||||
self._pworking -= 1
|
||||
break
|
||||
else:
|
||||
time.sleep(0.02)
|
||||
while self._precvsWaiting and self._pworking < self.nproc:
|
||||
newtid, newprecv = self._precvsWaiting.popitem()
|
||||
self._precvs[newtid] = newprecv
|
||||
self._procs[newtid].start()
|
||||
self._pworking += 1
|
||||
|
||||
|
||||
def make_chunks(arguments, nproc, maxbatch=10):
|
||||
# determine how many documents to read in one go
|
||||
nargs = len(arguments)
|
||||
chunksize = min(nargs // nproc, maxbatch)
|
||||
if chunksize == 0:
|
||||
chunksize = 1
|
||||
if chunksize == maxbatch:
|
||||
chunksize = nargs // nproc
|
||||
if chunksize >= maxbatch:
|
||||
# try to improve batch size vs. number of batches
|
||||
chunksize = int(sqrt(nargs/nproc * maxbatch))
|
||||
if chunksize == 0:
|
||||
chunksize = 1
|
||||
nchunks, rest = divmod(nargs, chunksize)
|
||||
if rest:
|
||||
nchunks += 1
|
||||
|
Loading…
Reference in New Issue
Block a user