Merge pull request #6431 from tk0miya/refactor_type_annotation3

Migrate to py3 style type annotation: sphinx.util (part2)
This commit is contained in:
Takeshi KOMIYA 2019-06-03 22:21:01 +09:00 committed by GitHub
commit 127d4db0d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 210 additions and 396 deletions

View File

@ -10,23 +10,22 @@
import sys import sys
import warnings import warnings
from typing import Any, Dict
from docutils.utils import get_source_line from docutils.utils import get_source_line
from sphinx import addnodes from sphinx import addnodes
from sphinx.config import Config
from sphinx.deprecation import RemovedInSphinx30Warning, RemovedInSphinx40Warning from sphinx.deprecation import RemovedInSphinx30Warning, RemovedInSphinx40Warning
from sphinx.transforms import SphinxTransform from sphinx.transforms import SphinxTransform
from sphinx.util import import_object from sphinx.util import import_object
if False: if False:
# For type annotation # For type annotation
from typing import Any, Dict # NOQA from sphinx.application import Sphinx
from sphinx.application import Sphinx # NOQA
from sphinx.config import Config # NOQA
def deprecate_source_parsers(app, config): def deprecate_source_parsers(app: "Sphinx", config: Config) -> None:
# type: (Sphinx, Config) -> None
if config.source_parsers: if config.source_parsers:
warnings.warn('The config variable "source_parsers" is deprecated. ' warnings.warn('The config variable "source_parsers" is deprecated. '
'Please update your extension for the parser and remove the setting.', 'Please update your extension for the parser and remove the setting.',
@ -37,8 +36,7 @@ def deprecate_source_parsers(app, config):
app.add_source_parser(suffix, parser) app.add_source_parser(suffix, parser)
def register_application_for_autosummary(app): def register_application_for_autosummary(app: "Sphinx") -> None:
# type: (Sphinx) -> None
"""Register application object to autosummary module. """Register application object to autosummary module.
Since Sphinx-1.7, documenters and attrgetters are registered into Since Sphinx-1.7, documenters and attrgetters are registered into
@ -55,8 +53,7 @@ class IndexEntriesMigrator(SphinxTransform):
"""Migrating indexentries from old style (4columns) to new style (5columns).""" """Migrating indexentries from old style (4columns) to new style (5columns)."""
default_priority = 700 default_priority = 700
def apply(self, **kwargs): def apply(self, **kwargs) -> None:
# type: (Any) -> None
for node in self.document.traverse(addnodes.index): for node in self.document.traverse(addnodes.index):
for i, entries in enumerate(node['entries']): for i, entries in enumerate(node['entries']):
if len(entries) == 4: if len(entries) == 4:
@ -66,8 +63,7 @@ class IndexEntriesMigrator(SphinxTransform):
node['entries'][i] = entries + (None,) node['entries'][i] = entries + (None,)
def setup(app): def setup(app: "Sphinx") -> Dict[str, Any]:
# type: (Sphinx) -> Dict[str, Any]
app.add_transform(IndexEntriesMigrator) app.add_transform(IndexEntriesMigrator)
app.connect('config-inited', deprecate_source_parsers) app.connect('config-inited', deprecate_source_parsers)
app.connect('builder-inited', register_application_for_autosummary) app.connect('builder-inited', register_application_for_autosummary)

View File

@ -11,6 +11,7 @@
import os import os
import re import re
import sys import sys
from typing import Dict
try: try:
# check if colorama is installed to support color on Windows # check if colorama is installed to support color on Windows
@ -18,23 +19,17 @@ try:
except ImportError: except ImportError:
colorama = None colorama = None
if False:
# For type annotation
from typing import Dict # NOQA
_ansi_re = re.compile('\x1b\\[(\\d\\d;){0,2}\\d\\dm') _ansi_re = re.compile('\x1b\\[(\\d\\d;){0,2}\\d\\dm')
codes = {} # type: Dict[str, str] codes = {} # type: Dict[str, str]
def terminal_safe(s): def terminal_safe(s: str) -> str:
# type: (str) -> str
"""safely encode a string for printing to the terminal.""" """safely encode a string for printing to the terminal."""
return s.encode('ascii', 'backslashreplace').decode('ascii') return s.encode('ascii', 'backslashreplace').decode('ascii')
def get_terminal_width(): def get_terminal_width() -> int:
# type: () -> int
"""Borrowed from the py lib.""" """Borrowed from the py lib."""
try: try:
import termios import termios
@ -53,8 +48,7 @@ def get_terminal_width():
_tw = get_terminal_width() _tw = get_terminal_width()
def term_width_line(text): def term_width_line(text: str) -> str:
# type: (str) -> str
if not codes: if not codes:
# if no coloring, don't output fancy backspaces # if no coloring, don't output fancy backspaces
return text + '\n' return text + '\n'
@ -63,8 +57,7 @@ def term_width_line(text):
return text.ljust(_tw + len(text) - len(_ansi_re.sub('', text))) + '\r' return text.ljust(_tw + len(text) - len(_ansi_re.sub('', text))) + '\r'
def color_terminal(): def color_terminal() -> bool:
# type: () -> bool
if sys.platform == 'win32' and colorama is not None: if sys.platform == 'win32' and colorama is not None:
colorama.init() colorama.init()
return True return True
@ -80,21 +73,18 @@ def color_terminal():
return False return False
def nocolor(): def nocolor() -> None:
# type: () -> None
if sys.platform == 'win32' and colorama is not None: if sys.platform == 'win32' and colorama is not None:
colorama.deinit() colorama.deinit()
codes.clear() codes.clear()
def coloron(): def coloron() -> None:
# type: () -> None
codes.update(_orig_codes) codes.update(_orig_codes)
def colorize(name, text, input_mode=False): def colorize(name: str, text: str, input_mode: bool = False) -> str:
# type: (str, str, bool) -> str def escseq(name: str) -> str:
def escseq(name):
# Wrap escape sequence with ``\1`` and ``\2`` to let readline know # Wrap escape sequence with ``\1`` and ``\2`` to let readline know
# it is non-printable characters # it is non-printable characters
# ref: https://tiswww.case.edu/php/chet/readline/readline.html # ref: https://tiswww.case.edu/php/chet/readline/readline.html
@ -109,15 +99,12 @@ def colorize(name, text, input_mode=False):
return escseq(name) + text + escseq('reset') return escseq(name) + text + escseq('reset')
def strip_colors(s): def strip_colors(s: str) -> str:
# type: (str) -> str
return re.compile('\x1b.*?m').sub('', s) return re.compile('\x1b.*?m').sub('', s)
def create_color_func(name): def create_color_func(name: str) -> None:
# type: (str) -> None def inner(text: str) -> str:
def inner(text):
# type: (str) -> str
return colorize(name, text) return colorize(name, text)
globals()[name] = inner globals()[name] = inner

View File

@ -10,23 +10,23 @@
""" """
import warnings import warnings
from typing import List, Tuple, cast from typing import Any, Dict, List, Tuple, Type, Union
from typing import cast
from docutils import nodes from docutils import nodes
from docutils.nodes import Node
from sphinx import addnodes from sphinx import addnodes
from sphinx.deprecation import RemovedInSphinx40Warning from sphinx.deprecation import RemovedInSphinx40Warning
from sphinx.util.typing import TextlikeNode
if False: if False:
# For type annotation # For type annotation
from typing import Any, Dict, Type, Union # NOQA from sphinx.environment import BuildEnvironment
from sphinx.directive import ObjectDescription # NOQA from sphinx.directive import ObjectDescription
from sphinx.environment import BuildEnvironment # NOQA
from sphinx.util.typing import TextlikeNode # NOQA
def _is_single_paragraph(node): def _is_single_paragraph(node: nodes.field_body) -> bool:
# type: (nodes.field_body) -> bool
"""True if the node only contains one paragraph (and system messages).""" """True if the node only contains one paragraph (and system messages)."""
if len(node) == 0: if len(node) == 0:
return False return False
@ -55,9 +55,8 @@ class Field:
is_grouped = False is_grouped = False
is_typed = False is_typed = False
def __init__(self, name, names=(), label=None, has_arg=True, rolename=None, def __init__(self, name: str, names: Tuple[str, ...] = (), label: str = None,
bodyrolename=None): has_arg: bool = True, rolename: str = None, bodyrolename: str = None) -> None:
# type: (str, Tuple[str, ...], str, bool, str, str) -> None
self.name = name self.name = name
self.names = names self.names = names
self.label = label self.label = label
@ -65,15 +64,9 @@ class Field:
self.rolename = rolename self.rolename = rolename
self.bodyrolename = bodyrolename self.bodyrolename = bodyrolename
def make_xref(self, def make_xref(self, rolename: str, domain: str, target: str,
rolename, # type: str innernode: Type[TextlikeNode] = addnodes.literal_emphasis,
domain, # type: str contnode: Node = None, env: "BuildEnvironment" = None) -> Node:
target, # type: str
innernode=addnodes.literal_emphasis, # type: Type[TextlikeNode]
contnode=None, # type: nodes.Node
env=None, # type: BuildEnvironment
):
# type: (...) -> nodes.Node
if not rolename: if not rolename:
return contnode or innernode(target, target) return contnode or innernode(target, target)
refnode = addnodes.pending_xref('', refdomain=domain, refexplicit=False, refnode = addnodes.pending_xref('', refdomain=domain, refexplicit=False,
@ -83,28 +76,16 @@ class Field:
env.get_domain(domain).process_field_xref(refnode) env.get_domain(domain).process_field_xref(refnode)
return refnode return refnode
def make_xrefs(self, def make_xrefs(self, rolename: str, domain: str, target: str,
rolename, # type: str innernode: Type[TextlikeNode] = addnodes.literal_emphasis,
domain, # type: str contnode: Node = None, env: "BuildEnvironment" = None) -> List[Node]:
target, # type: str
innernode=addnodes.literal_emphasis, # type: Type[TextlikeNode]
contnode=None, # type: nodes.Node
env=None, # type: BuildEnvironment
):
# type: (...) -> List[nodes.Node]
return [self.make_xref(rolename, domain, target, innernode, contnode, env)] return [self.make_xref(rolename, domain, target, innernode, contnode, env)]
def make_entry(self, fieldarg, content): def make_entry(self, fieldarg: str, content: List[Node]) -> Tuple[str, List[Node]]:
# type: (str, List[nodes.Node]) -> Tuple[str, List[nodes.Node]]
return (fieldarg, content) return (fieldarg, content)
def make_field(self, def make_field(self, types: Dict[str, List[Node]], domain: str,
types, # type: Dict[str, List[nodes.Node]] item: Tuple, env: "BuildEnvironment" = None) -> nodes.field:
domain, # type: str
item, # type: Tuple
env=None, # type: BuildEnvironment
):
# type: (...) -> nodes.field
fieldarg, content = item fieldarg, content = item
fieldname = nodes.field_name('', self.label) fieldname = nodes.field_name('', self.label)
if fieldarg: if fieldarg:
@ -138,19 +119,13 @@ class GroupedField(Field):
is_grouped = True is_grouped = True
list_type = nodes.bullet_list list_type = nodes.bullet_list
def __init__(self, name, names=(), label=None, rolename=None, def __init__(self, name: str, names: Tuple[str, ...] = (), label: str = None,
can_collapse=False): rolename: str = None, can_collapse: bool = False) -> None:
# type: (str, Tuple[str, ...], str, str, bool) -> None
super().__init__(name, names, label, True, rolename) super().__init__(name, names, label, True, rolename)
self.can_collapse = can_collapse self.can_collapse = can_collapse
def make_field(self, def make_field(self, types: Dict[str, List[Node]], domain: str,
types, # type: Dict[str, List[nodes.Node]] items: Tuple, env: "BuildEnvironment" = None) -> nodes.field:
domain, # type: str
items, # type: Tuple
env=None, # type: BuildEnvironment
):
# type: (...) -> nodes.field
fieldname = nodes.field_name('', self.label) fieldname = nodes.field_name('', self.label)
listnode = self.list_type() listnode = self.list_type()
for fieldarg, content in items: for fieldarg, content in items:
@ -191,22 +166,16 @@ class TypedField(GroupedField):
""" """
is_typed = True is_typed = True
def __init__(self, name, names=(), typenames=(), label=None, def __init__(self, name: str, names: Tuple[str, ...] = (), typenames: Tuple[str, ...] = (),
rolename=None, typerolename=None, can_collapse=False): label: str = None, rolename: str = None, typerolename: str = None,
# type: (str, Tuple[str, ...], Tuple[str, ...], str, str, str, bool) -> None can_collapse: bool = False) -> None:
super().__init__(name, names, label, rolename, can_collapse) super().__init__(name, names, label, rolename, can_collapse)
self.typenames = typenames self.typenames = typenames
self.typerolename = typerolename self.typerolename = typerolename
def make_field(self, def make_field(self, types: Dict[str, List[Node]], domain: str,
types, # type: Dict[str, List[nodes.Node]] items: Tuple, env: "BuildEnvironment" = None) -> nodes.field:
domain, # type: str def handle_item(fieldarg: str, content: str) -> nodes.paragraph:
items, # type: Tuple
env=None, # type: BuildEnvironment
):
# type: (...) -> nodes.field
def handle_item(fieldarg, content):
# type: (str, str) -> nodes.paragraph
par = nodes.paragraph() par = nodes.paragraph()
par.extend(self.make_xrefs(self.rolename, domain, fieldarg, par.extend(self.make_xrefs(self.rolename, domain, fieldarg,
addnodes.literal_strong, env=env)) addnodes.literal_strong, env=env))
@ -246,13 +215,11 @@ class DocFieldTransformer:
""" """
typemap = None # type: Dict[str, Tuple[Field, bool]] typemap = None # type: Dict[str, Tuple[Field, bool]]
def __init__(self, directive): def __init__(self, directive: "ObjectDescription") -> None:
# type: (ObjectDescription) -> None
self.directive = directive self.directive = directive
self.typemap = directive.get_field_type_map() self.typemap = directive.get_field_type_map()
def preprocess_fieldtypes(self, types): def preprocess_fieldtypes(self, types: List[Field]) -> Dict[str, Tuple[Field, bool]]:
# type: (List[Field]) -> Dict[str, Tuple[Field, bool]]
warnings.warn('DocFieldTransformer.preprocess_fieldtypes() is deprecated.', warnings.warn('DocFieldTransformer.preprocess_fieldtypes() is deprecated.',
RemovedInSphinx40Warning) RemovedInSphinx40Warning)
typemap = {} typemap = {}
@ -265,16 +232,14 @@ class DocFieldTransformer:
typemap[name] = typed_field, True typemap[name] = typed_field, True
return typemap return typemap
def transform_all(self, node): def transform_all(self, node: addnodes.desc_content) -> None:
# type: (addnodes.desc_content) -> None
"""Transform all field list children of a node.""" """Transform all field list children of a node."""
# don't traverse, only handle field lists that are immediate children # don't traverse, only handle field lists that are immediate children
for child in node: for child in node:
if isinstance(child, nodes.field_list): if isinstance(child, nodes.field_list):
self.transform(child) self.transform(child)
def transform(self, node): def transform(self, node: nodes.field_list) -> None:
# type: (nodes.field_list) -> None
"""Transform a single field list *node*.""" """Transform a single field list *node*."""
typemap = self.typemap typemap = self.typemap

View File

@ -16,33 +16,33 @@ from contextlib import contextmanager
from copy import copy from copy import copy
from distutils.version import LooseVersion from distutils.version import LooseVersion
from os import path from os import path
from typing import IO, cast from types import ModuleType
from typing import Any, Callable, Dict, Generator, IO, List, Set, Tuple, Type
from typing import cast
import docutils import docutils
from docutils import nodes from docutils import nodes
from docutils.io import FileOutput from docutils.io import FileOutput
from docutils.nodes import Element, Node, system_message
from docutils.parsers.rst import Directive, directives, roles, convert_directive_function from docutils.parsers.rst import Directive, directives, roles, convert_directive_function
from docutils.statemachine import StateMachine from docutils.parsers.rst.states import Inliner
from docutils.statemachine import StateMachine, State, StringList
from docutils.utils import Reporter, unescape from docutils.utils import Reporter, unescape
from sphinx.deprecation import RemovedInSphinx30Warning from sphinx.deprecation import RemovedInSphinx30Warning
from sphinx.errors import ExtensionError, SphinxError from sphinx.errors import ExtensionError, SphinxError
from sphinx.locale import __ from sphinx.locale import __
from sphinx.util import logging from sphinx.util import logging
from sphinx.util.typing import RoleFunction
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
report_re = re.compile('^(.+?:(?:\\d+)?): \\((DEBUG|INFO|WARNING|ERROR|SEVERE)/(\\d+)?\\) ') report_re = re.compile('^(.+?:(?:\\d+)?): \\((DEBUG|INFO|WARNING|ERROR|SEVERE)/(\\d+)?\\) ')
if False: if False:
# For type annotation # For type annotation
from types import ModuleType # NOQA from sphinx.builders import Builder
from typing import Any, Callable, Dict, Generator, List, Set, Tuple, Type # NOQA from sphinx.config import Config
from docutils.parsers.rst.states import Inliner # NOQA from sphinx.environment import BuildEnvironment
from docutils.statemachine import State, StringList # NOQA
from sphinx.builders import Builder # NOQA
from sphinx.config import Config # NOQA
from sphinx.environment import BuildEnvironment # NOQA
from sphinx.util.typing import RoleFunction # NOQA
__version_info__ = tuple(LooseVersion(docutils.__version__).version) __version_info__ = tuple(LooseVersion(docutils.__version__).version)
@ -50,8 +50,7 @@ additional_nodes = set() # type: Set[Type[nodes.Element]]
@contextmanager @contextmanager
def docutils_namespace(): def docutils_namespace() -> Generator[None, None, None]:
# type: () -> Generator[None, None, None]
"""Create namespace for reST parsers.""" """Create namespace for reST parsers."""
try: try:
_directives = copy(directives._directives) # type: ignore _directives = copy(directives._directives) # type: ignore
@ -67,14 +66,12 @@ def docutils_namespace():
additional_nodes.discard(node) additional_nodes.discard(node)
def is_directive_registered(name): def is_directive_registered(name: str) -> bool:
# type: (str) -> bool
"""Check the *name* directive is already registered.""" """Check the *name* directive is already registered."""
return name in directives._directives # type: ignore return name in directives._directives # type: ignore
def register_directive(name, directive): def register_directive(name: str, directive: Type[Directive]) -> None:
# type: (str, Type[Directive]) -> None
"""Register a directive to docutils. """Register a directive to docutils.
This modifies global state of docutils. So it is better to use this This modifies global state of docutils. So it is better to use this
@ -83,14 +80,12 @@ def register_directive(name, directive):
directives.register_directive(name, directive) directives.register_directive(name, directive)
def is_role_registered(name): def is_role_registered(name: str) -> bool:
# type: (str) -> bool
"""Check the *name* role is already registered.""" """Check the *name* role is already registered."""
return name in roles._roles # type: ignore return name in roles._roles # type: ignore
def register_role(name, role): def register_role(name: str, role: RoleFunction) -> None:
# type: (str, RoleFunction) -> None
"""Register a role to docutils. """Register a role to docutils.
This modifies global state of docutils. So it is better to use this This modifies global state of docutils. So it is better to use this
@ -99,20 +94,17 @@ def register_role(name, role):
roles.register_local_role(name, role) roles.register_local_role(name, role)
def unregister_role(name): def unregister_role(name: str) -> None:
# type: (str) -> None
"""Unregister a role from docutils.""" """Unregister a role from docutils."""
roles._roles.pop(name, None) # type: ignore roles._roles.pop(name, None) # type: ignore
def is_node_registered(node): def is_node_registered(node: Type[Element]) -> bool:
# type: (Type[nodes.Element]) -> bool
"""Check the *node* is already registered.""" """Check the *node* is already registered."""
return hasattr(nodes.GenericNodeVisitor, 'visit_' + node.__name__) return hasattr(nodes.GenericNodeVisitor, 'visit_' + node.__name__)
def register_node(node): def register_node(node: Type[Element]) -> None:
# type: (Type[nodes.Element]) -> None
"""Register a node to docutils. """Register a node to docutils.
This modifies global state of some visitors. So it is better to use this This modifies global state of some visitors. So it is better to use this
@ -123,8 +115,7 @@ def register_node(node):
additional_nodes.add(node) additional_nodes.add(node)
def unregister_node(node): def unregister_node(node: Type[Element]) -> None:
# type: (Type[nodes.Element]) -> None
"""Unregister a node from docutils. """Unregister a node from docutils.
This is inverse of ``nodes._add_nodes_class_names()``. This is inverse of ``nodes._add_nodes_class_names()``.
@ -137,8 +128,7 @@ def unregister_node(node):
@contextmanager @contextmanager
def patched_get_language(): def patched_get_language() -> Generator[None, None, None]:
# type: () -> Generator[None, None, None]
"""Patch docutils.languages.get_language() temporarily. """Patch docutils.languages.get_language() temporarily.
This ignores the second argument ``reporter`` to suppress warnings. This ignores the second argument ``reporter`` to suppress warnings.
@ -146,8 +136,7 @@ def patched_get_language():
""" """
from docutils.languages import get_language from docutils.languages import get_language
def patched_get_language(language_code, reporter=None): def patched_get_language(language_code: str, reporter: Reporter = None) -> Any:
# type: (str, Reporter) -> Any
return get_language(language_code) return get_language(language_code)
try: try:
@ -159,8 +148,7 @@ def patched_get_language():
@contextmanager @contextmanager
def using_user_docutils_conf(confdir): def using_user_docutils_conf(confdir: str) -> Generator[None, None, None]:
# type: (str) -> Generator[None, None, None]
"""Let docutils know the location of ``docutils.conf`` for Sphinx.""" """Let docutils know the location of ``docutils.conf`` for Sphinx."""
try: try:
docutilsconfig = os.environ.get('DOCUTILSCONFIG', None) docutilsconfig = os.environ.get('DOCUTILSCONFIG', None)
@ -176,8 +164,7 @@ def using_user_docutils_conf(confdir):
@contextmanager @contextmanager
def patch_docutils(confdir=None): def patch_docutils(confdir: str = None) -> Generator[None, None, None]:
# type: (str) -> Generator[None, None, None]
"""Patch to docutils temporarily.""" """Patch to docutils temporarily."""
with patched_get_language(), using_user_docutils_conf(confdir): with patched_get_language(), using_user_docutils_conf(confdir):
yield yield
@ -191,35 +178,30 @@ class sphinx_domains:
"""Monkey-patch directive and role dispatch, so that domain-specific """Monkey-patch directive and role dispatch, so that domain-specific
markup takes precedence. markup takes precedence.
""" """
def __init__(self, env): def __init__(self, env: "BuildEnvironment") -> None:
# type: (BuildEnvironment) -> None
self.env = env self.env = env
self.directive_func = None # type: Callable self.directive_func = None # type: Callable
self.roles_func = None # type: Callable self.roles_func = None # type: Callable
def __enter__(self): def __enter__(self) -> None:
# type: () -> None
self.enable() self.enable()
def __exit__(self, type, value, traceback): def __exit__(self, exc_type: Type[Exception], exc_value: Exception, traceback: Any) -> bool: # NOQA
# type: (str, str, str) -> None
self.disable() self.disable()
return True
def enable(self): def enable(self) -> None:
# type: () -> None
self.directive_func = directives.directive self.directive_func = directives.directive
self.role_func = roles.role self.role_func = roles.role
directives.directive = self.lookup_directive # type: ignore directives.directive = self.lookup_directive # type: ignore
roles.role = self.lookup_role # type: ignore roles.role = self.lookup_role # type: ignore
def disable(self): def disable(self) -> None:
# type: () -> None
directives.directive = self.directive_func directives.directive = self.directive_func
roles.role = self.role_func roles.role = self.role_func
def lookup_domain_element(self, type, name): def lookup_domain_element(self, type: str, name: str) -> Any:
# type: (str, str) -> Any
"""Lookup a markup element (directive or role), given its name which can """Lookup a markup element (directive or role), given its name which can
be a full name (with domain). be a full name (with domain).
""" """
@ -247,15 +229,13 @@ class sphinx_domains:
raise ElementLookupError raise ElementLookupError
def lookup_directive(self, name, lang_module, document): def lookup_directive(self, name: str, lang_module: ModuleType, document: nodes.document) -> Tuple[Type[Directive], List[system_message]]: # NOQA
# type: (str, ModuleType, nodes.document) -> Tuple[Type[Directive], List[nodes.system_message]] # NOQA
try: try:
return self.lookup_domain_element('directive', name) return self.lookup_domain_element('directive', name)
except ElementLookupError: except ElementLookupError:
return self.directive_func(name, lang_module, document) return self.directive_func(name, lang_module, document)
def lookup_role(self, name, lang_module, lineno, reporter): def lookup_role(self, name: str, lang_module: ModuleType, lineno: int, reporter: Reporter) -> Tuple[RoleFunction, List[system_message]]: # NOQA
# type: (str, ModuleType, int, Reporter) -> Tuple[RoleFunction, List[nodes.system_message]] # NOQA
try: try:
return self.lookup_domain_element('role', name) return self.lookup_domain_element('role', name)
except ElementLookupError: except ElementLookupError:
@ -263,8 +243,7 @@ class sphinx_domains:
class WarningStream: class WarningStream:
def write(self, text): def write(self, text: str) -> None:
# type: (str) -> None
matched = report_re.search(text) matched = report_re.search(text)
if not matched: if not matched:
logger.warning(text.rstrip("\r\n")) logger.warning(text.rstrip("\r\n"))
@ -276,16 +255,14 @@ class WarningStream:
class LoggingReporter(Reporter): class LoggingReporter(Reporter):
@classmethod @classmethod
def from_reporter(cls, reporter): def from_reporter(cls, reporter: Reporter) -> "LoggingReporter":
# type: (Reporter) -> LoggingReporter
"""Create an instance of LoggingReporter from other reporter object.""" """Create an instance of LoggingReporter from other reporter object."""
return cls(reporter.source, reporter.report_level, reporter.halt_level, return cls(reporter.source, reporter.report_level, reporter.halt_level,
reporter.debug_flag, reporter.error_handler) reporter.debug_flag, reporter.error_handler)
def __init__(self, source, report_level=Reporter.WARNING_LEVEL, def __init__(self, source: str, report_level: int = Reporter.WARNING_LEVEL,
halt_level=Reporter.SEVERE_LEVEL, debug=False, halt_level: int = Reporter.SEVERE_LEVEL, debug: bool = False,
error_handler='backslashreplace'): error_handler: str = 'backslashreplace') -> None:
# type: (str, int, int, bool, str) -> None
stream = cast(IO, WarningStream()) stream = cast(IO, WarningStream())
super().__init__(source, report_level, halt_level, super().__init__(source, report_level, halt_level,
stream, debug, error_handler=error_handler) stream, debug, error_handler=error_handler)
@ -294,18 +271,15 @@ class LoggingReporter(Reporter):
class NullReporter(Reporter): class NullReporter(Reporter):
"""A dummy reporter; write nothing.""" """A dummy reporter; write nothing."""
def __init__(self): def __init__(self) -> None:
# type: () -> None
super().__init__('', 999, 4) super().__init__('', 999, 4)
def is_html5_writer_available(): def is_html5_writer_available() -> bool:
# type: () -> bool
return __version_info__ > (0, 13, 0) return __version_info__ > (0, 13, 0)
def directive_helper(obj, has_content=None, argument_spec=None, **option_spec): def directive_helper(obj: Any, has_content: bool = None, argument_spec: Tuple[int, int, bool] = None, **option_spec) -> Any: # NOQA
# type: (Any, bool, Tuple[int, int, bool], Any) -> Any
warnings.warn('function based directive support is now deprecated. ' warnings.warn('function based directive support is now deprecated. '
'Use class based directive instead.', 'Use class based directive instead.',
RemovedInSphinx30Warning) RemovedInSphinx30Warning)
@ -323,8 +297,7 @@ def directive_helper(obj, has_content=None, argument_spec=None, **option_spec):
@contextmanager @contextmanager
def switch_source_input(state, content): def switch_source_input(state: State, content: StringList) -> Generator[None, None, None]:
# type: (State, StringList) -> Generator[None, None, None]
"""Switch current source input of state temporarily.""" """Switch current source input of state temporarily."""
try: try:
# remember the original ``get_source_and_line()`` method # remember the original ``get_source_and_line()`` method
@ -344,13 +317,11 @@ def switch_source_input(state, content):
class SphinxFileOutput(FileOutput): class SphinxFileOutput(FileOutput):
"""Better FileOutput class for Sphinx.""" """Better FileOutput class for Sphinx."""
def __init__(self, **kwargs): def __init__(self, **kwargs) -> None:
# type: (Any) -> None
self.overwrite_if_changed = kwargs.pop('overwrite_if_changed', False) self.overwrite_if_changed = kwargs.pop('overwrite_if_changed', False)
super().__init__(**kwargs) super().__init__(**kwargs)
def write(self, data): def write(self, data: str) -> str:
# type: (str) -> str
if (self.destination_path and self.autoclose and 'b' not in self.mode and if (self.destination_path and self.autoclose and 'b' not in self.mode and
self.overwrite_if_changed and os.path.exists(self.destination_path)): self.overwrite_if_changed and os.path.exists(self.destination_path)):
with open(self.destination_path, encoding=self.encoding) as f: with open(self.destination_path, encoding=self.encoding) as f:
@ -371,19 +342,16 @@ class SphinxDirective(Directive):
""" """
@property @property
def env(self): def env(self) -> "BuildEnvironment":
# type: () -> BuildEnvironment
"""Reference to the :class:`.BuildEnvironment` object.""" """Reference to the :class:`.BuildEnvironment` object."""
return self.state.document.settings.env return self.state.document.settings.env
@property @property
def config(self): def config(self) -> "Config":
# type: () -> Config
"""Reference to the :class:`.Config` object.""" """Reference to the :class:`.Config` object."""
return self.env.config return self.env.config
def set_source_info(self, node): def set_source_info(self, node: Node) -> None:
# type: (nodes.Node) -> None
"""Set source and line number to the node.""" """Set source and line number to the node."""
node.source, node.line = self.state_machine.get_source_and_line(self.lineno) node.source, node.line = self.state_machine.get_source_and_line(self.lineno)
@ -406,8 +374,9 @@ class SphinxRole:
content = None #: A list of strings, the directive content for customization content = None #: A list of strings, the directive content for customization
#: (from the "role" directive). #: (from the "role" directive).
def __call__(self, name, rawtext, text, lineno, inliner, options={}, content=[]): def __call__(self, name: str, rawtext: str, text: str, lineno: int,
# type: (str, str, str, int, Inliner, Dict, List[str]) -> Tuple[List[nodes.Node], List[nodes.system_message]] # NOQA inliner: Inliner, options: Dict = {}, content: List[str] = []
) -> Tuple[List[Node], List[system_message]]:
self.rawtext = rawtext self.rawtext = rawtext
self.text = unescape(text) self.text = unescape(text)
self.lineno = lineno self.lineno = lineno
@ -427,24 +396,20 @@ class SphinxRole:
return self.run() return self.run()
def run(self): def run(self) -> Tuple[List[Node], List[system_message]]:
# type: () -> Tuple[List[nodes.Node], List[nodes.system_message]]
raise NotImplementedError raise NotImplementedError
@property @property
def env(self): def env(self) -> "BuildEnvironment":
# type: () -> BuildEnvironment
"""Reference to the :class:`.BuildEnvironment` object.""" """Reference to the :class:`.BuildEnvironment` object."""
return self.inliner.document.settings.env return self.inliner.document.settings.env
@property @property
def config(self): def config(self) -> "Config":
# type: () -> Config
"""Reference to the :class:`.Config` object.""" """Reference to the :class:`.Config` object."""
return self.env.config return self.env.config
def set_source_info(self, node, lineno=None): def set_source_info(self, node: Node, lineno: int = None) -> None:
# type: (nodes.Node, int) -> None
if lineno is None: if lineno is None:
lineno = self.lineno lineno = self.lineno
@ -466,8 +431,9 @@ class ReferenceRole(SphinxRole):
# \x00 means the "<" was backslash-escaped # \x00 means the "<" was backslash-escaped
explicit_title_re = re.compile(r'^(.+?)\s*(?<!\x00)<(.*?)>$', re.DOTALL) explicit_title_re = re.compile(r'^(.+?)\s*(?<!\x00)<(.*?)>$', re.DOTALL)
def __call__(self, name, rawtext, text, lineno, inliner, options={}, content=[]): def __call__(self, name: str, rawtext: str, text: str, lineno: int,
# type: (str, str, str, int, Inliner, Dict, List[str]) -> Tuple[List[nodes.Node], List[nodes.system_message]] # NOQA inliner: Inliner, options: Dict = {}, content: List[str] = []
) -> Tuple[List[Node], List[system_message]]:
matched = self.explicit_title_re.match(text) matched = self.explicit_title_re.match(text)
if matched: if matched:
self.has_explicit_title = True self.has_explicit_title = True
@ -490,8 +456,7 @@ class SphinxTranslator(nodes.NodeVisitor):
This class is strongly coupled with Sphinx. This class is strongly coupled with Sphinx.
""" """
def __init__(self, document, builder): def __init__(self, document: nodes.document, builder: "Builder") -> None:
# type: (nodes.document, Builder) -> None
super().__init__(document) super().__init__(document)
self.builder = builder self.builder = builder
self.config = builder.config self.config = builder.config
@ -503,8 +468,7 @@ class SphinxTranslator(nodes.NodeVisitor):
__document_cache__ = None # type: nodes.document __document_cache__ = None # type: nodes.document
def new_document(source_path, settings=None): def new_document(source_path: str, settings: Any = None) -> nodes.document:
# type: (str, Any) -> nodes.document
"""Return a new empty document object. This is an alternative of docutils'. """Return a new empty document object. This is an alternative of docutils'.
This is a simple wrapper for ``docutils.utils.new_document()``. It This is a simple wrapper for ``docutils.utils.new_document()``. It

View File

@ -10,20 +10,20 @@
import os import os
import re import re
import zlib import zlib
from typing import Callable, IO, Iterator
from sphinx.util import logging from sphinx.util import logging
from sphinx.util.typing import Inventory
if False:
# For type annotation
from typing import Callable, IO, Iterator # NOQA
from sphinx.builders import Builder # NOQA
from sphinx.environment import BuildEnvironment # NOQA
from sphinx.util.typing import Inventory # NOQA
BUFSIZE = 16 * 1024 BUFSIZE = 16 * 1024
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if False:
# For type annotation
from sphinx.builders import Builder
from sphinx.environment import BuildEnvironment
class InventoryFileReader: class InventoryFileReader:
"""A file reader for inventory file. """A file reader for inventory file.
@ -31,21 +31,18 @@ class InventoryFileReader:
This reader supports mixture of texts and compressed texts. This reader supports mixture of texts and compressed texts.
""" """
def __init__(self, stream): def __init__(self, stream: IO) -> None:
# type: (IO) -> None
self.stream = stream self.stream = stream
self.buffer = b'' self.buffer = b''
self.eof = False self.eof = False
def read_buffer(self): def read_buffer(self) -> None:
# type: () -> None
chunk = self.stream.read(BUFSIZE) chunk = self.stream.read(BUFSIZE)
if chunk == b'': if chunk == b'':
self.eof = True self.eof = True
self.buffer += chunk self.buffer += chunk
def readline(self): def readline(self) -> str:
# type: () -> str
pos = self.buffer.find(b'\n') pos = self.buffer.find(b'\n')
if pos != -1: if pos != -1:
line = self.buffer[:pos].decode() line = self.buffer[:pos].decode()
@ -59,15 +56,13 @@ class InventoryFileReader:
return line return line
def readlines(self): def readlines(self) -> Iterator[str]:
# type: () -> Iterator[str]
while not self.eof: while not self.eof:
line = self.readline() line = self.readline()
if line: if line:
yield line yield line
def read_compressed_chunks(self): def read_compressed_chunks(self) -> Iterator[bytes]:
# type: () -> Iterator[bytes]
decompressor = zlib.decompressobj() decompressor = zlib.decompressobj()
while not self.eof: while not self.eof:
self.read_buffer() self.read_buffer()
@ -75,8 +70,7 @@ class InventoryFileReader:
self.buffer = b'' self.buffer = b''
yield decompressor.flush() yield decompressor.flush()
def read_compressed_lines(self): def read_compressed_lines(self) -> Iterator[str]:
# type: () -> Iterator[str]
buf = b'' buf = b''
for chunk in self.read_compressed_chunks(): for chunk in self.read_compressed_chunks():
buf += chunk buf += chunk
@ -89,8 +83,7 @@ class InventoryFileReader:
class InventoryFile: class InventoryFile:
@classmethod @classmethod
def load(cls, stream, uri, joinfunc): def load(cls, stream: IO, uri: str, joinfunc: Callable) -> Inventory:
# type: (IO, str, Callable) -> Inventory
reader = InventoryFileReader(stream) reader = InventoryFileReader(stream)
line = reader.readline().rstrip() line = reader.readline().rstrip()
if line == '# Sphinx inventory version 1': if line == '# Sphinx inventory version 1':
@ -101,8 +94,7 @@ class InventoryFile:
raise ValueError('invalid inventory header: %s' % line) raise ValueError('invalid inventory header: %s' % line)
@classmethod @classmethod
def load_v1(cls, stream, uri, join): def load_v1(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory:
# type: (InventoryFileReader, str, Callable) -> Inventory
invdata = {} # type: Inventory invdata = {} # type: Inventory
projname = stream.readline().rstrip()[11:] projname = stream.readline().rstrip()[11:]
version = stream.readline().rstrip()[11:] version = stream.readline().rstrip()[11:]
@ -120,8 +112,7 @@ class InventoryFile:
return invdata return invdata
@classmethod @classmethod
def load_v2(cls, stream, uri, join): def load_v2(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory:
# type: (InventoryFileReader, str, Callable) -> Inventory
invdata = {} # type: Inventory invdata = {} # type: Inventory
projname = stream.readline().rstrip()[11:] projname = stream.readline().rstrip()[11:]
version = stream.readline().rstrip()[11:] version = stream.readline().rstrip()[11:]
@ -150,10 +141,8 @@ class InventoryFile:
return invdata return invdata
@classmethod @classmethod
def dump(cls, filename, env, builder): def dump(cls, filename: str, env: "BuildEnvironment", builder: "Builder") -> None:
# type: (str, BuildEnvironment, Builder) -> None def escape(string: str) -> str:
def escape(string):
# type: (str) -> str
return re.sub("\\s+", " ", string) return re.sub("\\s+", " ", string)
with open(os.path.join(filename), 'wb') as f: with open(os.path.join(filename), 'wb') as f:

View File

@ -10,10 +10,7 @@
""" """
import re import re
from typing import Any, Dict, IO, List, Match, Union
if False:
# For type annotation
from typing import Any, Dict, IO, List, Match, Union # NOQA
_str_re = re.compile(r'"(\\\\|\\"|[^"])*"') _str_re = re.compile(r'"(\\\\|\\"|[^"])*"')
_int_re = re.compile(r'\d+') _int_re = re.compile(r'\d+')
@ -35,10 +32,8 @@ ESCAPE_DICT = {
ESCAPED = re.compile(r'\\u.{4}|\\.') ESCAPED = re.compile(r'\\u.{4}|\\.')
def encode_string(s): def encode_string(s: str) -> str:
# type: (str) -> str def replace(match: Match) -> str:
def replace(match):
# type: (Match) -> str
s = match.group(0) s = match.group(0)
try: try:
return ESCAPE_DICT[s] return ESCAPE_DICT[s]
@ -55,8 +50,7 @@ def encode_string(s):
return '"' + str(ESCAPE_ASCII.sub(replace, s)) + '"' return '"' + str(ESCAPE_ASCII.sub(replace, s)) + '"'
def decode_string(s): def decode_string(s: str) -> str:
# type: (str) -> str
return ESCAPED.sub(lambda m: eval('"' + m.group() + '"'), s) return ESCAPED.sub(lambda m: eval('"' + m.group() + '"'), s)
@ -78,8 +72,7 @@ do import static with
double in super""".split()) double in super""".split())
def dumps(obj, key=False): def dumps(obj: Any, key: bool = False) -> str:
# type: (Any, bool) -> str
if key: if key:
if not isinstance(obj, str): if not isinstance(obj, str):
obj = str(obj) obj = str(obj)
@ -107,13 +100,11 @@ def dumps(obj, key=False):
raise TypeError(type(obj)) raise TypeError(type(obj))
def dump(obj, f): def dump(obj: Any, f: IO) -> None:
# type: (Any, IO) -> None
f.write(dumps(obj)) f.write(dumps(obj))
def loads(x): def loads(x: str) -> Any:
# type: (str) -> Any
"""Loader that can read the JS subset the indexer produces.""" """Loader that can read the JS subset the indexer produces."""
nothing = object() nothing = object()
i = 0 i = 0
@ -205,6 +196,5 @@ def loads(x):
return obj return obj
def load(f): def load(f: IO) -> Any:
# type: (IO) -> Any
return loads(f.read()) return loads(f.read())

View File

@ -11,13 +11,10 @@
import json import json
import warnings import warnings
from collections import UserString from collections import UserString
from typing import Any, IO
from sphinx.deprecation import RemovedInSphinx40Warning from sphinx.deprecation import RemovedInSphinx40Warning
if False:
# For type annotation
from typing import Any, IO # NOQA
warnings.warn('sphinx.util.jsonimpl is deprecated', warnings.warn('sphinx.util.jsonimpl is deprecated',
RemovedInSphinx40Warning, stacklevel=2) RemovedInSphinx40Warning, stacklevel=2)
@ -25,30 +22,25 @@ warnings.warn('sphinx.util.jsonimpl is deprecated',
class SphinxJSONEncoder(json.JSONEncoder): class SphinxJSONEncoder(json.JSONEncoder):
"""JSONEncoder subclass that forces translation proxies.""" """JSONEncoder subclass that forces translation proxies."""
def default(self, obj): def default(self, obj: Any) -> str:
# type: (Any) -> str
if isinstance(obj, UserString): if isinstance(obj, UserString):
return str(obj) return str(obj)
return super().default(obj) return super().default(obj)
def dump(obj, fp, *args, **kwds): def dump(obj: Any, fp: IO, *args, **kwds) -> None:
# type: (Any, IO, Any, Any) -> None
kwds['cls'] = SphinxJSONEncoder kwds['cls'] = SphinxJSONEncoder
json.dump(obj, fp, *args, **kwds) json.dump(obj, fp, *args, **kwds)
def dumps(obj, *args, **kwds): def dumps(obj: Any, *args, **kwds) -> str:
# type: (Any, Any, Any) -> str
kwds['cls'] = SphinxJSONEncoder kwds['cls'] = SphinxJSONEncoder
return json.dumps(obj, *args, **kwds) return json.dumps(obj, *args, **kwds)
def load(*args, **kwds): def load(*args, **kwds) -> Any:
# type: (Any, Any) -> Any
return json.load(*args, **kwds) return json.load(*args, **kwds)
def loads(*args, **kwds): def loads(*args, **kwds) -> Any:
# type: (Any, Any) -> Any
return json.loads(*args, **kwds) return json.loads(*args, **kwds)

View File

@ -12,8 +12,10 @@ import logging
import logging.handlers import logging.handlers
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from typing import Any, Dict, Generator, IO, List, Tuple, Type, Union
from docutils import nodes from docutils import nodes
from docutils.nodes import Node
from docutils.utils import get_source_line from docutils.utils import get_source_line
from sphinx.errors import SphinxWarning from sphinx.errors import SphinxWarning
@ -21,8 +23,7 @@ from sphinx.util.console import colorize
if False: if False:
# For type annotation # For type annotation
from typing import Any, Dict, Generator, IO, List, Tuple, Type, Union # NOQA from sphinx.application import Sphinx
from sphinx.application import Sphinx # NOQA
NAMESPACE = 'sphinx' NAMESPACE = 'sphinx'
@ -54,8 +55,7 @@ COLOR_MAP = defaultdict(lambda: 'blue',
}) })
def getLogger(name): def getLogger(name: str) -> "SphinxLoggerAdapter":
# type: (str) -> SphinxLoggerAdapter
"""Get logger wrapped by :class:`sphinx.util.logging.SphinxLoggerAdapter`. """Get logger wrapped by :class:`sphinx.util.logging.SphinxLoggerAdapter`.
Sphinx logger always uses ``sphinx.*`` namespace to be independent from Sphinx logger always uses ``sphinx.*`` namespace to be independent from
@ -77,8 +77,7 @@ def getLogger(name):
return SphinxLoggerAdapter(logger, {}) return SphinxLoggerAdapter(logger, {})
def convert_serializable(records): def convert_serializable(records: List[logging.LogRecord]) -> None:
# type: (List[logging.LogRecord]) -> None
"""Convert LogRecord serializable.""" """Convert LogRecord serializable."""
for r in records: for r in records:
# extract arguments to a message and clear them # extract arguments to a message and clear them
@ -95,8 +94,7 @@ class SphinxLogRecord(logging.LogRecord):
prefix = '' prefix = ''
location = None # type: Any location = None # type: Any
def getMessage(self): def getMessage(self) -> str:
# type: () -> str
message = super().getMessage() message = super().getMessage()
location = getattr(self, 'location', None) location = getattr(self, 'location', None)
if location: if location:
@ -120,20 +118,17 @@ class SphinxWarningLogRecord(SphinxLogRecord):
class SphinxLoggerAdapter(logging.LoggerAdapter): class SphinxLoggerAdapter(logging.LoggerAdapter):
"""LoggerAdapter allowing ``type`` and ``subtype`` keywords.""" """LoggerAdapter allowing ``type`` and ``subtype`` keywords."""
def log(self, level, msg, *args, **kwargs): def log(self, level: Union[int, str], msg: str, *args, **kwargs) -> None:
# type: (Union[int, str], str, Any, Any) -> None
if isinstance(level, int): if isinstance(level, int):
super().log(level, msg, *args, **kwargs) super().log(level, msg, *args, **kwargs)
else: else:
levelno = LEVEL_NAMES[level] levelno = LEVEL_NAMES[level]
super().log(levelno, msg, *args, **kwargs) super().log(levelno, msg, *args, **kwargs)
def verbose(self, msg, *args, **kwargs): def verbose(self, msg: str, *args, **kwargs) -> None:
# type: (str, Any, Any) -> None
self.log(VERBOSE, msg, *args, **kwargs) self.log(VERBOSE, msg, *args, **kwargs)
def process(self, msg, kwargs): # type: ignore def process(self, msg: str, kwargs: Dict) -> Tuple[str, Dict]: # type: ignore
# type: (str, Dict) -> Tuple[str, Dict]
extra = kwargs.setdefault('extra', {}) extra = kwargs.setdefault('extra', {})
if 'type' in kwargs: if 'type' in kwargs:
extra['type'] = kwargs.pop('type') extra['type'] = kwargs.pop('type')
@ -148,8 +143,7 @@ class SphinxLoggerAdapter(logging.LoggerAdapter):
return msg, kwargs return msg, kwargs
def handle(self, record): def handle(self, record: logging.LogRecord) -> None:
# type: (logging.LogRecord) -> None
self.logger.handle(record) self.logger.handle(record)
@ -161,8 +155,7 @@ class WarningStreamHandler(logging.StreamHandler):
class NewLineStreamHandler(logging.StreamHandler): class NewLineStreamHandler(logging.StreamHandler):
"""StreamHandler which switches line terminator by record.nonl flag.""" """StreamHandler which switches line terminator by record.nonl flag."""
def emit(self, record): def emit(self, record: logging.LogRecord) -> None:
# type: (logging.LogRecord) -> None
try: try:
self.acquire() self.acquire()
if getattr(record, 'nonl', False): if getattr(record, 'nonl', False):
@ -177,16 +170,13 @@ class NewLineStreamHandler(logging.StreamHandler):
class MemoryHandler(logging.handlers.BufferingHandler): class MemoryHandler(logging.handlers.BufferingHandler):
"""Handler buffering all logs.""" """Handler buffering all logs."""
def __init__(self): def __init__(self) -> None:
# type: () -> None
super().__init__(-1) super().__init__(-1)
def shouldFlush(self, record): def shouldFlush(self, record: logging.LogRecord) -> bool:
# type: (logging.LogRecord) -> bool
return False # never flush return False # never flush
def flushTo(self, logger): def flushTo(self, logger: logging.Logger) -> None:
# type: (logging.Logger) -> None
self.acquire() self.acquire()
try: try:
for record in self.buffer: for record in self.buffer:
@ -195,15 +185,13 @@ class MemoryHandler(logging.handlers.BufferingHandler):
finally: finally:
self.release() self.release()
def clear(self): def clear(self) -> List[logging.LogRecord]:
# type: () -> List[logging.LogRecord]
buffer, self.buffer = self.buffer, [] buffer, self.buffer = self.buffer, []
return buffer return buffer
@contextmanager @contextmanager
def pending_warnings(): def pending_warnings() -> Generator[logging.Handler, None, None]:
# type: () -> Generator
"""Contextmanager to pend logging warnings temporary. """Contextmanager to pend logging warnings temporary.
Similar to :func:`pending_logging`. Similar to :func:`pending_logging`.
@ -231,8 +219,7 @@ def pending_warnings():
@contextmanager @contextmanager
def pending_logging(): def pending_logging() -> Generator[MemoryHandler, None, None]:
# type: () -> Generator
"""Contextmanager to pend logging all logs temporary. """Contextmanager to pend logging all logs temporary.
For example:: For example::
@ -264,8 +251,7 @@ def pending_logging():
@contextmanager @contextmanager
def skip_warningiserror(skip=True): def skip_warningiserror(skip: bool = True) -> Generator[None, None, None]:
# type: (bool) -> Generator
"""contextmanager to skip WarningIsErrorFilter for a while.""" """contextmanager to skip WarningIsErrorFilter for a while."""
logger = logging.getLogger(NAMESPACE) logger = logging.getLogger(NAMESPACE)
@ -285,8 +271,7 @@ def skip_warningiserror(skip=True):
@contextmanager @contextmanager
def prefixed_warnings(prefix): def prefixed_warnings(prefix: str) -> Generator[None, None, None]:
# type: (str) -> Generator
"""Prepend prefix to all records for a while. """Prepend prefix to all records for a while.
For example:: For example::
@ -332,13 +317,11 @@ def prefixed_warnings(prefix):
class LogCollector: class LogCollector:
def __init__(self): def __init__(self) -> None:
# type: () -> None
self.logs = [] # type: List[logging.LogRecord] self.logs = [] # type: List[logging.LogRecord]
@contextmanager @contextmanager
def collect(self): def collect(self) -> Generator[None, None, None]:
# type: () -> Generator
with pending_logging() as memhandler: with pending_logging() as memhandler:
yield yield
@ -348,16 +331,14 @@ class LogCollector:
class InfoFilter(logging.Filter): class InfoFilter(logging.Filter):
"""Filter error and warning messages.""" """Filter error and warning messages."""
def filter(self, record): def filter(self, record: logging.LogRecord) -> bool:
# type: (logging.LogRecord) -> bool
if record.levelno < logging.WARNING: if record.levelno < logging.WARNING:
return True return True
else: else:
return False return False
def is_suppressed_warning(type, subtype, suppress_warnings): def is_suppressed_warning(type: str, subtype: str, suppress_warnings: List[str]) -> bool:
# type: (str, str, List[str]) -> bool
"""Check the warning is suppressed or not.""" """Check the warning is suppressed or not."""
if type is None: if type is None:
return False return False
@ -379,13 +360,11 @@ def is_suppressed_warning(type, subtype, suppress_warnings):
class WarningSuppressor(logging.Filter): class WarningSuppressor(logging.Filter):
"""Filter logs by `suppress_warnings`.""" """Filter logs by `suppress_warnings`."""
def __init__(self, app): def __init__(self, app: "Sphinx") -> None:
# type: (Sphinx) -> None
self.app = app self.app = app
super().__init__() super().__init__()
def filter(self, record): def filter(self, record: logging.LogRecord) -> bool:
# type: (logging.LogRecord) -> bool
type = getattr(record, 'type', None) type = getattr(record, 'type', None)
subtype = getattr(record, 'subtype', None) subtype = getattr(record, 'subtype', None)
@ -405,13 +384,11 @@ class WarningSuppressor(logging.Filter):
class WarningIsErrorFilter(logging.Filter): class WarningIsErrorFilter(logging.Filter):
"""Raise exception if warning emitted.""" """Raise exception if warning emitted."""
def __init__(self, app): def __init__(self, app: "Sphinx") -> None:
# type: (Sphinx) -> None
self.app = app self.app = app
super().__init__() super().__init__()
def filter(self, record): def filter(self, record: logging.LogRecord) -> bool:
# type: (logging.LogRecord) -> bool
if getattr(record, 'skip_warningsiserror', False): if getattr(record, 'skip_warningsiserror', False):
# disabled by DisableWarningIsErrorFilter # disabled by DisableWarningIsErrorFilter
return True return True
@ -433,8 +410,7 @@ class WarningIsErrorFilter(logging.Filter):
class DisableWarningIsErrorFilter(logging.Filter): class DisableWarningIsErrorFilter(logging.Filter):
"""Disable WarningIsErrorFilter if this filter installed.""" """Disable WarningIsErrorFilter if this filter installed."""
def filter(self, record): def filter(self, record: logging.LogRecord) -> bool:
# type: (logging.LogRecord) -> bool
record.skip_warningsiserror = True # type: ignore record.skip_warningsiserror = True # type: ignore
return True return True
@ -442,13 +418,11 @@ class DisableWarningIsErrorFilter(logging.Filter):
class MessagePrefixFilter(logging.Filter): class MessagePrefixFilter(logging.Filter):
"""Prepend prefix to all records.""" """Prepend prefix to all records."""
def __init__(self, prefix): def __init__(self, prefix: str) -> None:
# type: (str) -> None
self.prefix = prefix self.prefix = prefix
super().__init__() super().__init__()
def filter(self, record): def filter(self, record: logging.LogRecord) -> bool:
# type: (logging.LogRecord) -> bool
if self.prefix: if self.prefix:
record.msg = self.prefix + ' ' + record.msg record.msg = self.prefix + ' ' + record.msg
return True return True
@ -462,13 +436,11 @@ class SphinxLogRecordTranslator(logging.Filter):
""" """
LogRecordClass = None # type: Type[logging.LogRecord] LogRecordClass = None # type: Type[logging.LogRecord]
def __init__(self, app): def __init__(self, app: "Sphinx") -> None:
# type: (Sphinx) -> None
self.app = app self.app = app
super().__init__() super().__init__()
def filter(self, record): # type: ignore def filter(self, record: SphinxWarningLogRecord) -> bool: # type: ignore
# type: (SphinxWarningLogRecord) -> bool
if isinstance(record, logging.LogRecord): if isinstance(record, logging.LogRecord):
# force subclassing to handle location # force subclassing to handle location
record.__class__ = self.LogRecordClass # type: ignore record.__class__ = self.LogRecordClass # type: ignore
@ -500,8 +472,7 @@ class WarningLogRecordTranslator(SphinxLogRecordTranslator):
LogRecordClass = SphinxWarningLogRecord LogRecordClass = SphinxWarningLogRecord
def get_node_location(node): def get_node_location(node: Node) -> str:
# type: (nodes.Node) -> str
(source, line) = get_source_line(node) (source, line) = get_source_line(node)
if source and line: if source and line:
return "%s:%s" % (source, line) return "%s:%s" % (source, line)
@ -514,8 +485,7 @@ def get_node_location(node):
class ColorizeFormatter(logging.Formatter): class ColorizeFormatter(logging.Formatter):
def format(self, record): def format(self, record: logging.LogRecord) -> str:
# type: (logging.LogRecord) -> str
message = super().format(record) message = super().format(record)
color = getattr(record, 'color', None) color = getattr(record, 'color', None)
if color is None: if color is None:
@ -529,13 +499,11 @@ class ColorizeFormatter(logging.Formatter):
class SafeEncodingWriter: class SafeEncodingWriter:
"""Stream writer which ignores UnicodeEncodeError silently""" """Stream writer which ignores UnicodeEncodeError silently"""
def __init__(self, stream): def __init__(self, stream: IO) -> None:
# type: (IO) -> None
self.stream = stream self.stream = stream
self.encoding = getattr(stream, 'encoding', 'ascii') or 'ascii' self.encoding = getattr(stream, 'encoding', 'ascii') or 'ascii'
def write(self, data): def write(self, data: str) -> None:
# type: (str) -> None
try: try:
self.stream.write(data) self.stream.write(data)
except UnicodeEncodeError: except UnicodeEncodeError:
@ -543,25 +511,21 @@ class SafeEncodingWriter:
# non-encodable characters, then decode them. # non-encodable characters, then decode them.
self.stream.write(data.encode(self.encoding, 'replace').decode(self.encoding)) self.stream.write(data.encode(self.encoding, 'replace').decode(self.encoding))
def flush(self): def flush(self) -> None:
# type: () -> None
if hasattr(self.stream, 'flush'): if hasattr(self.stream, 'flush'):
self.stream.flush() self.stream.flush()
class LastMessagesWriter: class LastMessagesWriter:
"""Stream writer which memories last 10 messages to save trackback""" """Stream writer which memories last 10 messages to save trackback"""
def __init__(self, app, stream): def __init__(self, app: "Sphinx", stream: IO) -> None:
# type: (Sphinx, IO) -> None
self.app = app self.app = app
def write(self, data): def write(self, data: str) -> None:
# type: (str) -> None
self.app.messagelog.append(data) self.app.messagelog.append(data)
def setup(app, status, warning): def setup(app: "Sphinx", status: IO, warning: IO) -> None:
# type: (Sphinx, IO, IO) -> None
"""Setup root logger for Sphinx""" """Setup root logger for Sphinx"""
logger = logging.getLogger(NAMESPACE) logger = logging.getLogger(NAMESPACE)
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)

View File

@ -9,16 +9,12 @@
""" """
import re import re
from typing import Callable, Dict, List, Match, Pattern
from sphinx.util.osutil import canon_path from sphinx.util.osutil import canon_path
if False:
# For type annotation
from typing import Callable, Dict, List, Match, Pattern # NOQA
def _translate_pattern(pat: str) -> str:
def _translate_pattern(pat):
# type: (str) -> str
"""Translate a shell-style glob pattern to a regular expression. """Translate a shell-style glob pattern to a regular expression.
Adapted from the fnmatch module, but enhanced so that single stars don't Adapted from the fnmatch module, but enhanced so that single stars don't
@ -64,8 +60,7 @@ def _translate_pattern(pat):
return res + '$' return res + '$'
def compile_matchers(patterns): def compile_matchers(patterns: List[str]) -> List[Callable[[str], Match[str]]]:
# type: (List[str]) -> List[Callable[[str], Match[str]]]
return [re.compile(_translate_pattern(pat)).match for pat in patterns] return [re.compile(_translate_pattern(pat)).match for pat in patterns]
@ -76,17 +71,14 @@ class Matcher:
For example, "**/index.rst" matches with "index.rst" For example, "**/index.rst" matches with "index.rst"
""" """
def __init__(self, patterns): def __init__(self, patterns: List[str]) -> None:
# type: (List[str]) -> None
expanded = [pat[3:] for pat in patterns if pat.startswith('**/')] expanded = [pat[3:] for pat in patterns if pat.startswith('**/')]
self.patterns = compile_matchers(patterns + expanded) self.patterns = compile_matchers(patterns + expanded)
def __call__(self, string): def __call__(self, string: str) -> bool:
# type: (str) -> bool
return self.match(string) return self.match(string)
def match(self, string): def match(self, string: str) -> bool:
# type: (str) -> bool
string = canon_path(string) string = canon_path(string)
return any(pat(string) for pat in self.patterns) return any(pat(string) for pat in self.patterns)
@ -97,16 +89,14 @@ DOTFILES = Matcher(['**/.*'])
_pat_cache = {} # type: Dict[str, Pattern] _pat_cache = {} # type: Dict[str, Pattern]
def patmatch(name, pat): def patmatch(name: str, pat: str) -> Match[str]:
# type: (str, str) -> Match[str]
"""Return if name matches pat. Adapted from fnmatch module.""" """Return if name matches pat. Adapted from fnmatch module."""
if pat not in _pat_cache: if pat not in _pat_cache:
_pat_cache[pat] = re.compile(_translate_pattern(pat)) _pat_cache[pat] = re.compile(_translate_pattern(pat))
return _pat_cache[pat].match(name) return _pat_cache[pat].match(name)
def patfilter(names, pat): def patfilter(names: List[str], pat: str) -> List[str]:
# type: (List[str], str) -> List[str]
"""Return the subset of the list NAMES that match PAT. """Return the subset of the list NAMES that match PAT.
Adapted from fnmatch module. Adapted from fnmatch module.

View File

@ -20,8 +20,7 @@ DEPTH_CHUNK_START = b'tEXtDepth\x00'
IEND_CHUNK = b'\x00\x00\x00\x00IEND\xAE\x42\x60\x82' IEND_CHUNK = b'\x00\x00\x00\x00IEND\xAE\x42\x60\x82'
def read_png_depth(filename): def read_png_depth(filename: str) -> int:
# type: (str) -> int
"""Read the special tEXt chunk indicating the depth from a PNG file.""" """Read the special tEXt chunk indicating the depth from a PNG file."""
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
f.seek(- (LEN_IEND + LEN_DEPTH), 2) f.seek(- (LEN_IEND + LEN_DEPTH), 2)
@ -33,8 +32,7 @@ def read_png_depth(filename):
return struct.unpack('!i', depthchunk[14:18])[0] return struct.unpack('!i', depthchunk[14:18])[0]
def write_png_depth(filename, depth): def write_png_depth(filename: str, depth: int) -> None:
# type: (str, int) -> None
"""Write the special tEXt chunk indicating the depth to a PNG file. """Write the special tEXt chunk indicating the depth to a PNG file.
The chunk is placed immediately before the special IEND chunk. The chunk is placed immediately before the special IEND chunk.

View File

@ -13,6 +13,7 @@ import io
import sys import sys
import textwrap import textwrap
import warnings import warnings
from typing import Any, Callable
from sphinx.deprecation import RemovedInSphinx40Warning, deprecated_alias from sphinx.deprecation import RemovedInSphinx40Warning, deprecated_alias
from sphinx.locale import __ from sphinx.locale import __
@ -20,10 +21,6 @@ from sphinx.util import logging
from sphinx.util.console import terminal_safe from sphinx.util.console import terminal_safe
from sphinx.util.typing import NoneType from sphinx.util.typing import NoneType
if False:
# For type annotation
from typing import Any, Callable # NOQA
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -33,8 +30,7 @@ logger = logging.getLogger(__name__)
# convert_with_2to3(): # convert_with_2to3():
# support for running 2to3 over config files # support for running 2to3 over config files
def convert_with_2to3(filepath): def convert_with_2to3(filepath: str) -> str:
# type: (str) -> str
from lib2to3.refactor import RefactoringTool, get_fixers_from_package from lib2to3.refactor import RefactoringTool, get_fixers_from_package
from lib2to3.pgen2.parse import ParseError from lib2to3.pgen2.parse import ParseError
fixers = get_fixers_from_package('lib2to3.fixes') fixers = get_fixers_from_package('lib2to3.fixes')
@ -62,8 +58,7 @@ class UnicodeMixin:
return self.__unicode__() return self.__unicode__()
def execfile_(filepath, _globals, open=open): def execfile_(filepath: str, _globals: Any, open: Callable = open) -> None:
# type: (str, Any, Callable) -> None
from sphinx.util.osutil import fs_encoding from sphinx.util.osutil import fs_encoding
with open(filepath, 'rb') as f: with open(filepath, 'rb') as f:
source = f.read() source = f.read()

View File

@ -6,26 +6,23 @@
:license: BSD, see LICENSE for details. :license: BSD, see LICENSE for details.
""" """
# (ab)use the Jinja parser for parsing our boolean expressions from typing import Iterator, List
from jinja2 import nodes from jinja2 import nodes
from jinja2.environment import Environment from jinja2.environment import Environment
from jinja2.nodes import Node
from jinja2.parser import Parser from jinja2.parser import Parser
env = Environment() env = Environment()
if False:
# For type annotation
from typing import Iterator, List # NOQA
class BooleanParser(Parser): class BooleanParser(Parser):
""" """
Only allow condition exprs and/or/not operations. Only allow condition exprs and/or/not operations.
""" """
def parse_compare(self): def parse_compare(self) -> Node:
# type: () -> nodes.Node node = None # type: Node
node = None # type: nodes.Node
token = self.stream.current token = self.stream.current
if token.type == 'name': if token.type == 'name':
if token.value in ('true', 'false', 'True', 'False'): if token.value in ('true', 'false', 'True', 'False'):
@ -46,38 +43,31 @@ class BooleanParser(Parser):
class Tags: class Tags:
def __init__(self, tags=None): def __init__(self, tags: List[str] = None) -> None:
# type: (List[str]) -> None
self.tags = dict.fromkeys(tags or [], True) self.tags = dict.fromkeys(tags or [], True)
def has(self, tag): def has(self, tag: str) -> bool:
# type: (str) -> bool
return tag in self.tags return tag in self.tags
__contains__ = has __contains__ = has
def __iter__(self): def __iter__(self) -> Iterator[str]:
# type: () -> Iterator[str]
return iter(self.tags) return iter(self.tags)
def add(self, tag): def add(self, tag: str) -> None:
# type: (str) -> None
self.tags[tag] = True self.tags[tag] = True
def remove(self, tag): def remove(self, tag: str) -> None:
# type: (str) -> None
self.tags.pop(tag, None) self.tags.pop(tag, None)
def eval_condition(self, condition): def eval_condition(self, condition: str) -> bool:
# type: (str) -> bool
# exceptions are handled by the caller # exceptions are handled by the caller
parser = BooleanParser(env, condition, state='variable') parser = BooleanParser(env, condition, state='variable')
expr = parser.parse_expression() expr = parser.parse_expression()
if not parser.stream.eos: if not parser.stream.eos:
raise ValueError('chunk after expression') raise ValueError('chunk after expression')
def eval_node(node): def eval_node(node: Node) -> bool:
# type: (nodes.Node) -> bool
if isinstance(node, nodes.CondExpr): if isinstance(node, nodes.CondExpr):
if eval_node(node.test): # type: ignore if eval_node(node.test): # type: ignore
return eval_node(node.expr1) # type: ignore return eval_node(node.expr1) # type: ignore

View File

@ -9,10 +9,7 @@
""" """
import re import re
from typing import Dict
if False:
# For type annotation
from typing import Dict # NOQA
tex_replacements = [ tex_replacements = [
# map TeX special chars # map TeX special chars
@ -78,20 +75,17 @@ tex_replace_map = {}
tex_hl_escape_map_new = {} tex_hl_escape_map_new = {}
def escape(s): def escape(s: str) -> str:
# type: (str) -> str
"""Escape text for LaTeX output.""" """Escape text for LaTeX output."""
return s.translate(tex_escape_map) return s.translate(tex_escape_map)
def escape_abbr(text): def escape_abbr(text: str) -> str:
# type: (str) -> str
"""Adjust spacing after abbreviations. Works with @ letter or other.""" """Adjust spacing after abbreviations. Works with @ letter or other."""
return re.sub(r'\.(?=\s|$)', r'.\@{}', text) return re.sub(r'\.(?=\s|$)', r'.\@{}', text)
def init(): def init() -> None:
# type: () -> None
for a, b in tex_replacements: for a, b in tex_replacements:
tex_escape_map[ord(a)] = b tex_escape_map[ord(a)] = b
tex_replace_map[ord(a)] = '_' tex_replace_map[ord(a)] = '_'