|
import os |
|
import requests |
|
import shutil |
|
import logging |
|
|
|
logger = logging.getLogger("eval_logger") |
|
|
|
|
|
def fetch_all_questions(api_url: str) -> list[dict] | None: |
|
""" |
|
Fetches all questions from the API. |
|
|
|
Args: |
|
api_url: The base URL of the scoring API. |
|
|
|
Returns: |
|
A list of question dictionaries, or None if an error occurs. |
|
""" |
|
questions_url = f"{api_url}/questions" |
|
logger.info(f"Fetching all questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
logger.warning("Fetched questions list is empty.") |
|
return None |
|
logger.info(f"Fetched {len(questions_data)} questions successfully.") |
|
return questions_data |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error fetching all questions: {e}", exc_info=True) |
|
return None |
|
except requests.exceptions.JSONDecodeError as e: |
|
logger.error(f"Error decoding JSON response from questions endpoint: {e}", exc_info=True) |
|
logger.error(f"Response text: {response.text[:500] if response else 'No response'}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"An unexpected error occurred fetching all questions: {e}", exc_info=True) |
|
return None |
|
|
|
def fetch_random_question(api_url: str) -> dict | None: |
|
""" |
|
Fetches a single random question from the API. |
|
|
|
Args: |
|
api_url: The base URL of the scoring API. |
|
|
|
Returns: |
|
A dictionary representing a single question, or None if an error occurs. |
|
""" |
|
random_question_url = f"{api_url}/random-question" |
|
logger.info(f"Fetching random question from: {random_question_url}") |
|
try: |
|
response = requests.get(random_question_url, timeout=15) |
|
response.raise_for_status() |
|
question_data = response.json() |
|
if not question_data: |
|
logger.warning("Fetched random question is empty.") |
|
return None |
|
logger.info(f"Fetched random question successfully: {question_data.get('task_id')}") |
|
return question_data |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error fetching random question: {e}", exc_info=True) |
|
return None |
|
except requests.exceptions.JSONDecodeError as e: |
|
logger.error(f"Error decoding JSON response from random question endpoint: {e}", exc_info=True) |
|
logger.error(f"Response text: {response.text[:500] if response else 'No response'}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"An unexpected error occurred fetching random question: {e}", exc_info=True) |
|
return None |
|
|
|
def download_file(api_url: str, task_id: str, file_name: str, download_dir: str = "downloads") -> str | None: |
|
""" |
|
Downloads a specific file associated with a given task ID. |
|
|
|
Args: |
|
api_url: The base URL of the scoring API. |
|
task_id: The ID of the task for which to download the file. |
|
file_name: The name of the file to be saved. |
|
download_dir: The directory where the file should be saved. Defaults to "downloads". |
|
|
|
Returns: |
|
The local path to the downloaded file, or None if an error occurs. |
|
""" |
|
if not file_name: |
|
logger.info(f"No file_name provided for task_id {task_id}. Skipping download.") |
|
return None |
|
|
|
file_url = f"{api_url}/files/{task_id}" |
|
|
|
os.makedirs(download_dir, exist_ok=True) |
|
|
|
local_file_path = os.path.join(download_dir, file_name) |
|
|
|
if os.path.exists(local_file_path): |
|
logger.info(f"File already exists at {local_file_path}. Skipping download.") |
|
return local_file_path |
|
|
|
logger.info(f"Downloading file for task_id {task_id} from: {file_url} to {local_file_path}") |
|
try: |
|
with requests.get(file_url, stream=True, timeout=30) as r: |
|
r.raise_for_status() |
|
with open(local_file_path, 'wb') as f: |
|
shutil.copyfileobj(r.raw, f) |
|
logger.info(f"File downloaded successfully: {local_file_path}") |
|
return local_file_path |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error downloading file for task_id {task_id}: {e}", exc_info=True) |
|
if os.path.exists(local_file_path): |
|
os.remove(local_file_path) |
|
return None |
|
except Exception as e: |
|
logger.error(f"An unexpected error occurred downloading file for task_id {task_id}: {e}", exc_info=True) |
|
if os.path.exists(local_file_path): |
|
os.remove(local_file_path) |
|
return None |
|
|
|
if __name__ == '__main__': |
|
print("--- Testing dataset_helper.py directly ---") |
|
print("NOTE: For full logging, run through app.py. This direct test uses print statements.") |
|
|
|
test_api_url = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
print("\n--- Testing fetch_all_questions ---") |
|
questions = fetch_all_questions(test_api_url) |
|
if questions: |
|
print(f"Successfully fetched {len(questions)} questions. First question task_id: {questions[0].get('task_id')}") |
|
else: |
|
print("Failed to fetch all questions.") |
|
|
|
print("\n--- Testing fetch_random_question ---") |
|
random_q = fetch_random_question(test_api_url) |
|
if random_q: |
|
print(f"Successfully fetched random question: {random_q.get('question')[:50]}...") |
|
else: |
|
print("Failed to fetch random question.") |
|
|
|
print("\n--- Testing download_file (example with a known task_id and file_name if available) ---") |
|
if questions: |
|
test_task_with_file = None |
|
test_file_name = None |
|
for q_item in questions: |
|
if q_item.get("file_name"): |
|
test_task_with_file = q_item.get("task_id") |
|
test_file_name = q_item.get("file_name") |
|
break |
|
|
|
if test_task_with_file and test_file_name: |
|
print(f"Attempting to download file for task_id: {test_task_with_file}, file_name: {test_file_name}") |
|
downloaded_path = download_file(test_api_url, test_task_with_file, test_file_name) |
|
if downloaded_path: |
|
print(f"File downloaded to: {downloaded_path}") |
|
else: |
|
print(f"Failed to download file for task_id: {test_task_with_file}") |
|
else: |
|
print("No question with an associated file found in the first batch of questions to test download.") |
|
else: |
|
print("Skipping download_file test as fetching questions failed.") |
|
|
|
print("\n--- Testing download_file (with a task_id that might not have a file or invalid file_name) ---") |
|
if questions and questions[0].get("file_name") == "": |
|
task_id_no_file = questions[0].get("task_id") |
|
file_name_empty = questions[0].get("file_name") |
|
print(f"Attempting to download file for task_id: {task_id_no_file} (expected to skip due to empty file_name)") |
|
path_no_file = download_file(test_api_url, task_id_no_file, file_name_empty) |
|
if path_no_file is None: |
|
print("Correctly skipped download or failed as expected for task with no file_name.") |
|
else: |
|
print(f"Unexpectedly downloaded something to {path_no_file} for a task with no file_name.") |
|
else: |
|
print("Skipping test for task with no file_name (either no questions or first question has a file).") |