File size: 12,879 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
from __future__ import annotations
import asyncio
import contextlib
import logging
import os
import platform
import signal
import socket
import sys
import threading
import time
from collections.abc import Generator, Sequence
from email.utils import formatdate
from types import FrameType
from typing import TYPE_CHECKING, Union
import click
from uvicorn.config import Config
if TYPE_CHECKING:
from uvicorn.protocols.http.h11_impl import H11Protocol
from uvicorn.protocols.http.httptools_impl import HttpToolsProtocol
from uvicorn.protocols.websockets.websockets_impl import WebSocketProtocol
from uvicorn.protocols.websockets.wsproto_impl import WSProtocol
Protocols = Union[H11Protocol, HttpToolsProtocol, WSProtocol, WebSocketProtocol]
HANDLED_SIGNALS = (
signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.
signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`.
)
if sys.platform == "win32": # pragma: py-not-win32
HANDLED_SIGNALS += (signal.SIGBREAK,) # Windows signal 21. Sent by Ctrl+Break.
logger = logging.getLogger("uvicorn.error")
class ServerState:
"""
Shared servers state that is available between all protocol instances.
"""
def __init__(self) -> None:
self.total_requests = 0
self.connections: set[Protocols] = set()
self.tasks: set[asyncio.Task[None]] = set()
self.default_headers: list[tuple[bytes, bytes]] = []
class Server:
def __init__(self, config: Config) -> None:
self.config = config
self.server_state = ServerState()
self.started = False
self.should_exit = False
self.force_exit = False
self.last_notified = 0.0
self._captured_signals: list[int] = []
def run(self, sockets: list[socket.socket] | None = None) -> None:
self.config.setup_event_loop()
return asyncio.run(self.serve(sockets=sockets))
async def serve(self, sockets: list[socket.socket] | None = None) -> None:
with self.capture_signals():
await self._serve(sockets)
async def _serve(self, sockets: list[socket.socket] | None = None) -> None:
process_id = os.getpid()
config = self.config
if not config.loaded:
config.load()
self.lifespan = config.lifespan_class(config)
message = "Started server process [%d]"
color_message = "Started server process [" + click.style("%d", fg="cyan") + "]"
logger.info(message, process_id, extra={"color_message": color_message})
await self.startup(sockets=sockets)
if self.should_exit:
return
await self.main_loop()
await self.shutdown(sockets=sockets)
message = "Finished server process [%d]"
color_message = "Finished server process [" + click.style("%d", fg="cyan") + "]"
logger.info(message, process_id, extra={"color_message": color_message})
async def startup(self, sockets: list[socket.socket] | None = None) -> None:
await self.lifespan.startup()
if self.lifespan.should_exit:
self.should_exit = True
return
config = self.config
def create_protocol(
_loop: asyncio.AbstractEventLoop | None = None,
) -> asyncio.Protocol:
return config.http_protocol_class( # type: ignore[call-arg]
config=config,
server_state=self.server_state,
app_state=self.lifespan.state,
_loop=_loop,
)
loop = asyncio.get_running_loop()
listeners: Sequence[socket.SocketType]
if sockets is not None: # pragma: full coverage
# Explicitly passed a list of open sockets.
# We use this when the server is run from a Gunicorn worker.
def _share_socket(
sock: socket.SocketType,
) -> socket.SocketType: # pragma py-not-win32
# Windows requires the socket be explicitly shared across
# multiple workers (processes).
from socket import fromshare # type: ignore[attr-defined]
sock_data = sock.share(os.getpid()) # type: ignore[attr-defined]
return fromshare(sock_data)
self.servers: list[asyncio.base_events.Server] = []
for sock in sockets:
is_windows = platform.system() == "Windows"
if config.workers > 1 and is_windows: # pragma: py-not-win32
sock = _share_socket(sock) # type: ignore[assignment]
server = await loop.create_server(create_protocol, sock=sock, ssl=config.ssl, backlog=config.backlog)
self.servers.append(server)
listeners = sockets
elif config.fd is not None: # pragma: py-win32
# Use an existing socket, from a file descriptor.
sock = socket.fromfd(config.fd, socket.AF_UNIX, socket.SOCK_STREAM)
server = await loop.create_server(create_protocol, sock=sock, ssl=config.ssl, backlog=config.backlog)
assert server.sockets is not None # mypy
listeners = server.sockets
self.servers = [server]
elif config.uds is not None: # pragma: py-win32
# Create a socket using UNIX domain socket.
uds_perms = 0o666
if os.path.exists(config.uds):
uds_perms = os.stat(config.uds).st_mode # pragma: full coverage
server = await loop.create_unix_server(
create_protocol, path=config.uds, ssl=config.ssl, backlog=config.backlog
)
os.chmod(config.uds, uds_perms)
assert server.sockets is not None # mypy
listeners = server.sockets
self.servers = [server]
else:
# Standard case. Create a socket from a host/port pair.
try:
server = await loop.create_server(
create_protocol,
host=config.host,
port=config.port,
ssl=config.ssl,
backlog=config.backlog,
)
except OSError as exc:
logger.error(exc)
await self.lifespan.shutdown()
sys.exit(1)
assert server.sockets is not None
listeners = server.sockets
self.servers = [server]
if sockets is None:
self._log_started_message(listeners)
else:
# We're most likely running multiple workers, so a message has already been
# logged by `config.bind_socket()`.
pass # pragma: full coverage
self.started = True
def _log_started_message(self, listeners: Sequence[socket.SocketType]) -> None:
config = self.config
if config.fd is not None: # pragma: py-win32
sock = listeners[0]
logger.info(
"Uvicorn running on socket %s (Press CTRL+C to quit)",
sock.getsockname(),
)
elif config.uds is not None: # pragma: py-win32
logger.info("Uvicorn running on unix socket %s (Press CTRL+C to quit)", config.uds)
else:
addr_format = "%s://%s:%d"
host = "0.0.0.0" if config.host is None else config.host
if ":" in host:
# It's an IPv6 address.
addr_format = "%s://[%s]:%d"
port = config.port
if port == 0:
port = listeners[0].getsockname()[1]
protocol_name = "https" if config.ssl else "http"
message = f"Uvicorn running on {addr_format} (Press CTRL+C to quit)"
color_message = "Uvicorn running on " + click.style(addr_format, bold=True) + " (Press CTRL+C to quit)"
logger.info(
message,
protocol_name,
host,
port,
extra={"color_message": color_message},
)
async def main_loop(self) -> None:
counter = 0
should_exit = await self.on_tick(counter)
while not should_exit:
counter += 1
counter = counter % 864000
await asyncio.sleep(0.1)
should_exit = await self.on_tick(counter)
async def on_tick(self, counter: int) -> bool:
# Update the default headers, once per second.
if counter % 10 == 0:
current_time = time.time()
current_date = formatdate(current_time, usegmt=True).encode()
if self.config.date_header:
date_header = [(b"date", current_date)]
else:
date_header = []
self.server_state.default_headers = date_header + self.config.encoded_headers
# Callback to `callback_notify` once every `timeout_notify` seconds.
if self.config.callback_notify is not None:
if current_time - self.last_notified > self.config.timeout_notify: # pragma: full coverage
self.last_notified = current_time
await self.config.callback_notify()
# Determine if we should exit.
if self.should_exit:
return True
max_requests = self.config.limit_max_requests
if max_requests is not None and self.server_state.total_requests >= max_requests:
logger.warning(f"Maximum request limit of {max_requests} exceeded. Terminating process.")
return True
return False
async def shutdown(self, sockets: list[socket.socket] | None = None) -> None:
logger.info("Shutting down")
# Stop accepting new connections.
for server in self.servers:
server.close()
for sock in sockets or []:
sock.close() # pragma: full coverage
# Request shutdown on all existing connections.
for connection in list(self.server_state.connections):
connection.shutdown()
await asyncio.sleep(0.1)
# When 3.10 is not supported anymore, use `async with asyncio.timeout(...):`.
try:
await asyncio.wait_for(
self._wait_tasks_to_complete(),
timeout=self.config.timeout_graceful_shutdown,
)
except asyncio.TimeoutError:
logger.error(
"Cancel %s running task(s), timeout graceful shutdown exceeded",
len(self.server_state.tasks),
)
for t in self.server_state.tasks:
t.cancel(msg="Task cancelled, timeout graceful shutdown exceeded")
# Send the lifespan shutdown event, and wait for application shutdown.
if not self.force_exit:
await self.lifespan.shutdown()
async def _wait_tasks_to_complete(self) -> None:
# Wait for existing connections to finish sending responses.
if self.server_state.connections and not self.force_exit:
msg = "Waiting for connections to close. (CTRL+C to force quit)"
logger.info(msg)
while self.server_state.connections and not self.force_exit:
await asyncio.sleep(0.1)
# Wait for existing tasks to complete.
if self.server_state.tasks and not self.force_exit:
msg = "Waiting for background tasks to complete. (CTRL+C to force quit)"
logger.info(msg)
while self.server_state.tasks and not self.force_exit:
await asyncio.sleep(0.1)
for server in self.servers:
await server.wait_closed()
@contextlib.contextmanager
def capture_signals(self) -> Generator[None, None, None]:
# Signals can only be listened to from the main thread.
if threading.current_thread() is not threading.main_thread():
yield
return
# always use signal.signal, even if loop.add_signal_handler is available
# this allows to restore previous signal handlers later on
original_handlers = {sig: signal.signal(sig, self.handle_exit) for sig in HANDLED_SIGNALS}
try:
yield
finally:
for sig, handler in original_handlers.items():
signal.signal(sig, handler)
# If we did gracefully shut down due to a signal, try to
# trigger the expected behaviour now; multiple signals would be
# done LIFO, see https://stackoverflow.com/questions/48434964
for captured_signal in reversed(self._captured_signals):
signal.raise_signal(captured_signal)
def handle_exit(self, sig: int, frame: FrameType | None) -> None:
self._captured_signals.append(sig)
if self.should_exit and sig == signal.SIGINT:
self.force_exit = True # pragma: full coverage
else:
self.should_exit = True
|