Spaces:
Configuration error
Configuration error
# What is this? | |
## Tests slack alerting on proxy logging object | |
import asyncio | |
import io | |
import json | |
import os | |
import random | |
import sys | |
import time | |
import uuid | |
from datetime import datetime, timedelta | |
from typing import Optional | |
import httpx | |
from litellm.types.integrations.slack_alerting import AlertType | |
# import logging | |
# logging.basicConfig(level=logging.DEBUG) | |
sys.path.insert(0, os.path.abspath("../..")) | |
import asyncio | |
import os | |
import unittest.mock | |
from unittest.mock import AsyncMock, MagicMock, patch | |
import pytest | |
from openai import APIError | |
import litellm | |
from litellm.caching.caching import DualCache, RedisCache | |
from litellm.integrations.SlackAlerting.slack_alerting import ( | |
DeploymentMetrics, | |
SlackAlerting, | |
) | |
from litellm.proxy._types import CallInfo, Litellm_EntityType, WebhookEvent | |
from litellm.proxy.utils import ProxyLogging | |
from litellm.router import AlertingConfig, Router | |
from litellm.utils import get_api_base | |
def test_get_api_base_unit_test(model, optional_params, expected_api_base): | |
api_base = get_api_base(model=model, optional_params=optional_params) | |
assert api_base == expected_api_base | |
async def test_get_api_base(): | |
_pl = ProxyLogging(user_api_key_cache=DualCache()) | |
_pl.update_values(alerting=["slack"], alerting_threshold=100, redis_cache=None) | |
model = "chatgpt-v-3" | |
messages = [{"role": "user", "content": "Hey how's it going?"}] | |
litellm_params = { | |
"acompletion": True, | |
"api_key": None, | |
"api_base": "https://openai-gpt-4-test-v-1.openai.azure.com/", | |
"force_timeout": 600, | |
"logger_fn": None, | |
"verbose": False, | |
"custom_llm_provider": "azure", | |
"litellm_call_id": "68f46d2d-714d-4ad8-8137-69600ec8755c", | |
"model_alias_map": {}, | |
"completion_call_id": None, | |
"metadata": None, | |
"model_info": None, | |
"proxy_server_request": None, | |
"preset_cache_key": None, | |
"no-log": False, | |
"stream_response": {}, | |
} | |
start_time = datetime.now() | |
end_time = datetime.now() | |
time_difference_float, model, api_base, messages = ( | |
_pl.slack_alerting_instance._response_taking_too_long_callback_helper( | |
kwargs={ | |
"model": model, | |
"messages": messages, | |
"litellm_params": litellm_params, | |
}, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
) | |
assert api_base is not None | |
assert isinstance(api_base, str) | |
assert len(api_base) > 0 | |
request_info = ( | |
f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`" | |
) | |
slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {100}s`" | |
await _pl.alerting_handler( | |
message=slow_message + request_info, | |
level="Low", | |
alert_type=AlertType.llm_too_slow, | |
) | |
print("passed test_get_api_base") | |
# Create a mock environment for testing | |
def mock_env(monkeypatch): | |
monkeypatch.setenv("SLACK_WEBHOOK_URL", "https://example.com/webhook") | |
monkeypatch.setenv("LANGFUSE_HOST", "https://cloud.langfuse.com") | |
monkeypatch.setenv("LANGFUSE_PROJECT_ID", "test-project-id") | |
# Test the __init__ method | |
def test_init(): | |
slack_alerting = SlackAlerting( | |
alerting_threshold=32, | |
alerting=["slack"], | |
alert_types=[AlertType.llm_exceptions], | |
internal_usage_cache=DualCache(), | |
) | |
assert slack_alerting.alerting_threshold == 32 | |
assert slack_alerting.alerting == ["slack"] | |
assert slack_alerting.alert_types == ["llm_exceptions"] | |
slack_no_alerting = SlackAlerting() | |
assert slack_no_alerting.alerting == [] | |
print("passed testing slack alerting init") | |
from datetime import datetime, timedelta | |
from unittest.mock import AsyncMock, patch | |
def slack_alerting(): | |
return SlackAlerting( | |
alerting_threshold=1, internal_usage_cache=DualCache(), alerting=["slack"] | |
) | |
# Test for slow LLM responses | |
async def test_response_taking_too_long_callback(slack_alerting): | |
start_time = datetime.now() | |
end_time = start_time + timedelta(seconds=301) | |
kwargs = {"model": "test_model", "messages": "test_messages", "litellm_params": {}} | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
await slack_alerting.response_taking_too_long_callback( | |
kwargs, None, start_time, end_time | |
) | |
mock_send_alert.assert_awaited_once() | |
async def test_alerting_metadata(slack_alerting): | |
""" | |
Test alerting_metadata is propogated correctly for response taking too long | |
""" | |
start_time = datetime.now() | |
end_time = start_time + timedelta(seconds=301) | |
kwargs = { | |
"model": "test_model", | |
"messages": "test_messages", | |
"litellm_params": {"metadata": {"alerting_metadata": {"hello": "world"}}}, | |
} | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
## RESPONSE TAKING TOO LONG | |
await slack_alerting.response_taking_too_long_callback( | |
kwargs, None, start_time, end_time | |
) | |
mock_send_alert.assert_awaited_once() | |
assert "hello" in mock_send_alert.call_args[1]["alerting_metadata"] | |
# Test for budget crossed | |
async def test_budget_alerts_crossed(slack_alerting): | |
user_max_budget = 100 | |
user_current_spend = 101 | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
await slack_alerting.budget_alerts( | |
"user_budget", | |
user_info=CallInfo( | |
token="", | |
spend=user_current_spend, | |
max_budget=user_max_budget, | |
event_group=Litellm_EntityType.USER, | |
), | |
) | |
mock_send_alert.assert_awaited_once() | |
# Test for budget crossed again (should not fire alert 2nd time) | |
async def test_budget_alerts_crossed_again(slack_alerting): | |
user_max_budget = 100 | |
user_current_spend = 101 | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
await slack_alerting.budget_alerts( | |
"user_budget", | |
user_info=CallInfo( | |
token="", | |
spend=user_current_spend, | |
max_budget=user_max_budget, | |
event_group=Litellm_EntityType.USER, | |
), | |
) | |
mock_send_alert.assert_awaited_once() | |
mock_send_alert.reset_mock() | |
await slack_alerting.budget_alerts( | |
"user_budget", | |
user_info=CallInfo( | |
token="", | |
spend=user_current_spend, | |
max_budget=user_max_budget, | |
event_group=Litellm_EntityType.USER, | |
), | |
) | |
mock_send_alert.assert_not_awaited() | |
# Test for send_alert - should be called once | |
async def test_send_alert(slack_alerting): | |
import logging | |
from litellm._logging import verbose_logger | |
asyncio.create_task(slack_alerting.periodic_flush()) | |
verbose_logger.setLevel(level=logging.DEBUG) | |
with patch.object( | |
slack_alerting.async_http_handler, "post", new=AsyncMock() | |
) as mock_post: | |
mock_post.return_value.status_code = 200 | |
await slack_alerting.send_alert( | |
"Test message", "Low", "budget_alerts", alerting_metadata={} | |
) | |
await asyncio.sleep(6) | |
mock_post.assert_awaited_once() | |
async def test_daily_reports_unit_test(slack_alerting): | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
router = litellm.Router( | |
model_list=[ | |
{ | |
"model_name": "test-gpt", | |
"litellm_params": {"model": "gpt-3.5-turbo"}, | |
"model_info": {"id": "1234"}, | |
} | |
] | |
) | |
deployment_metrics = DeploymentMetrics( | |
id="1234", | |
failed_request=False, | |
latency_per_output_token=20.3, | |
updated_at=litellm.utils.get_utc_datetime(), | |
) | |
updated_val = await slack_alerting.async_update_daily_reports( | |
deployment_metrics=deployment_metrics | |
) | |
assert updated_val == 1 | |
await slack_alerting.send_daily_reports(router=router) | |
mock_send_alert.assert_awaited_once() | |
async def test_daily_reports_completion(slack_alerting): | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
litellm.callbacks = [slack_alerting] | |
# on async success | |
router = litellm.Router( | |
model_list=[ | |
{ | |
"model_name": "gpt-5", | |
"litellm_params": { | |
"model": "gpt-3.5-turbo", | |
}, | |
} | |
] | |
) | |
await router.acompletion( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": "Hey, how's it going?"}], | |
) | |
await asyncio.sleep(3) | |
response_val = await slack_alerting.send_daily_reports(router=router) | |
assert response_val is True | |
mock_send_alert.assert_awaited_once() | |
# on async failure | |
router = litellm.Router( | |
model_list=[ | |
{ | |
"model_name": "gpt-5", | |
"litellm_params": {"model": "gpt-3.5-turbo", "api_key": "bad_key"}, | |
} | |
] | |
) | |
try: | |
await router.acompletion( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": "Hey, how's it going?"}], | |
) | |
except Exception as e: | |
pass | |
await asyncio.sleep(3) | |
response_val = await slack_alerting.send_daily_reports(router=router) | |
assert response_val is True | |
mock_send_alert.assert_awaited() | |
async def test_daily_reports_redis_cache_scheduler(): | |
redis_cache = RedisCache() | |
slack_alerting = SlackAlerting( | |
internal_usage_cache=DualCache(redis_cache=redis_cache) | |
) | |
# we need this to be 0 so it actualy sends the report | |
slack_alerting.alerting_args.daily_report_frequency = 0 | |
from litellm.router import AlertingConfig | |
router = litellm.Router( | |
model_list=[ | |
{ | |
"model_name": "gpt-5", | |
"litellm_params": { | |
"model": "gpt-3.5-turbo", | |
}, | |
} | |
] | |
) | |
with patch.object( | |
slack_alerting, "send_alert", new=AsyncMock() | |
) as mock_send_alert, patch.object( | |
redis_cache, "async_set_cache", new=AsyncMock() | |
) as mock_redis_set_cache: | |
# initial call - expect empty | |
await slack_alerting._run_scheduler_helper(llm_router=router) | |
try: | |
json.dumps(mock_redis_set_cache.call_args[0][1]) | |
except Exception as e: | |
pytest.fail( | |
"Cache value can't be json dumped - {}".format( | |
mock_redis_set_cache.call_args[0][1] | |
) | |
) | |
mock_redis_set_cache.assert_awaited_once() | |
# second call - expect empty | |
await slack_alerting._run_scheduler_helper(llm_router=router) | |
async def test_send_llm_exception_to_slack(): | |
from litellm.router import AlertingConfig | |
# on async success | |
router = litellm.Router( | |
model_list=[ | |
{ | |
"model_name": "gpt-3.5-turbo", | |
"litellm_params": { | |
"model": "gpt-3.5-turbo", | |
"api_key": "bad_key", | |
}, | |
}, | |
{ | |
"model_name": "gpt-5-good", | |
"litellm_params": { | |
"model": "gpt-3.5-turbo", | |
}, | |
}, | |
], | |
alerting_config=AlertingConfig( | |
alerting_threshold=0.5, webhook_url=os.getenv("SLACK_WEBHOOK_URL") | |
), | |
) | |
try: | |
await router.acompletion( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": "Hey, how's it going?"}], | |
) | |
except Exception: | |
pass | |
await router.acompletion( | |
model="gpt-5-good", | |
messages=[{"role": "user", "content": "Hey, how's it going?"}], | |
) | |
await asyncio.sleep(3) | |
# test models with 0 metrics are ignored | |
async def test_send_daily_reports_ignores_zero_values(): | |
router = MagicMock() | |
router.get_model_ids.return_value = ["model1", "model2", "model3"] | |
slack_alerting = SlackAlerting(internal_usage_cache=MagicMock()) | |
# model1:failed=None, model2:failed=0, model3:failed=10, model1:latency=0; model2:latency=0; model3:latency=None | |
slack_alerting.internal_usage_cache.async_batch_get_cache = AsyncMock( | |
return_value=[None, 0, 10, 0, 0, None] | |
) | |
slack_alerting.internal_usage_cache.async_set_cache_pipeline = AsyncMock() | |
router.get_model_info.side_effect = lambda x: {"litellm_params": {"model": x}} | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
result = await slack_alerting.send_daily_reports(router) | |
# Check that the send_alert method was called | |
mock_send_alert.assert_called_once() | |
message = mock_send_alert.call_args[1]["message"] | |
# Ensure the message includes only the non-zero, non-None metrics | |
assert "model3" in message | |
assert "model2" not in message | |
assert "model1" not in message | |
assert result == True | |
# test no alert is sent if all None or 0 metrics | |
async def test_send_daily_reports_all_zero_or_none(): | |
router = MagicMock() | |
router.get_model_ids.return_value = ["model1", "model2", "model3"] | |
slack_alerting = SlackAlerting(internal_usage_cache=MagicMock()) | |
slack_alerting.internal_usage_cache.async_batch_get_cache = AsyncMock( | |
return_value=[None, 0, None, 0, None, 0] | |
) | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
result = await slack_alerting.send_daily_reports(router) | |
# Check that the send_alert method was not called | |
mock_send_alert.assert_not_called() | |
assert result == False | |
# test user budget crossed alert sent only once, even if user makes multiple calls | |
async def test_send_token_budget_crossed_alerts(alerting_type): | |
slack_alerting = SlackAlerting() | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
user_info = { | |
"token": "50e55ca5bfbd0759697538e8d23c0cd5031f52d9e19e176d7233b20c7c4d3403", | |
"spend": 86, | |
"max_budget": 100, | |
"user_id": "ishaan@berri.ai", | |
"user_email": "ishaan@berri.ai", | |
"key_alias": "my-test-key", | |
"projected_exceeded_date": "10/20/2024", | |
"projected_spend": 200, | |
"event_group": Litellm_EntityType.KEY, | |
} | |
user_info = CallInfo(**user_info) | |
for _ in range(50): | |
await slack_alerting.budget_alerts( | |
type=alerting_type, | |
user_info=user_info, | |
) | |
mock_send_alert.assert_awaited_once() | |
async def test_webhook_alerting(alerting_type): | |
slack_alerting = SlackAlerting(alerting=["webhook"]) | |
with patch.object( | |
slack_alerting, "send_webhook_alert", new=AsyncMock() | |
) as mock_send_alert: | |
user_info = { | |
"token": "50e55ca5bfbd0759697538e8d23c0cd5031f52d9e19e176d7233b20c7c4d3403", | |
"spend": 1, | |
"max_budget": 0, | |
"user_id": "ishaan@berri.ai", | |
"user_email": "ishaan@berri.ai", | |
"key_alias": "my-test-key", | |
"projected_exceeded_date": "10/20/2024", | |
"projected_spend": 200, | |
"event_group": Litellm_EntityType.KEY, | |
} | |
user_info = CallInfo(**user_info) | |
for _ in range(50): | |
await slack_alerting.budget_alerts( | |
type=alerting_type, | |
user_info=user_info, | |
) | |
mock_send_alert.assert_awaited_once() | |
# @pytest.mark.asyncio | |
# async def test_webhook_customer_spend_event(): | |
# """ | |
# Test if customer spend is working as expected | |
# """ | |
# slack_alerting = SlackAlerting(alerting=["webhook"]) | |
# with patch.object( | |
# slack_alerting, "send_webhook_alert", new=AsyncMock() | |
# ) as mock_send_alert: | |
# user_info = { | |
# "token": "50e55ca5bfbd0759697538e8d23c0cd5031f52d9e19e176d7233b20c7c4d3403", | |
# "spend": 1, | |
# "max_budget": 0, | |
# "user_id": "ishaan@berri.ai", | |
# "user_email": "ishaan@berri.ai", | |
# "key_alias": "my-test-key", | |
# "projected_exceeded_date": "10/20/2024", | |
# "projected_spend": 200, | |
# } | |
# user_info = CallInfo(**user_info) | |
# for _ in range(50): | |
# await slack_alerting.budget_alerts( | |
# type=alerting_type, | |
# user_info=user_info, | |
# ) | |
# mock_send_alert.assert_awaited_once() | |
async def test_outage_alerting_called( | |
model, api_base, llm_provider, vertex_project, vertex_location, error_code | |
): | |
""" | |
If call fails, outage alert is called | |
If multiple calls fail, outage alert is sent | |
""" | |
slack_alerting = SlackAlerting(alerting=["webhook"]) | |
litellm.callbacks = [slack_alerting] | |
error_to_raise: Optional[APIError] = None | |
if error_code == 400: | |
print("RAISING 400 ERROR CODE") | |
error_to_raise = litellm.BadRequestError( | |
message="this is a bad request", | |
model=model, | |
llm_provider=llm_provider, | |
) | |
elif error_code == 408: | |
print("RAISING 408 ERROR CODE") | |
error_to_raise = litellm.Timeout( | |
message="A timeout occurred", model=model, llm_provider=llm_provider | |
) | |
elif error_code == 500: | |
print("RAISING 500 ERROR CODE") | |
error_to_raise = litellm.ServiceUnavailableError( | |
message="API is unavailable", | |
model=model, | |
llm_provider=llm_provider, | |
response=httpx.Response( | |
status_code=503, | |
request=httpx.Request( | |
method="completion", | |
url="https://github.com/BerriAI/litellm", | |
), | |
), | |
) | |
router = Router( | |
model_list=[ | |
{ | |
"model_name": model, | |
"litellm_params": { | |
"model": model, | |
"api_key": os.getenv("AZURE_API_KEY"), | |
"api_base": api_base, | |
"vertex_location": vertex_location, | |
"vertex_project": vertex_project, | |
}, | |
} | |
], | |
num_retries=0, | |
allowed_fails=100, | |
) | |
slack_alerting.update_values(llm_router=router) | |
with patch.object( | |
slack_alerting, "outage_alerts", new=AsyncMock() | |
) as mock_outage_alert: | |
try: | |
await router.acompletion( | |
model=model, | |
messages=[{"role": "user", "content": "Hey!"}], | |
mock_response=error_to_raise, | |
) | |
except Exception as e: | |
pass | |
mock_outage_alert.assert_called_once() | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
for _ in range(6): | |
try: | |
await router.acompletion( | |
model=model, | |
messages=[{"role": "user", "content": "Hey!"}], | |
mock_response=error_to_raise, | |
) | |
except Exception as e: | |
pass | |
await asyncio.sleep(3) | |
if error_code == 500 or error_code == 408: | |
mock_send_alert.assert_called_once() | |
else: | |
mock_send_alert.assert_not_called() | |
async def test_region_outage_alerting_called( | |
model, api_base, llm_provider, vertex_project, vertex_location, error_code | |
): | |
""" | |
If call fails, outage alert is called | |
If multiple calls fail, outage alert is sent | |
""" | |
slack_alerting = SlackAlerting( | |
alerting=["webhook"], alert_types=[AlertType.region_outage_alerts] | |
) | |
litellm.callbacks = [slack_alerting] | |
error_to_raise: Optional[APIError] = None | |
if error_code == 400: | |
print("RAISING 400 ERROR CODE") | |
error_to_raise = litellm.BadRequestError( | |
message="this is a bad request", | |
model=model, | |
llm_provider=llm_provider, | |
) | |
elif error_code == 408: | |
print("RAISING 408 ERROR CODE") | |
error_to_raise = litellm.Timeout( | |
message="A timeout occurred", model=model, llm_provider=llm_provider | |
) | |
elif error_code == 500: | |
print("RAISING 500 ERROR CODE") | |
error_to_raise = litellm.ServiceUnavailableError( | |
message="API is unavailable", | |
model=model, | |
llm_provider=llm_provider, | |
response=httpx.Response( | |
status_code=503, | |
request=httpx.Request( | |
method="completion", | |
url="https://github.com/BerriAI/litellm", | |
), | |
), | |
) | |
router = Router( | |
model_list=[ | |
{ | |
"model_name": model, | |
"litellm_params": { | |
"model": model, | |
"api_key": os.getenv("AZURE_API_KEY"), | |
"api_base": api_base, | |
"vertex_location": vertex_location, | |
"vertex_project": vertex_project, | |
}, | |
"model_info": {"id": "1"}, | |
}, | |
{ | |
"model_name": model, | |
"litellm_params": { | |
"model": model, | |
"api_key": os.getenv("AZURE_API_KEY"), | |
"api_base": api_base, | |
"vertex_location": vertex_location, | |
"vertex_project": "vertex_project-2", | |
}, | |
"model_info": {"id": "2"}, | |
}, | |
], | |
num_retries=0, | |
allowed_fails=100, | |
) | |
slack_alerting.update_values(llm_router=router) | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
for idx in range(6): | |
if idx % 2 == 0: | |
deployment_id = "1" | |
else: | |
deployment_id = "2" | |
await slack_alerting.region_outage_alerts( | |
exception=error_to_raise, deployment_id=deployment_id # type: ignore | |
) | |
if model == "gemini-pro" and (error_code == 500 or error_code == 408): | |
mock_send_alert.assert_called_once() | |
else: | |
mock_send_alert.assert_not_called() | |
async def test_alerting(): | |
router = litellm.Router( | |
model_list=[ | |
{ | |
"model_name": "gpt-3.5-turbo", | |
"litellm_params": { | |
"model": "gpt-3.5-turbo", | |
"api_key": "bad_key", | |
}, | |
} | |
], | |
debug_level="DEBUG", | |
set_verbose=True, | |
alerting_config=AlertingConfig( | |
alerting_threshold=10, # threshold for slow / hanging llm responses (in seconds). Defaults to 300 seconds | |
webhook_url=os.getenv( | |
"SLACK_WEBHOOK_URL" | |
), # webhook you want to send alerts to | |
), | |
) | |
try: | |
await router.acompletion( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": "Hey, how's it going?"}], | |
) | |
except Exception: | |
pass | |
finally: | |
await asyncio.sleep(3) | |
async def test_langfuse_trace_id(): | |
""" | |
- Unit test for `_add_langfuse_trace_id_to_alert` function in slack_alerting.py | |
""" | |
from litellm.litellm_core_utils.litellm_logging import Logging | |
from litellm.integrations.SlackAlerting.utils import _add_langfuse_trace_id_to_alert | |
litellm.success_callback = ["langfuse"] | |
litellm_logging_obj = Logging( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": "hi"}], | |
stream=False, | |
call_type="acompletion", | |
litellm_call_id="1234", | |
start_time=datetime.now(), | |
function_id="1234", | |
) | |
litellm.completion( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": "Hey how's it going?"}], | |
mock_response="Hey!", | |
litellm_logging_obj=litellm_logging_obj, | |
) | |
await asyncio.sleep(3) | |
assert litellm_logging_obj._get_trace_id(service_name="langfuse") is not None | |
slack_alerting = SlackAlerting( | |
alerting_threshold=32, | |
alerting=["slack"], | |
alert_types=[AlertType.llm_exceptions], | |
internal_usage_cache=DualCache(), | |
) | |
trace_url = await _add_langfuse_trace_id_to_alert( | |
request_data={"litellm_logging_obj": litellm_logging_obj} | |
) | |
assert trace_url is not None | |
returned_trace_id = int(trace_url.split("/")[-1]) | |
assert returned_trace_id == int( | |
litellm_logging_obj._get_trace_id(service_name="langfuse") | |
) | |
async def test_print_alerting_payload_warning(): | |
""" | |
Test if alerts are printed to verbose logger when log_to_console=True | |
""" | |
litellm.set_verbose = True | |
from litellm._logging import verbose_proxy_logger | |
from litellm.integrations.SlackAlerting.batching_handler import send_to_webhook | |
import logging | |
# Create a string buffer to capture log output | |
log_stream = io.StringIO() | |
handler = logging.StreamHandler(log_stream) | |
verbose_proxy_logger.addHandler(handler) | |
verbose_proxy_logger.setLevel(logging.WARNING) | |
# Create SlackAlerting instance with log_to_console=True | |
slack_alerting = SlackAlerting( | |
alerting_threshold=0.0000001, | |
alerting=["slack"], | |
alert_types=[AlertType.llm_exceptions], | |
internal_usage_cache=DualCache(), | |
) | |
slack_alerting.alerting_args.log_to_console = True | |
test_payload = {"text": "Test alert message"} | |
# Send an alert | |
with patch.object( | |
slack_alerting.async_http_handler, "post", new=AsyncMock() | |
) as mock_post: | |
await send_to_webhook( | |
slackAlertingInstance=slack_alerting, | |
item={ | |
"url": "https://example.com", | |
"headers": {"Content-Type": "application/json"}, | |
"payload": {"text": "Test alert message"}, | |
}, | |
count=1, | |
) | |
# Check if the payload was logged | |
log_output = log_stream.getvalue() | |
print(log_output) | |
assert "Test alert message" in log_output | |
# Clean up | |
verbose_proxy_logger.removeHandler(handler) | |
log_stream.close() | |
async def test_spend_report_cache(report_type): | |
""" | |
Test that spend reports are only sent once within their period | |
""" | |
# Mock prisma client response | |
mock_spend_data = [ | |
{"team_alias": "team1", "total_spend": 100.0}, | |
{"team_alias": "team2", "total_spend": 200.0}, | |
] | |
mock_tag_data = [ | |
{"individual_request_tag": "tag1", "total_spend": 150.0}, | |
{"individual_request_tag": "tag2", "total_spend": 150.0}, | |
] | |
with patch("litellm.proxy.proxy_server.prisma_client") as mock_prisma: | |
# Setup mock for database query | |
mock_prisma.db.query_raw = AsyncMock( | |
side_effect=[mock_spend_data, mock_tag_data] | |
) | |
slack_alerting = SlackAlerting( | |
alerting=["webhook"], internal_usage_cache=DualCache() | |
) | |
user_info = CallInfo( | |
token="test_token", | |
spend=100, | |
max_budget=1000, | |
user_id="test@test.com", | |
user_email="test@test.com", | |
key_alias="test-key", | |
event_group=Litellm_EntityType.KEY, | |
) | |
with patch.object( | |
slack_alerting, "send_alert", new=AsyncMock() | |
) as mock_send_alert: | |
# First call should send alert | |
if report_type == "weekly": | |
await slack_alerting.send_weekly_spend_report() | |
else: | |
await slack_alerting.send_monthly_spend_report() | |
mock_send_alert.assert_called_once() | |
mock_send_alert.reset_mock() | |
# Second call should not send alert (cached) | |
if report_type == "weekly": | |
await slack_alerting.send_weekly_spend_report() | |
else: | |
await slack_alerting.send_monthly_spend_report() | |
mock_send_alert.assert_not_called() | |
async def test_soft_budget_alerts(): | |
""" | |
Test if soft budget alerts (warnings when approaching budget limit) work correctly | |
- Test alert is sent when spend reaches 80% of budget | |
""" | |
slack_alerting = SlackAlerting(alerting=["webhook"]) | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
# Test 80% threshold | |
user_info = CallInfo( | |
token="test_token", | |
spend=80, # $80 spent | |
soft_budget=80, | |
user_id="test@test.com", | |
user_email="test@test.com", | |
key_alias="test-key", | |
event_group=Litellm_EntityType.KEY, | |
) | |
await slack_alerting.budget_alerts( | |
type="soft_budget", | |
user_info=user_info, | |
) | |
mock_send_alert.assert_called_once() | |
# Verify alert message contains correct percentage | |
alert_message = mock_send_alert.call_args[1]["message"] | |
print("GOT MESSAGE\n\n", alert_message) | |
expected_message = ( | |
"Soft Budget Crossed: Total Soft Budget:`80.0`\n" | |
"\n" | |
"*spend:* `80.0`\n" | |
"*soft_budget:* `80.0`\n" | |
"*user_id:* `test@test.com`\n" | |
"*user_email:* `test@test.com`\n" | |
"*key_alias:* `test-key`\n" | |
"*event_group:* `key`\n" | |
) | |
assert alert_message == expected_message | |
key_info = CallInfo( | |
token="test_token", | |
spend=81, | |
soft_budget=80, | |
max_budget=100, | |
user_id="test@test.com", | |
user_email="test@test.com", | |
key_alias="test-key", | |
event_group=Litellm_EntityType.KEY, | |
) | |
team_info = CallInfo( | |
token="test_token", | |
spend=160, | |
soft_budget=150, | |
max_budget=200, | |
team_id="team-123", | |
team_alias="engineering-team", | |
event_group=Litellm_EntityType.TEAM, | |
) | |
user_info = CallInfo( | |
token="test_token", | |
spend=45, | |
soft_budget=40, | |
max_budget=50, | |
user_id="user123", | |
event_group=Litellm_EntityType.USER, | |
) | |
key_no_max_budget_info = CallInfo( | |
token="test_token", | |
spend=90, | |
soft_budget=85, | |
user_id="dev@test.com", | |
user_email="dev@test.com", | |
key_alias="dev-key", | |
event_group=Litellm_EntityType.KEY, | |
) | |
async def test_soft_budget_alerts_webhook(entity_info): | |
""" | |
Tests that soft budget alerts are triggered for different entity types. | |
Tests: | |
- Key with max budget | |
- Team | |
- User | |
- Key without max budget | |
""" | |
slack_alerting = SlackAlerting(alerting=["webhook"]) | |
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: | |
# Test entity hit soft budget limit | |
await slack_alerting.budget_alerts( | |
type="soft_budget", | |
user_info=entity_info, | |
) | |
mock_send_alert.assert_called_once() | |
# Verify the webhook event | |
call_args = mock_send_alert.call_args[1] | |
logged_webhook_event: WebhookEvent = call_args["user_info"] | |
# Validate the webhook event has all expected fields | |
assert logged_webhook_event.spend == entity_info.spend | |
assert logged_webhook_event.soft_budget == entity_info.soft_budget | |
assert logged_webhook_event.max_budget == entity_info.max_budget | |
assert logged_webhook_event.user_id == entity_info.user_id | |
assert logged_webhook_event.user_email == entity_info.user_email | |
assert logged_webhook_event.key_alias == entity_info.key_alias | |
assert logged_webhook_event.event_group == entity_info.event_group | |