Begin using session-based HTTP requests in the linkcheck builder (#11503)

Co-authored-by: Adam Turner <9087854+aa-turner@users.noreply.github.com>
This commit is contained in:
James Addison 2023-07-23 22:23:08 +01:00 committed by GitHub
parent 1cb52d5664
commit 450ad637ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 52 deletions

View File

@ -279,12 +279,16 @@ class HyperlinkAvailabilityCheckWorker(Thread):
self.tls_verify = config.tls_verify
self.tls_cacerts = config.tls_cacerts
self._session = requests._Session()
super().__init__(daemon=True)
def run(self) -> None:
while True:
next_check, hyperlink = self.wqueue.get()
if hyperlink is None:
# An empty hyperlink is a signal to shutdown the worker; cleanup resources here
self._session.close()
break
uri, docname, _docpath, lineno = hyperlink
@ -346,6 +350,13 @@ class HyperlinkAvailabilityCheckWorker(Thread):
return status, info, code
def _retrieval_methods(self,
check_anchors: bool,
anchor: str) -> Iterator[tuple[Callable, dict]]:
if not check_anchors or not anchor:
yield self._session.head, {'allow_redirects': True}
yield self._session.get, {'stream': True}
def _check_uri(self, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:
req_url, delimiter, anchor = uri.partition('#')
for rex in self.anchors_ignore if delimiter and anchor else []:
@ -377,7 +388,7 @@ class HyperlinkAvailabilityCheckWorker(Thread):
error_message = ''
status_code = -1
response_url = retry_after = ''
for retrieval_method, kwargs in _retrieval_methods(self.check_anchors, anchor):
for retrieval_method, kwargs in self._retrieval_methods(self.check_anchors, anchor):
try:
with retrieval_method(
url=req_url, auth=auth_info,
@ -508,12 +519,6 @@ def _get_request_headers(
return {}
def _retrieval_methods(check_anchors: bool, anchor: str) -> Iterator[tuple[Callable, dict]]:
if not check_anchors or not anchor:
yield requests.head, {'allow_redirects': True}
yield requests.get, {'stream': True}
def contains_anchor(response: Response, anchor: str) -> bool:
"""Determine if an anchor is contained within an HTTP response."""

View File

@ -3,8 +3,7 @@
from __future__ import annotations
import warnings
from contextlib import contextmanager
from typing import Any, Iterator
from typing import Any
from urllib.parse import urlsplit
import requests
@ -16,15 +15,6 @@ _USER_AGENT = (f'Mozilla/5.0 (X11; Linux x86_64; rv:100.0) Gecko/20100101 Firefo
f'Sphinx/{sphinx.__version__}')
@contextmanager
def ignore_insecure_warning(verify: bool) -> Iterator[None]:
with warnings.catch_warnings():
if not verify:
# ignore InsecureRequestWarning if verify=False
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
yield
def _get_tls_cacert(url: str, certs: str | dict[str, str] | None) -> str | bool:
"""Get additional CA cert for a specific URL."""
if not certs:
@ -39,41 +29,45 @@ def _get_tls_cacert(url: str, certs: str | dict[str, str] | None) -> str | bool:
return certs.get(hostname, True)
def get(url: str,
def get(url: str, **kwargs: Any) -> requests.Response:
"""Sends a GET request like requests.get().
This sets up User-Agent header and TLS verification automatically."""
with _Session() as session:
return session.get(url, **kwargs)
def head(url: str, **kwargs: Any) -> requests.Response:
"""Sends a HEAD request like requests.head().
This sets up User-Agent header and TLS verification automatically."""
with _Session() as session:
return session.head(url, **kwargs)
class _Session(requests.Session):
def request( # type: ignore[override]
self, method: str, url: str,
_user_agent: str = '',
_tls_info: tuple[bool, str | dict[str, str] | None] = (), # type: ignore[assignment]
**kwargs: Any) -> requests.Response:
"""Sends a HEAD request like requests.head().
**kwargs: Any,
) -> requests.Response:
"""Sends a request with an HTTP verb and url.
This sets up User-Agent header and TLS verification automatically."""
headers = kwargs.setdefault('headers', {})
headers.setdefault('User-Agent', _user_agent or _USER_AGENT)
if _tls_info:
tls_verify, tls_cacerts = _tls_info
verify = bool(kwargs.get('verify', tls_verify))
kwargs.setdefault('verify', verify and _get_tls_cacert(url, tls_cacerts))
else:
verify = kwargs.get('verify', True)
This sets up User-Agent header and TLS verification automatically."""
headers = kwargs.setdefault('headers', {})
headers.setdefault('User-Agent', _user_agent or _USER_AGENT)
if _tls_info:
tls_verify, tls_cacerts = _tls_info
verify = bool(kwargs.get('verify', tls_verify))
kwargs.setdefault('verify', verify and _get_tls_cacert(url, tls_cacerts))
else:
verify = kwargs.get('verify', True)
with ignore_insecure_warning(verify):
return requests.get(url, **kwargs)
if verify:
return super().request(method, url, **kwargs)
def head(url: str,
_user_agent: str = '',
_tls_info: tuple[bool, str | dict[str, str] | None] = (), # type: ignore[assignment]
**kwargs: Any) -> requests.Response:
"""Sends a HEAD request like requests.head().
This sets up User-Agent header and TLS verification automatically."""
headers = kwargs.setdefault('headers', {})
headers.setdefault('User-Agent', _user_agent or _USER_AGENT)
if _tls_info:
tls_verify, tls_cacerts = _tls_info
verify = bool(kwargs.get('verify', tls_verify))
kwargs.setdefault('verify', verify and _get_tls_cacert(url, tls_cacerts))
else:
verify = kwargs.get('verify', True)
with ignore_insecure_warning(verify):
return requests.head(url, **kwargs)
with warnings.catch_warnings():
# ignore InsecureRequestWarning if verify=False
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
return super().request(method, url, **kwargs)

View File

@ -104,7 +104,7 @@ def test_defaults(app):
with http_server(DefaultsHandler):
with ConnectionMeasurement() as m:
app.build()
assert m.connection_count <= 10
assert m.connection_count <= 5
# Text output
assert (app.outdir / 'output.txt').exists()