From 8cfb281b05653a32f480799cb39d4c7532d27f05 Mon Sep 17 00:00:00 2001 From: Takeshi KOMIYA Date: Wed, 9 Nov 2016 11:45:12 +0900 Subject: [PATCH] Add type-check annotations to sphinx.util --- sphinx/util/__init__.py | 42 +++++++++++++++++++++++++++++----------- sphinx/util/console.py | 10 +++++++++- sphinx/util/docfields.py | 23 ++++++++++++++++++++-- sphinx/util/docutils.py | 16 +++++++++++++-- sphinx/util/i18n.py | 30 +++++++++++++++++++++------- sphinx/util/inspect.py | 20 ++++++++++++++----- sphinx/util/jsdump.py | 19 ++++++++++++++---- sphinx/util/matching.py | 15 ++++++++++++-- sphinx/util/nodes.py | 28 ++++++++++++++++++++++++++- sphinx/util/osutil.py | 30 ++++++++++++++++++++++++---- sphinx/util/parallel.py | 26 ++++++++++++++++++------- sphinx/util/pycompat.py | 17 ++++++++++++---- 12 files changed, 226 insertions(+), 50 deletions(-) diff --git a/sphinx/util/__init__.py b/sphinx/util/__init__.py index 7ac5c62f7..f24ffb681 100644 --- a/sphinx/util/__init__.py +++ b/sphinx/util/__init__.py @@ -42,19 +42,25 @@ from sphinx.util.nodes import ( # noqa caption_ref_re) from sphinx.util.matching import patfilter # noqa +if False: + # For type annotation + from typing import Any, Callable, Iterable, Pattern, Sequence, Tuple # NOQA + # Generally useful regular expressions. -ws_re = re.compile(r'\s+') -url_re = re.compile(r'(?P.+)://.*') +ws_re = re.compile(r'\s+') # type: Pattern +url_re = re.compile(r'(?P.+)://.*') # type: Pattern # High-level utility functions. def docname_join(basedocname, docname): + # type: (unicode, unicode) -> unicode return posixpath.normpath( posixpath.join('/' + basedocname, '..', docname))[1:] def path_stabilize(filepath): + # type: (unicode) -> unicode "normalize path separater and unicode string" newpath = filepath.replace(os.path.sep, SEP) if isinstance(newpath, text_type): @@ -63,6 +69,7 @@ def path_stabilize(filepath): def get_matching_files(dirname, exclude_matchers=()): + # type: (unicode, Tuple[Callable[[unicode], bool], ...]) -> Iterable[unicode] """Get all file names in a directory, recursively. Exclude files and dirs matching some matcher in *exclude_matchers*. @@ -75,9 +82,9 @@ def get_matching_files(dirname, exclude_matchers=()): relativeroot = root[dirlen:] qdirs = enumerate(path_stabilize(path.join(relativeroot, dn)) - for dn in dirs) + for dn in dirs) # type: Iterable[Tuple[int, unicode]] qfiles = enumerate(path_stabilize(path.join(relativeroot, fn)) - for fn in files) + for fn in files) # type: Iterable[Tuple[int, unicode]] for matcher in exclude_matchers: qdirs = [entry for entry in qdirs if not matcher(entry[1])] qfiles = [entry for entry in qfiles if not matcher(entry[1])] @@ -89,6 +96,7 @@ def get_matching_files(dirname, exclude_matchers=()): def get_matching_docs(dirname, suffixes, exclude_matchers=()): + # type: (unicode, List[unicode], Tuple[Callable[[unicode], bool], ...]) -> Iterable[unicode] # NOQA """Get all file names (without suffixes) matching a suffix in a directory, recursively. @@ -97,7 +105,7 @@ def get_matching_docs(dirname, suffixes, exclude_matchers=()): suffixpatterns = ['*' + s for s in suffixes] for filename in get_matching_files(dirname, exclude_matchers): for suffixpattern in suffixpatterns: - if fnmatch.fnmatch(filename, suffixpattern): + if fnmatch.fnmatch(filename, suffixpattern): # type: ignore yield filename[:-len(suffixpattern)+1] break @@ -109,9 +117,10 @@ class FilenameUniqDict(dict): appear in. Used for images and downloadable files in the environment. """ def __init__(self): - self._existing = set() + self._existing = set() # type: Set[unicode] def add_file(self, docname, newfile): + # type: (unicode, unicode) -> unicode if newfile in self: self[newfile][0].add(docname) return self[newfile][1] @@ -126,6 +135,7 @@ class FilenameUniqDict(dict): return uniquename def purge_doc(self, docname): + # type: (unicode) -> None for filename, (docs, unique) in list(self.items()): docs.discard(docname) if not docs: @@ -133,6 +143,7 @@ class FilenameUniqDict(dict): self._existing.discard(unique) def merge_other(self, docnames, other): + # type: (List[unicode], Dict[unicode, Tuple[Set[unicode], Any]]) -> None for filename, (docs, unique) in other.items(): for doc in docs & docnames: self.add_file(doc, filename) @@ -146,6 +157,7 @@ class FilenameUniqDict(dict): def copy_static_entry(source, targetdir, builder, context={}, exclude_matchers=(), level=0): + # type: (unicode, unicode, Any, Dict, Tuple[Callable, ...], int) -> None """[DEPRECATED] Copy a HTML builder static_path entry from source to targetdir. Handles all possible cases of files, directories and subdirectories. @@ -183,6 +195,7 @@ _DEBUG_HEADER = '''\ def save_traceback(app): + # type: (Any) -> unicode """Save the current exception's traceback in a temporary file.""" import sphinx import jinja2 @@ -190,7 +203,7 @@ def save_traceback(app): import platform exc = sys.exc_info()[1] if isinstance(exc, SphinxParallelError): - exc_format = '(Error in parallel process)\n' + exc.traceback + exc_format = '(Error in parallel process)\n' + exc.traceback # type: ignore else: exc_format = traceback.format_exc() fd, path = tempfile.mkstemp('.log', 'sphinx-err-') @@ -220,6 +233,7 @@ def save_traceback(app): def get_module_source(modname): + # type: (str) -> Tuple[unicode, unicode] """Try to find the source code for a module. Can return ('file', 'filename') in which case the source is in the given @@ -259,6 +273,7 @@ def get_module_source(modname): def get_full_modname(modname, attribute): + # type: (str, unicode) -> unicode __import__(modname) module = sys.modules[modname] @@ -277,6 +292,7 @@ _coding_re = re.compile(r'coding[:=]\s*([-\w.]+)') def detect_encoding(readline): + # type: (Callable) -> unicode """Like tokenize.detect_encoding() from Py3k, but a bit simplified.""" def read_or_stop(): @@ -433,10 +449,11 @@ def split_index_msg(type, value): def format_exception_cut_frames(x=1): + # type: (int) -> unicode """Format an exception with traceback, but only the last x frames.""" typ, val, tb = sys.exc_info() # res = ['Traceback (most recent call last):\n'] - res = [] + res = [] # type: List[unicode] tbres = traceback.format_tb(tb) res += tbres[-x:] res += traceback.format_exception_only(typ, val) @@ -449,7 +466,7 @@ class PeekableIterator(object): what's the next item. """ def __init__(self, iterable): - self.remaining = deque() + self.remaining = deque() # type: deque self._iterator = iter(iterable) def __iter__(self): @@ -477,6 +494,7 @@ class PeekableIterator(object): def import_object(objname, source=None): + # type: (str, unicode) -> Any try: module, name = objname.rsplit('.', 1) except ValueError as err: @@ -496,7 +514,8 @@ def import_object(objname, source=None): def encode_uri(uri): - split = list(urlsplit(uri)) + # type: (unicode) -> unicode + split = list(urlsplit(uri)) # type: Any split[1] = split[1].encode('idna').decode('ascii') split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii') query = list((q, quote_plus(v.encode('utf-8'))) @@ -506,8 +525,9 @@ def encode_uri(uri): def split_docinfo(text): + # type: (unicode) -> Sequence[unicode] docinfo_re = re.compile('\A((?:\s*:\w+:.*?\n)+)', re.M) - result = docinfo_re.split(text, 1) + result = docinfo_re.split(text, 1) # type: ignore if len(result) == 1: return '', result[0] else: diff --git a/sphinx/util/console.py b/sphinx/util/console.py index 593634b11..b952d7183 100644 --- a/sphinx/util/console.py +++ b/sphinx/util/console.py @@ -20,10 +20,11 @@ except ImportError: colorama = None _ansi_re = re.compile('\x1b\\[(\\d\\d;){0,2}\\d\\dm') -codes = {} +codes = {} # type: Dict[str, str] def get_terminal_width(): + # type: () -> int """Borrowed from the py lib.""" try: import termios @@ -43,6 +44,7 @@ _tw = get_terminal_width() def term_width_line(text): + # type: (str) -> str if not codes: # if no coloring, don't output fancy backspaces return text + '\n' @@ -52,6 +54,7 @@ def term_width_line(text): def color_terminal(): + # type: () -> bool if sys.platform == 'win32' and colorama is not None: colorama.init() return True @@ -68,24 +71,29 @@ def color_terminal(): def nocolor(): + # type: () -> None if sys.platform == 'win32' and colorama is not None: colorama.deinit() codes.clear() def coloron(): + # type: () -> None codes.update(_orig_codes) def colorize(name, text): + # type: (str, str) -> str return codes.get(name, '') + text + codes.get('reset', '') def strip_colors(s): + # type: (str) -> str return re.compile('\x1b.*?m').sub('', s) def create_color_func(name): + # type: (str) -> None def inner(text): return colorize(name, text) globals()[name] = inner diff --git a/sphinx/util/docfields.py b/sphinx/util/docfields.py index d5cb4038f..6bf38ebed 100644 --- a/sphinx/util/docfields.py +++ b/sphinx/util/docfields.py @@ -15,8 +15,14 @@ from docutils import nodes from sphinx import addnodes +if False: + # For type annotation + from typing import Any, Tuple # NOQA + from sphinx.domains import Domain # NOQA + def _is_single_paragraph(node): + # type: (nodes.Node) -> bool """True if the node only contains one paragraph (and system messages).""" if len(node) == 0: return False @@ -47,6 +53,7 @@ class Field(object): def __init__(self, name, names=(), label=None, has_arg=True, rolename=None, bodyrolename=None): + # type: (unicode, Tuple[unicode, ...], unicode, bool, unicode, unicode) -> None self.name = name self.names = names self.label = label @@ -56,6 +63,7 @@ class Field(object): def make_xref(self, rolename, domain, target, innernode=addnodes.literal_emphasis, contnode=None): + # type: (unicode, unicode, unicode, nodes.Node, nodes.Node) -> nodes.Node if not rolename: return contnode or innernode(target, target) refnode = addnodes.pending_xref('', refdomain=domain, refexplicit=False, @@ -65,12 +73,15 @@ class Field(object): def make_xrefs(self, rolename, domain, target, innernode=addnodes.literal_emphasis, contnode=None): + # type: (unicode, unicode, unicode, nodes.Node, nodes.Node) -> List[nodes.Node] return [self.make_xref(rolename, domain, target, innernode, contnode)] def make_entry(self, fieldarg, content): + # type: (List, unicode) -> Tuple[List, unicode] return (fieldarg, content) def make_field(self, types, domain, item): + # type: (List, unicode, Tuple) -> nodes.field fieldarg, content = item fieldname = nodes.field_name('', self.label) if fieldarg: @@ -106,10 +117,12 @@ class GroupedField(Field): def __init__(self, name, names=(), label=None, rolename=None, can_collapse=False): + # type: (unicode, Tuple[unicode, ...], unicode, unicode, bool) -> None Field.__init__(self, name, names, label, True, rolename) self.can_collapse = can_collapse def make_field(self, types, domain, items): + # type: (List, unicode, Tuple) -> nodes.field fieldname = nodes.field_name('', self.label) listnode = self.list_type() for fieldarg, content in items: @@ -151,11 +164,13 @@ class TypedField(GroupedField): def __init__(self, name, names=(), typenames=(), label=None, rolename=None, typerolename=None, can_collapse=False): + # type: (unicode, Tuple[unicode, ...], Tuple[unicode, ...], unicode, unicode, unicode, bool) -> None # NOQA GroupedField.__init__(self, name, names, label, rolename, can_collapse) self.typenames = typenames self.typerolename = typerolename def make_field(self, types, domain, items): + # type: (List, unicode, Tuple) -> nodes.field def handle_item(fieldarg, content): par = nodes.paragraph() par.extend(self.make_xrefs(self.rolename, domain, fieldarg, @@ -196,6 +211,7 @@ class DocFieldTransformer(object): """ def __init__(self, directive): + # type: (Any) -> None self.domain = directive.domain if '_doc_field_type_map' not in directive.__class__.__dict__: directive.__class__._doc_field_type_map = \ @@ -203,6 +219,7 @@ class DocFieldTransformer(object): self.typemap = directive._doc_field_type_map def preprocess_fieldtypes(self, types): + # type: (List) -> Dict[unicode, Tuple[Any, bool]] typemap = {} for fieldtype in types: for name in fieldtype.names: @@ -213,6 +230,7 @@ class DocFieldTransformer(object): return typemap def transform_all(self, node): + # type: (nodes.Node) -> None """Transform all field list children of a node.""" # don't traverse, only handle field lists that are immediate children for child in node: @@ -220,12 +238,13 @@ class DocFieldTransformer(object): self.transform(child) def transform(self, node): + # type: (nodes.Node) -> None """Transform a single field list *node*.""" typemap = self.typemap entries = [] - groupindices = {} - types = {} + groupindices = {} # type: Dict[unicode, int] + types = {} # type: Dict[unicode, Dict] # step 1: traverse all fields and collect field types and content for field in node: diff --git a/sphinx/util/docutils.py b/sphinx/util/docutils.py index be9e2edad..a18d0b560 100644 --- a/sphinx/util/docutils.py +++ b/sphinx/util/docutils.py @@ -12,11 +12,19 @@ from __future__ import absolute_import from copy import copy from contextlib import contextmanager + from docutils.parsers.rst import directives, roles +if False: + # For type annotation + from typing import Any, Callable, Iterator, Tuple # NOQA + from docutils import nodes # NOQA + from sphinx.environment import BuildEnvironment # NOQA + @contextmanager def docutils_namespace(): + # type: () -> Iterator[None] """Create namespace for reST parsers.""" try: _directives = copy(directives._directives) @@ -37,9 +45,10 @@ class sphinx_domains(object): markup takes precedence. """ def __init__(self, env): + # type: (BuildEnvironment) -> None self.env = env - self.directive_func = None - self.roles_func = None + self.directive_func = None # type: Callable + self.roles_func = None # type: Callable def __enter__(self): self.enable() @@ -59,6 +68,7 @@ class sphinx_domains(object): roles.role = self.role_func def lookup_domain_element(self, type, name): + # type: (unicode, unicode) -> Tuple[Any, List] """Lookup a markup element (directive or role), given its name which can be a full name (with domain). """ @@ -87,12 +97,14 @@ class sphinx_domains(object): raise ElementLookupError def lookup_directive(self, name, lang_module, document): + # type: (unicode, unicode, nodes.document) -> Tuple[Any, List] try: return self.lookup_domain_element('directive', name) except ElementLookupError: return self.directive_func(name, lang_module, document) def lookup_role(self, name, lang_module, lineno, reporter): + # type: (unicode, unicode, int, Any) -> Tuple[Any, List] try: return self.lookup_domain_element('role', name) except ElementLookupError: diff --git a/sphinx/util/i18n.py b/sphinx/util/i18n.py index 112353d47..efbbb75f7 100644 --- a/sphinx/util/i18n.py +++ b/sphinx/util/i18n.py @@ -22,9 +22,12 @@ from babel.messages.pofile import read_po from babel.messages.mofile import write_mo from sphinx.errors import SphinxError -from sphinx.util.osutil import walk -from sphinx.util import SEP +from sphinx.util.osutil import SEP, walk +if False: + # For type annotation + from typing import Callable # NOQA + from sphinx.environment import BuildEnvironment # NOQA LocaleFileInfoBase = namedtuple('CatalogInfo', 'base_dir,domain,charset') @@ -33,32 +36,39 @@ class CatalogInfo(LocaleFileInfoBase): @property def po_file(self): + # type: () -> unicode return self.domain + '.po' @property def mo_file(self): + # type: () -> unicode return self.domain + '.mo' @property def po_path(self): + # type: () -> unicode return path.join(self.base_dir, self.po_file) @property def mo_path(self): + # type: () -> unicode return path.join(self.base_dir, self.mo_file) def is_outdated(self): + # type: () -> bool return ( not path.exists(self.mo_path) or path.getmtime(self.mo_path) < path.getmtime(self.po_path)) def write_mo(self, locale): + # type: (unicode) -> None with io.open(self.po_path, 'rt', encoding=self.charset) as po: with io.open(self.mo_path, 'wb') as mo: write_mo(mo, read_po(po, locale)) def find_catalog(docname, compaction): + # type: (unicode, bool) -> unicode if compaction: ret = docname.split(SEP, 1)[0] else: @@ -68,18 +78,20 @@ def find_catalog(docname, compaction): def find_catalog_files(docname, srcdir, locale_dirs, lang, compaction): + # type: (unicode, unicode, List[unicode], unicode, bool) -> List[unicode] if not(lang and locale_dirs): return [] domain = find_catalog(docname, compaction) - files = [gettext.find(domain, path.join(srcdir, dir_), [lang]) - for dir_ in locale_dirs] - files = [path.relpath(f, srcdir) for f in files if f] - return files + files = [gettext.find(domain, path.join(srcdir, dir_), [lang]) # type: ignore + for dir_ in locale_dirs] # type: ignore + files = [path.relpath(f, srcdir) for f in files if f] # type: ignore + return files # type: ignore def find_catalog_source_files(locale_dirs, locale, domains=None, gettext_compact=False, charset='utf-8', force_all=False): + # type: (List[unicode], unicode, List[unicode], bool, unicode, bool) -> Set[CatalogInfo] """ :param list locale_dirs: list of path as `['locale_dir1', 'locale_dir2', ...]` to find @@ -99,7 +111,7 @@ def find_catalog_source_files(locale_dirs, locale, domains=None, gettext_compact if not locale: return [] # locale is not specified - catalogs = set() + catalogs = set() # type: Set[CatalogInfo] for locale_dir in locale_dirs: if not locale_dir: continue # skip system locale directory @@ -158,6 +170,7 @@ date_format_mappings = { def babel_format_date(date, format, locale, warn=None, formatter=babel.dates.format_date): + # type: (datetime, unicode, unicode, Callable, Callable) -> unicode if locale is None: locale = 'en' @@ -180,6 +193,7 @@ def babel_format_date(date, format, locale, warn=None, formatter=babel.dates.for def format_date(format, date=None, language=None, warn=None): + # type: (str, datetime, unicode, Callable) -> unicode if format is None: format = 'medium' @@ -226,6 +240,7 @@ def format_date(format, date=None, language=None, warn=None): def get_image_filename_for_language(filename, env): + # type: (unicode, BuildEnvironment) -> unicode if not env.config.language: return filename @@ -245,6 +260,7 @@ def get_image_filename_for_language(filename, env): def search_image_for_language(filename, env): + # type: (unicode, BuildEnvironment) -> unicode if not env.config.language: return filename diff --git a/sphinx/util/inspect.py b/sphinx/util/inspect.py index 147d43592..4439e09f6 100644 --- a/sphinx/util/inspect.py +++ b/sphinx/util/inspect.py @@ -12,10 +12,14 @@ import re from six import PY3, binary_type -from six.moves import builtins +from six.moves import builtins # type: ignore from sphinx.util import force_decode +if False: + # For type annotation + from typing import Any, Callable, Tuple # NOQA + # this imports the standard library inspect module without resorting to # relatively import this module inspect = __import__('inspect') @@ -67,7 +71,7 @@ else: # 2.7 """Like inspect.getargspec but supports functools.partial as well.""" if inspect.ismethod(func): func = func.__func__ - parts = 0, () + parts = 0, () # type: Tuple[int, Tuple[unicode, ...]] if type(func) is partial: keywords = func.keywords if keywords is None: @@ -101,6 +105,7 @@ except ImportError: def isenumattribute(x): + # type: (Any) -> bool """Check if the object is attribute of enum.""" if enum is None: return False @@ -108,6 +113,7 @@ def isenumattribute(x): def isdescriptor(x): + # type: (Any) -> bool """Check if the object is some kind of descriptor.""" for item in '__get__', '__set__', '__delete__': if hasattr(safe_getattr(x, item, None), '__call__'): @@ -116,6 +122,7 @@ def isdescriptor(x): def safe_getattr(obj, name, *defargs): + # type: (Any, unicode, unicode) -> object """A getattr() that turns all exceptions into AttributeErrors.""" try: return getattr(obj, name, *defargs) @@ -138,8 +145,9 @@ def safe_getattr(obj, name, *defargs): def safe_getmembers(object, predicate=None, attr_getter=safe_getattr): + # type: (Any, Callable[[unicode], bool], Callable) -> List[Tuple[unicode, Any]] """A version of inspect.getmembers() that uses safe_getattr().""" - results = [] + results = [] # type: List[Tuple[unicode, Any]] for key in dir(object): try: value = attr_getter(object, key, None) @@ -152,6 +160,7 @@ def safe_getmembers(object, predicate=None, attr_getter=safe_getattr): def object_description(object): + # type: (Any) -> unicode """A repr() implementation that returns text safe to use in reST context.""" try: s = repr(object) @@ -166,6 +175,7 @@ def object_description(object): def is_builtin_class_method(obj, attr_name): + # type: (Any, unicode) -> bool """If attr_name is implemented at builtin class, return True. >>> is_builtin_class_method(int, '__init__') @@ -177,6 +187,6 @@ def is_builtin_class_method(obj, attr_name): classes = [c for c in inspect.getmro(obj) if attr_name in c.__dict__] cls = classes[0] if classes else object - if not hasattr(builtins, safe_getattr(cls, '__name__', '')): + if not hasattr(builtins, safe_getattr(cls, '__name__', '')): # type: ignore return False - return getattr(builtins, safe_getattr(cls, '__name__', '')) is cls + return getattr(builtins, safe_getattr(cls, '__name__', '')) is cls # type: ignore diff --git a/sphinx/util/jsdump.py b/sphinx/util/jsdump.py index 5a2148c5b..330b5c0ee 100644 --- a/sphinx/util/jsdump.py +++ b/sphinx/util/jsdump.py @@ -16,6 +16,10 @@ from six import iteritems, integer_types, string_types from sphinx.util.pycompat import u +if False: + # For type annotation + from typing import Any, IO, Union # NOQA + _str_re = re.compile(r'"(\\\\|\\"|[^"])*"') _int_re = re.compile(r'\d+') _name_re = re.compile(r'[a-zA-Z_]\w*') @@ -37,6 +41,7 @@ ESCAPED = re.compile(r'\\u.{4}|\\.') def encode_string(s): + # type: (str) -> str def replace(match): s = match.group(0) try: @@ -55,6 +60,7 @@ def encode_string(s): def decode_string(s): + # type: (str) -> str return ESCAPED.sub(lambda m: eval(u + '"' + m.group() + '"'), s) @@ -77,6 +83,7 @@ double in super""".split()) def dumps(obj, key=False): + # type: (Any, bool) -> str if key: if not isinstance(obj, string_types): obj = str(obj) @@ -88,7 +95,7 @@ def dumps(obj, key=False): return 'null' elif obj is True or obj is False: return obj and 'true' or 'false' - elif isinstance(obj, integer_types + (float,)): + elif isinstance(obj, integer_types + (float,)): # type: ignore return str(obj) elif isinstance(obj, dict): return '{%s}' % ','.join(sorted('%s:%s' % ( @@ -100,20 +107,22 @@ def dumps(obj, key=False): elif isinstance(obj, (tuple, list)): return '[%s]' % ','.join(dumps(x) for x in obj) elif isinstance(obj, string_types): - return encode_string(obj) + return encode_string(obj) # type: ignore raise TypeError(type(obj)) def dump(obj, f): + # type: (Any, IO) -> None f.write(dumps(obj)) def loads(x): + # type: (str) -> Any """Loader that can read the JS subset the indexer produces.""" nothing = object() i = 0 n = len(x) - stack = [] + stack = [] # type: List[Union[List, Dict]] obj = nothing key = False keys = [] @@ -164,6 +173,7 @@ def loads(x): raise ValueError("multiple values") key = False else: + y = None # type: Any m = _str_re.match(x, i) if m: y = decode_string(m.group()[1:-1]) @@ -193,11 +203,12 @@ def loads(x): obj[keys[-1]] = y key = False else: - obj.append(y) + obj.append(y) # type: ignore if obj is nothing: raise ValueError("nothing loaded from string") return obj def load(f): + # type: (IO) -> Any return loads(f.read()) diff --git a/sphinx/util/matching.py b/sphinx/util/matching.py index fc7750be9..be4bfee34 100644 --- a/sphinx/util/matching.py +++ b/sphinx/util/matching.py @@ -11,15 +11,20 @@ import re +if False: + # For type annotation + from typing import Callable, Match, Pattern # NOQA + def _translate_pattern(pat): + # type: (unicode) -> unicode """Translate a shell-style glob pattern to a regular expression. Adapted from the fnmatch module, but enhanced so that single stars don't match slashes. """ i, n = 0, len(pat) - res = '' + res = '' # type: unicode while i < n: c = pat[i] i += 1 @@ -59,6 +64,7 @@ def _translate_pattern(pat): def compile_matchers(patterns): + # type: (List[unicode]) -> List[Callable[[unicode], Match[unicode]]] return [re.compile(_translate_pattern(pat)).match for pat in patterns] @@ -70,23 +76,27 @@ class Matcher(object): """ def __init__(self, patterns): + # type: (List[unicode]) -> None expanded = [pat[3:] for pat in patterns if pat.startswith('**/')] self.patterns = compile_matchers(patterns + expanded) def __call__(self, string): + # type: (unicode) -> bool return self.match(string) def match(self, string): + # type: (unicode) -> bool return any(pat(string) for pat in self.patterns) DOTFILES = Matcher(['**/.*']) -_pat_cache = {} +_pat_cache = {} # type: Dict[unicode, Pattern] def patmatch(name, pat): + # type: (unicode, unicode) -> re.Match """Return if name matches pat. Adapted from fnmatch module.""" if pat not in _pat_cache: _pat_cache[pat] = re.compile(_translate_pattern(pat)) @@ -94,6 +104,7 @@ def patmatch(name, pat): def patfilter(names, pat): + # type: (List[unicode], unicode) -> List[unicode] """Return the subset of the list NAMES that match PAT. Adapted from fnmatch module. diff --git a/sphinx/util/nodes.py b/sphinx/util/nodes.py index fe3b0f2f9..2568ea4aa 100644 --- a/sphinx/util/nodes.py +++ b/sphinx/util/nodes.py @@ -13,19 +13,28 @@ from __future__ import absolute_import import re from six import text_type + from docutils import nodes from sphinx import addnodes from sphinx.locale import pairindextypes +if False: + # For type annotation + from typing import Any, Callable, Iterable, Tuple, Union # NOQA + from sphinx.builders import Builder # NOQA + from sphinx.utils.tags import Tags # NOQA + class WarningStream(object): def __init__(self, warnfunc): + # type: (Callable) -> None self.warnfunc = warnfunc self._re = re.compile(r'\((DEBUG|INFO|WARNING|ERROR|SEVERE)/[0-4]\)') def write(self, text): + # type: (str) -> None text = text.strip() if text: self.warnfunc(self._re.sub(r'\1:', text), None, '') @@ -37,6 +46,7 @@ caption_ref_re = explicit_title_re # b/w compat alias def apply_source_workaround(node): + # type: (nodes.Node) -> None # workaround: nodes.term have wrong rawsource if classifier is specified. # The behavior of docutils-0.11, 0.12 is: # * when ``term text : classifier1 : classifier2`` is specified, @@ -87,6 +97,7 @@ IGNORED_NODES = ( def is_pending_meta(node): + # type: (nodes.Node) -> bool if (isinstance(node, nodes.pending) and isinstance(node.details.get('nodes', [None])[0], addnodes.meta)): return True @@ -95,6 +106,7 @@ def is_pending_meta(node): def is_translatable(node): + # type: (nodes.Node) -> bool if isinstance(node, addnodes.translatable): return True @@ -137,6 +149,7 @@ META_TYPE_NODES = ( def extract_messages(doctree): + # type: (nodes.Node) -> Iterable[Tuple[nodes.Node, unicode]] """Extract translatable messages from a document tree.""" for node in doctree.traverse(is_translatable): if isinstance(node, addnodes.translatable): @@ -164,12 +177,14 @@ def extract_messages(doctree): def find_source_node(node): + # type: (nodes.Node) -> unicode for pnode in traverse_parent(node): if pnode.source: return pnode.source def traverse_parent(node, cls=None): + # type: (nodes.Node, Any) -> Iterable[nodes.Node] while node: if cls is None or isinstance(node, cls): yield node @@ -177,6 +192,7 @@ def traverse_parent(node, cls=None): def traverse_translatable_index(doctree): + # type: (nodes.Node) -> Iterable[Tuple[nodes.Node, List[unicode]]] """Traverse translatable index node from a document tree.""" def is_block_index(node): return isinstance(node, addnodes.index) and \ @@ -190,6 +206,7 @@ def traverse_translatable_index(doctree): def nested_parse_with_titles(state, content, node): + # type: (Any, List[unicode], nodes.Node) -> unicode """Version of state.nested_parse() that allows titles and does not require titles to have the same decoration as the calling document. @@ -209,6 +226,7 @@ def nested_parse_with_titles(state, content, node): def clean_astext(node): + # type: (nodes.Node) -> unicode """Like node.astext(), but ignore images.""" node = node.deepcopy() for img in node.traverse(nodes.image): @@ -217,6 +235,7 @@ def clean_astext(node): def split_explicit_title(text): + # type: (str) -> Tuple[bool, unicode, unicode] """Split role content into title and target, if given.""" match = explicit_title_re.match(text) if match: @@ -230,7 +249,8 @@ indextypes = [ def process_index_entry(entry, targetid): - indexentries = [] + # type: (unicode, unicode) -> List[Tuple[unicode, unicode, unicode, unicode, unicode]] + indexentries = [] # type: List[Tuple[unicode, unicode, unicode, unicode, unicode]] entry = entry.strip() oentry = entry main = '' @@ -266,6 +286,7 @@ def process_index_entry(entry, targetid): def inline_all_toctrees(builder, docnameset, docname, tree, colorfunc, traversed): + # type: (Builder, Set[unicode], unicode, nodes.Node, Callable, nodes.Node) -> nodes.Node """Inline all toctrees in the *tree*. Record all docnames in *docnameset*, and output docnames with *colorfunc*. @@ -299,6 +320,7 @@ def inline_all_toctrees(builder, docnameset, docname, tree, colorfunc, traversed def make_refnode(builder, fromdocname, todocname, targetid, child, title=None): + # type: (Builder, unicode, unicode, unicode, nodes.Node, unicode) -> nodes.reference """Shortcut to create a reference node.""" node = nodes.reference('', '', internal=True) if fromdocname == todocname: @@ -313,15 +335,18 @@ def make_refnode(builder, fromdocname, todocname, targetid, child, title=None): def set_source_info(directive, node): + # type: (Any, nodes.Node) -> None node.source, node.line = \ directive.state_machine.get_source_and_line(directive.lineno) def set_role_source_info(inliner, lineno, node): + # type: (Any, unicode, nodes.Node) -> None node.source, node.line = inliner.reporter.get_source_and_line(lineno) def process_only_nodes(doctree, tags, warn_node=None): + # type: (nodes.Node, Tags, Callable) -> None # A comment on the comment() nodes being inserted: replacing by [] would # result in a "Losing ids" exception if there is a target node before # the only node, so we make sure docutils can transfer the id to @@ -345,6 +370,7 @@ def process_only_nodes(doctree, tags, warn_node=None): # monkey-patch Element.copy to copy the rawsource and line def _new_copy(self): + # type: (nodes.Node) -> nodes.Node newnode = self.__class__(self.rawsource, **self.attributes) if isinstance(self, nodes.Element): newnode.source = self.source diff --git a/sphinx/util/osutil.py b/sphinx/util/osutil.py index b8fffb220..5561f0ddb 100644 --- a/sphinx/util/osutil.py +++ b/sphinx/util/osutil.py @@ -21,9 +21,12 @@ import filecmp from os import path import contextlib from io import BytesIO, StringIO - from six import PY2, text_type +if False: + # For type annotation + from typing import Any, Iterator, Tuple, Union # NOQA + # Errnos that we need. EEXIST = getattr(errno, 'EEXIST', 0) ENOENT = getattr(errno, 'ENOENT', 0) @@ -39,15 +42,18 @@ SEP = "/" def os_path(canonicalpath): + # type: (unicode) -> unicode return canonicalpath.replace(SEP, path.sep) def canon_path(nativepath): + # type: (unicode) -> unicode """Return path in OS-independent form""" return nativepath.replace(path.sep, SEP) def relative_uri(base, to): + # type: (unicode, unicode) -> unicode """Return a relative URL from ``base`` to ``to``.""" if to.startswith(SEP): return to @@ -71,6 +77,7 @@ def relative_uri(base, to): def ensuredir(path): + # type: (unicode) -> None """Ensure that a path exists.""" try: os.makedirs(path) @@ -84,6 +91,7 @@ def ensuredir(path): # that check UnicodeError. # The customization obstacle to replace the function with the os.walk. def walk(top, topdown=True, followlinks=False): + # type: (unicode, bool, bool) -> Iterator[Tuple[unicode, List[unicode], List[unicode]]] """Backport of os.walk from 2.6, where the *followlinks* argument was added. """ @@ -115,6 +123,7 @@ def walk(top, topdown=True, followlinks=False): def mtimes_of_files(dirnames, suffix): + # type: (List[unicode], unicode) -> Iterator[float] for dirname in dirnames: for root, dirs, files in os.walk(dirname): for sfile in files: @@ -126,6 +135,7 @@ def mtimes_of_files(dirnames, suffix): def movefile(source, dest): + # type: (unicode, unicode) -> None """Move a file, removing the destination if it exists.""" if os.path.exists(dest): try: @@ -136,6 +146,7 @@ def movefile(source, dest): def copytimes(source, dest): + # type: (unicode, unicode) -> None """Copy a file's modification times.""" st = os.stat(source) if hasattr(os, 'utime'): @@ -143,6 +154,7 @@ def copytimes(source, dest): def copyfile(source, dest): + # type: (unicode, unicode) -> None """Copy a file and its modification times, if possible. Note: ``copyfile`` skips copying if the file has not been changed""" @@ -159,10 +171,12 @@ no_fn_re = re.compile(r'[^a-zA-Z0-9_-]') def make_filename(string): + # type: (str) -> unicode return no_fn_re.sub('', string) or 'sphinx' def ustrftime(format, *args): + # type: (unicode, Any) -> unicode # [DEPRECATED] strftime for unicode strings # It will be removed at Sphinx-1.5 if not args: @@ -171,7 +185,7 @@ def ustrftime(format, *args): source_date_epoch = os.getenv('SOURCE_DATE_EPOCH') if source_date_epoch is not None: time_struct = time.gmtime(float(source_date_epoch)) - args = [time_struct] + args = [time_struct] # type: ignore if PY2: # if a locale is set, the time strings are encoded in the encoding # given by LC_TIME; if that is available, use it @@ -188,16 +202,18 @@ def ustrftime(format, *args): def safe_relpath(path, start=None): + # type: (unicode, unicode) -> unicode try: return os.path.relpath(path, start) except ValueError: return path -fs_encoding = sys.getfilesystemencoding() or sys.getdefaultencoding() +fs_encoding = sys.getfilesystemencoding() or sys.getdefaultencoding() # type: unicode def abspath(pathdir): + # type: (unicode) -> unicode pathdir = path.abspath(pathdir) if isinstance(pathdir, bytes): pathdir = pathdir.decode(fs_encoding) @@ -205,6 +221,7 @@ def abspath(pathdir): def getcwd(): + # type: () -> unicode if hasattr(os, 'getcwdu'): return os.getcwdu() return os.getcwd() @@ -212,6 +229,7 @@ def getcwd(): @contextlib.contextmanager def cd(target_dir): + # type: (unicode) -> Iterator[None] cwd = getcwd() try: os.chdir(target_dir) @@ -233,10 +251,12 @@ class FileAvoidWrite(object): Objects can be used as context managers. """ def __init__(self, path): + # type: (unicode) -> None self._path = path - self._io = None + self._io = None # type: Union[StringIO, BytesIO] def write(self, data): + # type: (Union[str, bytes]) -> None if not self._io: if isinstance(data, text_type): self._io = StringIO() @@ -246,6 +266,7 @@ class FileAvoidWrite(object): self._io.write(data) def close(self): + # type: () -> None """Stop accepting writes and write file, if needed.""" if not self._io: raise Exception('FileAvoidWrite does not support empty files.') @@ -288,6 +309,7 @@ class FileAvoidWrite(object): def rmtree(path): + # type: (unicode) -> None if os.path.isdir(path): shutil.rmtree(path) else: diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py index bace0b5fd..814af09b1 100644 --- a/sphinx/util/parallel.py +++ b/sphinx/util/parallel.py @@ -13,16 +13,19 @@ import os import time import traceback from math import sqrt +from six import iteritems try: import multiprocessing except ImportError: multiprocessing = None -from six import iteritems - from sphinx.errors import SphinxParallelError +if False: + # For type annotation + from typing import Any, Callable, Sequence # NOQA + # our parallel functionality only works for the forking Process parallel_available = multiprocessing and (os.name == 'posix') @@ -31,9 +34,11 @@ class SerialTasks(object): """Has the same interface as ParallelTasks, but executes tasks directly.""" def __init__(self, nproc=1): + # type: (int) -> None pass def add_task(self, task_func, arg=None, result_func=None): + # type: (Callable, Any, Callable) -> None if arg is not None: res = task_func(arg) else: @@ -42,6 +47,7 @@ class SerialTasks(object): result_func(res) def join(self): + # type: () -> None pass @@ -49,23 +55,25 @@ class ParallelTasks(object): """Executes *nproc* tasks in parallel after forking.""" def __init__(self, nproc): + # type: (int) -> None self.nproc = nproc # (optional) function performed by each task on the result of main task - self._result_funcs = {} + self._result_funcs = {} # type: Dict[int, Callable] # task arguments - self._args = {} + self._args = {} # type: Dict[int, List[Any]] # list of subprocesses (both started and waiting) - self._procs = {} + self._procs = {} # type: Dict[int, multiprocessing.Process] # list of receiving pipe connections of running subprocesses - self._precvs = {} + self._precvs = {} # type: Dict[int, Any] # list of receiving pipe connections of waiting subprocesses - self._precvsWaiting = {} + self._precvsWaiting = {} # type: Dict[int, Any] # number of working subprocesses self._pworking = 0 # task number of each subprocess self._taskid = 0 def _process(self, pipe, func, arg): + # type: (Any, Callable, Any) -> None try: if arg is None: ret = func() @@ -76,6 +84,7 @@ class ParallelTasks(object): pipe.send((True, (err, traceback.format_exc()))) def add_task(self, task_func, arg=None, result_func=None): + # type: (Callable, Any, Callable) -> None tid = self._taskid self._taskid += 1 self._result_funcs[tid] = result_func or (lambda arg: None) @@ -88,10 +97,12 @@ class ParallelTasks(object): self._join_one() def join(self): + # type: () -> None while self._pworking: self._join_one() def _join_one(self): + # type: () -> None for tid, pipe in iteritems(self._precvs): if pipe.poll(): exc, result = pipe.recv() @@ -111,6 +122,7 @@ class ParallelTasks(object): def make_chunks(arguments, nproc, maxbatch=10): + # type: (Sequence[unicode], int, int) -> List[Any] # determine how many documents to read in one go nargs = len(arguments) chunksize = nargs // nproc diff --git a/sphinx/util/pycompat.py b/sphinx/util/pycompat.py index e3b17ef62..3d31abb1e 100644 --- a/sphinx/util/pycompat.py +++ b/sphinx/util/pycompat.py @@ -14,11 +14,13 @@ import sys import codecs import warnings -from six import class_types +from six import PY3, class_types, text_type, exec_ from six.moves import zip_longest from itertools import product -from six import PY3, text_type, exec_ +if False: + # For type annotation + from typing import Any, Callable # NOQA NoneType = type(None) @@ -33,6 +35,7 @@ if PY3: # safely encode a string for printing to the terminal def terminal_safe(s): + # type: (unicode) -> unicode return s.encode('ascii', 'backslashreplace').decode('ascii') # some kind of default system encoding; should be used with a lenient # error handler @@ -40,6 +43,7 @@ if PY3: # support for running 2to3 over config files def convert_with_2to3(filepath): + # type: (unicode) -> unicode from lib2to3.refactor import RefactoringTool, get_fixers_from_package from lib2to3.pgen2.parse import ParseError fixers = get_fixers_from_package('lib2to3.fixes') @@ -68,13 +72,15 @@ else: # Python 2 u = 'u' # no need to refactor on 2.x versions - convert_with_2to3 = None + convert_with_2to3 = None # type: ignore def TextIOWrapper(stream, encoding): + # type: (file, str) -> unicode return codecs.lookup(encoding or 'ascii')[2](stream) # safely encode a string for printing to the terminal def terminal_safe(s): + # type: (unicode) -> unicode return s.encode('ascii', 'backslashreplace') # some kind of default system encoding; should be used with a lenient # error handler @@ -91,6 +97,7 @@ else: # backport from python3 def indent(text, prefix, predicate=None): + # type: (unicode, unicode, Callable) -> unicode if predicate is None: def predicate(line): return line.strip() @@ -102,6 +109,7 @@ else: def execfile_(filepath, _globals, open=open): + # type: (unicode, Any, Callable) -> None from sphinx.util.osutil import fs_encoding # get config source -- 'b' is a no-op under 2.x, while 'U' is # ignored under 3.x (but 3.x compile() accepts \r\n newlines) @@ -132,6 +140,7 @@ def execfile_(filepath, _globals, open=open): class _DeprecationWrapper(object): def __init__(self, mod, deprecated): + # type: (Any, Dict) -> None self._mod = mod self._deprecated = deprecated @@ -145,7 +154,7 @@ class _DeprecationWrapper(object): return getattr(self._mod, attr) -sys.modules[__name__] = _DeprecationWrapper(sys.modules[__name__], dict( +sys.modules[__name__] = _DeprecationWrapper(sys.modules[__name__], dict( # type: ignore zip_longest = zip_longest, product = product, all = all,