|
from langchain.tools import tool |
|
import requests |
|
import math |
|
import re |
|
from datetime import datetime, timedelta |
|
|
|
|
|
@tool |
|
def calculator(expression: str) -> str: |
|
""" |
|
执行数学计算,支持加减乘除、指数、对数、三角函数等。 |
|
示例: |
|
- "2 + 3 * 4" -> 14 |
|
- "sqrt(16)" -> 4.0 |
|
- "log10(100)" -> 2.0 |
|
""" |
|
try: |
|
|
|
if re.search(r"[a-zA-Z_`'\"]", expression): |
|
return "Error: Expression contains invalid characters" |
|
|
|
|
|
expression = expression.replace("^", "**") |
|
expression = expression.replace("sqrt", "math.sqrt") |
|
expression = expression.replace("log", "math.log10") |
|
expression = expression.replace("ln", "math.log") |
|
expression = expression.replace("sin", "math.sin") |
|
expression = expression.replace("cos", "math.cos") |
|
expression = expression.replace("tan", "math.tan") |
|
|
|
|
|
if "pi" in expression: |
|
expression = expression.replace("pi", str(math.pi)) |
|
if "e" in expression: |
|
expression = expression.replace("e", str(math.e)) |
|
|
|
|
|
result = eval(expression, {"__builtins__": None}, {"math": math}) |
|
return str(round(result, 6)) if isinstance(result, float) else str(result) |
|
except Exception as e: |
|
return f"Calculation error: {str(e)}" |
|
|
|
|
|
@tool |
|
def scientific_calculator(operation: str, values: list) -> str: |
|
""" |
|
执行高级科学计算: |
|
- 统计: 'mean', 'median', 'mode', 'std' |
|
- 金融: 'compound_interest' |
|
- 几何: 'area_circle', 'volume_sphere' |
|
- 三角: 'hypotenuse' |
|
""" |
|
try: |
|
operation = operation.lower() |
|
|
|
if operation == "mean": |
|
return str(sum(values) / len(values)) |
|
|
|
elif operation == "median": |
|
sorted_vals = sorted(values) |
|
n = len(sorted_vals) |
|
mid = n // 2 |
|
return str(sorted_vals[mid] if n % 2 else (sorted_vals[mid-1] + sorted_vals[mid]) / 2) |
|
|
|
elif operation == "mode": |
|
from collections import Counter |
|
count = Counter(values) |
|
max_count = max(count.values()) |
|
modes = [k for k, v in count.items() if v == max_count] |
|
return str(modes[0] if len(modes) == 1 else modes) |
|
|
|
elif operation == "std": |
|
mean = sum(values) / len(values) |
|
variance = sum((x - mean) ** 2 for x in values) / len(values) |
|
return str(math.sqrt(variance)) |
|
|
|
elif operation == "compound_interest": |
|
principal, rate, time, periods = values |
|
amount = principal * (1 + rate/(periods*100)) ** (periods*time) |
|
return str(round(amount, 2)) |
|
|
|
elif operation == "area_circle": |
|
return str(math.pi * values[0] ** 2) |
|
|
|
elif operation == "volume_sphere": |
|
return str((4/3) * math.pi * values[0] ** 3) |
|
|
|
elif operation == "hypotenuse": |
|
return str(math.sqrt(values[0]**2 + values[1]**2)) |
|
|
|
else: |
|
return f"Unknown operation: {operation}" |
|
|
|
except Exception as e: |
|
return f"Scientific calculation error: {str(e)}" |
|
|
|
|
|
@tool |
|
def unit_converter(value: float, from_unit: str, to_unit: str) -> str: |
|
""" |
|
执行单位转换,支持: |
|
- 长度: m, km, cm, mm, inch, foot, mile |
|
- 重量: g, kg, mg, lb, oz |
|
- 温度: C, F, K |
|
- 体积: L, mL, gallon, pint |
|
""" |
|
try: |
|
|
|
length_factors = { |
|
'm': 1, 'km': 1000, 'cm': 0.01, 'mm': 0.001, |
|
'inch': 0.0254, 'foot': 0.3048, 'mile': 1609.34 |
|
} |
|
|
|
|
|
weight_factors = { |
|
'kg': 1, 'g': 0.001, 'mg': 0.000001, |
|
'lb': 0.453592, 'oz': 0.0283495 |
|
} |
|
|
|
|
|
volume_factors = { |
|
'L': 1, 'mL': 0.001, 'gallon': 3.78541, 'pint': 0.473176 |
|
} |
|
|
|
|
|
if from_unit.upper() in ['C', 'F', 'K'] and to_unit.upper() in ['C', 'F', 'K']: |
|
temp = value |
|
if from_unit.upper() == 'C': |
|
if to_unit.upper() == 'F': |
|
return str(temp * 9/5 + 32) |
|
elif to_unit.upper() == 'K': |
|
return str(temp + 273.15) |
|
elif from_unit.upper() == 'F': |
|
if to_unit.upper() == 'C': |
|
return str((temp - 32) * 5/9) |
|
elif to_unit.upper() == 'K': |
|
return str((temp - 32) * 5/9 + 273.15) |
|
elif from_unit.upper() == 'K': |
|
if to_unit.upper() == 'C': |
|
return str(temp - 273.15) |
|
elif to_unit.upper() == 'F': |
|
return str((temp - 273.15) * 9/5 + 32) |
|
return str(value) |
|
|
|
|
|
if from_unit in length_factors and to_unit in length_factors: |
|
return str(value * length_factors[from_unit] / length_factors[to_unit]) |
|
|
|
|
|
if from_unit in weight_factors and to_unit in weight_factors: |
|
return str(value * weight_factors[from_unit] / weight_factors[to_unit]) |
|
|
|
|
|
if from_unit in volume_factors and to_unit in volume_factors: |
|
return str(value * volume_factors[from_unit] / volume_factors[to_unit]) |
|
|
|
return f"Unsupported conversion: {from_unit} to {to_unit}" |
|
|
|
except Exception as e: |
|
return f"Unit conversion error: {str(e)}" |
|
|
|
|
|
@tool |
|
def time_calculator(operation: str, base_time: str = None, duration: str = None, timezone: str = None) -> str: |
|
""" |
|
执行时间计算: |
|
- 'current_time': 获取当前时间 |
|
- 'add_duration': 添加时间间隔 |
|
- 'subtract_duration': 减去时间间隔 |
|
- 'time_diff': 计算两个时间的差值 |
|
|
|
时间格式: YYYY-MM-DD HH:MM:SS |
|
持续时间格式: Xd Yh Zm (例如: 2d 3h 30m) |
|
""" |
|
try: |
|
now = datetime.utcnow() |
|
|
|
if operation == "current_time": |
|
return now.strftime("%Y-%m-%d %H:%M:%S UTC") |
|
|
|
elif operation == "add_duration" and base_time and duration: |
|
base = datetime.strptime(base_time, "%Y-%m-%d %H:%M:%S") |
|
return add_duration(base, duration) |
|
|
|
elif operation == "subtract_duration" and base_time and duration: |
|
base = datetime.strptime(base_time, "%Y-%m-%d %H:%M:%S") |
|
return subtract_duration(base, duration) |
|
|
|
elif operation == "time_diff" and duration: |
|
parts = duration.split() |
|
time1 = datetime.strptime(parts[0], "%Y-%m-%d") |
|
time2 = datetime.strptime(parts[1], "%Y-%m-%d") |
|
diff = abs((time2 - time1).days) |
|
return f"{diff} days" |
|
|
|
return "Invalid operation or missing parameters" |
|
|
|
except Exception as e: |
|
return f"Time calculation error: {str(e)}" |
|
|
|
def add_duration(base: datetime, duration: str) -> str: |
|
"""添加持续时间到基础时间""" |
|
parts = duration.split() |
|
delta = timedelta() |
|
for part in parts: |
|
if part.endswith('d'): |
|
delta += timedelta(days=int(part[:-1])) |
|
elif part.endswith('h'): |
|
delta += timedelta(hours=int(part[:-1])) |
|
elif part.endswith('m'): |
|
delta += timedelta(minutes=int(part[:-1])) |
|
return (base + delta).strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
def subtract_duration(base: datetime, duration: str) -> str: |
|
"""从基础时间减去持续时间""" |
|
parts = duration.split() |
|
delta = timedelta() |
|
for part in parts: |
|
if part.endswith('d'): |
|
delta += timedelta(days=int(part[:-1])) |
|
elif part.endswith('h'): |
|
delta += timedelta(hours=int(part[:-1])) |
|
elif part.endswith('m'): |
|
delta += timedelta(minutes=int(part[:-1])) |
|
return (base - delta).strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
@tool |
|
def web_search(query: str, max_results: int = 3) -> str: |
|
""" |
|
执行网络搜索并返回相关结果。使用DuckDuckGo作为搜索引擎。 |
|
|
|
参数: |
|
query: 搜索查询 |
|
max_results: 返回的最大结果数 (默认3) |
|
|
|
返回: 格式化的搜索结果 |
|
""" |
|
try: |
|
url = "https://api.duckduckgo.com/" |
|
params = { |
|
"q": query, |
|
"format": "json", |
|
"no_redirect": 1, |
|
"no_html": 1, |
|
"skip_disambig": 1 |
|
} |
|
|
|
response = requests.get(url, params=params, timeout=10) |
|
data = response.json() |
|
|
|
|
|
results = [] |
|
|
|
|
|
if data.get("AbstractText"): |
|
results.append(f"摘要: {data['AbstractText']}") |
|
|
|
|
|
if data.get("RelatedTopics"): |
|
for i, topic in enumerate(data["RelatedTopics"]): |
|
if i >= max_results: |
|
break |
|
if "Text" in topic: |
|
results.append(f"{i+1}. {topic['Text']} [来源: {topic.get('FirstURL', '未知')}]") |
|
|
|
|
|
if not results: |
|
return "未找到相关信息" |
|
|
|
return "\n\n".join(results) |
|
|
|
except Exception as e: |
|
return f"搜索错误: {str(e)}" |
|
|
|
|
|
@tool |
|
def currency_converter(amount: float, from_currency: str, to_currency: str) -> str: |
|
""" |
|
使用实时汇率转换货币 |
|
|
|
支持的货币: USD, EUR, GBP, JPY, CNY, CAD, AUD, CHF, INR, RUB |
|
""" |
|
try: |
|
|
|
url = f"https://api.exchangerate-api.com/v4/latest/{from_currency.upper()}" |
|
response = requests.get(url, timeout=5) |
|
data = response.json() |
|
|
|
if "error" in data: |
|
return f"货币错误: {data['error']}" |
|
|
|
rates = data.get("rates", {}) |
|
rate = rates.get(to_currency.upper()) |
|
|
|
if not rate: |
|
return f"不支持的目标货币: {to_currency}" |
|
|
|
converted = amount * rate |
|
return f"{amount} {from_currency.upper()} = {converted:.2f} {to_currency.upper()}" |
|
|
|
except Exception as e: |
|
return f"货币转换错误: {str(e)}" |
|
|
|
|
|
def get_tools(): |
|
return [ |
|
calculator, |
|
scientific_calculator, |
|
unit_converter, |
|
time_calculator, |
|
web_search, |
|
currency_converter |
|
] |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print("计算器测试:") |
|
print(calculator("2 + 3 * 4")) |
|
print(calculator("sqrt(16)")) |
|
print(calculator("sin(pi/2)")) |
|
|
|
|
|
print("\n科学计算器测试:") |
|
print(scientific_calculator("mean", [1, 2, 3, 4, 5])) |
|
print(scientific_calculator("compound_interest", [1000, 5, 10, 1])) |
|
|
|
|
|
print("\n单位转换测试:") |
|
print(unit_converter(10, "km", "m")) |
|
print(unit_converter(32, "F", "C")) |
|
|
|
|
|
print("\n时间计算测试:") |
|
print(time_calculator("current_time")) |
|
print(time_calculator("add_duration", "2023-01-01 00:00:00", "2d 3h")) |
|
|
|
|
|
print("\n网络搜索测试:") |
|
print(web_search("Python programming", max_results=2)) |
|
|
|
|
|
print("\n货币转换测试:") |
|
print(currency_converter(100, "USD", "EUR")) |