import os import requests import shutil import logging logger = logging.getLogger("eval_logger") def fetch_all_questions(api_url: str) -> list[dict] | None: """ Fetches all questions from the API. Args: api_url: The base URL of the scoring API. Returns: A list of question dictionaries, or None if an error occurs. """ questions_url = f"{api_url}/questions" logger.info(f"Fetching all questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: logger.warning("Fetched questions list is empty.") return None logger.info(f"Fetched {len(questions_data)} questions successfully.") return questions_data except requests.exceptions.RequestException as e: logger.error(f"Error fetching all questions: {e}", exc_info=True) return None except requests.exceptions.JSONDecodeError as e: logger.error(f"Error decoding JSON response from questions endpoint: {e}", exc_info=True) logger.error(f"Response text: {response.text[:500] if response else 'No response'}") return None except Exception as e: logger.error(f"An unexpected error occurred fetching all questions: {e}", exc_info=True) return None def fetch_random_question(api_url: str) -> dict | None: """ Fetches a single random question from the API. Args: api_url: The base URL of the scoring API. Returns: A dictionary representing a single question, or None if an error occurs. """ random_question_url = f"{api_url}/random-question" logger.info(f"Fetching random question from: {random_question_url}") try: response = requests.get(random_question_url, timeout=15) response.raise_for_status() question_data = response.json() if not question_data: logger.warning("Fetched random question is empty.") return None logger.info(f"Fetched random question successfully: {question_data.get('task_id')}") return question_data except requests.exceptions.RequestException as e: logger.error(f"Error fetching random question: {e}", exc_info=True) return None except requests.exceptions.JSONDecodeError as e: logger.error(f"Error decoding JSON response from random question endpoint: {e}", exc_info=True) logger.error(f"Response text: {response.text[:500] if response else 'No response'}") return None except Exception as e: logger.error(f"An unexpected error occurred fetching random question: {e}", exc_info=True) return None def download_file(api_url: str, task_id: str, file_name: str, download_dir: str = "downloads") -> str | None: """ Downloads a specific file associated with a given task ID. Args: api_url: The base URL of the scoring API. task_id: The ID of the task for which to download the file. file_name: The name of the file to be saved. download_dir: The directory where the file should be saved. Defaults to "downloads". Returns: The local path to the downloaded file, or None if an error occurs. """ if not file_name: logger.info(f"No file_name provided for task_id {task_id}. Skipping download.") return None file_url = f"{api_url}/files/{task_id}" os.makedirs(download_dir, exist_ok=True) local_file_path = os.path.join(download_dir, file_name) if os.path.exists(local_file_path): logger.info(f"File already exists at {local_file_path}. Skipping download.") return local_file_path logger.info(f"Downloading file for task_id {task_id} from: {file_url} to {local_file_path}") try: with requests.get(file_url, stream=True, timeout=30) as r: r.raise_for_status() with open(local_file_path, 'wb') as f: shutil.copyfileobj(r.raw, f) logger.info(f"File downloaded successfully: {local_file_path}") return local_file_path except requests.exceptions.RequestException as e: logger.error(f"Error downloading file for task_id {task_id}: {e}", exc_info=True) if os.path.exists(local_file_path): os.remove(local_file_path) return None except Exception as e: logger.error(f"An unexpected error occurred downloading file for task_id {task_id}: {e}", exc_info=True) if os.path.exists(local_file_path): os.remove(local_file_path) return None if __name__ == '__main__': print("--- Testing dataset_helper.py directly ---") print("NOTE: For full logging, run through app.py. This direct test uses print statements.") test_api_url = "https://agents-course-unit4-scoring.hf.space" print("\n--- Testing fetch_all_questions ---") questions = fetch_all_questions(test_api_url) if questions: print(f"Successfully fetched {len(questions)} questions. First question task_id: {questions[0].get('task_id')}") else: print("Failed to fetch all questions.") print("\n--- Testing fetch_random_question ---") random_q = fetch_random_question(test_api_url) if random_q: print(f"Successfully fetched random question: {random_q.get('question')[:50]}...") else: print("Failed to fetch random question.") print("\n--- Testing download_file (example with a known task_id and file_name if available) ---") if questions: test_task_with_file = None test_file_name = None for q_item in questions: if q_item.get("file_name"): test_task_with_file = q_item.get("task_id") test_file_name = q_item.get("file_name") break if test_task_with_file and test_file_name: print(f"Attempting to download file for task_id: {test_task_with_file}, file_name: {test_file_name}") downloaded_path = download_file(test_api_url, test_task_with_file, test_file_name) if downloaded_path: print(f"File downloaded to: {downloaded_path}") else: print(f"Failed to download file for task_id: {test_task_with_file}") else: print("No question with an associated file found in the first batch of questions to test download.") else: print("Skipping download_file test as fetching questions failed.") print("\n--- Testing download_file (with a task_id that might not have a file or invalid file_name) ---") if questions and questions[0].get("file_name") == "": task_id_no_file = questions[0].get("task_id") file_name_empty = questions[0].get("file_name") print(f"Attempting to download file for task_id: {task_id_no_file} (expected to skip due to empty file_name)") path_no_file = download_file(test_api_url, task_id_no_file, file_name_empty) if path_no_file is None: print("Correctly skipped download or failed as expected for task with no file_name.") else: print(f"Unexpectedly downloaded something to {path_no_file} for a task with no file_name.") else: print("Skipping test for task with no file_name (either no questions or first question has a file).")