intersphinx: Add InventoryFile utility

This commit is contained in:
Takeshi KOMIYA 2017-03-02 17:26:07 +09:00
parent 00e32eeeff
commit 69bb12c1e1
2 changed files with 67 additions and 65 deletions

View File

@ -126,65 +126,67 @@ class ZlibReader(object):
return iter(self) # type: ignore
def read_inventory_v1(f, uri, join):
# type: (IO, unicode, Callable) -> Inventory
f = UTF8StreamReader(f)
invdata = {} # type: Inventory
projname = f.readline().rstrip()[11:]
version = f.readline().rstrip()[11:]
for line in f:
name, type, location = line.rstrip().split(None, 2)
location = join(uri, location)
# version 1 did not add anchors to the location
if type == 'mod':
type = 'py:module'
location += '#module-' + name
class InventoryFile(object):
@classmethod
def load(cls, stream, uri, joinfunc):
# type: (IO, unicode, Callable) -> Inventory
line = stream.readline().rstrip().decode('utf-8')
if line == '# Sphinx inventory version 1':
return cls.load_v1(stream, uri, joinfunc)
elif line == '# Sphinx inventory version 2':
return cls.load_v2(stream, uri, joinfunc)
else:
type = 'py:' + type
location += '#' + name
invdata.setdefault(type, {})[name] = (projname, version, location, '-')
return invdata
raise ValueError('invalid inventory header: %s' % line)
@classmethod
def load_v1(cls, stream, uri, join):
# type: (IO, unicode, Callable) -> Inventory
stream = UTF8StreamReader(stream)
invdata = {} # type: Inventory
projname = stream.readline().rstrip()[11:]
version = stream.readline().rstrip()[11:]
for line in stream:
name, type, location = line.rstrip().split(None, 2)
location = join(uri, location)
# version 1 did not add anchors to the location
if type == 'mod':
type = 'py:module'
location += '#module-' + name
else:
type = 'py:' + type
location += '#' + name
invdata.setdefault(type, {})[name] = (projname, version, location, '-')
return invdata
def read_inventory_v2(f, uri, join):
# type: (IO, unicode, Callable) -> Inventory
invdata = {} # type: Inventory
projname = f.readline().decode('utf-8').rstrip()[11:]
version = f.readline().decode('utf-8').rstrip()[11:]
line = f.readline().decode('utf-8')
if 'zlib' not in line:
raise ValueError('invalid inventory header (not compressed): %s' % line)
@classmethod
def load_v2(cls, stream, uri, join):
# type: (IO, unicode, Callable) -> Inventory
invdata = {} # type: Inventory
projname = stream.readline().decode('utf-8').rstrip()[11:]
version = stream.readline().decode('utf-8').rstrip()[11:]
line = stream.readline().decode('utf-8')
if 'zlib' not in line:
raise ValueError('invalid inventory header (not compressed): %s' % line)
for line in ZlibReader(f).readlines():
# be careful to handle names with embedded spaces correctly
m = re.match(r'(?x)(.+?)\s+(\S*:\S*)\s+(-?\d+)\s+(\S+)\s+(.*)',
line.rstrip())
if not m:
continue
name, type, prio, location, dispname = m.groups()
if type == 'py:module' and type in invdata and \
name in invdata[type]: # due to a bug in 1.1 and below,
# two inventory entries are created
# for Python modules, and the first
# one is correct
continue
if location.endswith(u'$'):
location = location[:-1] + name
location = join(uri, location)
invdata.setdefault(type, {})[name] = (projname, version,
location, dispname)
return invdata
def read_inventory(f, uri, join):
# type: (IO, unicode, Callable, int) -> Inventory
line = f.readline().rstrip().decode('utf-8')
if line == '# Sphinx inventory version 1':
return read_inventory_v1(f, uri, join)
elif line == '# Sphinx inventory version 2':
return read_inventory_v2(f, uri, join)
else:
raise ValueError('invalid inventory header: %s' % line)
for line in ZlibReader(stream).readlines():
# be careful to handle names with embedded spaces correctly
m = re.match(r'(?x)(.+?)\s+(\S*:\S*)\s+(-?\d+)\s+(\S+)\s+(.*)',
line.rstrip())
if not m:
continue
name, type, prio, location, dispname = m.groups()
if type == 'py:module' and type in invdata and \
name in invdata[type]: # due to a bug in 1.1 and below,
# two inventory entries are created
# for Python modules, and the first
# one is correct
continue
if location.endswith(u'$'):
location = location[:-1] + name
location = join(uri, location)
invdata.setdefault(type, {})[name] = (projname, version,
location, dispname)
return invdata
def _strip_basic_auth(url):
@ -290,7 +292,7 @@ def fetch_inventory(app, uri, inv):
with f:
try:
join = localuri and path.join or posixpath.join
invdata = read_inventory(f, uri, join)
invdata = InventoryFile.load(f, uri, join)
except ValueError as exc:
raise ValueError('unknown or unsupported inventory version: %r' % exc)
except Exception as err:

View File

@ -19,7 +19,7 @@ import mock
from sphinx import addnodes
from sphinx.ext.intersphinx import setup as intersphinx_setup
from sphinx.ext.intersphinx import read_inventory, \
from sphinx.ext.intersphinx import InventoryFile, \
load_mappings, missing_reference, _strip_basic_auth, \
_get_safe_url, fetch_inventory, INVENTORY_FILENAME
@ -50,7 +50,7 @@ a term including:colon std:term -1 glossary.html#term-a-term-including-colon -
def test_read_inventory_v1():
f = BytesIO(inventory_v1)
invdata = read_inventory(f, '/util', posixpath.join)
invdata = InventoryFile.load(f, '/util', posixpath.join)
assert invdata['py:module']['module'] == \
('foo', '1.0', '/util/foo.html#module-module', '-')
assert invdata['py:class']['module.cls'] == \
@ -59,7 +59,7 @@ def test_read_inventory_v1():
def test_read_inventory_v2():
f = BytesIO(inventory_v2)
invdata = read_inventory(f, '/util', posixpath.join)
invdata = InventoryFile.load(f, '/util', posixpath.join)
assert len(invdata['py:module']) == 2
assert invdata['py:module']['module1'] == \
@ -75,9 +75,9 @@ def test_read_inventory_v2():
'/util/glossary.html#term-a-term-including-colon'
@mock.patch('sphinx.ext.intersphinx.read_inventory')
@mock.patch('sphinx.ext.intersphinx.InventoryFile')
@mock.patch('sphinx.ext.intersphinx._read_from_url')
def test_fetch_inventory_redirection(_read_from_url, read_inventory, app, status, warning):
def test_fetch_inventory_redirection(_read_from_url, InventoryFile, app, status, warning):
intersphinx_setup(app)
_read_from_url().readline.return_value = '# Sphinx inventory version 2'.encode('utf-8')
@ -85,7 +85,7 @@ def test_fetch_inventory_redirection(_read_from_url, read_inventory, app, status
_read_from_url().url = 'http://hostname/' + INVENTORY_FILENAME
fetch_inventory(app, 'http://hostname/', 'http://hostname/' + INVENTORY_FILENAME)
assert 'intersphinx inventory has moved' not in status.getvalue()
assert read_inventory.call_args[0][1] == 'http://hostname/'
assert InventoryFile.load.call_args[0][1] == 'http://hostname/'
# same uri and inv, redirected
status.seek(0)
@ -96,7 +96,7 @@ def test_fetch_inventory_redirection(_read_from_url, read_inventory, app, status
assert status.getvalue() == ('intersphinx inventory has moved: '
'http://hostname/%s -> http://hostname/new/%s\n' %
(INVENTORY_FILENAME, INVENTORY_FILENAME))
assert read_inventory.call_args[0][1] == 'http://hostname/new'
assert InventoryFile.load.call_args[0][1] == 'http://hostname/new'
# different uri and inv, not redirected
status.seek(0)
@ -105,7 +105,7 @@ def test_fetch_inventory_redirection(_read_from_url, read_inventory, app, status
fetch_inventory(app, 'http://hostname/', 'http://hostname/new/' + INVENTORY_FILENAME)
assert 'intersphinx inventory has moved' not in status.getvalue()
assert read_inventory.call_args[0][1] == 'http://hostname/'
assert InventoryFile.load.call_args[0][1] == 'http://hostname/'
# different uri and inv, redirected
status.seek(0)
@ -116,7 +116,7 @@ def test_fetch_inventory_redirection(_read_from_url, read_inventory, app, status
assert status.getvalue() == ('intersphinx inventory has moved: '
'http://hostname/new/%s -> http://hostname/other/%s\n' %
(INVENTORY_FILENAME, INVENTORY_FILENAME))
assert read_inventory.call_args[0][1] == 'http://hostname/'
assert InventoryFile.load.call_args[0][1] == 'http://hostname/'
def test_missing_reference(tempdir, app, status, warning):