|
|
|
""" |
|
Script pour tester si un fournisseur d'API supporte réellement les requêtes parallèles |
|
""" |
|
import os |
|
import sys |
|
import time |
|
import asyncio |
|
import json |
|
from pathlib import Path |
|
from datetime import datetime |
|
|
|
|
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
|
|
|
|
MODEL_NAME = "Qwen/QwQ-32B" |
|
PROVIDER = "novita" |
|
REQUEST_COUNT = 5 |
|
|
|
|
|
PROMPTS = [ |
|
"Explain in detail how parallel computing has transformed modern data processing.", |
|
"Describe the fundamental differences between CPU and GPU architectures.", |
|
"Analyze the key challenges in distributed systems design.", |
|
"Discuss the evolution of natural language processing from rule-based systems to modern transformer architectures.", |
|
"Explain the concept of quantum computing and how it differs from classical computing paradigms." |
|
] |
|
|
|
async def send_request(prompt, request_id=None, show_logs=True): |
|
"""Envoie une requête au modèle et mesure le temps d'exécution""" |
|
if show_logs and request_id is not None: |
|
print(f"Démarrage requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") |
|
|
|
start_time = time.time() |
|
|
|
cmd_args = [ |
|
"curl", "-s", |
|
"-X", "POST", |
|
f"https://api-inference.huggingface.co/models/{MODEL_NAME}", |
|
"-H", f"Authorization: Bearer {os.environ.get('HF_TOKEN')}", |
|
"-H", "Content-Type: application/json", |
|
"-d", json.dumps({ |
|
"inputs": prompt, |
|
"parameters": { |
|
"provider": PROVIDER, |
|
"max_new_tokens": 20 |
|
} |
|
}) |
|
] |
|
|
|
process = await asyncio.create_subprocess_exec( |
|
*cmd_args, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.PIPE |
|
) |
|
|
|
stdout, stderr = await process.communicate() |
|
|
|
end_time = time.time() |
|
duration = end_time - start_time |
|
|
|
response = stdout.decode("utf-8") |
|
stderr_output = stderr.decode("utf-8") |
|
|
|
|
|
is_success = False |
|
try: |
|
response_json = json.loads(response) |
|
is_success = process.returncode == 0 and isinstance(response_json, list) and "generated_text" in response_json[0] |
|
except json.JSONDecodeError: |
|
is_success = process.returncode == 0 and not ("error" in response.lower()) |
|
except Exception: |
|
is_success = process.returncode == 0 |
|
|
|
|
|
error_message = None |
|
if not is_success: |
|
try: |
|
if "error" in response.lower(): |
|
try: |
|
response_json = json.loads(response) |
|
if "error" in response_json: |
|
error_message = response_json["error"] |
|
except: |
|
error_message = f"Erreur non-JSON: {response}" |
|
elif stderr_output: |
|
error_message = stderr_output |
|
else: |
|
error_message = f"Réponse: {response}" |
|
except: |
|
error_message = f"Erreur inconnue. Code: {process.returncode}" |
|
|
|
if show_logs and request_id is not None: |
|
print(f"Fin requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]} (durée: {duration:.2f}s)") |
|
if not is_success: |
|
print(f"ERREUR requête {request_id}: {error_message[:100]}..." if error_message and len(error_message) > 100 else error_message) |
|
|
|
return { |
|
"request_id": request_id, |
|
"prompt": prompt, |
|
"start_time": start_time, |
|
"end_time": end_time, |
|
"duration": duration, |
|
"success": is_success, |
|
"response": response, |
|
"error_message": error_message |
|
} |
|
|
|
async def run_parallel_requests(prompts): |
|
"""Exécute les requêtes en parallèle""" |
|
print(f"\n=== Test parallèle: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===") |
|
print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}") |
|
|
|
|
|
start_event = asyncio.Event() |
|
|
|
async def synchronized_request(prompt, req_id): |
|
await start_event.wait() |
|
return await send_request(prompt, req_id) |
|
|
|
|
|
tasks = [asyncio.create_task(synchronized_request(prompts[i], i)) for i in range(len(prompts))] |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
parallel_start_time = time.time() |
|
print(f"Démarrage synchronisé à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") |
|
start_event.set() |
|
|
|
|
|
results = await asyncio.gather(*tasks) |
|
parallel_end_time = time.time() |
|
parallel_duration = parallel_end_time - parallel_start_time |
|
|
|
print(f"Test parallèle terminé en {parallel_duration:.2f}s\n") |
|
return results, parallel_duration |
|
|
|
async def run_sequential_requests(prompts): |
|
"""Exécute les mêmes requêtes séquentiellement""" |
|
print(f"\n=== Test séquentiel: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===") |
|
print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}") |
|
|
|
sequential_start_time = time.time() |
|
results = [] |
|
|
|
for i, prompt in enumerate(prompts): |
|
print(f"Requête séquentielle {i}...") |
|
result = await send_request(prompt, i) |
|
results.append(result) |
|
|
|
sequential_end_time = time.time() |
|
sequential_duration = sequential_end_time - sequential_start_time |
|
|
|
print(f"Test séquentiel terminé en {sequential_duration:.2f}s\n") |
|
return results, sequential_duration |
|
|
|
async def run_tests(): |
|
"""Exécute les tests parallèles puis séquentiels et compare les résultats""" |
|
global_start = time.time() |
|
prompts = PROMPTS[:REQUEST_COUNT] |
|
|
|
|
|
parallel_results, parallel_duration = await run_parallel_requests(prompts) |
|
|
|
|
|
sequential_results, sequential_duration = await run_sequential_requests(prompts) |
|
|
|
|
|
global_end = time.time() |
|
total_duration = global_end - global_start |
|
|
|
|
|
parallel_success = sum(1 for r in parallel_results if r["success"]) |
|
sequential_success = sum(1 for r in sequential_results if r["success"]) |
|
|
|
|
|
if parallel_duration > 0: |
|
parallelism_factor = sequential_duration / parallel_duration |
|
else: |
|
parallelism_factor = 0 |
|
|
|
|
|
improvement_percent = (1 - (parallel_duration / sequential_duration)) * 100 if sequential_duration > 0 else 0 |
|
|
|
|
|
print("\n====== RÉSUMÉ DES TESTS ======") |
|
print(f"Modèle: {MODEL_NAME}, Provider: {PROVIDER}, Requêtes: {len(prompts)}") |
|
print(f"\nDurée test parallèle: {parallel_duration:.2f}s ({parallel_success}/{len(prompts)} réussies)") |
|
print(f"Durée test séquentiel: {sequential_duration:.2f}s ({sequential_success}/{len(prompts)} réussies)") |
|
print(f"Facteur de parallélisme: {parallelism_factor:.2f}x") |
|
print(f"Amélioration: {improvement_percent:.1f}%") |
|
|
|
if parallelism_factor >= len(prompts) * 0.8: |
|
conclusion = "EXCELLENT parallélisme (proche du théorique maximum)" |
|
elif parallelism_factor >= 2: |
|
conclusion = "BON parallélisme (significativement meilleur que séquentiel)" |
|
elif parallelism_factor >= 1.3: |
|
conclusion = "MOYEN parallélisme (légèrement meilleur que séquentiel)" |
|
else: |
|
conclusion = "FAIBLE ou PAS DE parallélisme (pas d'avantage significatif)" |
|
|
|
print(f"\nConclusion: {conclusion}") |
|
|
|
|
|
output_file = f"parallel_test_{PROVIDER}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
with open(output_file, 'w') as f: |
|
json.dump({ |
|
"model": MODEL_NAME, |
|
"provider": PROVIDER, |
|
"request_count": len(prompts), |
|
"parallel_duration": parallel_duration, |
|
"sequential_duration": sequential_duration, |
|
"parallelism_factor": parallelism_factor, |
|
"improvement_percent": improvement_percent, |
|
"conclusion": conclusion, |
|
"parallel_results": parallel_results, |
|
"sequential_results": sequential_results |
|
}, f, indent=2) |
|
|
|
print(f"\nRésultats détaillés sauvegardés dans {output_file}") |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(run_tests()) |