builder: implement parallel writing based on multiprocessing

Does not work completely yet: globals such as the search index and
images for HTML are not updated properly: this needs a new API.
This commit is contained in:
Georg Brandl 2013-01-13 17:27:09 +01:00
parent 2e4608039c
commit 53d1d79ff8

View File

@ -12,6 +12,12 @@
import os
from os import path
try:
import multiprocessing
import threading
except ImportError:
multiprocessing = threading = None
from docutils import nodes
from sphinx.util.osutil import SEP, relative_uri
@ -33,6 +39,8 @@ class Builder(object):
format = ''
# doctree versioning method
versioning_method = 'none'
# allow parallel write_doc calls
allow_parallel = True
def __init__(self, app):
self.env = app.env
@ -289,16 +297,75 @@ class Builder(object):
self.prepare_writing(docnames)
self.info('done')
# write target files
warnings = []
self.env.set_warnfunc(lambda *args: warnings.append(args))
# check for prerequisites to parallel build
# (parallel only works on POSIX, because the forking impl of
# multiprocessing is required)
if not (multiprocessing and
self.app.parallel and
self.allow_parallel and
os.name == 'posix'):
self._write_serial(sorted(docnames), warnings)
else:
self._write_parallel(sorted(docnames), warnings, self.app.parallel)
self.env.set_warnfunc(self.warn)
def _write_serial(self, docnames, warnings):
for docname in self.status_iterator(
sorted(docnames), 'writing output... ', darkgreen, len(docnames)):
docnames, 'writing output... ', darkgreen, len(docnames)):
doctree = self.env.get_and_resolve_doctree(docname, self)
self.write_doc(docname, doctree)
for warning in warnings:
self.warn(*warning)
self.env.set_warnfunc(self.warn)
def _write_parallel(self, docnames, warnings, nproc):
def write_process(docnames):
try:
for docname in docnames:
doctree = self.env.get_and_resolve_doctree(docname, self)
self.write_doc(docname, doctree)
for warning in warnings:
self.warn(*warning)
except KeyboardInterrupt:
pass # do not print a traceback on Ctrl-C
def process_thread(docnames):
p = multiprocessing.Process(target=write_process, args=(docnames,))
p.start()
p.join()
semaphore.release()
# allow only "nproc" worker processes at once
semaphore = threading.Semaphore(nproc)
# list of threads to join when waiting for completion
threads = []
# warm up caches/compile templates using the first docname
write_process([docnames[0]])
docnames = docnames[1:]
ndocs = len(docnames)
# determine how many documents to write in one go
chunksize = min(ndocs // nproc, 10)
nchunks, rest = divmod(ndocs, chunksize)
if rest:
nchunks += 1
# partition documents in "chunks" that will be written by one Process
chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
for docnames in self.status_iterator(
chunks, 'writing output... ', darkgreen, len(chunks),
lambda chk: '%s .. %s' % (chk[0], chk[-1])):
semaphore.acquire()
# start a new thread to oversee the completion of this chunk
t = threading.Thread(target=process_thread, args=(docnames,))
t.setDaemon(True)
t.start()
threads.append(t)
# make sure all threads have finished
self.info(bold('waiting for workers... '))#, nonl=True)
for t in threads:
t.join()
def prepare_writing(self, docnames):
raise NotImplementedError