|
from __future__ import annotations |
|
|
|
import dataclasses |
|
import urllib.parse |
|
import urllib.request |
|
|
|
from .exceptions import InvalidProxy, InvalidURI |
|
|
|
|
|
__all__ = ["parse_uri", "WebSocketURI"] |
|
|
|
|
|
|
|
DELIMS = ":/?#[]@!$&'()*+,;=" |
|
|
|
|
|
@dataclasses.dataclass |
|
class WebSocketURI: |
|
""" |
|
WebSocket URI. |
|
|
|
Attributes: |
|
secure: :obj:`True` for a ``wss`` URI, :obj:`False` for a ``ws`` URI. |
|
host: Normalized to lower case. |
|
port: Always set even if it's the default. |
|
path: May be empty. |
|
query: May be empty if the URI doesn't include a query component. |
|
username: Available when the URI contains `User Information`_. |
|
password: Available when the URI contains `User Information`_. |
|
|
|
.. _User Information: https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.1 |
|
|
|
""" |
|
|
|
secure: bool |
|
host: str |
|
port: int |
|
path: str |
|
query: str |
|
username: str | None = None |
|
password: str | None = None |
|
|
|
@property |
|
def resource_name(self) -> str: |
|
if self.path: |
|
resource_name = self.path |
|
else: |
|
resource_name = "/" |
|
if self.query: |
|
resource_name += "?" + self.query |
|
return resource_name |
|
|
|
@property |
|
def user_info(self) -> tuple[str, str] | None: |
|
if self.username is None: |
|
return None |
|
assert self.password is not None |
|
return (self.username, self.password) |
|
|
|
|
|
def parse_uri(uri: str) -> WebSocketURI: |
|
""" |
|
Parse and validate a WebSocket URI. |
|
|
|
Args: |
|
uri: WebSocket URI. |
|
|
|
Returns: |
|
Parsed WebSocket URI. |
|
|
|
Raises: |
|
InvalidURI: If ``uri`` isn't a valid WebSocket URI. |
|
|
|
""" |
|
parsed = urllib.parse.urlparse(uri) |
|
if parsed.scheme not in ["ws", "wss"]: |
|
raise InvalidURI(uri, "scheme isn't ws or wss") |
|
if parsed.hostname is None: |
|
raise InvalidURI(uri, "hostname isn't provided") |
|
if parsed.fragment != "": |
|
raise InvalidURI(uri, "fragment identifier is meaningless") |
|
|
|
secure = parsed.scheme == "wss" |
|
host = parsed.hostname |
|
port = parsed.port or (443 if secure else 80) |
|
path = parsed.path |
|
query = parsed.query |
|
username = parsed.username |
|
password = parsed.password |
|
|
|
|
|
if username is not None and password is None: |
|
raise InvalidURI(uri, "username provided without password") |
|
|
|
try: |
|
uri.encode("ascii") |
|
except UnicodeEncodeError: |
|
|
|
|
|
host = host.encode("idna").decode() |
|
path = urllib.parse.quote(path, safe=DELIMS) |
|
query = urllib.parse.quote(query, safe=DELIMS) |
|
if username is not None: |
|
assert password is not None |
|
username = urllib.parse.quote(username, safe=DELIMS) |
|
password = urllib.parse.quote(password, safe=DELIMS) |
|
|
|
return WebSocketURI(secure, host, port, path, query, username, password) |
|
|
|
|
|
@dataclasses.dataclass |
|
class Proxy: |
|
""" |
|
Proxy. |
|
|
|
Attributes: |
|
scheme: ``"socks5h"``, ``"socks5"``, ``"socks4a"``, ``"socks4"``, |
|
``"https"``, or ``"http"``. |
|
host: Normalized to lower case. |
|
port: Always set even if it's the default. |
|
username: Available when the proxy address contains `User Information`_. |
|
password: Available when the proxy address contains `User Information`_. |
|
|
|
.. _User Information: https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.1 |
|
|
|
""" |
|
|
|
scheme: str |
|
host: str |
|
port: int |
|
username: str | None = None |
|
password: str | None = None |
|
|
|
@property |
|
def user_info(self) -> tuple[str, str] | None: |
|
if self.username is None: |
|
return None |
|
assert self.password is not None |
|
return (self.username, self.password) |
|
|
|
|
|
def parse_proxy(proxy: str) -> Proxy: |
|
""" |
|
Parse and validate a proxy. |
|
|
|
Args: |
|
proxy: proxy. |
|
|
|
Returns: |
|
Parsed proxy. |
|
|
|
Raises: |
|
InvalidProxy: If ``proxy`` isn't a valid proxy. |
|
|
|
""" |
|
parsed = urllib.parse.urlparse(proxy) |
|
if parsed.scheme not in ["socks5h", "socks5", "socks4a", "socks4", "https", "http"]: |
|
raise InvalidProxy(proxy, f"scheme {parsed.scheme} isn't supported") |
|
if parsed.hostname is None: |
|
raise InvalidProxy(proxy, "hostname isn't provided") |
|
if parsed.path not in ["", "/"]: |
|
raise InvalidProxy(proxy, "path is meaningless") |
|
if parsed.query != "": |
|
raise InvalidProxy(proxy, "query is meaningless") |
|
if parsed.fragment != "": |
|
raise InvalidProxy(proxy, "fragment is meaningless") |
|
|
|
scheme = parsed.scheme |
|
host = parsed.hostname |
|
port = parsed.port or (443 if parsed.scheme == "https" else 80) |
|
username = parsed.username |
|
password = parsed.password |
|
|
|
|
|
if username is not None and password is None: |
|
raise InvalidProxy(proxy, "username provided without password") |
|
|
|
try: |
|
proxy.encode("ascii") |
|
except UnicodeEncodeError: |
|
|
|
|
|
host = host.encode("idna").decode() |
|
if username is not None: |
|
assert password is not None |
|
username = urllib.parse.quote(username, safe=DELIMS) |
|
password = urllib.parse.quote(password, safe=DELIMS) |
|
|
|
return Proxy(scheme, host, port, username, password) |
|
|
|
|
|
def get_proxy(uri: WebSocketURI) -> str | None: |
|
""" |
|
Return the proxy to use for connecting to the given WebSocket URI, if any. |
|
|
|
""" |
|
if urllib.request.proxy_bypass(f"{uri.host}:{uri.port}"): |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
proxies = urllib.request.getproxies() |
|
if uri.secure: |
|
schemes = ["wss", "socks", "https"] |
|
else: |
|
schemes = ["ws", "socks", "https", "http"] |
|
|
|
for scheme in schemes: |
|
proxy = proxies.get(scheme) |
|
if proxy is not None: |
|
if scheme == "socks" and proxy.startswith("http://"): |
|
proxy = "socks5h://" + proxy[7:] |
|
return proxy |
|
else: |
|
return None |
|
|