|
import json |
|
import pandas as pd |
|
import gradio as gr |
|
from typing import Dict, Any, Type |
|
from web2json.preprocessor import BasicPreprocessor |
|
from web2json.ai_extractor import AIExtractor, RAGExtractor, GeminiLLMClient |
|
from web2json.postprocessor import PostProcessor |
|
from web2json.pipeline import Pipeline |
|
from pydantic import BaseModel, Field, create_model |
|
import os |
|
import dotenv |
|
|
|
dotenv.load_dotenv() |
|
|
|
def parse_schema_input(schema_input: str) -> Type[BaseModel]: |
|
""" |
|
Convert user schema input to a Pydantic BaseModel. |
|
Supports multiple input formats: |
|
1. JSON schema format |
|
2. Python class definition |
|
3. Simple field definitions |
|
""" |
|
schema_input = schema_input.strip() |
|
|
|
if not schema_input: |
|
|
|
return create_model('DefaultSchema', |
|
title=(str, Field(description="Title of the content")), |
|
content=(str, Field(description="Main content"))) |
|
|
|
try: |
|
|
|
if schema_input.startswith('{'): |
|
schema_dict = json.loads(schema_input) |
|
return json_schema_to_basemodel(schema_dict) |
|
|
|
|
|
elif 'class ' in schema_input and 'BaseModel' in schema_input: |
|
return python_class_to_basemodel(schema_input) |
|
|
|
|
|
else: |
|
return simple_fields_to_basemodel(schema_input) |
|
|
|
except Exception as e: |
|
raise ValueError(f"Could not parse schema: {str(e)}. Please check your schema format.") |
|
|
|
def json_schema_to_basemodel(schema_dict: Dict) -> Type[BaseModel]: |
|
"""Convert JSON schema to BaseModel""" |
|
fields = {} |
|
properties = schema_dict.get('properties', {}) |
|
required = schema_dict.get('required', []) |
|
|
|
for field_name, field_info in properties.items(): |
|
field_type = get_python_type(field_info.get('type', 'string')) |
|
field_description = field_info.get('description', '') |
|
|
|
if field_name in required: |
|
fields[field_name] = (field_type, Field(description=field_description)) |
|
else: |
|
fields[field_name] = (field_type, Field(default=None, description=field_description)) |
|
|
|
return create_model('DynamicSchema', **fields) |
|
|
|
def python_class_to_basemodel(class_definition: str) -> Type[BaseModel]: |
|
"""Convert Python class definition to BaseModel""" |
|
try: |
|
|
|
namespace = {'BaseModel': BaseModel, 'Field': Field, 'str': str, 'int': int, |
|
'float': float, 'bool': bool, 'list': list, 'dict': dict} |
|
exec(class_definition, namespace) |
|
|
|
|
|
for name, obj in namespace.items(): |
|
if (isinstance(obj, type) and |
|
issubclass(obj, BaseModel) and |
|
obj != BaseModel): |
|
return obj |
|
|
|
raise ValueError("No BaseModel class found in definition") |
|
except Exception as e: |
|
raise ValueError(f"Invalid Python class definition: {str(e)}") |
|
|
|
def simple_fields_to_basemodel(fields_text: str) -> Type[BaseModel]: |
|
"""Convert simple field definitions to BaseModel""" |
|
fields = {} |
|
|
|
for line in fields_text.strip().split('\n'): |
|
line = line.strip() |
|
if not line or line.startswith('#'): |
|
continue |
|
|
|
|
|
if ':' in line: |
|
parts = line.split(':', 1) |
|
field_name = parts[0].strip() |
|
|
|
type_and_desc = parts[1].strip() |
|
if '=' in type_and_desc: |
|
type_part, desc_part = type_and_desc.split('=', 1) |
|
field_type = get_python_type(type_part.strip()) |
|
description = desc_part.strip().strip('"\'') |
|
else: |
|
field_type = get_python_type(type_and_desc.strip()) |
|
description = "" |
|
|
|
fields[field_name] = (field_type, Field(description=description)) |
|
else: |
|
|
|
field_name = line.strip() |
|
fields[field_name] = (str, Field(description="")) |
|
|
|
if not fields: |
|
raise ValueError("No valid fields found in schema definition") |
|
|
|
return create_model('DynamicSchema', **fields) |
|
|
|
def get_python_type(type_str: str): |
|
"""Convert type string to Python type""" |
|
type_str = type_str.lower().strip() |
|
type_mapping = { |
|
'string': str, 'str': str, |
|
'integer': int, 'int': int, |
|
'number': float, 'float': float, |
|
'boolean': bool, 'bool': bool, |
|
'array': list, 'list': list, |
|
'object': dict, 'dict': dict |
|
} |
|
return type_mapping.get(type_str, str) |
|
|
|
def webpage_to_json_wrapper(content: str, is_url: bool, schema_input: str) -> Dict[str, Any]: |
|
"""Wrapper function that converts schema input to BaseModel""" |
|
try: |
|
|
|
schema_model = parse_schema_input(schema_input) |
|
|
|
|
|
return webpage_to_json(content, is_url, schema_model) |
|
|
|
except Exception as e: |
|
return {"error": f"Schema parsing error: {str(e)}"} |
|
|
|
def webpage_to_json(content: str, is_url: bool, schema: BaseModel) -> Dict[str, Any]: |
|
""" |
|
Extracts structured JSON information from a given content based on a specified schema. |
|
This function sets up a processing pipeline that includes: |
|
- Preprocessing the input content. |
|
- Utilizing an AI language model to extract information according to the provided schema. |
|
- Postprocessing the extracted output to match the exact schema requirements. |
|
Parameters: |
|
content (str): The input content to be analyzed. This can be direct text or a URL content. |
|
is_url (bool): A flag indicating whether the provided content is a URL (True) or raw text (False). |
|
schema (BaseModel): A Pydantic BaseModel defining the expected structure and data types for the output. |
|
Returns: |
|
Dict[str, Any]: A dictionary containing the extracted data matching the schema. In case of errors during initialization |
|
or processing, the dictionary will include an "error" key with a descriptive message. |
|
""" |
|
prompt_template = """Extract the following information from the provided content according to the specified schema. |
|
|
|
Content to analyze: |
|
{content} |
|
|
|
Schema requirements: |
|
{schema} |
|
|
|
Instructions: |
|
- Extract only information that is explicitly present in the content |
|
- Follow the exact structure and data types specified in the schema |
|
- If a required field cannot be found, indicate this clearly |
|
- Preserve the original formatting and context where relevant |
|
- Return the extracted data in the format specified by the schema""" |
|
|
|
|
|
|
|
preprocessor = BasicPreprocessor(config={'keep_tags': False}) |
|
try: |
|
llm = GeminiLLMClient(config={'api_key': os.getenv('GEMINI_API_KEY')}) |
|
except Exception as e: |
|
return {"error": f"Failed to initialize LLM client: {str(e)}"} |
|
|
|
|
|
ai_extractor = AIExtractor(llm_client=llm, prompt_template=prompt_template) |
|
postprocessor = PostProcessor() |
|
pipeline = Pipeline(preprocessor, ai_extractor, postprocessor) |
|
|
|
try: |
|
result = pipeline.run(content, is_url, schema) |
|
print("-"*80) |
|
print(f"Processed result: {result}") |
|
return result |
|
except Exception as e: |
|
return {"error": f"Processing error: {str(e)}"} |
|
|
|
|
|
example_schemas = """ |
|
**Example Schema Formats:** |
|
|
|
1. **Simple field definitions:** |
|
``` |
|
title: str = Page title |
|
price: float = Product price |
|
description: str = Product description |
|
available: bool = Is available |
|
``` |
|
|
|
2. **JSON Schema:** |
|
```json |
|
{ |
|
"properties": { |
|
"title": {"type": "string", "description": "Page title"}, |
|
"price": {"type": "number", "description": "Product price"}, |
|
"description": {"type": "string", "description": "Product description"} |
|
}, |
|
"required": ["title"] |
|
} |
|
``` |
|
|
|
3. **Python Class Definition:** |
|
```python |
|
class ProductSchema(BaseModel): |
|
title: str = Field(description="Product title") |
|
price: float = Field(description="Product price") |
|
description: str = Field(description="Product description") |
|
available: bool = Field(default=False, description="Availability status") |
|
``` |
|
""" |
|
|
|
|
|
demo = gr.Interface( |
|
fn=webpage_to_json_wrapper, |
|
inputs=[ |
|
gr.Textbox( |
|
label="Content (URL or Raw Text)", |
|
lines=10, |
|
placeholder="Enter URL or paste raw HTML/text here." |
|
), |
|
gr.Checkbox(label="Content is URL?", value=False), |
|
gr.Textbox( |
|
label="Schema Definition", |
|
lines=15, |
|
placeholder="Define your extraction schema (see examples below)", |
|
info=example_schemas |
|
) |
|
], |
|
outputs=gr.JSON(label="Output JSON"), |
|
title="Webpage to JSON Converter", |
|
description="Convert web pages or raw text into structured JSON using customizable schemas. Define your schema using simple field definitions, JSON schema, or Python class syntax.", |
|
examples=[ |
|
[ |
|
"https://example.com", |
|
True, |
|
"title: str = Page title\nprice: float = Product price\ndescription: str = Description" |
|
], |
|
[ |
|
"<h1>Sample Product</h1><p>Price: $29.99</p><p>Great quality item</p>", |
|
False, |
|
'''{ |
|
"type": "object", |
|
"properties": { |
|
"title": { |
|
"type": "string", |
|
"description": "Name of the product" |
|
}, |
|
"price": { |
|
"type": "number", |
|
"description": "Price of the product" |
|
}, |
|
"description": { |
|
"type": "string", |
|
"description": "Detailed description of the product" |
|
}, |
|
"availability": { |
|
"type": "boolean", |
|
"description": "Whether the product is in stock (true) or not (false)" |
|
} |
|
}, |
|
"required": ["title", "price"] |
|
}''' |
|
] |
|
] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(mcp_server=True) |