diff --git a/sphinx/ext/intersphinx/_load.py b/sphinx/ext/intersphinx/_load.py index c7ea85a0d..a81c6bbb2 100644 --- a/sphinx/ext/intersphinx/_load.py +++ b/sphinx/ext/intersphinx/_load.py @@ -4,7 +4,6 @@ from __future__ import annotations import concurrent.futures import dataclasses -import io import os.path import posixpath import time @@ -327,9 +326,8 @@ def _fetch_inventory( else: raw_data = _fetch_inventory_file(inv_location=inv_location, srcdir=srcdir) - stream = io.BytesIO(raw_data) try: - invdata = InventoryFile.load(stream, target_uri, posixpath.join) + invdata = InventoryFile.loads(raw_data, uri=target_uri) except ValueError as exc: msg = f'unknown or unsupported inventory version: {exc!r}' raise ValueError(msg) from exc diff --git a/sphinx/util/_inventory_file_reader.py b/sphinx/util/_inventory_file_reader.py new file mode 100644 index 000000000..d19faa87e --- /dev/null +++ b/sphinx/util/_inventory_file_reader.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import zlib +from typing import TYPE_CHECKING + +from sphinx.util import logging + +BUFSIZE = 16 * 1024 +logger = logging.getLogger(__name__) + +if TYPE_CHECKING: + from collections.abc import Iterator + from typing import Protocol + + # Readable file stream for inventory loading + class _SupportsRead(Protocol): + def read(self, size: int = ...) -> bytes: ... + + +__all__ = ('InventoryFileReader',) + + +class InventoryFileReader: + """A file reader for an inventory file. + + This reader supports mixture of texts and compressed texts. + """ + + def __init__(self, stream: _SupportsRead) -> None: + self.stream = stream + self.buffer = b'' + self.eof = False + + def read_buffer(self) -> None: + chunk = self.stream.read(BUFSIZE) + if chunk == b'': + self.eof = True + self.buffer += chunk + + def readline(self) -> str: + pos = self.buffer.find(b'\n') + if pos != -1: + line = self.buffer[:pos].decode() + self.buffer = self.buffer[pos + 1 :] + elif self.eof: + line = self.buffer.decode() + self.buffer = b'' + else: + self.read_buffer() + line = self.readline() + + return line + + def readlines(self) -> Iterator[str]: + while not self.eof: + line = self.readline() + if line: + yield line + + def read_compressed_chunks(self) -> Iterator[bytes]: + decompressor = zlib.decompressobj() + while not self.eof: + self.read_buffer() + yield decompressor.decompress(self.buffer) + self.buffer = b'' + yield decompressor.flush() + + def read_compressed_lines(self) -> Iterator[str]: + buf = b'' + for chunk in self.read_compressed_chunks(): + buf += chunk + pos = buf.find(b'\n') + while pos != -1: + yield buf[:pos].decode() + buf = buf[pos + 1 :] + pos = buf.find(b'\n') diff --git a/sphinx/util/inventory.py b/sphinx/util/inventory.py index 507d7a1d8..9da94f85d 100644 --- a/sphinx/util/inventory.py +++ b/sphinx/util/inventory.py @@ -2,6 +2,7 @@ from __future__ import annotations +import posixpath import re import zlib from typing import TYPE_CHECKING @@ -14,127 +15,96 @@ logger = logging.getLogger(__name__) if TYPE_CHECKING: import os - from collections.abc import Callable, Iterator + from collections.abc import Callable, Sequence + from typing import Protocol from sphinx.builders import Builder from sphinx.environment import BuildEnvironment - from sphinx.util.typing import Inventory, InventoryItem, _ReadableStream + from sphinx.util.typing import Inventory, InventoryItem + + # Readable file stream for inventory loading + class _SupportsRead(Protocol): + def read(self, size: int = ...) -> bytes: ... + + _JoinFunc = Callable[[str, str], str] -class InventoryFileReader: - """A file reader for an inventory file. +def __getattr__(name: str) -> object: + if name == 'InventoryFileReader': + from sphinx.util._inventory_file_reader import InventoryFileReader - This reader supports mixture of texts and compressed texts. - """ - - def __init__(self, stream: _ReadableStream[bytes]) -> None: - self.stream = stream - self.buffer = b'' - self.eof = False - - def read_buffer(self) -> None: - chunk = self.stream.read(BUFSIZE) - if chunk == b'': - self.eof = True - self.buffer += chunk - - def readline(self) -> str: - pos = self.buffer.find(b'\n') - if pos != -1: - line = self.buffer[:pos].decode() - self.buffer = self.buffer[pos + 1 :] - elif self.eof: - line = self.buffer.decode() - self.buffer = b'' - else: - self.read_buffer() - line = self.readline() - - return line - - def readlines(self) -> Iterator[str]: - while not self.eof: - line = self.readline() - if line: - yield line - - def read_compressed_chunks(self) -> Iterator[bytes]: - decompressor = zlib.decompressobj() - while not self.eof: - self.read_buffer() - yield decompressor.decompress(self.buffer) - self.buffer = b'' - yield decompressor.flush() - - def read_compressed_lines(self) -> Iterator[str]: - buf = b'' - for chunk in self.read_compressed_chunks(): - buf += chunk - pos = buf.find(b'\n') - while pos != -1: - yield buf[:pos].decode() - buf = buf[pos + 1 :] - pos = buf.find(b'\n') + return InventoryFileReader + msg = f'module {__name__!r} has no attribute {name!r}' + raise AttributeError(msg) class InventoryFile: @classmethod - def load( - cls: type[InventoryFile], - stream: _ReadableStream[bytes], + def loads( + cls, + content: bytes, + *, uri: str, - joinfunc: Callable[[str, str], str], ) -> Inventory: - reader = InventoryFileReader(stream) - line = reader.readline().rstrip() - if line == '# Sphinx inventory version 1': - return cls.load_v1(reader, uri, joinfunc) - elif line == '# Sphinx inventory version 2': - return cls.load_v2(reader, uri, joinfunc) - else: - raise ValueError('invalid inventory header: %s' % line) + format_line, _, content = content.partition(b'\n') + format_line = format_line.rstrip() # remove trailing \r or spaces + if format_line == b'# Sphinx inventory version 2': + return cls._loads_v2(content, uri=uri) + if format_line == b'# Sphinx inventory version 1': + lines = content.decode().splitlines() + return cls._loads_v1(lines, uri=uri) + if format_line.startswith(b'# Sphinx inventory version '): + unknown_version = format_line[27:].decode() + msg = f'unknown or unsupported inventory version: {unknown_version!r}' + raise ValueError(msg) + msg = f'invalid inventory header: {format_line.decode()}' + raise ValueError(msg) @classmethod - def load_v1( - cls: type[InventoryFile], - stream: InventoryFileReader, - uri: str, - join: Callable[[str, str], str], - ) -> Inventory: + def load(cls, stream: _SupportsRead, uri: str, joinfunc: _JoinFunc) -> Inventory: + return cls.loads(stream.read(), uri=uri) + + @classmethod + def _loads_v1(cls, lines: Sequence[str], *, uri: str) -> Inventory: + if len(lines) < 2: + msg = 'invalid inventory header: missing project name or version' + raise ValueError(msg) invdata: Inventory = {} - projname = stream.readline().rstrip()[11:] - version = stream.readline().rstrip()[11:] - for line in stream.readlines(): - name, type, location = line.rstrip().split(None, 2) - location = join(uri, location) + projname = lines[0].rstrip()[11:] # Project name + version = lines[1].rstrip()[11:] # Project version + for line in lines[2:]: + name, item_type, location = line.rstrip().split(None, 2) + location = posixpath.join(uri, location) # version 1 did not add anchors to the location - if type == 'mod': - type = 'py:module' - location += '#module-' + name + if item_type == 'mod': + item_type = 'py:module' + location += f'#module-{name}' else: - type = 'py:' + type - location += '#' + name - invdata.setdefault(type, {})[name] = (projname, version, location, '-') + item_type = f'py:{item_type}' + location += f'#{name}' + inv_item: InventoryItem = projname, version, location, '-' + invdata.setdefault(item_type, {})[name] = inv_item return invdata @classmethod - def load_v2( - cls: type[InventoryFile], - stream: InventoryFileReader, - uri: str, - join: Callable[[str, str], str], - ) -> Inventory: + def _loads_v2(cls, inv_data: bytes, *, uri: str) -> Inventory: + try: + line_1, line_2, check_line, compressed = inv_data.split(b'\n', maxsplit=3) + except ValueError: + msg = 'invalid inventory header: missing project name or version' + raise ValueError(msg) from None invdata: Inventory = {} - projname = stream.readline().rstrip()[11:] - version = stream.readline().rstrip()[11:] + projname = line_1.rstrip()[11:].decode() # Project name + version = line_2.rstrip()[11:].decode() # Project version # definition -> priority, location, display name potential_ambiguities: dict[str, tuple[str, str, str]] = {} actual_ambiguities = set() - line = stream.readline() - if 'zlib' not in line: - raise ValueError('invalid inventory header (not compressed): %s' % line) + if b'zlib' not in check_line: # '... compressed using zlib' + msg = f'invalid inventory header (not compressed): {check_line.decode()}' + raise ValueError(msg) - for line in stream.read_compressed_lines(): + decompressed_content = zlib.decompress(compressed) + for line in decompressed_content.decode().splitlines(): # be careful to handle names with embedded spaces correctly m = re.match( r'(.+?)\s+(\S+)\s+(-?\d+)\s+?(\S*)\s+(.*)', @@ -177,7 +147,7 @@ class InventoryFile: potential_ambiguities[lowercase_definition] = content if location.endswith('$'): location = location[:-1] + name - location = join(uri, location) + location = posixpath.join(uri, location) inv_item: InventoryItem = projname, version, location, dispname invdata.setdefault(type, {})[name] = inv_item for ambiguity in actual_ambiguities: @@ -192,10 +162,7 @@ class InventoryFile: @classmethod def dump( - cls: type[InventoryFile], - filename: str | os.PathLike[str], - env: BuildEnvironment, - builder: Builder, + cls, filename: str | os.PathLike[str], env: BuildEnvironment, builder: Builder ) -> None: def escape(string: str) -> str: return re.sub('\\s+', ' ', string) diff --git a/sphinx/util/typing.py b/sphinx/util/typing.py index 458c8076a..6b72ad9e8 100644 --- a/sphinx/util/typing.py +++ b/sphinx/util/typing.py @@ -119,26 +119,6 @@ OptionSpec: TypeAlias = dict[str, Callable[[str], Any]] # title getter functions for enumerable nodes (see sphinx.domains.std) TitleGetter: TypeAlias = Callable[[nodes.Node], str] -# Readable file stream for inventory loading -if TYPE_CHECKING: - from types import TracebackType - from typing import Self - - _T_co = TypeVar('_T_co', str, bytes, covariant=True) - - class _ReadableStream(Protocol[_T_co]): # NoQA: PYI046 (false positive) - def read(self, size: int = ...) -> _T_co: ... - - def __enter__(self) -> Self: ... - - def __exit__( - self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, - ) -> None: ... - - # inventory data on memory InventoryItem: TypeAlias = tuple[ str, # project name diff --git a/tests/test_extensions/test_ext_intersphinx.py b/tests/test_extensions/test_ext_intersphinx.py index 70262cdd0..090510400 100644 --- a/tests/test_extensions/test_ext_intersphinx.py +++ b/tests/test_extensions/test_ext_intersphinx.py @@ -88,7 +88,7 @@ def test_fetch_inventory_redirection(get_request, InventoryFile, app): srcdir=app.srcdir, ) assert 'intersphinx inventory has moved' not in app.status.getvalue() - assert InventoryFile.load.call_args[0][1] == 'https://hostname/' + assert InventoryFile.loads.call_args[1]['uri'] == 'https://hostname/' # same uri and inv, redirected app.status.seek(0) @@ -106,7 +106,7 @@ def test_fetch_inventory_redirection(get_request, InventoryFile, app): 'https://hostname/%s -> https://hostname/new/%s\n' % (INVENTORY_FILENAME, INVENTORY_FILENAME) ) - assert InventoryFile.load.call_args[0][1] == 'https://hostname/new' + assert InventoryFile.loads.call_args[1]['uri'] == 'https://hostname/new' # different uri and inv, not redirected app.status.seek(0) @@ -120,7 +120,7 @@ def test_fetch_inventory_redirection(get_request, InventoryFile, app): srcdir=app.srcdir, ) assert 'intersphinx inventory has moved' not in app.status.getvalue() - assert InventoryFile.load.call_args[0][1] == 'https://hostname/' + assert InventoryFile.loads.call_args[1]['uri'] == 'https://hostname/' # different uri and inv, redirected app.status.seek(0) @@ -138,7 +138,7 @@ def test_fetch_inventory_redirection(get_request, InventoryFile, app): 'https://hostname/new/%s -> https://hostname/other/%s\n' % (INVENTORY_FILENAME, INVENTORY_FILENAME) ) - assert InventoryFile.load.call_args[0][1] == 'https://hostname/' + assert InventoryFile.loads.call_args[1]['uri'] == 'https://hostname/' @pytest.mark.sphinx('html', testroot='root') diff --git a/tests/test_util/test_util_inventory.py b/tests/test_util/test_util_inventory.py index b09c2ec2d..77267496c 100644 --- a/tests/test_util/test_util_inventory.py +++ b/tests/test_util/test_util_inventory.py @@ -3,8 +3,6 @@ from __future__ import annotations import os -import posixpath -from io import BytesIO from typing import TYPE_CHECKING import pytest @@ -25,8 +23,7 @@ if TYPE_CHECKING: def test_read_inventory_v1(): - f = BytesIO(INVENTORY_V1) - invdata = InventoryFile.load(f, '/util', posixpath.join) + invdata = InventoryFile.loads(INVENTORY_V1, uri='/util') assert invdata['py:module']['module'] == ( 'foo', '1.0', @@ -42,8 +39,7 @@ def test_read_inventory_v1(): def test_read_inventory_v2(): - f = BytesIO(INVENTORY_V2) - invdata = InventoryFile.load(f, '/util', posixpath.join) + invdata = InventoryFile.loads(INVENTORY_V2, uri='/util') assert len(invdata['py:module']) == 2 assert invdata['py:module']['module1'] == ( @@ -69,8 +65,7 @@ def test_read_inventory_v2(): def test_read_inventory_v2_not_having_version(): - f = BytesIO(INVENTORY_V2_NO_VERSION) - invdata = InventoryFile.load(f, '/util', posixpath.join) + invdata = InventoryFile.loads(INVENTORY_V2_NO_VERSION, uri='/util') assert invdata['py:module']['module1'] == ( 'foo', '', @@ -81,8 +76,7 @@ def test_read_inventory_v2_not_having_version(): @pytest.mark.sphinx('html', testroot='root') def test_ambiguous_definition_warning(app): - f = BytesIO(INVENTORY_V2_AMBIGUOUS_TERMS) - InventoryFile.load(f, '/util', posixpath.join) + InventoryFile.loads(INVENTORY_V2_AMBIGUOUS_TERMS, uri='/util') def _multiple_defs_notice_for(entity: str) -> str: return f'contains multiple definitions for {entity}'