Re-structure the `linkcheck` builder (#11499)

Re-organise and re-structure the ``linkcheck`` builder:

- All functions defined within functions are factored out into top-level functions or class methods
- Classes and methods have been re-arranged (Builder, PostTransform, Checker, Worker)
- TLS verification on ``sphinx.util.requests`` has been changed to not pass the ``Config`` object all the way down
- The ``Hyperlink`` object now stores the document path
- ``BuildEnvironment`` and ``Config`` objects are used to extract properties and are not stored as class attributes
This commit is contained in:
Adam Turner 2023-07-23 00:01:41 +01:00 committed by GitHub
parent 566e4e74a0
commit d71c781187
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 388 additions and 394 deletions

View File

@ -6,63 +6,37 @@ import json
import re
import socket
import time
from copy import deepcopy
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from html.parser import HTMLParser
from os import path
from queue import PriorityQueue, Queue
from threading import Thread
from typing import Any, Callable, Generator, Iterator, NamedTuple, Tuple, Union, cast
from typing import TYPE_CHECKING, NamedTuple, cast
from urllib.parse import unquote, urlparse, urlsplit, urlunparse
from docutils import nodes
from requests import Response
from requests.exceptions import ConnectionError, HTTPError, SSLError, TooManyRedirects
from sphinx.application import Sphinx
from sphinx.builders.dummy import DummyBuilder
from sphinx.config import Config
from sphinx.environment import BuildEnvironment
from sphinx.locale import __
from sphinx.transforms.post_transforms import SphinxPostTransform
from sphinx.util import encode_uri, logging, requests
from sphinx.util.console import darkgray, darkgreen, purple, red, turquoise # type: ignore
from sphinx.util.nodes import get_node_line
if TYPE_CHECKING:
from typing import Any, Callable, Generator, Iterator
from requests import Response
from sphinx.application import Sphinx
from sphinx.config import Config
logger = logging.getLogger(__name__)
uri_re = re.compile('([a-z]+:)?//') # matches to foo:// and // (a protocol relative URL)
class Hyperlink(NamedTuple):
uri: str
docname: str
lineno: int | None
class CheckRequest(NamedTuple):
next_check: float
hyperlink: Hyperlink | None
class CheckResult(NamedTuple):
uri: str
docname: str
lineno: int
status: str
message: str
code: int
class RateLimit(NamedTuple):
delay: float
next_check: float
# Tuple is old styled CheckRequest
CheckRequestType = Union[CheckRequest, Tuple[float, str, str, int]]
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml;q=0.9,*/*;q=0.8',
}
@ -71,39 +45,6 @@ QUEUE_POLL_SECS = 1
DEFAULT_DELAY = 60.0
class AnchorCheckParser(HTMLParser):
"""Specialised HTML parser that looks for a specific anchor."""
def __init__(self, search_anchor: str) -> None:
super().__init__()
self.search_anchor = search_anchor
self.found = False
def handle_starttag(self, tag: Any, attrs: Any) -> None:
for key, value in attrs:
if key in ('id', 'name') and value == self.search_anchor:
self.found = True
break
def contains_anchor(response: Response, anchor: str) -> bool:
"""Determine if an anchor is contained within an HTTP response."""
parser = AnchorCheckParser(unquote(anchor))
# Read file in chunks. If we find a matching anchor, we break
# the loop early in hopes not to have to download the whole thing.
for chunk in response.iter_content(chunk_size=4096, decode_unicode=True):
if isinstance(chunk, bytes): # requests failed to decode
chunk = chunk.decode() # manually try to decode it
parser.feed(chunk)
if parser.found:
break
parser.close()
return parser.found
class CheckExternalLinksBuilder(DummyBuilder):
"""
Checks for broken external links.
@ -118,12 +59,26 @@ class CheckExternalLinksBuilder(DummyBuilder):
# set a timeout for non-responding servers
socket.setdefaulttimeout(5.0)
def finish(self) -> None:
checker = HyperlinkAvailabilityChecker(self.config)
logger.info('')
output_text = path.join(self.outdir, 'output.txt')
output_json = path.join(self.outdir, 'output.json')
with open(output_text, 'w', encoding='utf-8') as self.txt_outfile,\
open(output_json, 'w', encoding='utf-8') as self.json_outfile:
for result in checker.check(self.hyperlinks):
self.process_result(result)
if self.broken_hyperlinks:
self.app.statuscode = 1
def process_result(self, result: CheckResult) -> None:
filename = self.env.doc2path(result.docname, False)
linkstat = {"filename": filename, "lineno": result.lineno,
"status": result.status, "code": result.code, "uri": result.uri,
"info": result.message}
linkstat = {'filename': filename, 'lineno': result.lineno,
'status': result.status, 'code': result.code, 'uri': result.uri,
'info': result.message}
self.write_linkstat(linkstat)
if result.status == 'unchecked':
@ -172,54 +127,77 @@ class CheckExternalLinksBuilder(DummyBuilder):
self.write_entry('redirected ' + text, result.docname, filename,
result.lineno, result.uri + ' to ' + result.message)
else:
raise ValueError("Unknown status %s." % result.status)
def write_entry(self, what: str, docname: str, filename: str, line: int,
uri: str) -> None:
self.txt_outfile.write(f"{filename}:{line}: [{what}] {uri}\n")
raise ValueError('Unknown status %s.' % result.status)
def write_linkstat(self, data: dict) -> None:
self.json_outfile.write(json.dumps(data))
self.json_outfile.write('\n')
def finish(self) -> None:
checker = HyperlinkAvailabilityChecker(self.env, self.config)
logger.info('')
def write_entry(self, what: str, docname: str, filename: str, line: int,
uri: str) -> None:
self.txt_outfile.write(f'{filename}:{line}: [{what}] {uri}\n')
output_text = path.join(self.outdir, 'output.txt')
output_json = path.join(self.outdir, 'output.json')
with open(output_text, 'w', encoding="utf-8") as self.txt_outfile,\
open(output_json, 'w', encoding="utf-8") as self.json_outfile:
for result in checker.check(self.hyperlinks):
self.process_result(result)
if self.broken_hyperlinks:
self.app.statuscode = 1
class HyperlinkCollector(SphinxPostTransform):
builders = ('linkcheck',)
default_priority = 800
def run(self, **kwargs: Any) -> None:
builder = cast(CheckExternalLinksBuilder, self.app.builder)
hyperlinks = builder.hyperlinks
docname = self.env.docname
# reference nodes
for refnode in self.document.findall(nodes.reference):
if 'refuri' in refnode:
uri = refnode['refuri']
_add_uri(self.app, uri, refnode, hyperlinks, docname)
# image nodes
for imgnode in self.document.findall(nodes.image):
uri = imgnode['candidates'].get('?')
if uri and '://' in uri:
_add_uri(self.app, uri, imgnode, hyperlinks, docname)
# raw nodes
for rawnode in self.document.findall(nodes.raw):
uri = rawnode.get('source')
if uri and '://' in uri:
_add_uri(self.app, uri, rawnode, hyperlinks, docname)
def _add_uri(app: Sphinx, uri: str, node: nodes.Element,
hyperlinks: dict[str, Hyperlink], docname: str) -> None:
if newuri := app.emit_firstresult('linkcheck-process-uri', uri):
uri = newuri
try:
lineno = get_node_line(node)
except ValueError:
lineno = None
if uri not in hyperlinks:
hyperlinks[uri] = Hyperlink(uri, docname, app.env.doc2path(docname), lineno)
class Hyperlink(NamedTuple):
uri: str
docname: str
docpath: str
lineno: int | None
class HyperlinkAvailabilityChecker:
def __init__(self, env: BuildEnvironment, config: Config) -> None:
def __init__(self, config: Config) -> None:
self.config = config
self.env = env
self.rate_limits: dict[str, RateLimit] = {}
self.rqueue: Queue[CheckResult] = Queue()
self.workers: list[Thread] = []
self.wqueue: PriorityQueue[CheckRequest] = PriorityQueue()
self.num_workers: int = config.linkcheck_workers
self.to_ignore = [re.compile(x) for x in self.config.linkcheck_ignore]
def invoke_threads(self) -> None:
for _i in range(self.config.linkcheck_workers):
thread = HyperlinkAvailabilityCheckWorker(self.env, self.config,
self.rqueue, self.wqueue,
self.rate_limits)
thread.start()
self.workers.append(thread)
def shutdown_threads(self) -> None:
self.wqueue.join()
for _worker in self.workers:
self.wqueue.put(CheckRequest(CHECK_IMMEDIATELY, None), False)
self.to_ignore: list[re.Pattern[str]] = list(map(re.compile,
self.config.linkcheck_ignore))
def check(self, hyperlinks: dict[str, Hyperlink]) -> Generator[CheckResult, None, None]:
self.invoke_threads()
@ -240,204 +218,79 @@ class HyperlinkAvailabilityChecker:
self.shutdown_threads()
def invoke_threads(self) -> None:
for _i in range(self.num_workers):
thread = HyperlinkAvailabilityCheckWorker(self.config,
self.rqueue, self.wqueue,
self.rate_limits)
thread.start()
self.workers.append(thread)
def shutdown_threads(self) -> None:
self.wqueue.join()
for _worker in self.workers:
self.wqueue.put(CheckRequest(CHECK_IMMEDIATELY, None), False)
def is_ignored_uri(self, uri: str) -> bool:
return any(pat.match(uri) for pat in self.to_ignore)
class CheckRequest(NamedTuple):
next_check: float
hyperlink: Hyperlink | None
class CheckResult(NamedTuple):
uri: str
docname: str
lineno: int
status: str
message: str
code: int
class HyperlinkAvailabilityCheckWorker(Thread):
"""A worker class for checking the availability of hyperlinks."""
def __init__(self, env: BuildEnvironment, config: Config, rqueue: Queue[CheckResult],
wqueue: Queue[CheckRequest], rate_limits: dict[str, RateLimit]) -> None:
self.config = config
self.env = env
def __init__(self, config: Config,
rqueue: Queue[CheckResult],
wqueue: Queue[CheckRequest],
rate_limits: dict[str, RateLimit]) -> None:
self.rate_limits = rate_limits
self.rqueue = rqueue
self.wqueue = wqueue
self.anchors_ignore = [re.compile(x)
for x in self.config.linkcheck_anchors_ignore]
self.documents_exclude = [re.compile(doc)
for doc in self.config.linkcheck_exclude_documents]
self.anchors_ignore: list[re.Pattern[str]] = list(
map(re.compile, config.linkcheck_anchors_ignore))
self.documents_exclude: list[re.Pattern[str]] = list(
map(re.compile, config.linkcheck_exclude_documents))
self.auth = [(re.compile(pattern), auth_info) for pattern, auth_info
in self.config.linkcheck_auth]
in config.linkcheck_auth]
self.timeout: int | float | None = config.linkcheck_timeout
self.request_headers: dict[str, dict[str, str]] = config.linkcheck_request_headers
self.check_anchors: bool = config.linkcheck_anchors
self.allowed_redirects: dict[re.Pattern[str], re.Pattern[str]]
self.allowed_redirects = config.linkcheck_allowed_redirects
self.retries: int = config.linkcheck_retries
self.rate_limit_timeout = config.linkcheck_rate_limit_timeout
self.user_agent = config.user_agent
self.tls_verify = config.tls_verify
self.tls_cacerts = config.tls_cacerts
super().__init__(daemon=True)
def run(self) -> None:
kwargs = {}
if self.config.linkcheck_timeout:
kwargs['timeout'] = self.config.linkcheck_timeout
def get_request_headers() -> dict[str, str]:
url = urlsplit(uri)
candidates = [f"{url.scheme}://{url.netloc}",
f"{url.scheme}://{url.netloc}/",
uri,
"*"]
for u in candidates:
if u in self.config.linkcheck_request_headers:
headers = deepcopy(DEFAULT_REQUEST_HEADERS)
headers.update(self.config.linkcheck_request_headers[u])
return headers
return {}
def check_uri() -> tuple[str, str, int]:
req_url, delimiter, anchor = uri.partition('#')
for rex in self.anchors_ignore if delimiter and anchor else []:
if rex.match(anchor):
anchor = ''
break
# handle non-ASCII URIs
try:
req_url.encode('ascii')
except UnicodeError:
req_url = encode_uri(req_url)
# Get auth info, if any
for pattern, auth_info in self.auth: # noqa: B007 (false positive)
if pattern.match(uri):
break
else:
auth_info = None
# update request headers for the URL
kwargs['headers'] = get_request_headers()
# Linkcheck HTTP request logic:
#
# - Attempt HTTP HEAD before HTTP GET unless page content is required.
# - Follow server-issued HTTP redirects.
# - Respect server-issued HTTP 429 back-offs.
error_message = None
status_code = -1
response_url = retry_after = ''
for retrieval_method, retrieval_kwargs in _retrieval_methods(
self.config.linkcheck_anchors, anchor,
):
try:
with retrieval_method(url=req_url, auth=auth_info, config=self.config,
**retrieval_kwargs, **kwargs) as response:
if response.ok and anchor and not contains_anchor(response, anchor):
raise Exception(__(f'Anchor {anchor!r} not found'))
# Copy data we need from the (closed) response
status_code = response.status_code
redirect_status_code = response.history[-1].status_code if response.history else None # NoQA: E501
retry_after = response.headers.get('Retry-After')
response_url = f'{response.url}'
response.raise_for_status()
del response
break
except SSLError as err:
# SSL failure; report that the link is broken.
return 'broken', str(err), 0
except (ConnectionError, TooManyRedirects) as err:
# Servers drop the connection on HEAD requests, causing
# ConnectionError.
error_message = str(err)
continue
except HTTPError as err:
error_message = str(err)
# Unauthorised: the reference probably exists
if status_code == 401:
return 'working', 'unauthorized', 0
# Rate limiting; back-off if allowed, or report failure otherwise
if status_code == 429:
if next_check := self.limit_rate(response_url, retry_after):
self.wqueue.put(CheckRequest(next_check, hyperlink), False)
return 'rate-limited', '', 0
return 'broken', error_message, 0
# Don't claim success/failure during server-side outages
if status_code == 503:
return 'ignored', 'service unavailable', 0
# For most HTTP failures, continue attempting alternate retrieval methods
continue
except Exception as err:
# Unhandled exception (intermittent or permanent); report that
# the link is broken.
return 'broken', str(err), 0
else:
# All available retrieval methods have been exhausted; report
# that the link is broken.
return 'broken', error_message, 0
# Success; clear rate limits for the origin
netloc = urlsplit(req_url).netloc
try:
del self.rate_limits[netloc]
except KeyError:
pass
if ((response_url.rstrip('/') == req_url.rstrip('/'))
or allowed_redirect(req_url, response_url)):
return 'working', '', 0
elif redirect_status_code is not None:
return 'redirected', response_url, redirect_status_code
else:
return 'redirected', response_url, 0
def allowed_redirect(url: str, new_url: str) -> bool:
return any(
from_url.match(url) and to_url.match(new_url)
for from_url, to_url
in self.config.linkcheck_allowed_redirects.items()
)
def check(docname: str) -> tuple[str, str, int]:
# check for various conditions without bothering the network
for doc_matcher in self.documents_exclude:
if doc_matcher.match(docname):
info = (
f'{docname} matched {doc_matcher.pattern} from '
'linkcheck_exclude_documents'
)
return 'ignored', info, 0
if len(uri) == 0 or uri.startswith(('#', 'mailto:', 'tel:')):
return 'unchecked', '', 0
elif not uri.startswith(('http:', 'https:')):
if uri_re.match(uri):
# non supported URI schemes (ex. ftp)
return 'unchecked', '', 0
else:
srcdir = path.dirname(self.env.doc2path(docname))
if path.exists(path.join(srcdir, uri)):
return 'working', '', 0
else:
return 'broken', '', 0
# need to actually check the URI
for _ in range(self.config.linkcheck_retries):
status, info, code = check_uri()
if status != "broken":
break
return (status, info, code)
while True:
check_request = self.wqueue.get()
next_check, hyperlink = check_request
next_check, hyperlink = self.wqueue.get()
if hyperlink is None:
break
uri, docname, lineno = hyperlink
uri, docname, _docpath, lineno = hyperlink
if uri is None:
break
netloc = urlsplit(uri).netloc
try:
# Refresh rate limit.
@ -454,14 +307,153 @@ class HyperlinkAvailabilityCheckWorker(Thread):
self.wqueue.put(CheckRequest(next_check, hyperlink), False)
self.wqueue.task_done()
continue
status, info, code = check(docname)
status, info, code = self._check(docname, uri, hyperlink)
if status == 'rate-limited':
logger.info(darkgray('-rate limited- ') + uri + darkgray(' | sleeping...'))
else:
self.rqueue.put(CheckResult(uri, docname, lineno, status, info, code))
self.wqueue.task_done()
def _check(self, docname: str, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:
# check for various conditions without bothering the network
for doc_matcher in self.documents_exclude:
if doc_matcher.match(docname):
info = (
f'{docname} matched {doc_matcher.pattern} from '
'linkcheck_exclude_documents'
)
return 'ignored', info, 0
if len(uri) == 0 or uri.startswith(('#', 'mailto:', 'tel:')):
return 'unchecked', '', 0
if not uri.startswith(('http:', 'https:')):
if uri_re.match(uri):
# Non-supported URI schemes (ex. ftp)
return 'unchecked', '', 0
src_dir = path.dirname(hyperlink.docpath)
if path.exists(path.join(src_dir, uri)):
return 'working', '', 0
return 'broken', '', 0
# need to actually check the URI
status, info, code = '', '', 0
for _ in range(self.retries):
status, info, code = self._check_uri(uri, hyperlink)
if status != 'broken':
break
return status, info, code
def _check_uri(self, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:
req_url, delimiter, anchor = uri.partition('#')
for rex in self.anchors_ignore if delimiter and anchor else []:
if rex.match(anchor):
anchor = ''
break
# handle non-ASCII URIs
try:
req_url.encode('ascii')
except UnicodeError:
req_url = encode_uri(req_url)
# Get auth info, if any
for pattern, auth_info in self.auth: # noqa: B007 (false positive)
if pattern.match(uri):
break
else:
auth_info = None
# update request headers for the URL
headers = _get_request_headers(uri, self.request_headers)
# Linkcheck HTTP request logic:
#
# - Attempt HTTP HEAD before HTTP GET unless page content is required.
# - Follow server-issued HTTP redirects.
# - Respect server-issued HTTP 429 back-offs.
error_message = None
status_code = -1
response_url = retry_after = ''
for retrieval_method, kwargs in _retrieval_methods(self.check_anchors, anchor):
try:
with retrieval_method(
url=req_url, auth=auth_info,
headers=headers,
timeout=self.timeout,
**kwargs,
_user_agent=self.user_agent,
_tls_info=(self.tls_verify, self.tls_cacerts),
) as response:
if response.ok and anchor and not contains_anchor(response, anchor):
raise Exception(__(f'Anchor {anchor!r} not found'))
# Copy data we need from the (closed) response
status_code = response.status_code
redirect_status_code = response.history[-1].status_code if response.history else None # NoQA: E501
retry_after = response.headers.get('Retry-After')
response_url = f'{response.url}'
response.raise_for_status()
del response
break
except SSLError as err:
# SSL failure; report that the link is broken.
return 'broken', str(err), 0
except (ConnectionError, TooManyRedirects) as err:
# Servers drop the connection on HEAD requests, causing
# ConnectionError.
error_message = str(err)
continue
except HTTPError as err:
error_message = str(err)
# Unauthorised: the reference probably exists
if status_code == 401:
return 'working', 'unauthorized', 0
# Rate limiting; back-off if allowed, or report failure otherwise
if status_code == 429:
if next_check := self.limit_rate(response_url, retry_after):
self.wqueue.put(CheckRequest(next_check, hyperlink), False)
return 'rate-limited', '', 0
return 'broken', error_message, 0
# Don't claim success/failure during server-side outages
if status_code == 503:
return 'ignored', 'service unavailable', 0
# For most HTTP failures, continue attempting alternate retrieval methods
continue
except Exception as err:
# Unhandled exception (intermittent or permanent); report that
# the link is broken.
return 'broken', str(err), 0
else:
# All available retrieval methods have been exhausted; report
# that the link is broken.
return 'broken', error_message, 0
# Success; clear rate limits for the origin
self.rate_limits.pop(urlsplit(req_url).netloc, None)
if ((response_url.rstrip('/') == req_url.rstrip('/'))
or _allowed_redirect(req_url, response_url,
self.allowed_redirects)):
return 'working', '', 0
elif redirect_status_code is not None:
return 'redirected', response_url, redirect_status_code
else:
return 'redirected', response_url, 0
def limit_rate(self, response_url: str, retry_after: str) -> float | None:
delay = DEFAULT_DELAY
next_check = None
if retry_after:
try:
@ -482,7 +474,7 @@ class HyperlinkAvailabilityCheckWorker(Thread):
next_check = time.time() + delay
netloc = urlsplit(response_url).netloc
if next_check is None:
max_delay = self.config.linkcheck_rate_limit_timeout
max_delay = self.rate_limit_timeout
try:
rate_limit = self.rate_limits[netloc]
except KeyError:
@ -490,7 +482,7 @@ class HyperlinkAvailabilityCheckWorker(Thread):
else:
last_wait_time = rate_limit.delay
delay = 2.0 * last_wait_time
if delay > max_delay and last_wait_time < max_delay:
if delay > max_delay > last_wait_time:
delay = max_delay
if delay > max_delay:
return None
@ -499,54 +491,74 @@ class HyperlinkAvailabilityCheckWorker(Thread):
return next_check
def _retrieval_methods(
linkcheck_anchors: bool,
anchor: str,
) -> Iterator[tuple[Callable, dict[str, bool]]]:
if not linkcheck_anchors or not anchor:
def _get_request_headers(
uri: str,
request_headers: dict[str, dict[str, str]],
) -> dict[str, str]:
url = urlsplit(uri)
candidates = (f'{url.scheme}://{url.netloc}',
f'{url.scheme}://{url.netloc}/',
uri,
'*')
for u in candidates:
if u in request_headers:
headers = {**DEFAULT_REQUEST_HEADERS, **request_headers[u]}
return headers
return {}
def _retrieval_methods(check_anchors: bool, anchor: str) -> Iterator[tuple[Callable, dict]]:
if not check_anchors or not anchor:
yield requests.head, {'allow_redirects': True}
yield requests.get, {'stream': True}
class HyperlinkCollector(SphinxPostTransform):
builders = ('linkcheck',)
default_priority = 800
def contains_anchor(response: Response, anchor: str) -> bool:
"""Determine if an anchor is contained within an HTTP response."""
def run(self, **kwargs: Any) -> None:
builder = cast(CheckExternalLinksBuilder, self.app.builder)
hyperlinks = builder.hyperlinks
parser = AnchorCheckParser(unquote(anchor))
# Read file in chunks. If we find a matching anchor, we break
# the loop early in hopes not to have to download the whole thing.
for chunk in response.iter_content(chunk_size=4096, decode_unicode=True):
if isinstance(chunk, bytes): # requests failed to decode
chunk = chunk.decode() # manually try to decode it
def add_uri(uri: str, node: nodes.Element) -> None:
newuri = self.app.emit_firstresult('linkcheck-process-uri', uri)
if newuri:
uri = newuri
parser.feed(chunk)
if parser.found:
break
parser.close()
return parser.found
try:
lineno = get_node_line(node)
except ValueError:
lineno = None
uri_info = Hyperlink(uri, self.env.docname, lineno)
if uri not in hyperlinks:
hyperlinks[uri] = uri_info
# reference nodes
for refnode in self.document.findall(nodes.reference):
if 'refuri' not in refnode:
continue
uri = refnode['refuri']
add_uri(uri, refnode)
class AnchorCheckParser(HTMLParser):
"""Specialised HTML parser that looks for a specific anchor."""
# image nodes
for imgnode in self.document.findall(nodes.image):
uri = imgnode['candidates'].get('?')
if uri and '://' in uri:
add_uri(uri, imgnode)
def __init__(self, search_anchor: str) -> None:
super().__init__()
# raw nodes
for rawnode in self.document.findall(nodes.raw):
uri = rawnode.get('source')
if uri and '://' in uri:
add_uri(uri, rawnode)
self.search_anchor = search_anchor
self.found = False
def handle_starttag(self, tag: Any, attrs: Any) -> None:
for key, value in attrs:
if key in ('id', 'name') and value == self.search_anchor:
self.found = True
break
def _allowed_redirect(url: str, new_url: str,
allowed_redirects: dict[re.Pattern[str], re.Pattern[str]]) -> bool:
return any(
from_url.match(url) and to_url.match(new_url)
for from_url, to_url
in allowed_redirects.items()
)
class RateLimit(NamedTuple):
delay: float
next_check: float
def rewrite_github_anchor(app: Sphinx, uri: str) -> str | None:
@ -556,7 +568,7 @@ def rewrite_github_anchor(app: Sphinx, uri: str) -> str | None:
them before checking and makes them comparable.
"""
parsed = urlparse(uri)
if parsed.hostname == "github.com" and parsed.fragment:
if parsed.hostname == 'github.com' and parsed.fragment:
prefixed = parsed.fragment.startswith('user-content-')
if not prefixed:
fragment = f'user-content-{parsed.fragment}'
@ -592,7 +604,7 @@ def setup(app: Sphinx) -> dict[str, Any]:
app.add_config_value('linkcheck_anchors', True, False)
# Anchors starting with ! are ignored since they are
# commonly used for dynamic pages
app.add_config_value('linkcheck_anchors_ignore', ["^!"], False)
app.add_config_value('linkcheck_anchors_ignore', ['^!'], False)
app.add_config_value('linkcheck_rate_limit_timeout', 300.0, False)
app.add_event('linkcheck-process-uri')

View File

@ -134,11 +134,13 @@ def _read_from_url(url: str, config: Config | None = None) -> IO:
:return: data read from resource described by *url*
:rtype: ``file``-like object
"""
r = requests.get(url, stream=True, config=config, timeout=config.intersphinx_timeout)
r = requests.get(url, stream=True, timeout=config.intersphinx_timeout,
_user_agent=config.user_agent,
_tls_info=(config.tls_verify, config.tls_cacerts))
r.raise_for_status()
r.raw.url = r.url
# decode content-body based on the header.
# ref: https://github.com/kennethreitz/requests/issues/2155
# ref: https://github.com/psf/requests/issues/2155
r.raw.read = functools.partial(r.raw.read, decode_content=True)
return r.raw
@ -694,6 +696,7 @@ def inspect_main(argv: list[str]) -> None:
class MockConfig:
intersphinx_timeout: int | None = None
tls_verify = False
tls_cacerts = None
user_agent = None
class MockApp:

View File

@ -2,91 +2,78 @@
from __future__ import annotations
import sys
import warnings
from contextlib import contextmanager
from typing import Any, Generator
from typing import Any, Iterator
from urllib.parse import urlsplit
import requests
from urllib3.exceptions import InsecureRequestWarning
import sphinx
from sphinx.config import Config
useragent_header = [('User-Agent',
'Mozilla/5.0 (X11; Linux x86_64; rv:25.0) Gecko/20100101 Firefox/25.0')]
_USER_AGENT = (f'Mozilla/5.0 (X11; Linux x86_64; rv:100.0) Gecko/20100101 Firefox/100.0 '
f'Sphinx/{sphinx.__version__}')
@contextmanager
def ignore_insecure_warning(**kwargs: Any) -> Generator[None, None, None]:
def ignore_insecure_warning(verify: bool) -> Iterator[None]:
with warnings.catch_warnings():
if not kwargs.get('verify'):
if not verify:
# ignore InsecureRequestWarning if verify=False
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
yield
def _get_tls_cacert(url: str, config: Config) -> str | bool:
"""Get additional CA cert for a specific URL.
This also returns ``False`` if verification is disabled.
And returns ``True`` if additional CA cert not found.
"""
if not config.tls_verify:
return False
certs = getattr(config, 'tls_cacerts', None)
def _get_tls_cacert(url: str, certs: str | dict[str, str] | None) -> str | bool:
"""Get additional CA cert for a specific URL."""
if not certs:
return True
elif isinstance(certs, (str, tuple)):
return certs # type: ignore
return certs
else:
hostname = urlsplit(url)[1]
hostname = urlsplit(url).netloc
if '@' in hostname:
hostname = hostname.split('@')[1]
_, hostname = hostname.split('@', 1)
return certs.get(hostname, True)
def _get_user_agent(config: Config) -> str:
if config.user_agent:
return config.user_agent
else:
return ' '.join([
'Sphinx/%s' % sphinx.__version__,
'requests/%s' % requests.__version__,
'python/%s' % '.'.join(map(str, sys.version_info[:3])),
])
def get(url: str, **kwargs: Any) -> requests.Response:
"""Sends a GET request like requests.get().
This sets up User-Agent header and TLS verification automatically."""
headers = kwargs.setdefault('headers', {})
config = kwargs.pop('config', None)
if config:
kwargs.setdefault('verify', _get_tls_cacert(url, config))
headers.setdefault('User-Agent', _get_user_agent(config))
else:
headers.setdefault('User-Agent', useragent_header[0][1])
with ignore_insecure_warning(**kwargs):
return requests.get(url, **kwargs)
def head(url: str, **kwargs: Any) -> requests.Response:
def get(url: str,
_user_agent: str = '',
_tls_info: tuple[bool, str | dict[str, str] | None] = (), # type: ignore[assignment]
**kwargs: Any) -> requests.Response:
"""Sends a HEAD request like requests.head().
This sets up User-Agent header and TLS verification automatically."""
headers = kwargs.setdefault('headers', {})
config = kwargs.pop('config', None)
if config:
kwargs.setdefault('verify', _get_tls_cacert(url, config))
headers.setdefault('User-Agent', _get_user_agent(config))
headers.setdefault('User-Agent', _user_agent or _USER_AGENT)
if _tls_info:
tls_verify, tls_cacerts = _tls_info
verify = bool(kwargs.get('verify', tls_verify))
kwargs.setdefault('verify', verify and _get_tls_cacert(url, tls_cacerts))
else:
headers.setdefault('User-Agent', useragent_header[0][1])
verify = kwargs.get('verify', True)
with ignore_insecure_warning(**kwargs):
with ignore_insecure_warning(verify):
return requests.get(url, **kwargs)
def head(url: str,
_user_agent: str = '',
_tls_info: tuple[bool, str | dict[str, str] | None] = (), # type: ignore[assignment]
**kwargs: Any) -> requests.Response:
"""Sends a HEAD request like requests.head().
This sets up User-Agent header and TLS verification automatically."""
headers = kwargs.setdefault('headers', {})
headers.setdefault('User-Agent', _user_agent or _USER_AGENT)
if _tls_info:
tls_verify, tls_cacerts = _tls_info
verify = bool(kwargs.get('verify', tls_verify))
kwargs.setdefault('verify', verify and _get_tls_cacert(url, tls_cacerts))
else:
verify = kwargs.get('verify', True)
with ignore_insecure_warning(verify):
return requests.head(url, **kwargs)

View File

@ -31,11 +31,7 @@ class DefaultsHandler(http.server.BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"
def do_HEAD(self):
if self.path[1:].rstrip() == "":
self.send_response(200, "OK")
self.send_header("Content-Length", "0")
self.end_headers()
elif self.path[1:].rstrip() == "anchor.html":
if self.path[1:].rstrip() in {"", "anchor.html"}:
self.send_response(200, "OK")
self.send_header("Content-Length", "0")
self.end_headers()
@ -230,9 +226,8 @@ def custom_handler(valid_credentials=(), success_criteria=lambda _: True):
def authenticated(method):
def method_if_authenticated(self):
if expected_token is None:
return method(self)
elif self.headers["Authorization"] == f"Basic {expected_token}":
if (expected_token is None
or self.headers["Authorization"] == f"Basic {expected_token}"):
return method(self)
else:
self.send_response(403, "Forbidden")
@ -731,7 +726,7 @@ class FakeResponse:
def test_limit_rate_default_sleep(app):
worker = HyperlinkAvailabilityCheckWorker(app.env, app.config, Queue(), Queue(), {})
worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), {})
with mock.patch('time.time', return_value=0.0):
next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After"))
assert next_check == 60.0
@ -739,15 +734,14 @@ def test_limit_rate_default_sleep(app):
def test_limit_rate_user_max_delay(app):
app.config.linkcheck_rate_limit_timeout = 0.0
worker = HyperlinkAvailabilityCheckWorker(app.env, app.config, Queue(), Queue(), {})
worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), {})
next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After"))
assert next_check is None
def test_limit_rate_doubles_previous_wait_time(app):
rate_limits = {"localhost": RateLimit(60.0, 0.0)}
worker = HyperlinkAvailabilityCheckWorker(app.env, app.config, Queue(), Queue(),
rate_limits)
worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), rate_limits)
with mock.patch('time.time', return_value=0.0):
next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After"))
assert next_check == 120.0
@ -756,8 +750,7 @@ def test_limit_rate_doubles_previous_wait_time(app):
def test_limit_rate_clips_wait_time_to_max_time(app):
app.config.linkcheck_rate_limit_timeout = 90.0
rate_limits = {"localhost": RateLimit(60.0, 0.0)}
worker = HyperlinkAvailabilityCheckWorker(app.env, app.config, Queue(), Queue(),
rate_limits)
worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), rate_limits)
with mock.patch('time.time', return_value=0.0):
next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After"))
assert next_check == 90.0
@ -766,8 +759,7 @@ def test_limit_rate_clips_wait_time_to_max_time(app):
def test_limit_rate_bails_out_after_waiting_max_time(app):
app.config.linkcheck_rate_limit_timeout = 90.0
rate_limits = {"localhost": RateLimit(90.0, 0.0)}
worker = HyperlinkAvailabilityCheckWorker(app.env, app.config, Queue(), Queue(),
rate_limits)
worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), rate_limits)
next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After"))
assert next_check is None