Spaces:
Sleeping
Sleeping
import os | |
import requests | |
from contextlib import asynccontextmanager | |
from bs4 import BeautifulSoup | |
from fastapi import FastAPI, HTTPException | |
from neo4j import GraphDatabase, basic_auth | |
import google.generativeai as genai | |
import logging # Import logging module | |
# --- Logging Configuration --- | |
# Basic logger configuration to display INFO messages and above. | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) # Create a logger instance for this module | |
# --- Environment Variable Configuration --- | |
NEO4J_URI = os.getenv("NEO4J_URI") | |
NEO4J_USER = os.getenv("NEO4J_USER") | |
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") | |
# Validation of essential configurations | |
if not NEO4J_URI or not NEO4J_USER or not NEO4J_PASSWORD: | |
logger.critical("CRITICAL ERROR: NEO4J_URI, NEO4J_USER, and NEO4J_PASSWORD environment variables must be set.") | |
# --- Application Lifecycle (Startup/Shutdown) --- | |
async def lifespan(app: FastAPI): | |
"""Handles startup and shutdown events.""" | |
# Initialize Gemini Client | |
logger.info("Initializing Gemini client...") | |
if genai: | |
try: | |
# Assuming GEMINI_API_KEY is set in environment or loaded via settings | |
api_key = os.getenv("GEMINI_API_KEY") or getattr(settings, "GEMINI_API_KEY", None) | |
if not api_key: | |
raise ValueError("GEMINI_API_KEY not found in environment or settings.") | |
else: | |
genai.configure(api_key=api_key) | |
logger.info("Gemini client configured successfully.") | |
except Exception as e: | |
logger.error(f"Failed to configure Gemini client: {e}", exc_info=True) | |
else: | |
logger.warning("Gemini library not imported. Endpoints requiring Gemini will not work.") | |
yield # API runs here | |
# --- Shutdown --- | |
logger.info("API shutting down...") | |
logger.info("API shutdown complete.") | |
# Initialize FastAPI application | |
app = FastAPI( | |
title="Neo4j Importer", | |
description="API to fetch documents, summarize it with Gemini, and add it to Neo4j.", | |
version="1.0.0", | |
lifespan=lifespan | |
) | |
# --- Utility Functions (Adapted from your script) --- | |
def get_content(number: str, node_type: str) -> str: | |
"""Fetches raw HTML content from Arxiv or other sources.""" | |
redirect_links = { | |
"Patent": f"https://patents.google.com/patent/{number}/en", | |
"ResearchPaper": f"https://arxiv.org/abs/{number}" | |
} | |
url = redirect_links.get(node_type) | |
if not url: | |
logger.warning(f"Unknown node type: {node_type} for number {number}") | |
return "" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() # Raises HTTPError for bad responses (4XX or 5XX) | |
return response.content.decode('utf-8', errors='replace').replace("\n", "") | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Request error for {node_type} number: {number} at URL {url}: {e}") | |
return "" | |
except Exception as e: | |
logger.error(f"An unexpected error occurred in get_content for {number}: {e}") | |
return "" | |
def extract_arxiv(rp_number: str, node_type: str = "ResearchPaper") -> dict: | |
"""Extracts information from an Arxiv research paper and generates a summary.""" | |
rp_data = { | |
"document": f"Arxiv {rp_number}", # ID for the paper | |
"title": "Error fetching content or content not found", | |
"abstract": "Error fetching content or content not found", | |
"summary": "Summary not yet generated" # Default summary | |
} | |
raw_content = get_content(rp_number, node_type) | |
if not raw_content: | |
logger.warning(f"No content fetched for Arxiv ID: {rp_number}") | |
return rp_data # Returns default error data | |
try: | |
soup = BeautifulSoup(raw_content, 'html.parser') | |
# Extract Title | |
title_tag = soup.find('h1', class_='title') | |
if title_tag and title_tag.find('span', class_='descriptor'): | |
title_text = title_tag.find('span', class_='descriptor').next_sibling | |
if title_text and isinstance(title_text, str): | |
rp_data["title"] = title_text.strip() | |
else: | |
rp_data["title"] = title_tag.get_text(separator=" ", strip=True).replace("Title:", "").strip() | |
elif title_tag : # Fallback if the span descriptor is not there but h1.title exists | |
rp_data["title"] = title_tag.get_text(separator=" ", strip=True).replace("Title:", "").strip() | |
# Extract Abstract | |
abstract_tag = soup.find('blockquote', class_='abstract') | |
if abstract_tag: | |
abstract_text = abstract_tag.get_text(strip=True) | |
if abstract_text.lower().startswith('abstract'): # Check if "abstract" (case-insensitive) is at the beginning | |
# Find the first occurrence of ':' after "abstract" or just remove "abstract" prefix | |
prefix_end = abstract_text.lower().find('abstract') + len('abstract') | |
if prefix_end < len(abstract_text) and abstract_text[prefix_end] == ':': | |
prefix_end += 1 # Include the colon in removal | |
abstract_text = abstract_text[prefix_end:].strip() | |
rp_data["abstract"] = abstract_text | |
# Mark if title or abstract are still not found | |
if rp_data["title"] == "Error fetching content or content not found" and not title_tag: | |
rp_data["title"] = "Title not found on page" | |
if rp_data["abstract"] == "Error fetching content or content not found" and not abstract_tag: | |
rp_data["abstract"] = "Abstract not found on page" | |
except Exception as e: | |
logger.error(f"Failed to parse content for Arxiv ID {rp_number}: {e}") | |
# Generate summary with Gemini API if available and abstract exists | |
if rp_data["abstract"] and \ | |
not rp_data["abstract"].startswith("Error fetching content") and \ | |
not rp_data["abstract"].startswith("Abstract not found"): | |
prompt = f"""You are a 3GPP standardization expert. Summarize the key information in the provided document in simple technical English relevant to identifying potential Key Issues. | |
Focus on challenges, gaps, or novel aspects. | |
Here is the document: <document>{rp_data['abstract']}<document>""" | |
try: | |
model = genai.GenerativeModel("gemini-2.5-flash-preview-05-20") | |
response = model.generate_content(prompt) | |
rp_data["summary"] = response.text | |
logger.info(f"Summary generated for Arxiv ID: {rp_number}") | |
except Exception as e: | |
logger.error(f"Error generating summary with Gemini for Arxiv ID {rp_number}: {e}") | |
rp_data["summary"] = "Error generating summary (API failure)" | |
else: | |
rp_data["summary"] = "Summary not generated (Abstract unavailable or problematic)" | |
return rp_data | |
def extract_google_patents(patent_number: str, node_type: str = "Patent"): | |
""" | |
Extracts information from a Google Patents page with robust error handling. | |
""" | |
# Initialize a dictionary with default error messages for consistency. | |
patent_data = { | |
"number": f"{patent_number}", | |
"title": "Error fetching content or content not found", | |
"description": "Error fetching content or content not found", | |
"claim": "Error fetching content or content not found", | |
"summary": "Summary not yet generated" # Default summary | |
} | |
# Use the generic get_content function to fetch the raw page content. | |
raw_content = get_content(patent_number, node_type) | |
if not raw_content: | |
logger.warning(f"No content fetched for Patent ID: {patent_number}") | |
return patent_data # Return the dictionary with default error messages. | |
try: | |
# Let BeautifulSoup handle the decoding from raw bytes. | |
soup = BeautifulSoup(raw_content, 'html.parser') | |
# --- Extract Title --- | |
title_tag = soup.find('meta', attrs={'name': 'DC.title'}) | |
if title_tag and title_tag.get('content'): | |
patent_data["title"] = title_tag['content'].strip() | |
else: | |
# Fallback to finding the title in an <h1> tag. | |
title_h1 = soup.find('h1', id='title') | |
if title_h1: | |
patent_data["title"] = title_h1.get_text(strip=True) | |
# --- Extract Description --- | |
description_section = soup.find('section', itemprop='description') | |
if description_section: | |
# Remove unnecessary nested spans to clean the output. | |
for src_text in description_section.find_all('span', class_='google-src-text'): | |
src_text.decompose() | |
patent_data["description"] = description_section.get_text(separator=' ', strip=True) | |
# --- Extract Claims --- | |
claims_section = soup.find('section', itemprop='claims') | |
if claims_section: | |
# Remove unnecessary nested spans here as well. | |
for src_text in claims_section.find_all('span', class_='google-src-text'): | |
src_text.decompose() | |
patent_data["claim"] = claims_section.get_text(separator=' ', strip=True) | |
# Update status message if specific sections were not found on the page. | |
if patent_data["title"] == "Error fetching content or content not found": | |
patent_data["title"] = "Title not found on page" | |
if patent_data["description"] == "Error fetching content or content not found": | |
patent_data["description"] = "Description not found on page" | |
if patent_data["claim"] == "Error fetching content or content not found": | |
patent_data["claim"] = "Claim not found on page" | |
except Exception as e: | |
# Catch any unexpected errors during the parsing process. | |
logger.error(f"Failed to parse content for Patent ID {patent_number}: {e}") | |
# Generate summary with Gemini API if available and abstract exists | |
if patent_data["description"] and \ | |
not patent_data["description"].startswith("Error fetching content") and \ | |
not patent_data["description"].startswith("Description not found"): | |
prompt = f"""You are a 3GPP standardization expert. Summarize the key information in the provided document in simple technical English relevant to identifying potential Key Issues. | |
Focus on challenges, gaps, or novel aspects. | |
Here is the document: <document>{patent_data['description']}<document>""" | |
try: | |
model = genai.GenerativeModel("gemini-2.5-flash-preview-05-20") | |
response = model.generate_content(prompt) | |
patent_data["summary"] = response.text | |
logger.info(f"Summary generated for Patent ID: {patent_number}") | |
except Exception as e: | |
logger.error(f"Error generating summary with Gemini for Patent ID {patent_number}: {e}") | |
patent_data["summary"] = "Error generating summary (API failure)" | |
else: | |
rp_data["summary"] = "Summary not generated (Description unavailable or problematic)" | |
return patent_data | |
def add_nodes_to_neo4j(driver, data_list: list, node_type: str): | |
"""Adds a list of nodes to Neo4j in a single transaction.""" | |
if not data_list: | |
logger.warning("No data provided to add_nodes_to_neo4j.") | |
return 0 | |
query = ( | |
"UNWIND $data as properties " | |
f"CREATE (n:{node_type}) " | |
"SET n = properties" | |
) | |
# query = ( | |
# f"UNWIND $data as properties " | |
# f"MERGE (n:{node_type} {{arxiv_id: properties.arxiv_id}}) " # Use MERGE for idempotency | |
# f"ON CREATE SET n = properties " | |
# f"ON MATCH SET n += properties" # Update properties if the node already exists | |
# ) | |
try: | |
with driver.session(database="neo4j") as session: # Specify database if not default | |
result = session.execute_write(lambda tx: tx.run(query, data=data_list).consume()) | |
nodes_created = result.counters.nodes_created | |
if nodes_created > 0: | |
logger.info(f"{nodes_created} new {node_type} node(s) added successfully.") | |
return nodes_created # Return the number of nodes actually created | |
except Exception as e: | |
logger.error(f"Neo4j Error - Failed to add/update {node_type} nodes: {e}") | |
raise HTTPException(status_code=500, detail=f"Neo4j database error: {e}") | |
# --- FastAPI Endpoint --- | |
# API state check route | |
def read_root(): | |
return {"status": "ok"} | |
# 201 Created for successful creation | |
async def add_single_research_paper(arxiv_id: str): | |
""" | |
Fetches a research paper from Arxiv by its ID, extracts information, | |
generates a summary, and adds/updates it as a 'ResearchPaper' node in Neo4j. | |
""" | |
node_type = "ResearchPaper" | |
logger.info(f"Processing request for Arxiv ID: {arxiv_id}") | |
if not NEO4J_URI or not NEO4J_USER or not NEO4J_PASSWORD: | |
logger.error("Neo4j database connection details are not configured on the server.") | |
raise HTTPException(status_code=500, detail="Neo4j database connection details are not configured on the server.") | |
# Step 1: Extract paper data | |
paper_data = extract_arxiv(arxiv_id, node_type) | |
if paper_data["title"].startswith("Error fetching content") or paper_data["title"] == "Title not found on page": | |
logger.warning(f"Could not fetch or parse content for Arxiv ID {arxiv_id}. Title: {paper_data['title']}") | |
raise HTTPException(status_code=404, detail=f"Could not fetch or parse content for Arxiv ID {arxiv_id}. Title: {paper_data['title']}") | |
# Step 2: Add/Update in Neo4j | |
driver_instance = None # Initialize for the finally block | |
try: | |
auth_token = basic_auth(NEO4J_USER, NEO4J_PASSWORD) | |
driver_instance = GraphDatabase.driver(NEO4J_URI, auth=auth_token) | |
driver_instance.verify_connectivity() | |
logger.info("Successfully connected to Neo4j.") | |
nodes_created_count = add_nodes_to_neo4j(driver_instance, [paper_data], node_type) | |
if nodes_created_count > 0 : | |
logger.info(f"Research paper {arxiv_id} was successfully added to Neo4j.") | |
status_code_response = 201 # Created | |
# Note: FastAPI uses the status_code from the decorator or HTTPException. | |
# This custom status_code_response is for the JSON body if needed, but the actual HTTP response status | |
# will be 201 (from decorator) unless an HTTPException overrides it or we change the decorator based on logic. | |
# For simplicity here, we'll return it in the body and let the decorator's 201 stand if no error. | |
# A more advanced setup might change the response status dynamically. | |
return {"data": paper_data} | |
except HTTPException as e: # Re-raise HTTPExceptions | |
logger.error(f"HTTPException during Neo4j operation for {arxiv_id}: {e.detail}") | |
raise e | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during Neo4j operation for {arxiv_id}: {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail=f"An unexpected server error occurred: {e}") | |
finally: | |
if driver_instance: | |
driver_instance.close() | |
logger.info("Neo4j connection closed.") | |
# 201 Created for successful creation | |
async def add_single_patent(patent_id: str): | |
""" | |
Fetches a patent from Google Patents by its ID, extracts information, | |
generates a summary, and adds/updates it as a 'Patent' node in Neo4j. | |
""" | |
node_type = "Patent" | |
logger.info(f"Processing request for Patent ID: {patent_id}") | |
if not NEO4J_URI or not NEO4J_USER or not NEO4J_PASSWORD: | |
logger.error("Neo4j database connection details are not configured on the server.") | |
raise HTTPException(status_code=500, detail="Neo4j database connection details are not configured on the server.") | |
# Step 1: Extract patent data | |
patent_data = extract_google_patents(patent_id, node_type) | |
if patent_data["title"].startswith("Error fetching content") or patent_data["title"] == "Title not found on page": | |
logger.warning(f"Could not fetch or parse content for Patent ID {patent_id}. Title: {patent_data['title']}") | |
raise HTTPException(status_code=404, detail=f"Could not fetch or parse content for Patent ID {patent_id}. Title: {patent_data['title']}") | |
# Step 2: Add/Update in Neo4j | |
driver_instance = None # Initialize for the finally block | |
try: | |
auth_token = basic_auth(NEO4J_USER, NEO4J_PASSWORD) | |
driver_instance = GraphDatabase.driver(NEO4J_URI, auth=auth_token) | |
driver_instance.verify_connectivity() | |
logger.info("Successfully connected to Neo4j.") | |
nodes_created_count = add_nodes_to_neo4j(driver_instance, [patent_data], node_type) | |
if nodes_created_count > 0 : | |
logger.info(f"Patent {patent_id} was successfully added to Neo4j.") | |
status_code_response = 201 # Created | |
# Note: FastAPI uses the status_code from the decorator or HTTPException. | |
# This custom status_code_response is for the JSON body if needed, but the actual HTTP response status | |
# will be 201 (from decorator) unless an HTTPException overrides it or we change the decorator based on logic. | |
# For simplicity here, we'll return it in the body and let the decorator's 201 stand if no error. | |
# A more advanced setup might change the response status dynamically. | |
return {"data": patent_data} | |
except HTTPException as e: # Re-raise HTTPExceptions | |
logger.error(f"HTTPException during Neo4j operation for {patent_id}: {e.detail}") | |
raise e | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during Neo4j operation for {patent_id}: {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail=f"An unexpected server error occurred: {e}") | |
finally: | |
if driver_instance: | |
driver_instance.close() | |
logger.info("Neo4j connection closed.") |