mirror of
https://github.com/sphinx-doc/sphinx.git
synced 2025-02-25 18:55:22 -06:00
332 lines
12 KiB
Python
332 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
sphinx.builders.linkcheck
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
The CheckExternalLinksBuilder class.
|
|
|
|
:copyright: Copyright 2007-2018 by the Sphinx team, see AUTHORS.
|
|
:license: BSD, see LICENSE for details.
|
|
"""
|
|
|
|
import re
|
|
import socket
|
|
import codecs
|
|
import threading
|
|
from os import path
|
|
|
|
from requests.exceptions import HTTPError
|
|
from six.moves import queue, html_parser
|
|
from six.moves.urllib.parse import unquote
|
|
from docutils import nodes
|
|
|
|
# 2015-06-25 barry@python.org. This exception was deprecated in Python 3.3 and
|
|
# removed in Python 3.5, however for backward compatibility reasons, we're not
|
|
# going to just remove it. If it doesn't exist, define an exception that will
|
|
# never be caught but leaves the code in check_anchor() intact.
|
|
try:
|
|
from six.moves.html_parser import HTMLParseError # type: ignore
|
|
except ImportError:
|
|
class HTMLParseError(Exception): # type: ignore
|
|
pass
|
|
|
|
from sphinx.builders import Builder
|
|
from sphinx.util import encode_uri, requests, logging
|
|
from sphinx.util.console import ( # type: ignore
|
|
purple, red, darkgreen, darkgray, darkred, turquoise
|
|
)
|
|
from sphinx.util.requests import is_ssl_error
|
|
|
|
if False:
|
|
# For type annotation
|
|
from typing import Any, Dict, List, Set, Tuple, Union # NOQA
|
|
from sphinx.application import Sphinx # NOQA
|
|
from sphinx.util.requests.requests import Response # NOQA
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AnchorCheckParser(html_parser.HTMLParser):
|
|
"""Specialized HTML parser that looks for a specific anchor."""
|
|
|
|
def __init__(self, search_anchor):
|
|
# type: (unicode) -> None
|
|
html_parser.HTMLParser.__init__(self)
|
|
|
|
self.search_anchor = search_anchor
|
|
self.found = False
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
for key, value in attrs:
|
|
if key in ('id', 'name') and value == self.search_anchor:
|
|
self.found = True
|
|
break
|
|
|
|
|
|
def check_anchor(response, anchor):
|
|
# type: (Response, unicode) -> bool
|
|
"""Reads HTML data from a response object `response` searching for `anchor`.
|
|
Returns True if anchor was found, False otherwise.
|
|
"""
|
|
parser = AnchorCheckParser(anchor)
|
|
try:
|
|
# Read file in chunks. If we find a matching anchor, we break
|
|
# the loop early in hopes not to have to download the whole thing.
|
|
for chunk in response.iter_content(chunk_size=4096, decode_unicode=True):
|
|
parser.feed(chunk)
|
|
if parser.found:
|
|
break
|
|
parser.close()
|
|
except HTMLParseError:
|
|
# HTMLParser is usually pretty good with sloppy HTML, but it tends to
|
|
# choke on EOF. But we're done then anyway.
|
|
pass
|
|
return parser.found
|
|
|
|
|
|
class CheckExternalLinksBuilder(Builder):
|
|
"""
|
|
Checks for broken external links.
|
|
"""
|
|
name = 'linkcheck'
|
|
|
|
def init(self):
|
|
# type: () -> None
|
|
self.to_ignore = [re.compile(x) for x in self.app.config.linkcheck_ignore]
|
|
self.anchors_ignore = [re.compile(x)
|
|
for x in self.app.config.linkcheck_anchors_ignore]
|
|
self.good = set() # type: Set[unicode]
|
|
self.broken = {} # type: Dict[unicode, unicode]
|
|
self.redirected = {} # type: Dict[unicode, Tuple[unicode, int]]
|
|
# set a timeout for non-responding servers
|
|
socket.setdefaulttimeout(5.0)
|
|
# create output file
|
|
open(path.join(self.outdir, 'output.txt'), 'w').close()
|
|
|
|
# create queues and worker threads
|
|
self.wqueue = queue.Queue() # type: queue.Queue
|
|
self.rqueue = queue.Queue() # type: queue.Queue
|
|
self.workers = [] # type: List[threading.Thread]
|
|
for i in range(self.app.config.linkcheck_workers):
|
|
thread = threading.Thread(target=self.check_thread)
|
|
thread.setDaemon(True)
|
|
thread.start()
|
|
self.workers.append(thread)
|
|
|
|
def check_thread(self):
|
|
# type: () -> None
|
|
kwargs = {}
|
|
if self.app.config.linkcheck_timeout:
|
|
kwargs['timeout'] = self.app.config.linkcheck_timeout
|
|
|
|
kwargs['allow_redirects'] = True
|
|
|
|
def check_uri():
|
|
# type: () -> Tuple[unicode, unicode, int]
|
|
# split off anchor
|
|
if '#' in uri:
|
|
req_url, anchor = uri.split('#', 1)
|
|
for rex in self.anchors_ignore:
|
|
if rex.match(anchor):
|
|
anchor = None
|
|
break
|
|
else:
|
|
req_url = uri
|
|
anchor = None
|
|
|
|
# handle non-ASCII URIs
|
|
try:
|
|
req_url.encode('ascii')
|
|
except UnicodeError:
|
|
req_url = encode_uri(req_url)
|
|
|
|
try:
|
|
if anchor and self.app.config.linkcheck_anchors:
|
|
# Read the whole document and see if #anchor exists
|
|
response = requests.get(req_url, stream=True, config=self.app.config,
|
|
**kwargs)
|
|
found = check_anchor(response, unquote(anchor))
|
|
|
|
if not found:
|
|
raise Exception("Anchor '%s' not found" % anchor)
|
|
else:
|
|
try:
|
|
# try a HEAD request first, which should be easier on
|
|
# the server and the network
|
|
response = requests.head(req_url, config=self.app.config, **kwargs)
|
|
response.raise_for_status()
|
|
except HTTPError as err:
|
|
# retry with GET request if that fails, some servers
|
|
# don't like HEAD requests.
|
|
response = requests.get(req_url, stream=True, config=self.app.config,
|
|
**kwargs)
|
|
response.raise_for_status()
|
|
except HTTPError as err:
|
|
if err.response.status_code == 401:
|
|
# We'll take "Unauthorized" as working.
|
|
return 'working', ' - unauthorized', 0
|
|
else:
|
|
return 'broken', str(err), 0
|
|
except Exception as err:
|
|
if is_ssl_error(err):
|
|
return 'ignored', str(err), 0
|
|
else:
|
|
return 'broken', str(err), 0
|
|
if response.url.rstrip('/') == req_url.rstrip('/'):
|
|
return 'working', '', 0
|
|
else:
|
|
new_url = response.url
|
|
if anchor:
|
|
new_url += '#' + anchor
|
|
# history contains any redirects, get last
|
|
if response.history:
|
|
code = response.history[-1].status_code
|
|
return 'redirected', new_url, code
|
|
else:
|
|
return 'redirected', new_url, 0
|
|
|
|
def check():
|
|
# type: () -> Tuple[unicode, unicode, int]
|
|
# check for various conditions without bothering the network
|
|
if len(uri) == 0 or uri.startswith(('#', 'mailto:', 'ftp:')):
|
|
return 'unchecked', '', 0
|
|
elif not uri.startswith(('http:', 'https:')):
|
|
return 'local', '', 0
|
|
elif uri in self.good:
|
|
return 'working', 'old', 0
|
|
elif uri in self.broken:
|
|
return 'broken', self.broken[uri], 0
|
|
elif uri in self.redirected:
|
|
return 'redirected', self.redirected[uri][0], self.redirected[uri][1]
|
|
for rex in self.to_ignore:
|
|
if rex.match(uri):
|
|
return 'ignored', '', 0
|
|
|
|
# need to actually check the URI
|
|
for _ in range(self.app.config.linkcheck_retries):
|
|
status, info, code = check_uri()
|
|
if status != "broken":
|
|
break
|
|
|
|
if status == "working":
|
|
self.good.add(uri)
|
|
elif status == "broken":
|
|
self.broken[uri] = info
|
|
elif status == "redirected":
|
|
self.redirected[uri] = (info, code)
|
|
|
|
return (status, info, code)
|
|
|
|
while True:
|
|
uri, docname, lineno = self.wqueue.get()
|
|
if uri is None:
|
|
break
|
|
status, info, code = check()
|
|
self.rqueue.put((uri, docname, lineno, status, info, code))
|
|
|
|
def process_result(self, result):
|
|
# type: (Tuple[unicode, unicode, int, unicode, unicode, int]) -> None
|
|
uri, docname, lineno, status, info, code = result
|
|
if status == 'unchecked':
|
|
return
|
|
if status == 'working' and info == 'old':
|
|
return
|
|
if lineno:
|
|
logger.info('(line %4d) ', lineno, nonl=1)
|
|
if status == 'ignored':
|
|
if info:
|
|
logger.info(darkgray('-ignored- ') + uri + ': ' + info)
|
|
else:
|
|
logger.info(darkgray('-ignored- ') + uri)
|
|
elif status == 'local':
|
|
logger.info(darkgray('-local- ') + uri)
|
|
self.write_entry('local', docname, lineno, uri)
|
|
elif status == 'working':
|
|
logger.info(darkgreen('ok ') + uri + info)
|
|
elif status == 'broken':
|
|
self.write_entry('broken', docname, lineno, uri + ': ' + info)
|
|
if self.app.quiet or self.app.warningiserror:
|
|
logger.warning('broken link: %s (%s)', uri, info,
|
|
location=(self.env.doc2path(docname), lineno))
|
|
else:
|
|
logger.info(red('broken ') + uri + red(' - ' + info))
|
|
elif status == 'redirected':
|
|
text, color = {
|
|
301: ('permanently', darkred),
|
|
302: ('with Found', purple),
|
|
303: ('with See Other', purple),
|
|
307: ('temporarily', turquoise),
|
|
0: ('with unknown code', purple),
|
|
}[code]
|
|
self.write_entry('redirected ' + text, docname, lineno,
|
|
uri + ' to ' + info)
|
|
logger.info(color('redirect ') + uri + color(' - ' + text + ' to ' + info))
|
|
|
|
def get_target_uri(self, docname, typ=None):
|
|
# type: (unicode, unicode) -> unicode
|
|
return ''
|
|
|
|
def get_outdated_docs(self):
|
|
# type: () -> Set[unicode]
|
|
return self.env.found_docs
|
|
|
|
def prepare_writing(self, docnames):
|
|
# type: (nodes.Node) -> None
|
|
return
|
|
|
|
def write_doc(self, docname, doctree):
|
|
# type: (unicode, nodes.Node) -> None
|
|
logger.info('')
|
|
n = 0
|
|
for node in doctree.traverse(nodes.reference):
|
|
if 'refuri' not in node:
|
|
continue
|
|
uri = node['refuri']
|
|
lineno = None
|
|
while lineno is None:
|
|
node = node.parent
|
|
if node is None:
|
|
break
|
|
lineno = node.line
|
|
self.wqueue.put((uri, docname, lineno), False)
|
|
n += 1
|
|
done = 0
|
|
while done < n:
|
|
self.process_result(self.rqueue.get())
|
|
done += 1
|
|
|
|
if self.broken:
|
|
self.app.statuscode = 1
|
|
|
|
def write_entry(self, what, docname, line, uri):
|
|
# type: (unicode, unicode, int, unicode) -> None
|
|
with codecs.open(path.join(self.outdir, 'output.txt'), 'a', 'utf-8') as output: # type: ignore # NOQA
|
|
output.write("%s:%s: [%s] %s\n" % (self.env.doc2path(docname, None),
|
|
line, what, uri))
|
|
|
|
def finish(self):
|
|
# type: () -> None
|
|
for worker in self.workers:
|
|
self.wqueue.put((None, None, None), False)
|
|
|
|
|
|
def setup(app):
|
|
# type: (Sphinx) -> Dict[unicode, Any]
|
|
app.add_builder(CheckExternalLinksBuilder)
|
|
|
|
app.add_config_value('linkcheck_ignore', [], None)
|
|
app.add_config_value('linkcheck_retries', 1, None)
|
|
app.add_config_value('linkcheck_timeout', None, None, [int])
|
|
app.add_config_value('linkcheck_workers', 5, None)
|
|
app.add_config_value('linkcheck_anchors', True, None)
|
|
# Anchors starting with ! are ignored since they are
|
|
# commonly used for dynamic pages
|
|
app.add_config_value('linkcheck_anchors_ignore', ["^!"], None)
|
|
|
|
return {
|
|
'version': 'builtin',
|
|
'parallel_read_safe': True,
|
|
'parallel_write_safe': True,
|
|
}
|