from __future__ import annotations __all__ = ("http_server",) from contextlib import contextmanager from http.server import ThreadingHTTPServer from pathlib import Path from ssl import PROTOCOL_TLS_SERVER, SSLContext from threading import Thread from typing import TYPE_CHECKING import filelock if TYPE_CHECKING: from collections.abc import Iterator from socketserver import BaseRequestHandler from typing import Any, Final # Generated with: # $ openssl req -new -x509 -days 3650 -nodes -out cert.pem \ # -keyout cert.pem -addext "subjectAltName = DNS:localhost" TESTS_ROOT: Final[Path] = Path(__file__).parent CERT_FILE: Final[str] = str(TESTS_ROOT / "certs" / "cert.pem") # File lock for tests LOCK_PATH: Final[str] = str(TESTS_ROOT / 'test-server.lock') class HttpServerThread(Thread): def __init__(self, handler: type[BaseRequestHandler], /, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.server = ThreadingHTTPServer(("localhost", 7777), handler) def run(self) -> None: self.server.serve_forever(poll_interval=0.001) def terminate(self) -> None: self.server.shutdown() self.server.server_close() self.join() class HttpsServerThread(HttpServerThread): def __init__( self, handler: type[BaseRequestHandler], /, *args: Any, **kwargs: Any, ) -> None: super().__init__(handler, *args, **kwargs) sslcontext = SSLContext(PROTOCOL_TLS_SERVER) sslcontext.load_cert_chain(CERT_FILE) self.server.socket = sslcontext.wrap_socket(self.server.socket, server_side=True) @contextmanager def http_server(handler: type[BaseRequestHandler], *, tls_enabled: bool = False) -> Iterator[HttpServerThread]: server_cls = HttpsServerThread if tls_enabled else HttpServerThread with filelock.FileLock(LOCK_PATH): server = server_cls(handler, daemon=True) server.start() try: yield server finally: server.terminate()