mirror of
https://github.com/sphinx-doc/sphinx.git
synced 2025-02-25 18:55:22 -06:00
intersphinx: Add ZlibReader utility
This commit is contained in:
parent
05a9fd9cc9
commit
00e32eeeff
@ -60,6 +60,7 @@ if False:
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
UTF8StreamReader = codecs.lookup('utf-8')[2]
|
UTF8StreamReader = codecs.lookup('utf-8')[2]
|
||||||
|
BUFSIZE = 16 * 1024
|
||||||
|
|
||||||
|
|
||||||
class InventoryAdapter(object):
|
class InventoryAdapter(object):
|
||||||
@ -93,14 +94,44 @@ class InventoryAdapter(object):
|
|||||||
self.env.intersphinx_named_inventory.clear()
|
self.env.intersphinx_named_inventory.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class ZlibReader(object):
|
||||||
|
"""Compressed file reader."""
|
||||||
|
|
||||||
|
def __init__(self, stream):
|
||||||
|
# type: (IO) -> None
|
||||||
|
self.stream = stream
|
||||||
|
|
||||||
|
def read_chunks(self):
|
||||||
|
# type: () -> Iterator[bytes]
|
||||||
|
decompressor = zlib.decompressobj()
|
||||||
|
for chunk in iter(lambda: self.stream.read(BUFSIZE), b''):
|
||||||
|
yield decompressor.decompress(chunk)
|
||||||
|
yield decompressor.flush()
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
# type: () -> Iterator[unicode]
|
||||||
|
buf = b''
|
||||||
|
for chunk in self.read_chunks():
|
||||||
|
buf += chunk
|
||||||
|
pos = buf.find(b'\n')
|
||||||
|
while pos != -1:
|
||||||
|
yield buf[:pos].decode('utf-8')
|
||||||
|
buf = buf[pos + 1:]
|
||||||
|
pos = buf.find(b'\n')
|
||||||
|
|
||||||
|
assert not buf
|
||||||
|
|
||||||
|
def readlines(self):
|
||||||
|
# type: () -> Iterator[unicode]
|
||||||
|
return iter(self) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
def read_inventory_v1(f, uri, join):
|
def read_inventory_v1(f, uri, join):
|
||||||
# type: (IO, unicode, Callable) -> Inventory
|
# type: (IO, unicode, Callable) -> Inventory
|
||||||
f = UTF8StreamReader(f)
|
f = UTF8StreamReader(f)
|
||||||
invdata = {} # type: Inventory
|
invdata = {} # type: Inventory
|
||||||
line = next(f)
|
projname = f.readline().rstrip()[11:]
|
||||||
projname = line.rstrip()[11:]
|
version = f.readline().rstrip()[11:]
|
||||||
line = next(f)
|
|
||||||
version = line.rstrip()[11:]
|
|
||||||
for line in f:
|
for line in f:
|
||||||
name, type, location = line.rstrip().split(None, 2)
|
name, type, location = line.rstrip().split(None, 2)
|
||||||
location = join(uri, location)
|
location = join(uri, location)
|
||||||
@ -115,37 +146,16 @@ def read_inventory_v1(f, uri, join):
|
|||||||
return invdata
|
return invdata
|
||||||
|
|
||||||
|
|
||||||
def read_inventory_v2(f, uri, join, bufsize=16 * 1024):
|
def read_inventory_v2(f, uri, join):
|
||||||
# type: (IO, unicode, Callable, int) -> Inventory
|
# type: (IO, unicode, Callable) -> Inventory
|
||||||
invdata = {} # type: Inventory
|
invdata = {} # type: Inventory
|
||||||
line = f.readline()
|
projname = f.readline().decode('utf-8').rstrip()[11:]
|
||||||
projname = line.rstrip()[11:].decode('utf-8')
|
version = f.readline().decode('utf-8').rstrip()[11:]
|
||||||
line = f.readline()
|
|
||||||
version = line.rstrip()[11:].decode('utf-8')
|
|
||||||
line = f.readline().decode('utf-8')
|
line = f.readline().decode('utf-8')
|
||||||
if 'zlib' not in line:
|
if 'zlib' not in line:
|
||||||
raise ValueError('invalid inventory header (not compressed): %s' % line)
|
raise ValueError('invalid inventory header (not compressed): %s' % line)
|
||||||
|
|
||||||
def read_chunks():
|
for line in ZlibReader(f).readlines():
|
||||||
# type: () -> Iterator[bytes]
|
|
||||||
decompressor = zlib.decompressobj()
|
|
||||||
for chunk in iter(lambda: f.read(bufsize), b''):
|
|
||||||
yield decompressor.decompress(chunk)
|
|
||||||
yield decompressor.flush()
|
|
||||||
|
|
||||||
def split_lines(iter):
|
|
||||||
# type: (Iterator[bytes]) -> Iterator[unicode]
|
|
||||||
buf = b''
|
|
||||||
for chunk in iter:
|
|
||||||
buf += chunk
|
|
||||||
lineend = buf.find(b'\n')
|
|
||||||
while lineend != -1:
|
|
||||||
yield buf[:lineend].decode('utf-8')
|
|
||||||
buf = buf[lineend + 1:]
|
|
||||||
lineend = buf.find(b'\n')
|
|
||||||
assert not buf
|
|
||||||
|
|
||||||
for line in split_lines(read_chunks()):
|
|
||||||
# be careful to handle names with embedded spaces correctly
|
# be careful to handle names with embedded spaces correctly
|
||||||
m = re.match(r'(?x)(.+?)\s+(\S*:\S*)\s+(-?\d+)\s+(\S+)\s+(.*)',
|
m = re.match(r'(?x)(.+?)\s+(\S*:\S*)\s+(-?\d+)\s+(\S+)\s+(.*)',
|
||||||
line.rstrip())
|
line.rstrip())
|
||||||
@ -166,13 +176,13 @@ def read_inventory_v2(f, uri, join, bufsize=16 * 1024):
|
|||||||
return invdata
|
return invdata
|
||||||
|
|
||||||
|
|
||||||
def read_inventory(f, uri, join, bufsize=16 * 1024):
|
def read_inventory(f, uri, join):
|
||||||
# type: (IO, unicode, Callable, int) -> Inventory
|
# type: (IO, unicode, Callable, int) -> Inventory
|
||||||
line = f.readline().rstrip().decode('utf-8')
|
line = f.readline().rstrip().decode('utf-8')
|
||||||
if line == '# Sphinx inventory version 1':
|
if line == '# Sphinx inventory version 1':
|
||||||
return read_inventory_v1(f, uri, join)
|
return read_inventory_v1(f, uri, join)
|
||||||
elif line == '# Sphinx inventory version 2':
|
elif line == '# Sphinx inventory version 2':
|
||||||
return read_inventory_v2(f, uri, join, bufsize=bufsize)
|
return read_inventory_v2(f, uri, join)
|
||||||
else:
|
else:
|
||||||
raise ValueError('invalid inventory header: %s' % line)
|
raise ValueError('invalid inventory header: %s' % line)
|
||||||
|
|
||||||
|
@ -59,25 +59,19 @@ def test_read_inventory_v1():
|
|||||||
|
|
||||||
def test_read_inventory_v2():
|
def test_read_inventory_v2():
|
||||||
f = BytesIO(inventory_v2)
|
f = BytesIO(inventory_v2)
|
||||||
invdata1 = read_inventory(f, '/util', posixpath.join)
|
invdata = read_inventory(f, '/util', posixpath.join)
|
||||||
|
|
||||||
# try again with a small buffer size to test the chunking algorithm
|
assert len(invdata['py:module']) == 2
|
||||||
f = BytesIO(inventory_v2)
|
assert invdata['py:module']['module1'] == \
|
||||||
invdata2 = read_inventory(f, '/util', posixpath.join, bufsize=5)
|
|
||||||
|
|
||||||
assert invdata1 == invdata2
|
|
||||||
|
|
||||||
assert len(invdata1['py:module']) == 2
|
|
||||||
assert invdata1['py:module']['module1'] == \
|
|
||||||
('foo', '2.0', '/util/foo.html#module-module1', 'Long Module desc')
|
('foo', '2.0', '/util/foo.html#module-module1', 'Long Module desc')
|
||||||
assert invdata1['py:module']['module2'] == \
|
assert invdata['py:module']['module2'] == \
|
||||||
('foo', '2.0', '/util/foo.html#module-module2', '-')
|
('foo', '2.0', '/util/foo.html#module-module2', '-')
|
||||||
assert invdata1['py:function']['module1.func'][2] == \
|
assert invdata['py:function']['module1.func'][2] == \
|
||||||
'/util/sub/foo.html#module1.func'
|
'/util/sub/foo.html#module1.func'
|
||||||
assert invdata1['c:function']['CFunc'][2] == '/util/cfunc.html#CFunc'
|
assert invdata['c:function']['CFunc'][2] == '/util/cfunc.html#CFunc'
|
||||||
assert invdata1['std:term']['a term'][2] == \
|
assert invdata['std:term']['a term'][2] == \
|
||||||
'/util/glossary.html#term-a-term'
|
'/util/glossary.html#term-a-term'
|
||||||
assert invdata1['std:term']['a term including:colon'][2] == \
|
assert invdata['std:term']['a term including:colon'][2] == \
|
||||||
'/util/glossary.html#term-a-term-including-colon'
|
'/util/glossary.html#term-a-term-including-colon'
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user