Improve static typing in `intersphinx` et al

This commit is contained in:
Adam Turner 2023-03-17 17:39:44 +00:00
parent ae9008b128
commit 3d2114214b
4 changed files with 53 additions and 24 deletions

View File

@ -29,26 +29,32 @@ from typing import IO, TYPE_CHECKING, Any, cast
from urllib.parse import urlsplit, urlunsplit from urllib.parse import urlsplit, urlunsplit
from docutils import nodes from docutils import nodes
from docutils.nodes import Element, Node, TextElement, system_message from docutils.utils import relative_path
from docutils.utils import Reporter, relative_path
import sphinx import sphinx
from sphinx.addnodes import pending_xref from sphinx.addnodes import pending_xref
from sphinx.application import Sphinx
from sphinx.builders.html import INVENTORY_FILENAME from sphinx.builders.html import INVENTORY_FILENAME
from sphinx.config import Config
from sphinx.domains import Domain
from sphinx.environment import BuildEnvironment
from sphinx.errors import ExtensionError from sphinx.errors import ExtensionError
from sphinx.locale import _, __ from sphinx.locale import _, __
from sphinx.transforms.post_transforms import ReferencesResolver from sphinx.transforms.post_transforms import ReferencesResolver
from sphinx.util import logging, requests from sphinx.util import logging, requests
from sphinx.util.docutils import CustomReSTDispatcher, SphinxRole from sphinx.util.docutils import CustomReSTDispatcher, SphinxRole
from sphinx.util.inventory import InventoryFile from sphinx.util.inventory import InventoryFile
from sphinx.util.typing import Inventory, InventoryItem, RoleFunction
if TYPE_CHECKING: if TYPE_CHECKING:
from types import ModuleType from types import ModuleType
from typing import Tuple, Union
from docutils.nodes import Node, TextElement, system_message
from docutils.utils import Reporter
from sphinx.application import Sphinx
from sphinx.config import Config
from sphinx.domains import Domain
from sphinx.environment import BuildEnvironment
from sphinx.util.typing import Inventory, InventoryItem, RoleFunction
InventoryCacheEntry = Tuple[Union[str, None], int, Inventory]
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -60,12 +66,22 @@ class InventoryAdapter:
self.env = env self.env = env
if not hasattr(env, 'intersphinx_cache'): if not hasattr(env, 'intersphinx_cache'):
# initial storage when fetching inventories before processing
self.env.intersphinx_cache = {} # type: ignore self.env.intersphinx_cache = {} # type: ignore
self.env.intersphinx_inventory = {} # type: ignore self.env.intersphinx_inventory = {} # type: ignore
self.env.intersphinx_named_inventory = {} # type: ignore self.env.intersphinx_named_inventory = {} # type: ignore
@property @property
def cache(self) -> dict[str, tuple[str, int, Inventory]]: def cache(self) -> dict[str, InventoryCacheEntry]:
"""Intersphinx cache.
- Key is the URI of the remote inventory
- Element one is the key given in the Sphinx intersphinx_mapping
configuration value
- Element two is a time value for cache invalidation, a float
- Element three is the loaded remote inventory, type Inventory
"""
return self.env.intersphinx_cache # type: ignore return self.env.intersphinx_cache # type: ignore
@property @property
@ -152,7 +168,7 @@ def _get_safe_url(url: str) -> str:
return urlunsplit(frags) return urlunsplit(frags)
def fetch_inventory(app: Sphinx, uri: str, inv: Any) -> Any: def fetch_inventory(app: Sphinx, uri: str, inv: str) -> Inventory:
"""Fetch, parse and return an intersphinx inventory file.""" """Fetch, parse and return an intersphinx inventory file."""
# both *uri* (base URI of the links to generate) and *inv* (actual # both *uri* (base URI of the links to generate) and *inv* (actual
# location of the inventory file) can be local or remote URIs # location of the inventory file) can be local or remote URIs
@ -192,7 +208,12 @@ def fetch_inventory(app: Sphinx, uri: str, inv: Any) -> Any:
def fetch_inventory_group( def fetch_inventory_group(
name: str, uri: str, invs: Any, cache: Any, app: Any, now: float, name: str | None,
uri: str,
invs: tuple[str | None, ...],
cache: dict[str, InventoryCacheEntry],
app: Sphinx,
now: int,
) -> bool: ) -> bool:
cache_time = now - app.config.intersphinx_cache_limit * 86400 cache_time = now - app.config.intersphinx_cache_limit * 86400
failures = [] failures = []
@ -211,7 +232,7 @@ def fetch_inventory_group(
failures.append(err.args) failures.append(err.args)
continue continue
if invdata: if invdata:
cache[uri] = (name, now, invdata) cache[uri] = name, now, invdata
return True return True
return False return False
finally: finally:
@ -235,6 +256,9 @@ def load_mappings(app: Sphinx) -> None:
with concurrent.futures.ThreadPoolExecutor() as pool: with concurrent.futures.ThreadPoolExecutor() as pool:
futures = [] futures = []
name: str | None
uri: str
invs: tuple[str | None, ...]
for name, (uri, invs) in app.config.intersphinx_mapping.values(): for name, (uri, invs) in app.config.intersphinx_mapping.values():
futures.append(pool.submit( futures.append(pool.submit(
fetch_inventory_group, name, uri, invs, inventories.cache, app, now, fetch_inventory_group, name, uri, invs, inventories.cache, app, now,
@ -263,7 +287,7 @@ def load_mappings(app: Sphinx) -> None:
def _create_element_from_result(domain: Domain, inv_name: str | None, def _create_element_from_result(domain: Domain, inv_name: str | None,
data: InventoryItem, data: InventoryItem,
node: pending_xref, contnode: TextElement) -> Element: node: pending_xref, contnode: TextElement) -> nodes.reference:
proj, version, uri, dispname = data proj, version, uri, dispname = data
if '://' not in uri and node.get('refdoc'): if '://' not in uri and node.get('refdoc'):
# get correct path in case of subdirectories # get correct path in case of subdirectories
@ -295,7 +319,7 @@ def _resolve_reference_in_domain_by_target(
inv_name: str | None, inventory: Inventory, inv_name: str | None, inventory: Inventory,
domain: Domain, objtypes: list[str], domain: Domain, objtypes: list[str],
target: str, target: str,
node: pending_xref, contnode: TextElement) -> Element | None: node: pending_xref, contnode: TextElement) -> nodes.reference | None:
for objtype in objtypes: for objtype in objtypes:
if objtype not in inventory: if objtype not in inventory:
# Continue if there's nothing of this kind in the inventory # Continue if there's nothing of this kind in the inventory
@ -328,7 +352,7 @@ def _resolve_reference_in_domain(env: BuildEnvironment,
honor_disabled_refs: bool, honor_disabled_refs: bool,
domain: Domain, objtypes: list[str], domain: Domain, objtypes: list[str],
node: pending_xref, contnode: TextElement, node: pending_xref, contnode: TextElement,
) -> Element | None: ) -> nodes.reference | None:
# we adjust the object types for backwards compatibility # we adjust the object types for backwards compatibility
if domain.name == 'std' and 'cmdoption' in objtypes: if domain.name == 'std' and 'cmdoption' in objtypes:
# until Sphinx-1.6, cmdoptions are stored as std:option # until Sphinx-1.6, cmdoptions are stored as std:option
@ -361,7 +385,7 @@ def _resolve_reference_in_domain(env: BuildEnvironment,
def _resolve_reference(env: BuildEnvironment, inv_name: str | None, inventory: Inventory, def _resolve_reference(env: BuildEnvironment, inv_name: str | None, inventory: Inventory,
honor_disabled_refs: bool, honor_disabled_refs: bool,
node: pending_xref, contnode: TextElement) -> Element | None: node: pending_xref, contnode: TextElement) -> nodes.reference | None:
# disabling should only be done if no inventory is given # disabling should only be done if no inventory is given
honor_disabled_refs = honor_disabled_refs and inv_name is None honor_disabled_refs = honor_disabled_refs and inv_name is None
@ -407,7 +431,7 @@ def inventory_exists(env: BuildEnvironment, inv_name: str) -> bool:
def resolve_reference_in_inventory(env: BuildEnvironment, def resolve_reference_in_inventory(env: BuildEnvironment,
inv_name: str, inv_name: str,
node: pending_xref, contnode: TextElement, node: pending_xref, contnode: TextElement,
) -> Element | None: ) -> nodes.reference | None:
"""Attempt to resolve a missing reference via intersphinx references. """Attempt to resolve a missing reference via intersphinx references.
Resolution is tried in the given inventory with the target as is. Resolution is tried in the given inventory with the target as is.
@ -422,7 +446,7 @@ def resolve_reference_in_inventory(env: BuildEnvironment,
def resolve_reference_any_inventory(env: BuildEnvironment, def resolve_reference_any_inventory(env: BuildEnvironment,
honor_disabled_refs: bool, honor_disabled_refs: bool,
node: pending_xref, contnode: TextElement, node: pending_xref, contnode: TextElement,
) -> Element | None: ) -> nodes.reference | None:
"""Attempt to resolve a missing reference via intersphinx references. """Attempt to resolve a missing reference via intersphinx references.
Resolution is tried with the target as is in any inventory. Resolution is tried with the target as is in any inventory.
@ -434,7 +458,7 @@ def resolve_reference_any_inventory(env: BuildEnvironment,
def resolve_reference_detect_inventory(env: BuildEnvironment, def resolve_reference_detect_inventory(env: BuildEnvironment,
node: pending_xref, contnode: TextElement, node: pending_xref, contnode: TextElement,
) -> Element | None: ) -> nodes.reference | None:
"""Attempt to resolve a missing reference via intersphinx references. """Attempt to resolve a missing reference via intersphinx references.
Resolution is tried first with the target as is in any inventory. Resolution is tried first with the target as is in any inventory.
@ -462,7 +486,7 @@ def resolve_reference_detect_inventory(env: BuildEnvironment,
def missing_reference(app: Sphinx, env: BuildEnvironment, node: pending_xref, def missing_reference(app: Sphinx, env: BuildEnvironment, node: pending_xref,
contnode: TextElement) -> Element | None: contnode: TextElement) -> nodes.reference | None:
"""Attempt to resolve a missing reference via intersphinx references.""" """Attempt to resolve a missing reference via intersphinx references."""
return resolve_reference_detect_inventory(env, node, contnode) return resolve_reference_detect_inventory(env, node, contnode)

View File

@ -7,7 +7,7 @@ import zlib
from typing import IO, TYPE_CHECKING, Callable, Iterator from typing import IO, TYPE_CHECKING, Callable, Iterator
from sphinx.util import logging from sphinx.util import logging
from sphinx.util.typing import Inventory from sphinx.util.typing import Inventory, InventoryItem
BUFSIZE = 16 * 1024 BUFSIZE = 16 * 1024
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -133,8 +133,8 @@ class InventoryFile:
if location.endswith('$'): if location.endswith('$'):
location = location[:-1] + name location = location[:-1] + name
location = join(uri, location) location = join(uri, location)
invdata.setdefault(type, {})[name] = (projname, version, inv_item: InventoryItem = projname, version, location, dispname
location, dispname) invdata.setdefault(type, {})[name] = inv_item
return invdata return invdata
@classmethod @classmethod

View File

@ -51,7 +51,12 @@ OptionSpec = Dict[str, Callable[[str], Any]]
TitleGetter = Callable[[nodes.Node], str] TitleGetter = Callable[[nodes.Node], str]
# inventory data on memory # inventory data on memory
InventoryItem = Tuple[str, str, str, str] InventoryItem = Tuple[
str, # project name
str, # project version
str, # URL
str, # display name
]
Inventory = Dict[str, Dict[str, InventoryItem]] Inventory = Dict[str, Dict[str, InventoryItem]]

View File

@ -4,7 +4,7 @@ import posixpath
import zlib import zlib
from io import BytesIO from io import BytesIO
from sphinx.ext.intersphinx import InventoryFile from sphinx.util.inventory import InventoryFile
inventory_v1 = b'''\ inventory_v1 = b'''\
# Sphinx inventory version 1 # Sphinx inventory version 1