Switch to using requests for better charset detection

Python requests does a better job of detecting the charsets of
webpages, performing automatic decoding when the text content is
requested, avoiding issues around needing to do detection.

This allows checking the following urls & anchors correctly:

    http://www.yaml.org/spec/1.2/spec.html#id2761803
    http://www.yaml.org/spec/1.2/spec.html#id2765878
    http://www.yaml.org/spec/1.2/spec.html#id2765878
This commit is contained in:
Darragh Bailey 2016-03-23 15:34:40 +00:00
parent 4959a75c6f
commit 0b9ee8d451
3 changed files with 48 additions and 61 deletions

View File

@ -49,6 +49,7 @@ requires = [
'babel>=1.3,!=2.0',
'alabaster>=0.7,<0.8',
'imagesize',
'requests',
]
extras_require = {
# Environment Marker works for wheel 0.24 or later

View File

@ -14,11 +14,13 @@ import socket
import codecs
import threading
from os import path
import warnings
import pkg_resources
import requests
from requests.exceptions import HTTPError
from six.moves import queue
from six.moves.urllib.request import build_opener, Request, HTTPRedirectHandler
from six.moves.urllib.parse import unquote
from six.moves.urllib.error import HTTPError
from six.moves.html_parser import HTMLParser
from docutils import nodes
@ -36,28 +38,25 @@ from sphinx.builders import Builder
from sphinx.util import encode_uri
from sphinx.util.console import purple, red, darkgreen, darkgray, \
darkred, turquoise
from sphinx.util.pycompat import TextIOWrapper
try:
pkg_resources.require(['requests[security]'])
except pkg_resources.DistributionNotFound:
import ssl
if not getattr(ssl, 'HAS_SNI', False):
# don't complain on each url processed about the SSL issue
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecurePlatformWarning)
warnings.warn(
'Some links may return broken results due to being unable to '
'check the Server Name Indication (SNI) in the returned SSL cert '
'against the hostname in the url requested. Recommended to '
'install "requests[security]" as a dependency or upgrade to '
'a python version with SNI support (Python 3 and Python 2.7.9+).'
)
class RedirectHandler(HTTPRedirectHandler):
"""A RedirectHandler that records the redirect code we got."""
def redirect_request(self, req, fp, code, msg, headers, newurl):
new_req = HTTPRedirectHandler.redirect_request(self, req, fp, code,
msg, headers, newurl)
req.redirect_code = code
return new_req
# create an opener that will simulate a browser user-agent
opener = build_opener(RedirectHandler)
opener.addheaders = [('User-agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:25.0) '
'Gecko/20100101 Firefox/25.0')]
class HeadRequest(Request):
"""Subclass of urllib2.Request that sends a HEAD request."""
def get_method(self):
return 'HEAD'
requests_user_agent = [('User-agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:25.0) '
'Gecko/20100101 Firefox/25.0')]
class AnchorCheckParser(HTMLParser):
@ -75,18 +74,18 @@ class AnchorCheckParser(HTMLParser):
self.found = True
def check_anchor(f, anchor):
"""Reads HTML data from a filelike object 'f' searching for *anchor*.
def check_anchor(response, anchor):
"""Reads HTML data from a response object `response` searching for `anchor`.
Returns True if anchor was found, False otherwise.
"""
parser = AnchorCheckParser(anchor)
try:
# Read file in chunks of 8192 bytes. If we find a matching anchor, we
# break the loop early in hopes not to have to download the whole thing.
chunk = f.read(8192)
while chunk and not parser.found:
# Read file in chunks. If we find a matching anchor, we break
# the loop early in hopes not to have to download the whole thing.
for chunk in response.iter_content():
parser.feed(chunk)
chunk = f.read(8192)
if parser.found:
break
parser.close()
except HTMLParseError:
# HTMLParser is usually pretty good with sloppy HTML, but it tends to
@ -95,17 +94,6 @@ def check_anchor(f, anchor):
return parser.found
def get_content_charset(f):
content_type = f.headers.get('content-type')
if content_type:
params = (p.strip() for p in content_type.split(';')[1:])
for param in params:
if param.startswith('charset='):
return param[8:]
return None
class CheckExternalLinksBuilder(Builder):
"""
Checks for broken external links.
@ -122,6 +110,9 @@ class CheckExternalLinksBuilder(Builder):
# create output file
open(path.join(self.outdir, 'output.txt'), 'w').close()
self.session = requests.Session()
self.session.headers = dict(requests_user_agent)
# create queues and worker threads
self.wqueue = queue.Queue()
self.rqueue = queue.Queue()
@ -137,6 +128,8 @@ class CheckExternalLinksBuilder(Builder):
if self.app.config.linkcheck_timeout:
kwargs['timeout'] = self.app.config.linkcheck_timeout
kwargs['allow_redirects'] = True
def check_uri():
# split off anchor
if '#' in uri:
@ -157,16 +150,8 @@ class CheckExternalLinksBuilder(Builder):
# Read the whole document and see if #anchor exists
# (Anchors starting with ! are ignored since they are
# commonly used for dynamic pages)
req = Request(req_url)
f = opener.open(req, **kwargs)
encoding = 'utf-8'
if hasattr(f.headers, 'get_content_charset'):
encoding = f.headers.get_content_charset() or encoding
else:
encoding = get_content_charset(f) or encoding
found = check_anchor(TextIOWrapper(f, encoding),
unquote(anchor))
f.close()
response = requests.get(req_url, stream=True, **kwargs)
found = check_anchor(response, unquote(anchor))
if not found:
raise Exception("Anchor '%s' not found" % anchor)
@ -174,32 +159,32 @@ class CheckExternalLinksBuilder(Builder):
try:
# try a HEAD request, which should be easier on
# the server and the network
req = HeadRequest(req_url)
f = opener.open(req, **kwargs)
f.close()
response = requests.head(req_url, **kwargs)
response.raise_for_status()
except HTTPError as err:
if err.code != 405:
if err.response.status_code != 405:
raise
# retry with GET if that fails, some servers
# don't like HEAD requests and reply with 405
req = Request(req_url)
f = opener.open(req, **kwargs)
f.close()
response = requests.get(req_url, stream=True, **kwargs)
response.raise_for_status()
except HTTPError as err:
if err.code == 401:
if err.response.status_code == 401:
# We'll take "Unauthorized" as working.
return 'working', ' - unauthorized', 0
else:
return 'broken', str(err), 0
except Exception as err:
return 'broken', str(err), 0
if f.url.rstrip('/') == req_url.rstrip('/'):
if response.url.rstrip('/') == req_url.rstrip('/'):
return 'working', '', 0
else:
new_url = f.url
new_url = response.url
if anchor:
new_url += '#' + anchor
code = getattr(req, 'redirect_code', 0)
# history contains any redirects, get last
if response.history:
code = response.history[-1].status_code
return 'redirected', new_url, code
def check():

View File

@ -12,3 +12,4 @@ whoosh>=2.0
alabaster
sphinx_rtd_theme
imagesize
requests