Move InventoryFile class to sphinx.util.inventory

This commit is contained in:
Takeshi KOMIYA 2017-03-02 22:51:42 +09:00
parent 69bb12c1e1
commit 29254c15f1
4 changed files with 196 additions and 158 deletions

View File

@ -27,12 +27,9 @@
from __future__ import print_function
import time
import zlib
import codecs
import functools
import posixpath
from os import path
import re
from six import PY3, iteritems, string_types
from six.moves.urllib.parse import urlsplit, urlunsplit
@ -44,10 +41,11 @@ import sphinx
from sphinx.locale import _
from sphinx.builders.html import INVENTORY_FILENAME
from sphinx.util import requests, logging
from sphinx.util.inventory import InventoryFile
if False:
# For type annotation
from typing import Any, Callable, Dict, IO, Iterator, Tuple, Union # NOQA
from typing import Any, Dict, IO, Tuple, Union # NOQA
from sphinx.application import Sphinx # NOQA
from sphinx.config import Config # NOQA
from sphinx.environment import BuildEnvironment # NOQA
@ -59,9 +57,6 @@ if False:
logger = logging.getLogger(__name__)
UTF8StreamReader = codecs.lookup('utf-8')[2]
BUFSIZE = 16 * 1024
class InventoryAdapter(object):
"""Inventory adapter for environment"""
@ -94,101 +89,6 @@ class InventoryAdapter(object):
self.env.intersphinx_named_inventory.clear()
class ZlibReader(object):
"""Compressed file reader."""
def __init__(self, stream):
# type: (IO) -> None
self.stream = stream
def read_chunks(self):
# type: () -> Iterator[bytes]
decompressor = zlib.decompressobj()
for chunk in iter(lambda: self.stream.read(BUFSIZE), b''):
yield decompressor.decompress(chunk)
yield decompressor.flush()
def __iter__(self):
# type: () -> Iterator[unicode]
buf = b''
for chunk in self.read_chunks():
buf += chunk
pos = buf.find(b'\n')
while pos != -1:
yield buf[:pos].decode('utf-8')
buf = buf[pos + 1:]
pos = buf.find(b'\n')
assert not buf
def readlines(self):
# type: () -> Iterator[unicode]
return iter(self) # type: ignore
class InventoryFile(object):
@classmethod
def load(cls, stream, uri, joinfunc):
# type: (IO, unicode, Callable) -> Inventory
line = stream.readline().rstrip().decode('utf-8')
if line == '# Sphinx inventory version 1':
return cls.load_v1(stream, uri, joinfunc)
elif line == '# Sphinx inventory version 2':
return cls.load_v2(stream, uri, joinfunc)
else:
raise ValueError('invalid inventory header: %s' % line)
@classmethod
def load_v1(cls, stream, uri, join):
# type: (IO, unicode, Callable) -> Inventory
stream = UTF8StreamReader(stream)
invdata = {} # type: Inventory
projname = stream.readline().rstrip()[11:]
version = stream.readline().rstrip()[11:]
for line in stream:
name, type, location = line.rstrip().split(None, 2)
location = join(uri, location)
# version 1 did not add anchors to the location
if type == 'mod':
type = 'py:module'
location += '#module-' + name
else:
type = 'py:' + type
location += '#' + name
invdata.setdefault(type, {})[name] = (projname, version, location, '-')
return invdata
@classmethod
def load_v2(cls, stream, uri, join):
# type: (IO, unicode, Callable) -> Inventory
invdata = {} # type: Inventory
projname = stream.readline().decode('utf-8').rstrip()[11:]
version = stream.readline().decode('utf-8').rstrip()[11:]
line = stream.readline().decode('utf-8')
if 'zlib' not in line:
raise ValueError('invalid inventory header (not compressed): %s' % line)
for line in ZlibReader(stream).readlines():
# be careful to handle names with embedded spaces correctly
m = re.match(r'(?x)(.+?)\s+(\S*:\S*)\s+(-?\d+)\s+(\S+)\s+(.*)',
line.rstrip())
if not m:
continue
name, type, prio, location, dispname = m.groups()
if type == 'py:module' and type in invdata and \
name in invdata[type]: # due to a bug in 1.1 and below,
# two inventory entries are created
# for Python modules, and the first
# one is correct
continue
if location.endswith(u'$'):
location = location[:-1] + name
location = join(uri, location)
invdata.setdefault(type, {})[name] = (projname, version,
location, dispname)
return invdata
def _strip_basic_auth(url):
# type: (unicode) -> unicode
"""Returns *url* with basic auth credentials removed. Also returns the

123
sphinx/util/inventory.py Normal file
View File

@ -0,0 +1,123 @@
# -*- coding: utf-8 -*-
"""
sphinx.util.inventory
~~~~~~~~~~~~~~~~~~~~~
Inventory utility functions for Sphinx.
:copyright: Copyright 2007-2016 by the Sphinx team, see AUTHORS.
:license: BSD, see LICENSE for details.
"""
import re
import zlib
import codecs
from six import PY3
if False:
# For type annotation
from typing import Callable, Dict, IO, Iterator, Tuple # NOQA
if PY3:
unicode = str
Inventory = Dict[unicode, Dict[unicode, Tuple[unicode, unicode, unicode, unicode]]]
BUFSIZE = 16 * 1024
UTF8StreamReader = codecs.lookup('utf-8')[2]
class ZlibReader(object):
"""Compressed file reader."""
def __init__(self, stream):
# type: (IO) -> None
self.stream = stream
def read_chunks(self):
# type: () -> Iterator[bytes]
decompressor = zlib.decompressobj()
for chunk in iter(lambda: self.stream.read(BUFSIZE), b''):
yield decompressor.decompress(chunk)
yield decompressor.flush()
def __iter__(self):
# type: () -> Iterator[unicode]
buf = b''
for chunk in self.read_chunks():
buf += chunk
pos = buf.find(b'\n')
while pos != -1:
yield buf[:pos].decode('utf-8')
buf = buf[pos + 1:]
pos = buf.find(b'\n')
assert not buf
def readlines(self):
# type: () -> Iterator[unicode]
return iter(self) # type: ignore
class InventoryFile(object):
@classmethod
def load(cls, stream, uri, joinfunc):
# type: (IO, unicode, Callable) -> Inventory
line = stream.readline().rstrip().decode('utf-8')
if line == '# Sphinx inventory version 1':
return cls.load_v1(stream, uri, joinfunc)
elif line == '# Sphinx inventory version 2':
return cls.load_v2(stream, uri, joinfunc)
else:
raise ValueError('invalid inventory header: %s' % line)
@classmethod
def load_v1(cls, stream, uri, join):
# type: (IO, unicode, Callable) -> Inventory
stream = UTF8StreamReader(stream)
invdata = {} # type: Inventory
projname = stream.readline().rstrip()[11:]
version = stream.readline().rstrip()[11:]
for line in stream:
name, type, location = line.rstrip().split(None, 2)
location = join(uri, location)
# version 1 did not add anchors to the location
if type == 'mod':
type = 'py:module'
location += '#module-' + name
else:
type = 'py:' + type
location += '#' + name
invdata.setdefault(type, {})[name] = (projname, version, location, '-')
return invdata
@classmethod
def load_v2(cls, stream, uri, join):
# type: (IO, unicode, Callable) -> Inventory
invdata = {} # type: Inventory
projname = stream.readline().decode('utf-8').rstrip()[11:]
version = stream.readline().decode('utf-8').rstrip()[11:]
line = stream.readline().decode('utf-8')
if 'zlib' not in line:
raise ValueError('invalid inventory header (not compressed): %s' % line)
for line in ZlibReader(stream).readlines():
# be careful to handle names with embedded spaces correctly
m = re.match(r'(?x)(.+?)\s+(\S*:\S*)\s+(-?\d+)\s+(\S+)\s+(.*)',
line.rstrip())
if not m:
continue
name, type, prio, location, dispname = m.groups()
if type == 'py:module' and type in invdata and \
name in invdata[type]: # due to a bug in 1.1 and below,
# two inventory entries are created
# for Python modules, and the first
# one is correct
continue
if location.endswith(u'$'):
location = location[:-1] + name
location = join(uri, location)
invdata.setdefault(type, {})[name] = (projname, version,
location, dispname)
return invdata

View File

@ -9,70 +9,18 @@
:license: BSD, see LICENSE for details.
"""
import posixpath
import unittest
import zlib
from six import BytesIO
from docutils import nodes
import mock
from sphinx import addnodes
from sphinx.ext.intersphinx import setup as intersphinx_setup
from sphinx.ext.intersphinx import InventoryFile, \
load_mappings, missing_reference, _strip_basic_auth, \
from sphinx.ext.intersphinx import (
load_mappings, missing_reference, _strip_basic_auth,
_get_safe_url, fetch_inventory, INVENTORY_FILENAME
inventory_v1 = '''\
# Sphinx inventory version 1
# Project: foo
# Version: 1.0
module mod foo.html
module.cls class foo.html
'''.encode('utf-8')
inventory_v2 = '''\
# Sphinx inventory version 2
# Project: foo
# Version: 2.0
# The remainder of this file is compressed with zlib.
'''.encode('utf-8') + zlib.compress('''\
module1 py:module 0 foo.html#module-module1 Long Module desc
module2 py:module 0 foo.html#module-$ -
module1.func py:function 1 sub/foo.html#$ -
CFunc c:function 2 cfunc.html#CFunc -
a term std:term -1 glossary.html#term-a-term -
docname std:doc -1 docname.html -
a term including:colon std:term -1 glossary.html#term-a-term-including-colon -
'''.encode('utf-8'))
def test_read_inventory_v1():
f = BytesIO(inventory_v1)
invdata = InventoryFile.load(f, '/util', posixpath.join)
assert invdata['py:module']['module'] == \
('foo', '1.0', '/util/foo.html#module-module', '-')
assert invdata['py:class']['module.cls'] == \
('foo', '1.0', '/util/foo.html#module.cls', '-')
def test_read_inventory_v2():
f = BytesIO(inventory_v2)
invdata = InventoryFile.load(f, '/util', posixpath.join)
assert len(invdata['py:module']) == 2
assert invdata['py:module']['module1'] == \
('foo', '2.0', '/util/foo.html#module-module1', 'Long Module desc')
assert invdata['py:module']['module2'] == \
('foo', '2.0', '/util/foo.html#module-module2', '-')
assert invdata['py:function']['module1.func'][2] == \
'/util/sub/foo.html#module1.func'
assert invdata['c:function']['CFunc'][2] == '/util/cfunc.html#CFunc'
assert invdata['std:term']['a term'][2] == \
'/util/glossary.html#term-a-term'
assert invdata['std:term']['a term including:colon'][2] == \
'/util/glossary.html#term-a-term-including-colon'
)
from test_util_inventory import inventory_v2
@mock.patch('sphinx.ext.intersphinx.InventoryFile')

View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""
test_util_inventory
~~~~~~~~~~~~~~~~~~~
Test inventory util functions.
:copyright: Copyright 2007-2016 by the Sphinx team, see AUTHORS.
:license: BSD, see LICENSE for details.
"""
import zlib
import posixpath
from six import BytesIO
from sphinx.ext.intersphinx import InventoryFile
inventory_v1 = '''\
# Sphinx inventory version 1
# Project: foo
# Version: 1.0
module mod foo.html
module.cls class foo.html
'''.encode('utf-8')
inventory_v2 = '''\
# Sphinx inventory version 2
# Project: foo
# Version: 2.0
# The remainder of this file is compressed with zlib.
'''.encode('utf-8') + zlib.compress('''\
module1 py:module 0 foo.html#module-module1 Long Module desc
module2 py:module 0 foo.html#module-$ -
module1.func py:function 1 sub/foo.html#$ -
CFunc c:function 2 cfunc.html#CFunc -
a term std:term -1 glossary.html#term-a-term -
docname std:doc -1 docname.html -
a term including:colon std:term -1 glossary.html#term-a-term-including-colon -
'''.encode('utf-8'))
def test_read_inventory_v1():
f = BytesIO(inventory_v1)
invdata = InventoryFile.load(f, '/util', posixpath.join)
assert invdata['py:module']['module'] == \
('foo', '1.0', '/util/foo.html#module-module', '-')
assert invdata['py:class']['module.cls'] == \
('foo', '1.0', '/util/foo.html#module.cls', '-')
def test_read_inventory_v2():
f = BytesIO(inventory_v2)
invdata = InventoryFile.load(f, '/util', posixpath.join)
assert len(invdata['py:module']) == 2
assert invdata['py:module']['module1'] == \
('foo', '2.0', '/util/foo.html#module-module1', 'Long Module desc')
assert invdata['py:module']['module2'] == \
('foo', '2.0', '/util/foo.html#module-module2', '-')
assert invdata['py:function']['module1.func'][2] == \
'/util/sub/foo.html#module1.func'
assert invdata['c:function']['CFunc'][2] == '/util/cfunc.html#CFunc'
assert invdata['std:term']['a term'][2] == \
'/util/glossary.html#term-a-term'
assert invdata['std:term']['a term including:colon'][2] == \
'/util/glossary.html#term-a-term-including-colon'