sphinx/sphinx/ext/intersphinx.py

670 lines
25 KiB
Python
Raw Normal View History

2008-08-04 12:31:25 -05:00
"""
sphinx.ext.intersphinx
~~~~~~~~~~~~~~~~~~~~~~
Insert links to objects documented in remote Sphinx documentation.
2008-08-04 12:31:25 -05:00
This works as follows:
* Each Sphinx HTML build creates a file named "objects.inv" that contains a
mapping from object names to URIs relative to the HTML set's root.
2008-08-04 12:31:25 -05:00
* Projects using the Intersphinx extension can specify links to such mapping
files in the `intersphinx_mapping` config value. The mapping will then be
used to resolve otherwise missing references to objects into links to the
other documentation.
2008-08-04 12:31:25 -05:00
* By default, the mapping file is assumed to be at the same location as the
rest of the documentation; however, the location of the mapping file can
also be specified individually, e.g. if the docs should be buildable
without Internet access.
2022-01-01 03:45:03 -06:00
:copyright: Copyright 2007-2022 by the Sphinx team, see AUTHORS.
:license: BSD, see LICENSE for details.
2008-08-04 12:31:25 -05:00
"""
2019-11-14 21:27:30 -06:00
import concurrent.futures
import functools
2008-08-04 12:31:25 -05:00
import posixpath
2018-01-27 10:52:16 -06:00
import sys
import time
2008-08-04 12:31:25 -05:00
from os import path
from types import ModuleType
from typing import IO, Any, Dict, List, Optional, Tuple, cast
2018-11-11 10:02:14 -06:00
from urllib.parse import urlsplit, urlunsplit
2008-08-04 12:31:25 -05:00
from docutils import nodes
from docutils.nodes import Element, Node, TextElement, system_message
from docutils.utils import Reporter, relative_path
2008-08-04 12:31:25 -05:00
import sphinx
from sphinx.addnodes import pending_xref
from sphinx.application import Sphinx
2008-12-05 05:27:08 -06:00
from sphinx.builders.html import INVENTORY_FILENAME
from sphinx.config import Config
from sphinx.domains import Domain
from sphinx.environment import BuildEnvironment
from sphinx.errors import ExtensionError
from sphinx.locale import _, __
from sphinx.transforms.post_transforms import ReferencesResolver
from sphinx.util import logging, requests
from sphinx.util.docutils import CustomReSTDispatcher, SphinxRole
from sphinx.util.inventory import InventoryFile
from sphinx.util.typing import Inventory, InventoryItem, RoleFunction
2008-08-04 12:31:25 -05:00
logger = logging.getLogger(__name__)
2009-03-19 09:49:03 -05:00
class InventoryAdapter:
2017-01-29 10:55:39 -06:00
"""Inventory adapter for environment"""
def __init__(self, env: BuildEnvironment) -> None:
2017-01-29 10:55:39 -06:00
self.env = env
if not hasattr(env, 'intersphinx_cache'):
2018-01-22 07:05:38 -06:00
self.env.intersphinx_cache = {} # type: ignore
self.env.intersphinx_inventory = {} # type: ignore
self.env.intersphinx_named_inventory = {} # type: ignore
2017-01-29 10:55:39 -06:00
@property
def cache(self) -> Dict[str, Tuple[str, int, Inventory]]:
2018-01-22 07:05:38 -06:00
return self.env.intersphinx_cache # type: ignore
2017-01-29 10:55:39 -06:00
@property
def main_inventory(self) -> Inventory:
2018-01-22 07:05:38 -06:00
return self.env.intersphinx_inventory # type: ignore
2017-01-29 10:55:39 -06:00
@property
def named_inventory(self) -> Dict[str, Inventory]:
2018-01-22 07:05:38 -06:00
return self.env.intersphinx_named_inventory # type: ignore
2017-01-29 10:55:39 -06:00
def clear(self) -> None:
2018-01-22 07:05:38 -06:00
self.env.intersphinx_inventory.clear() # type: ignore
self.env.intersphinx_named_inventory.clear() # type: ignore
2017-01-29 10:55:39 -06:00
def _strip_basic_auth(url: str) -> str:
"""Returns *url* with basic auth credentials removed. Also returns the
basic auth username and password if they're present in *url*.
E.g.: https://user:pass@example.com => https://example.com
*url* need not include basic auth credentials.
:param url: url which may or may not contain basic auth credentials
:type url: ``str``
:return: *url* with any basic auth creds removed
:rtype: ``str``
"""
frags = list(urlsplit(url))
# swap out "user[:pass]@hostname" for "hostname"
if '@' in frags[1]:
frags[1] = frags[1].split('@')[1]
return urlunsplit(frags)
def _read_from_url(url: str, config: Config = None) -> IO:
"""Reads data from *url* with an HTTP *GET*.
This function supports fetching from resources which use basic HTTP auth as
laid out by RFC1738 § 3.1. See § 5 for grammar definitions for URLs.
.. seealso:
https://www.ietf.org/rfc/rfc1738.txt
:param url: URL of an HTTP resource
:type url: ``str``
:return: data read from resource described by *url*
:rtype: ``file``-like object
"""
r = requests.get(url, stream=True, config=config, timeout=config.intersphinx_timeout)
2016-08-17 22:33:28 -05:00
r.raise_for_status()
r.raw.url = r.url
# decode content-body based on the header.
# ref: https://github.com/kennethreitz/requests/issues/2155
r.raw.read = functools.partial(r.raw.read, decode_content=True)
2016-08-17 22:33:28 -05:00
return r.raw
def _get_safe_url(url: str) -> str:
"""Gets version of *url* with basic auth passwords obscured. This function
returns results suitable for printing and logging.
2016-08-17 10:58:17 -05:00
E.g.: https://user:12345@example.com => https://user@example.com
:param url: a url
:type url: ``str``
2016-08-17 10:58:17 -05:00
:return: *url* with password removed
:rtype: ``str``
"""
2016-08-17 11:25:29 -05:00
parts = urlsplit(url)
if parts.username is None:
return url
else:
frags = list(parts)
if parts.port:
frags[1] = '{}@{}:{}'.format(parts.username, parts.hostname, parts.port)
2016-08-17 11:25:29 -05:00
else:
frags[1] = '{}@{}'.format(parts.username, parts.hostname)
2016-08-17 11:25:29 -05:00
return urlunsplit(frags)
def fetch_inventory(app: Sphinx, uri: str, inv: Any) -> Any:
2008-08-04 12:31:25 -05:00
"""Fetch, parse and return an intersphinx inventory file."""
# both *uri* (base URI of the links to generate) and *inv* (actual
# location of the inventory file) can be local or remote URIs
2015-10-22 01:09:16 -05:00
localuri = '://' not in uri
if not localuri:
# case: inv URI points to remote resource; strip any existing auth
uri = _strip_basic_auth(uri)
2008-08-04 12:31:25 -05:00
try:
2015-10-22 01:09:16 -05:00
if '://' in inv:
f = _read_from_url(inv, config=app.config)
2008-08-04 12:31:25 -05:00
else:
f = open(path.join(app.srcdir, inv), 'rb')
except Exception as err:
err.args = ('intersphinx inventory %r not fetchable due to %s: %s',
inv, err.__class__, str(err))
raise
2008-08-04 12:31:25 -05:00
try:
2016-08-17 22:33:28 -05:00
if hasattr(f, 'url'):
newinv = f.url # type: ignore
if inv != newinv:
2019-01-08 09:54:42 -06:00
logger.info(__('intersphinx inventory has moved: %s -> %s'), inv, newinv)
if uri in (inv, path.dirname(inv), path.dirname(inv) + '/'):
uri = path.dirname(newinv)
2016-08-17 10:56:40 -05:00
with f:
try:
join = path.join if localuri else posixpath.join
2017-03-02 02:26:07 -06:00
invdata = InventoryFile.load(f, uri, join)
except ValueError as exc:
raise ValueError('unknown or unsupported inventory version: %r' % exc) from exc
except Exception as err:
err.args = ('intersphinx inventory %r not readable due to %s: %s',
inv, err.__class__.__name__, str(err))
raise
2008-08-04 12:31:25 -05:00
else:
return invdata
2019-11-14 21:27:30 -06:00
def fetch_inventory_group(
name: str, uri: str, invs: Any, cache: Any, app: Any, now: float
) -> bool:
2008-08-04 12:31:25 -05:00
cache_time = now - app.config.intersphinx_cache_limit * 86400
2019-11-14 21:27:30 -06:00
failures = []
try:
for inv in invs:
if not inv:
inv = posixpath.join(uri, INVENTORY_FILENAME)
# decide whether the inventory must be read: always read local
# files; remote ones only if the cache time is expired
2019-11-14 21:27:30 -06:00
if '://' not in inv or uri not in cache or cache[uri][1] < cache_time:
safe_inv_url = _get_safe_url(inv)
2019-01-08 09:54:42 -06:00
logger.info(__('loading intersphinx inventory from %s...'), safe_inv_url)
try:
invdata = fetch_inventory(app, uri, inv)
except Exception as err:
failures.append(err.args)
continue
if invdata:
2019-11-14 21:27:30 -06:00
cache[uri] = (name, now, invdata)
return True
return False
finally:
if failures == []:
pass
elif len(failures) < len(invs):
2019-01-08 09:54:42 -06:00
logger.info(__("encountered some issues with some of the inventories,"
" but they had working alternatives:"))
for fail in failures:
logger.info(*fail)
else:
issues = '\n'.join([f[0] % f[1:] for f in failures])
logger.warning(__("failed to reach any of the inventories "
"with the following issues:") + "\n" + issues)
2019-11-14 21:27:30 -06:00
def load_mappings(app: Sphinx) -> None:
"""Load all intersphinx mappings into the environment."""
now = int(time.time())
inventories = InventoryAdapter(app.builder.env)
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = []
for name, (uri, invs) in app.config.intersphinx_mapping.values():
futures.append(pool.submit(
fetch_inventory_group, name, uri, invs, inventories.cache, app, now
))
updated = [f.result() for f in concurrent.futures.as_completed(futures)]
if any(updated):
2017-01-29 10:55:39 -06:00
inventories.clear()
# Duplicate values in different inventories will shadow each
# other; which one will override which can vary between builds
# since they are specified using an unordered dict. To make
# it more consistent, we sort the named inventories and then
# add the unnamed inventories last. This means that the
# unnamed inventories will shadow the named ones but the named
# ones can still be accessed when the name is specified.
2017-01-29 10:55:39 -06:00
cached_vals = list(inventories.cache.values())
named_vals = sorted(v for v in cached_vals if v[0])
unnamed_vals = [v for v in cached_vals if not v[0]]
2015-03-08 11:03:00 -05:00
for name, _x, invdata in named_vals + unnamed_vals:
if name:
2017-01-29 10:55:39 -06:00
inventories.named_inventory[name] = invdata
for type, objects in invdata.items():
2017-01-29 10:55:39 -06:00
inventories.main_inventory.setdefault(type, {}).update(objects)
2008-08-04 12:31:25 -05:00
def _create_element_from_result(domain: Domain, inv_name: Optional[str],
2021-10-31 07:26:38 -05:00
data: InventoryItem,
node: pending_xref, contnode: TextElement) -> Element:
proj, version, uri, dispname = data
if '://' not in uri and node.get('refdoc'):
# get correct path in case of subdirectories
uri = path.join(relative_path(node['refdoc'], '.'), uri)
if version:
reftitle = _('(in %s v%s)') % (proj, version)
else:
reftitle = _('(in %s)') % (proj,)
newnode = nodes.reference('', '', internal=False, refuri=uri, reftitle=reftitle)
if node.get('refexplicit'):
# use whatever title was given
newnode.append(contnode)
elif dispname == '-' or \
(domain.name == 'std' and node['reftype'] == 'keyword'):
# use whatever title was given, but strip prefix
title = contnode.astext()
if inv_name is not None and title.startswith(inv_name + ':'):
newnode.append(contnode.__class__(title[len(inv_name) + 1:],
title[len(inv_name) + 1:]))
else:
newnode.append(contnode)
else:
# else use the given display name (used for :ref:)
newnode.append(contnode.__class__(dispname, dispname))
return newnode
def _resolve_reference_in_domain_by_target(
inv_name: Optional[str], inventory: Inventory,
domain: Domain, objtypes: List[str],
target: str,
node: pending_xref, contnode: TextElement) -> Optional[Element]:
for objtype in objtypes:
if objtype not in inventory:
# Continue if there's nothing of this kind in the inventory
continue
if target in inventory[objtype]:
# Case sensitive match, use it
data = inventory[objtype][target]
elif objtype == 'std:term':
# Check for potential case insensitive matches for terms only
target_lower = target.lower()
insensitive_matches = list(filter(lambda k: k.lower() == target_lower,
inventory[objtype].keys()))
if insensitive_matches:
data = inventory[objtype][insensitive_matches[0]]
else:
# No case insensitive match either, continue to the next candidate
continue
else:
# Could reach here if we're not a term but have a case insensitive match.
# This is a fix for terms specifically, but potentially should apply to
# other types.
continue
return _create_element_from_result(domain, inv_name, data, node, contnode)
return None
def _resolve_reference_in_domain(env: BuildEnvironment,
inv_name: Optional[str], inventory: Inventory,
honor_disabled_refs: bool,
domain: Domain, objtypes: List[str],
node: pending_xref, contnode: TextElement
) -> Optional[Element]:
# we adjust the object types for backwards compatibility
if domain.name == 'std' and 'cmdoption' in objtypes:
# until Sphinx-1.6, cmdoptions are stored as std:option
objtypes.append('option')
if domain.name == 'py' and 'attribute' in objtypes:
# Since Sphinx-2.1, properties are stored as py:method
objtypes.append('method')
# the inventory contains domain:type as objtype
objtypes = ["{}:{}".format(domain.name, t) for t in objtypes]
# now that the objtypes list is complete we can remove the disabled ones
if honor_disabled_refs:
disabled = env.config.intersphinx_disabled_reftypes
objtypes = [o for o in objtypes if o not in disabled]
# without qualification
res = _resolve_reference_in_domain_by_target(inv_name, inventory, domain, objtypes,
node['reftarget'], node, contnode)
if res is not None:
return res
# try with qualification of the current scope instead
full_qualified_name = domain.get_full_qualified_name(node)
if full_qualified_name is None:
return None
return _resolve_reference_in_domain_by_target(inv_name, inventory, domain, objtypes,
full_qualified_name, node, contnode)
def _resolve_reference(env: BuildEnvironment, inv_name: Optional[str], inventory: Inventory,
honor_disabled_refs: bool,
node: pending_xref, contnode: TextElement) -> Optional[Element]:
# disabling should only be done if no inventory is given
honor_disabled_refs = honor_disabled_refs and inv_name is None
if honor_disabled_refs and '*' in env.config.intersphinx_disabled_reftypes:
return None
typ = node['reftype']
if typ == 'any':
for domain_name, domain in env.domains.items():
if honor_disabled_refs \
and (domain_name + ":*") in env.config.intersphinx_disabled_reftypes:
continue
objtypes = list(domain.object_types)
res = _resolve_reference_in_domain(env, inv_name, inventory,
honor_disabled_refs,
domain, objtypes,
node, contnode)
if res is not None:
return res
return None
2014-09-19 06:31:55 -05:00
else:
domain_name = node.get('refdomain')
if not domain_name:
2014-09-19 06:31:55 -05:00
# only objects in domains are in the inventory
2018-11-30 08:51:16 -06:00
return None
if honor_disabled_refs \
and (domain_name + ":*") in env.config.intersphinx_disabled_reftypes:
return None
domain = env.get_domain(domain_name)
objtypes = domain.objtypes_for_role(typ)
2014-09-19 06:31:55 -05:00
if not objtypes:
2018-11-30 08:51:16 -06:00
return None
return _resolve_reference_in_domain(env, inv_name, inventory,
honor_disabled_refs,
domain, objtypes,
node, contnode)
2008-08-04 12:31:25 -05:00
def inventory_exists(env: BuildEnvironment, inv_name: str) -> bool:
return inv_name in InventoryAdapter(env).named_inventory
def resolve_reference_in_inventory(env: BuildEnvironment,
inv_name: str,
node: pending_xref, contnode: TextElement
) -> Optional[Element]:
"""Attempt to resolve a missing reference via intersphinx references.
Resolution is tried in the given inventory with the target as is.
Requires ``inventory_exists(env, inv_name)``.
"""
assert inventory_exists(env, inv_name)
return _resolve_reference(env, inv_name, InventoryAdapter(env).named_inventory[inv_name],
False, node, contnode)
def resolve_reference_any_inventory(env: BuildEnvironment,
honor_disabled_refs: bool,
node: pending_xref, contnode: TextElement
) -> Optional[Element]:
"""Attempt to resolve a missing reference via intersphinx references.
Resolution is tried with the target as is in any inventory.
"""
return _resolve_reference(env, None, InventoryAdapter(env).main_inventory,
honor_disabled_refs,
node, contnode)
def resolve_reference_detect_inventory(env: BuildEnvironment,
node: pending_xref, contnode: TextElement
) -> Optional[Element]:
"""Attempt to resolve a missing reference via intersphinx references.
Resolution is tried first with the target as is in any inventory.
If this does not succeed, then the target is split by the first ``:``,
to form ``inv_name:newtarget``. If ``inv_name`` is a named inventory, then resolution
is tried in that inventory with the new target.
"""
# ordinary direct lookup, use data as is
res = resolve_reference_any_inventory(env, True, node, contnode)
if res is not None:
return res
# try splitting the target into 'inv_name:target'
target = node['reftarget']
if ':' not in target:
return None
inv_name, newtarget = target.split(':', 1)
if not inventory_exists(env, inv_name):
return None
node['reftarget'] = newtarget
res_inv = resolve_reference_in_inventory(env, inv_name, node, contnode)
node['reftarget'] = target
return res_inv
def missing_reference(app: Sphinx, env: BuildEnvironment, node: pending_xref,
contnode: TextElement) -> Optional[Element]:
"""Attempt to resolve a missing reference via intersphinx references."""
return resolve_reference_detect_inventory(env, node, contnode)
2018-11-30 08:51:16 -06:00
2008-08-04 12:31:25 -05:00
class IntersphinxDispatcher(CustomReSTDispatcher):
2021-07-13 05:48:17 -05:00
"""Custom dispatcher for external role.
2021-07-13 05:48:17 -05:00
This enables :external:***: roles on parsing reST document.
"""
def __init__(self) -> None:
super().__init__()
def role(self, role_name: str, language_module: ModuleType, lineno: int, reporter: Reporter
) -> Tuple[RoleFunction, List[system_message]]:
2021-07-13 05:48:17 -05:00
if role_name.split(':')[0] == 'external':
return IntersphinxRole(), []
else:
return super().role(role_name, language_module, lineno, reporter)
class IntersphinxRole(SphinxRole):
def run(self) -> Tuple[List[Node], List[system_message]]:
role_name = self.get_role_name(self.name)
if role_name is None:
logger.warning(__('role not found: %s'), self.name,
location=(self.env.docname, self.lineno))
return [], []
# extract inventory specification
inventory = None
if self.text.startswith('\\:'):
# escaped :, so not a real inventory specification
self.text = self.text[1:]
elif self.text[0] == ':': # format: :inv:normalRoleArg
inventory = self.text.split(':')[1]
self.text = self.text[(len(inventory) + 2):]
result, messages = self.invoke_role(role_name)
for node in result:
if isinstance(node, pending_xref):
node['intersphinx'] = True
node['inventory'] = inventory
return result, messages
def get_role_name(self, name: str) -> Optional[Tuple[str, str]]:
names = name.split(':')
if len(names) == 2:
2021-07-13 05:48:17 -05:00
# :external:role:
domain = self.env.temp_data.get('default_domain')
role = names[1]
elif len(names) == 3:
2021-07-13 05:48:17 -05:00
# :external:domain:role:
domain = names[1]
role = names[2]
else:
return None
if domain and self.is_existent_role(domain, role):
return (domain, role)
elif self.is_existent_role('std', role):
return ('std', role)
else:
return None
def is_existent_role(self, domain_name: str, role_name: str) -> bool:
try:
domain = self.env.get_domain(domain_name)
if role_name in domain.roles:
return True
else:
return False
except ExtensionError:
return False
def invoke_role(self, role: Tuple[str, str]) -> Tuple[List[Node], List[system_message]]:
domain = self.env.get_domain(role[0])
if domain:
role_func = domain.role(role[1])
return role_func(':'.join(role), self.rawtext, self.text, self.lineno,
self.inliner, self.options, self.content)
else:
return [], []
class IntersphinxRoleResolver(ReferencesResolver):
"""pending_xref node resolver for intersphinx role.
This resolves pending_xref nodes generated by :intersphinx:***: role.
"""
default_priority = ReferencesResolver.default_priority - 1
def run(self, **kwargs: Any) -> None:
for node in self.document.traverse(pending_xref):
if 'intersphinx' in node:
contnode = cast(nodes.TextElement, node[0].deepcopy())
# temporary hax to glue on inventory info again
if node['inventory'] is not None:
node['reftarget'] = node['inventory'] + ":" + node['reftarget']
newnode = missing_reference(self.app, self.env, node, contnode)
if newnode is None:
# no warning, the normal missing_reference handler will do that
pass
else:
node.replace_self(newnode)
def install_dispatcher(app: Sphinx, docname: str, source: List[str]) -> None:
"""Enable IntersphinxDispatcher.
.. note:: The installed dispatcher will uninstalled on disabling sphinx_domain
automatically.
"""
dispatcher = IntersphinxDispatcher()
dispatcher.enable()
def normalize_intersphinx_mapping(app: Sphinx, config: Config) -> None:
for key, value in config.intersphinx_mapping.copy().items():
try:
if isinstance(value, (list, tuple)):
# new format
name, (uri, inv) = key, value
if not isinstance(name, str):
logger.warning(__('intersphinx identifier %r is not string. Ignored'),
name)
config.intersphinx_mapping.pop(key)
continue
else:
# old format, no name
name, uri, inv = None, key, value
if not isinstance(inv, tuple):
config.intersphinx_mapping[key] = (name, (uri, (inv,)))
else:
config.intersphinx_mapping[key] = (name, (uri, inv))
except Exception as exc:
2020-03-08 19:47:11 -05:00
logger.warning(__('Failed to read intersphinx_mapping[%s], ignored: %r'), key, exc)
config.intersphinx_mapping.pop(key)
def setup(app: Sphinx) -> Dict[str, Any]:
2008-08-04 12:31:25 -05:00
app.add_config_value('intersphinx_mapping', {}, True)
app.add_config_value('intersphinx_cache_limit', 5, False)
app.add_config_value('intersphinx_timeout', None, False)
app.add_config_value('intersphinx_disabled_reftypes', [], True)
app.connect('config-inited', normalize_intersphinx_mapping, priority=800)
2008-08-04 12:31:25 -05:00
app.connect('builder-inited', load_mappings)
app.connect('source-read', install_dispatcher)
app.connect('missing-reference', missing_reference)
app.add_post_transform(IntersphinxRoleResolver)
return {
'version': sphinx.__display_version__,
'env_version': 1,
'parallel_read_safe': True
}
def inspect_main(argv: List[str]) -> None:
"""Debug functionality to print out an inventory"""
2018-02-22 09:13:17 -06:00
if len(argv) < 1:
print("Print out an inventory file.\n"
"Error: must specify local path or URL to an inventory file.",
file=sys.stderr)
sys.exit(1)
class MockConfig:
intersphinx_timeout: int = None
tls_verify = False
user_agent = None
class MockApp:
srcdir = ''
config = MockConfig()
def warn(self, msg: str) -> None:
print(msg, file=sys.stderr)
try:
filename = argv[0]
invdata = fetch_inventory(MockApp(), '', filename) # type: ignore
for key in sorted(invdata or {}):
print(key)
for entry, einfo in sorted(invdata[key].items()):
print('\t%-40s %s%s' % (entry,
'%-40s: ' % einfo[3] if einfo[3] != '-' else '',
einfo[2]))
except ValueError as exc:
print(exc.args[0] % exc.args[1:])
except Exception as exc:
print('Unknown error: %r' % exc)
if __name__ == '__main__':
2019-10-06 04:18:12 -05:00
import logging as _logging
_logging.basicConfig()
inspect_main(argv=sys.argv[1:])