|
import socketio |
|
import redis |
|
from redis import asyncio as aioredis |
|
from urllib.parse import urlparse |
|
|
|
|
|
def parse_redis_sentinel_url(redis_url): |
|
parsed_url = urlparse(redis_url) |
|
if parsed_url.scheme != "redis": |
|
raise ValueError("Invalid Redis URL scheme. Must be 'redis'.") |
|
|
|
return { |
|
"username": parsed_url.username or None, |
|
"password": parsed_url.password or None, |
|
"service": parsed_url.hostname or "mymaster", |
|
"port": parsed_url.port or 6379, |
|
"db": int(parsed_url.path.lstrip("/") or 0), |
|
} |
|
|
|
|
|
def get_redis_connection(redis_url, redis_sentinels, decode_responses=True): |
|
if redis_sentinels: |
|
redis_config = parse_redis_sentinel_url(redis_url) |
|
sentinel = redis.sentinel.Sentinel( |
|
redis_sentinels, |
|
port=redis_config["port"], |
|
db=redis_config["db"], |
|
username=redis_config["username"], |
|
password=redis_config["password"], |
|
decode_responses=decode_responses, |
|
) |
|
|
|
|
|
return sentinel.master_for(redis_config["service"]) |
|
else: |
|
|
|
return redis.Redis.from_url(redis_url, decode_responses=decode_responses) |
|
|
|
|
|
def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env): |
|
if sentinel_hosts_env: |
|
sentinel_hosts = sentinel_hosts_env.split(",") |
|
sentinel_port = int(sentinel_port_env) |
|
return [(host, sentinel_port) for host in sentinel_hosts] |
|
return [] |
|
|
|
|
|
class AsyncRedisSentinelManager(socketio.AsyncRedisManager): |
|
def __init__( |
|
self, |
|
sentinel_hosts, |
|
sentinel_port=26379, |
|
redis_port=6379, |
|
service="mymaster", |
|
db=0, |
|
username=None, |
|
password=None, |
|
channel="socketio", |
|
write_only=False, |
|
logger=None, |
|
redis_options=None, |
|
): |
|
""" |
|
Initialize the Redis Sentinel Manager. |
|
This implementation mostly replicates the __init__ of AsyncRedisManager and |
|
overrides _redis_connect() with a version that uses Redis Sentinel |
|
|
|
:param sentinel_hosts: List of Sentinel hosts |
|
:param sentinel_port: Sentinel Port |
|
:param redis_port: Redis Port (currently unsupported by aioredis!) |
|
:param service: Master service name in Sentinel |
|
:param db: Redis database to use |
|
:param username: Redis username (if any) (currently unsupported by aioredis!) |
|
:param password: Redis password (if any) |
|
:param channel: The channel name on which the server sends and receives |
|
notifications. Must be the same in all the servers. |
|
:param write_only: If set to ``True``, only initialize to emit events. The |
|
default of ``False`` initializes the class for emitting |
|
and receiving. |
|
:param redis_options: additional keyword arguments to be passed to |
|
``aioredis.from_url()``. |
|
""" |
|
self._sentinels = [(host, sentinel_port) for host in sentinel_hosts] |
|
self._redis_port = redis_port |
|
self._service = service |
|
self._db = db |
|
self._username = username |
|
self._password = password |
|
self._channel = channel |
|
self.redis_options = redis_options or {} |
|
|
|
|
|
self._redis_connect() |
|
super(socketio.AsyncRedisManager, self).__init__( |
|
channel=channel, write_only=write_only, logger=logger |
|
) |
|
|
|
def _redis_connect(self): |
|
"""Establish connections to Redis through Sentinel.""" |
|
sentinel = aioredis.sentinel.Sentinel( |
|
self._sentinels, |
|
port=self._redis_port, |
|
db=self._db, |
|
password=self._password, |
|
**self.redis_options, |
|
) |
|
|
|
self.redis = sentinel.master_for(self._service) |
|
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) |
|
|