jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
from __future__ import annotations
import dataclasses
import urllib.parse
import urllib.request
from .exceptions import InvalidProxy, InvalidURI
__all__ = ["parse_uri", "WebSocketURI"]
# All characters from the gen-delims and sub-delims sets in RFC 3987.
DELIMS = ":/?#[]@!$&'()*+,;="
@dataclasses.dataclass
class WebSocketURI:
"""
WebSocket URI.
Attributes:
secure: :obj:`True` for a ``wss`` URI, :obj:`False` for a ``ws`` URI.
host: Normalized to lower case.
port: Always set even if it's the default.
path: May be empty.
query: May be empty if the URI doesn't include a query component.
username: Available when the URI contains `User Information`_.
password: Available when the URI contains `User Information`_.
.. _User Information: https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.1
"""
secure: bool
host: str
port: int
path: str
query: str
username: str | None = None
password: str | None = None
@property
def resource_name(self) -> str:
if self.path:
resource_name = self.path
else:
resource_name = "/"
if self.query:
resource_name += "?" + self.query
return resource_name
@property
def user_info(self) -> tuple[str, str] | None:
if self.username is None:
return None
assert self.password is not None
return (self.username, self.password)
def parse_uri(uri: str) -> WebSocketURI:
"""
Parse and validate a WebSocket URI.
Args:
uri: WebSocket URI.
Returns:
Parsed WebSocket URI.
Raises:
InvalidURI: If ``uri`` isn't a valid WebSocket URI.
"""
parsed = urllib.parse.urlparse(uri)
if parsed.scheme not in ["ws", "wss"]:
raise InvalidURI(uri, "scheme isn't ws or wss")
if parsed.hostname is None:
raise InvalidURI(uri, "hostname isn't provided")
if parsed.fragment != "":
raise InvalidURI(uri, "fragment identifier is meaningless")
secure = parsed.scheme == "wss"
host = parsed.hostname
port = parsed.port or (443 if secure else 80)
path = parsed.path
query = parsed.query
username = parsed.username
password = parsed.password
# urllib.parse.urlparse accepts URLs with a username but without a
# password. This doesn't make sense for HTTP Basic Auth credentials.
if username is not None and password is None:
raise InvalidURI(uri, "username provided without password")
try:
uri.encode("ascii")
except UnicodeEncodeError:
# Input contains non-ASCII characters.
# It must be an IRI. Convert it to a URI.
host = host.encode("idna").decode()
path = urllib.parse.quote(path, safe=DELIMS)
query = urllib.parse.quote(query, safe=DELIMS)
if username is not None:
assert password is not None
username = urllib.parse.quote(username, safe=DELIMS)
password = urllib.parse.quote(password, safe=DELIMS)
return WebSocketURI(secure, host, port, path, query, username, password)
@dataclasses.dataclass
class Proxy:
"""
Proxy.
Attributes:
scheme: ``"socks5h"``, ``"socks5"``, ``"socks4a"``, ``"socks4"``,
``"https"``, or ``"http"``.
host: Normalized to lower case.
port: Always set even if it's the default.
username: Available when the proxy address contains `User Information`_.
password: Available when the proxy address contains `User Information`_.
.. _User Information: https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.1
"""
scheme: str
host: str
port: int
username: str | None = None
password: str | None = None
@property
def user_info(self) -> tuple[str, str] | None:
if self.username is None:
return None
assert self.password is not None
return (self.username, self.password)
def parse_proxy(proxy: str) -> Proxy:
"""
Parse and validate a proxy.
Args:
proxy: proxy.
Returns:
Parsed proxy.
Raises:
InvalidProxy: If ``proxy`` isn't a valid proxy.
"""
parsed = urllib.parse.urlparse(proxy)
if parsed.scheme not in ["socks5h", "socks5", "socks4a", "socks4", "https", "http"]:
raise InvalidProxy(proxy, f"scheme {parsed.scheme} isn't supported")
if parsed.hostname is None:
raise InvalidProxy(proxy, "hostname isn't provided")
if parsed.path not in ["", "/"]:
raise InvalidProxy(proxy, "path is meaningless")
if parsed.query != "":
raise InvalidProxy(proxy, "query is meaningless")
if parsed.fragment != "":
raise InvalidProxy(proxy, "fragment is meaningless")
scheme = parsed.scheme
host = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "https" else 80)
username = parsed.username
password = parsed.password
# urllib.parse.urlparse accepts URLs with a username but without a
# password. This doesn't make sense for HTTP Basic Auth credentials.
if username is not None and password is None:
raise InvalidProxy(proxy, "username provided without password")
try:
proxy.encode("ascii")
except UnicodeEncodeError:
# Input contains non-ASCII characters.
# It must be an IRI. Convert it to a URI.
host = host.encode("idna").decode()
if username is not None:
assert password is not None
username = urllib.parse.quote(username, safe=DELIMS)
password = urllib.parse.quote(password, safe=DELIMS)
return Proxy(scheme, host, port, username, password)
def get_proxy(uri: WebSocketURI) -> str | None:
"""
Return the proxy to use for connecting to the given WebSocket URI, if any.
"""
if urllib.request.proxy_bypass(f"{uri.host}:{uri.port}"):
return None
# According to the _Proxy Usage_ section of RFC 6455, use a SOCKS5 proxy if
# available, else favor the proxy for HTTPS connections over the proxy for
# HTTP connections.
# The priority of a proxy for WebSocket connections is unspecified. We give
# it the highest priority. This makes it easy to configure a specific proxy
# for websockets.
# getproxies() may return SOCKS proxies as {"socks": "http://host:port"} or
# as {"https": "socks5h://host:port"} depending on whether they're declared
# in the operating system or in environment variables.
proxies = urllib.request.getproxies()
if uri.secure:
schemes = ["wss", "socks", "https"]
else:
schemes = ["ws", "socks", "https", "http"]
for scheme in schemes:
proxy = proxies.get(scheme)
if proxy is not None:
if scheme == "socks" and proxy.startswith("http://"):
proxy = "socks5h://" + proxy[7:]
return proxy
else:
return None