[WIP] parallel read

This commit is contained in:
Georg Brandl 2014-09-22 14:51:47 +02:00
parent 905cbf853d
commit 31452fc64d
29 changed files with 311 additions and 87 deletions

View File

@ -442,7 +442,8 @@ handlers to the events. Example:
Emitted after the environment has determined the list of all added and
changed files and just before it reads them. It allows extension authors to
reorder the list of docnames (*inplace*) before processing, or add more
docnames that Sphinx did not consider changed.
docnames that Sphinx did not consider changed (but never add any docnames
that are not in ``env.found_docs``).
You can also remove document names; do this with caution since it will make
Sphinx treat changed files as unchanged.

View File

@ -33,6 +33,13 @@ as metadata of the extension. Metadata keys currently recognized are:
* ``'version'``: a string that identifies the extension version. It is used for
extension version requirement checking (see :confval:`needs_extensions`) and
informational purposes. If not given, ``"unknown version"`` is substituted.
* ``'parallel_read_safe'``: a boolean that specifies if parallel reading of
source files can be used when the extension is loaded. It defaults to
``False``, i.e. you have to explicitly specify your extension to be
parallel-read-safe after checking that it is.
* ``'parallel_write_safe'``: a boolean that specifies if parallel writing of
output files can be used when the extension is loaded. Since extensions
usually don't negatively influence the process, this defaults to ``True``.
APIs used for writing extensions
--------------------------------

View File

@ -53,6 +53,7 @@ events = {
'env-before-read-docs': 'env, docnames',
'source-read': 'docname, source text',
'doctree-read': 'the doctree before being pickled',
'env-merge-info': 'env, read docnames, other env instance',
'missing-reference': 'env, node, contnode',
'doctree-resolved': 'doctree, docname',
'env-updated': 'env',

View File

@ -238,18 +238,11 @@ class Builder(object):
# while reading, collect all warnings from docutils
warnings = []
self.env.set_warnfunc(lambda *args: warnings.append(args))
self.info(bold('updating environment: '), nonl=1)
msg, length, iterator = self.env.update(self.config, self.srcdir,
updated_docnames = self.env.update(self.config, self.srcdir,
self.doctreedir, self.app)
self.info(msg)
for docname in self.status_iterator(iterator, 'reading sources... ',
purple, length):
updated_docnames.add(docname)
# nothing further to do, the environment has already
# done the reading
self.env.set_warnfunc(self.warn)
for warning in warnings:
self.warn(*warning)
self.env.set_warnfunc(self.warn)
doccount = len(updated_docnames)
self.info(bold('looking for now-outdated files... '), nonl=1)
@ -325,17 +318,25 @@ class Builder(object):
# check for prerequisites to parallel build
# (parallel only works on POSIX, because the forking impl of
# multiprocessing is required)
if not (multiprocessing and
if (multiprocessing and
self.app.parallel > 1 and
self.allow_parallel and
os.name == 'posix'):
self._write_serial(sorted(docnames), warnings)
else:
for extname, md in self.app._extension_metadata.items():
par_ok = md.get('parallel_write_safe', True)
if not par_ok:
self.app.warn('the %s extension is not safe for parallel '
'writing, doing serial read' % extname)
break
else: # means no break, means everything is safe
# number of subprocesses is parallel-1 because the main process
# is busy loading doctrees and doing write_doc_serialized()
self._write_parallel(sorted(docnames), warnings,
nproc=self.app.parallel - 1)
self.env.set_warnfunc(self.warn)
return
self._write_serial(sorted(docnames), warnings)
self.env.set_warnfunc(self.warn)
def _write_serial(self, docnames, warnings):
for docname in self.app.status_iterator(

View File

@ -202,6 +202,14 @@ class Domain(object):
"""Remove traces of a document in the domain-specific inventories."""
pass
def merge_domaindata(self, docnames, otherdata):
"""Merge in data regarding *docnames* from a different domaindata
inventory.
"""
raise NotImplementedError('merge_domaindata must be implemented in %s '
'to be able to do parallel builds!' %
self.__class__)
def process_doc(self, env, docname, document):
"""Process a document after it is read by the environment."""
pass

View File

@ -269,6 +269,12 @@ class CDomain(Domain):
if fn == docname:
del self.data['objects'][fullname]
def merge_domaindata(self, docnames, otherdata):
# XXX check duplicates
for fullname, (fn, objtype) in otherdata['objects'].items():
if fn in docnames:
self.data['objects'][fullname] = (fn, objtype)
def resolve_xref(self, env, fromdocname, builder,
typ, target, node, contnode):
# strip pointer asterisk

View File

@ -1836,6 +1836,12 @@ class CPPDomain(Domain):
if data[0] == docname:
del self.data['objects'][fullname]
def merge_domaindata(self, docnames, otherdata):
# XXX check duplicates
for fullname, data in otherdata['objects'].items():
if data[0] in docnames:
self.data['objects'][fullname] = data
def _resolve_xref_inner(self, env, fromdocname, builder,
target, node, contnode, warn=True):
def _create_refnode(nameAst):

View File

@ -187,6 +187,12 @@ class JavaScriptDomain(Domain):
if fn == docname:
del self.data['objects'][fullname]
def merge_domaindata(self, docnames, otherdata):
# XXX check duplicates
for fullname, (fn, objtype) in otherdata['objects'].items():
if fn in docnames:
self.data['objects'][fullname] = (fn, objtype)
def find_obj(self, env, obj, name, typ, searchorder=0):
if name[-2:] == '()':
name = name[:-2]

View File

@ -627,6 +627,15 @@ class PythonDomain(Domain):
if fn == docname:
del self.data['modules'][modname]
def merge_domaindata(self, docnames, otherdata):
# XXX check duplicates?
for fullname, (fn, objtype) in otherdata['objects'].items():
if fn in docnames:
self.data['objects'][fullname] = (fn, objtype)
for modname, data in otherdata['modules'].items():
if data[0] in docnames:
self.data['modules'][modname] = data
def find_obj(self, env, modname, classname, name, type, searchmode=0):
"""Find a Python object for "name", perhaps using the given module
and/or classname. Returns a list of (name, object entry) tuples.

View File

@ -123,6 +123,12 @@ class ReSTDomain(Domain):
if doc == docname:
del self.data['objects'][typ, name]
def merge_domaindata(self, docnames, otherdata):
# XXX check duplicates
for (typ, name), doc in otherdata['objects'].items():
if doc in docnames:
self.data['objects'][typ, name] = doc
def resolve_xref(self, env, fromdocname, builder, typ, target, node,
contnode):
objects = self.data['objects']

View File

@ -506,6 +506,21 @@ class StandardDomain(Domain):
if fn == docname:
del self.data['anonlabels'][key]
def merge_domaindata(self, docnames, otherdata):
# XXX duplicates?
for key, data in otherdata['progoptions'].items():
if data[0] in docnames:
self.data['progoptions'][key] = data
for key, data in otherdata['objects'].items():
if data[0] in docnames:
self.data['objects'][key] = data
for key, data in otherdata['labels'].items():
if data[0] in docnames:
self.data['labels'][key] = data
for key, data in otherdata['anonlabels'].items():
if data[0] in docnames:
self.data['anonlabels'][key] = data
def process_doc(self, env, docname, document):
labels, anonlabels = self.data['labels'], self.data['anonlabels']
for name, explicit in iteritems(document.nametypes):

View File

@ -22,8 +22,14 @@ from os import path
from glob import glob
from itertools import groupby
try:
import multiprocessing
import threading
except ImportError:
multiprocessing = threading = None
from six import iteritems, itervalues, text_type, class_types
from six.moves import cPickle as pickle, zip
from six.moves import cPickle as pickle, zip, queue
from docutils import nodes
from docutils.io import FileInput, NullOutput
from docutils.core import Publisher
@ -40,6 +46,7 @@ from sphinx.util import url_re, get_matching_docs, docname_join, split_into, \
FilenameUniqDict
from sphinx.util.nodes import clean_astext, make_refnode, WarningStream
from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding
from sphinx.util.console import bold, purple
from sphinx.util.matching import compile_matchers
from sphinx.util.websupport import is_commentable
from sphinx.errors import SphinxError, ExtensionError
@ -328,6 +335,50 @@ class BuildEnvironment:
for domain in self.domains.values():
domain.clear_doc(docname)
def merge_info_from(self, docnames, other, app):
"""Merge global information gathered about *docnames* while reading them
from the *other* environment.
This possibly comes from a parallel build process.
"""
docnames = set(docnames)
for docname in docnames:
self.all_docs[docname] = other.all_docs[docname]
if docname in other.reread_always:
self.reread_always.add(docname)
self.metadata[docname] = other.metadata[docname]
if docname in other.dependencies:
self.dependencies[docname] = other.dependencies[docname]
self.titles[docname] = other.titles[docname]
self.longtitles[docname] = other.longtitles[docname]
self.tocs[docname] = other.tocs[docname]
self.toc_num_entries[docname] = other.toc_num_entries[docname]
# toc_secnumbers is not assigned during read
if docname in other.toctree_includes:
self.toctree_includes[docname] = other.toctree_includes[docname]
self.indexentries[docname] = other.indexentries[docname]
if docname in other.glob_toctrees:
self.glob_toctrees.add(docname)
if docname in other.numbered_toctrees:
self.numbered_toctrees.add(docname)
self.images.merge_other(docnames, other.images)
self.dlfiles.merge_other(docnames, other.dlfiles)
for subfn, fnset in other.files_to_rebuild.items():
self.files_to_rebuild.setdefault(subfn, set()).update(fnset & docnames)
for key, data in other.citations.items():
# XXX duplicates?
if data[0] in docnames:
self.citations[key] = data
for version, changes in other.versionchanges.items():
self.versionchanges.setdefault(version, []).extend(
change for change in changes if change[1] in docnames)
for domainname, domain in self.domains.items():
domain.merge_domaindata(docnames, other.domaindata[domainname])
app.emit('env-merge-info', self, docnames, other)
def doc2path(self, docname, base=True, suffix=None):
"""Return the filename for the document name.
@ -443,13 +494,11 @@ class BuildEnvironment:
return added, changed, removed
def update(self, config, srcdir, doctreedir, app=None):
def update(self, config, srcdir, doctreedir, app):
"""(Re-)read all files new or changed since last update.
Returns a summary, the total count of documents to reread and an
iterator that yields docnames as it processes them. Store all
environment docnames in the canonical format (ie using SEP as a
separator in place of os.path.sep).
Store all environment docnames in the canonical format (ie using SEP as
a separator in place of os.path.sep).
"""
config_changed = False
if self.config is None:
@ -481,6 +530,8 @@ class BuildEnvironment:
# this cache also needs to be updated every time
self._nitpick_ignore = set(self.config.nitpick_ignore)
app.info(bold('updating environment: '), nonl=1)
added, changed, removed = self.get_outdated_files(config_changed)
# allow user intervention as well
@ -495,33 +546,145 @@ class BuildEnvironment:
msg += '%s added, %s changed, %s removed' % (len(added), len(changed),
len(removed))
app.info(msg)
def update_generator():
self.app = app
# clear all files no longer present
for docname in removed:
if app:
app.emit('env-purge-doc', self, docname)
self.clear_doc(docname)
# read all new and changed files
docnames = sorted(added | changed)
if app:
# allow changing and reordering the list of docs to read
app.emit('env-before-read-docs', self, docnames)
for docname in docnames:
yield docname
self.read_doc(docname, app=app)
# check if we should do parallel or serial read
par_ok = False
if (len(added | changed) > 5 and
multiprocessing and
app.parallel > 1 and
os.name == 'posix'):
par_ok = True
for extname, md in app._extension_metadata.items():
ext_ok = md.get('parallel_read_safe')
if ext_ok:
continue
if ext_ok is None:
app.warn('the %s extension does not declare if it '
'is safe for parallel reading, assuming it '
'isn\'t - please ask the extension author to '
'check and make it explicit' % extname)
app.warn('doing serial read')
else:
app.warn('the %s extension is not safe for parallel '
'reading, doing serial read' % extname)
par_ok = False
break
if par_ok:
self._read_parallel(docnames, app, nproc=app.parallel)
else:
self._read_serial(docnames, app)
if config.master_doc not in self.all_docs:
self.warn(None, 'master file %s not found' %
self.doc2path(config.master_doc))
self.app = None
if app:
app.emit('env-updated', self)
return docnames
return msg, len(added | changed), update_generator()
def _read_serial(self, docnames, app):
for docname in app.status_iterator(docnames, 'reading sources... ',
purple, len(docnames)):
# remove all inventory entries for that file
app.emit('env-purge-doc', self, docname)
self.clear_doc(docname)
self.read_doc(docname, app)
def _read_parallel(self, docnames, app, nproc):
def read_process(docs, pipe):
self.app = app
self.warnings = []
self.set_warnfunc(lambda *args: self.warnings.append(args))
try:
for docname in docs:
self.read_doc(docname, app)
except KeyboardInterrupt:
# XXX return None?
pass # do not print a traceback on Ctrl-C
self.set_warnfunc(None)
del self.app
del self.domains
del self.config.values
del self.config
pipe.send(self)
def process_thread(docs):
precv, psend = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=read_process, args=(docs, psend))
p.start()
# XXX error handling
new_env = precv.recv()
merge_queue.put((docs, new_env))
p.join()
semaphore.release()
# allow only "nproc" worker processes at once
semaphore = threading.Semaphore(nproc)
# list of threads to join when waiting for completion
threads = []
# queue of other env objects to merge
merge_queue = queue.Queue()
# clear all outdated docs at once
for docname in docnames:
app.emit('env-purge-doc', self, docname)
self.clear_doc(docname)
# determine how many documents to read in one go
ndocs = len(docnames)
chunksize = min(ndocs // nproc, 10)
if chunksize == 0:
chunksize = 1
nchunks, rest = divmod(ndocs, chunksize)
if rest:
nchunks += 1
# partition documents in "chunks" that will be written by one Process
chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
warnings = []
merged = 0
for chunk in app.status_iterator(chunks, 'reading sources... ',
purple, len(chunks)):
semaphore.acquire()
t = threading.Thread(target=process_thread, args=(chunk,))
t.setDaemon(True)
t.start()
threads.append(t)
try:
docs, other = merge_queue.get(False)
except queue.Empty:
pass
else:
warnings.extend(other.warnings)
self.merge_info_from(docs, other, app)
merged += 1
while merged < len(chunks):
docs, other = merge_queue.get()
warnings.extend(other.warnings)
self.merge_info_from(docs, other, app)
merged += 1
for warning in warnings:
self._warnfunc(*warning)
# make sure all threads have finished
app.info(bold('waiting for workers... '))
for t in threads:
t.join()
def check_dependents(self, already):
to_rewrite = self.assign_section_numbers()
@ -590,19 +753,8 @@ class BuildEnvironment:
directives.directive = directive
roles.role = role
def read_doc(self, docname, src_path=None, save_parsed=True, app=None):
"""Parse a file and add/update inventory entries for the doctree.
If srcpath is given, read from a different source file.
"""
# remove all inventory entries for that file
if app:
app.emit('env-purge-doc', self, docname)
self.clear_doc(docname)
if src_path is None:
src_path = self.doc2path(docname)
def read_doc(self, docname, app=None):
"""Parse a file and add/update inventory entries for the doctree."""
self.temp_data['docname'] = docname
# defaults to the global default, but can be re-set in a document
@ -639,6 +791,7 @@ class BuildEnvironment:
destination_class=NullOutput)
pub.set_components(None, 'restructuredtext', None)
pub.process_programmatic_settings(None, self.settings, None)
src_path = self.doc2path(docname)
source = SphinxFileInput(app, self, source=None, source_path=src_path,
encoding=self.config.source_encoding)
pub.source = source
@ -706,7 +859,6 @@ class BuildEnvironment:
self.ref_context.clear()
roles._roles.pop('', None) # if a document has set a local default role
if save_parsed:
# save the parsed doctree
doctree_filename = self.doc2path(docname, self.doctreedir,
'.doctree')
@ -718,8 +870,6 @@ class BuildEnvironment:
pickle.dump(doctree, f, pickle.HIGHEST_PROTOCOL)
finally:
f.close()
else:
return doctree
# utilities to use while reading a document

View File

@ -1515,7 +1515,7 @@ def setup(app):
app.add_event('autodoc-process-signature')
app.add_event('autodoc-skip-member')
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}
class testcls:

View File

@ -570,4 +570,4 @@ def setup(app):
app.connect('doctree-read', process_autosummary_toc)
app.connect('builder-inited', process_generate_options)
app.add_config_value('autosummary_generate', [], True)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': False}

View File

@ -265,4 +265,4 @@ def setup(app):
app.add_config_value('coverage_ignore_c_items', {}, False)
app.add_config_value('coverage_write_headline', True, False)
app.add_config_value('coverage_skip_undoc_in_source', False, False)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -443,4 +443,4 @@ def setup(app):
app.add_config_value('doctest_test_doctest_blocks', 'default', False)
app.add_config_value('doctest_global_setup', '', False)
app.add_config_value('doctest_global_cleanup', '', False)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -59,4 +59,4 @@ def setup_link_roles(app):
def setup(app):
app.add_config_value('extlinks', {}, 'env')
app.connect('builder-inited', setup_link_roles)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -323,4 +323,4 @@ def setup(app):
app.add_config_value('graphviz_dot', 'dot', 'html')
app.add_config_value('graphviz_dot_args', [], 'html')
app.add_config_value('graphviz_output_format', 'png', 'html')
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -73,4 +73,4 @@ def setup(app):
app.add_node(ifconfig)
app.add_directive('ifconfig', IfConfig)
app.connect('doctree-resolved', process_ifconfig_nodes)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -408,4 +408,4 @@ def setup(app):
app.add_config_value('inheritance_graph_attrs', {}, False),
app.add_config_value('inheritance_node_attrs', {}, False),
app.add_config_value('inheritance_edge_attrs', {}, False),
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -282,4 +282,4 @@ def setup(app):
app.add_config_value('intersphinx_cache_limit', 5, False)
app.connect('missing-reference', missing_reference)
app.connect('builder-inited', load_mappings)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -57,4 +57,4 @@ def setup(app):
mathbase_setup(app, (html_visit_math, None), (html_visit_displaymath, None))
app.add_config_value('jsmath_path', '', False)
app.connect('builder-inited', builder_inited)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -16,9 +16,11 @@ from sphinx import addnodes
from sphinx.locale import _
from sphinx.errors import SphinxError
class LinkcodeError(SphinxError):
category = "linkcode error"
def doctree_read(app, doctree):
env = app.builder.env
@ -68,7 +70,8 @@ def doctree_read(app, doctree):
classes=['viewcode-link'])
signode += onlynode
def setup(app):
app.connect('doctree-read', doctree_read)
app.add_config_value('linkcode_resolve', None, '')
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': False}

View File

@ -69,4 +69,4 @@ def setup(app):
app.add_config_value('mathjax_inline', [r'\(', r'\)'], 'html')
app.add_config_value('mathjax_display', [r'\[', r'\]'], 'html')
app.connect('builder-inited', builder_inited)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -256,7 +256,7 @@ def setup(app):
for name, (default, rebuild) in iteritems(Config._config_values):
app.add_config_value(name, default, rebuild)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}
def _process_docstring(app, what, name, obj, options, lines):

View File

@ -246,4 +246,4 @@ def setup(app):
app.add_config_value('pngmath_latex_preamble', '', 'html')
app.add_config_value('pngmath_add_tooltips', True, 'html')
app.connect('build-finished', cleanup_tempdir)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': True}

View File

@ -172,4 +172,4 @@ def setup(app):
app.connect('doctree-read', process_todos)
app.connect('doctree-resolved', process_todo_nodes)
app.connect('env-purge-doc', purge_todos)
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': False}

View File

@ -204,4 +204,4 @@ def setup(app):
app.connect('missing-reference', missing_reference)
#app.add_config_value('viewcode_include_modules', [], 'env')
#app.add_config_value('viewcode_exclude_modules', [], 'env')
return {'version': sphinx.__version__}
return {'version': sphinx.__version__, 'parallel_read_safe': False}

View File

@ -130,6 +130,11 @@ class FilenameUniqDict(dict):
del self[filename]
self._existing.discard(unique)
def merge_other(self, docnames, other):
for filename, (docs, unique) in other.items():
for doc in docs & docnames:
self.add_file(doc, filename)
def __getstate__(self):
return self._existing