File size: 10,663 Bytes
f2a2588
 
 
2856ca3
f2a2588
0a66039
f2a2588
 
2856ca3
f2a2588
 
 
 
 
2856ca3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2a2588
2856ca3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2a2588
2856ca3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2a2588
2856ca3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2a2588
2856ca3
 
 
 
 
 
 
 
 
 
 
 
f2a2588
2856ca3
 
 
 
 
 
 
 
 
 
 
f2a2588
2856ca3
0e70948
 
 
 
 
 
 
 
 
 
 
 
 
 
44fb3b3
2856ca3
44fb3b3
 
2856ca3
44fb3b3
 
2856ca3
44fb3b3
 
 
 
 
 
2856ca3
f2a2588
716c07f
9c9669b
f2a2588
 
 
 
2856ca3
9c9669b
 
f2a2588
 
2856ca3
f2a2588
 
df8c52d
 
f2a2588
 
 
 
2856ca3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2a2588
 
2856ca3
f2a2588
2856ca3
 
 
 
 
f2a2588
2856ca3
 
 
 
 
 
f2a2588
 
 
2856ca3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2a2588
 
 
2856ca3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import json
import pandas as pd
import gradio as gr
from typing import Dict, Any, Type
from web2json.preprocessor import BasicPreprocessor
from web2json.ai_extractor import AIExtractor, RAGExtractor, GeminiLLMClient
from web2json.postprocessor import PostProcessor
from web2json.pipeline import Pipeline
from pydantic import BaseModel, Field, create_model
import os
import dotenv

dotenv.load_dotenv()

def parse_schema_input(schema_input: str) -> Type[BaseModel]:
    """
    Convert user schema input to a Pydantic BaseModel.
    Supports multiple input formats:
    1. JSON schema format
    2. Python class definition
    3. Simple field definitions
    """
    schema_input = schema_input.strip()
    
    if not schema_input:
        # Default schema if none provided
        return create_model('DefaultSchema', 
                          title=(str, Field(description="Title of the content")),
                          content=(str, Field(description="Main content")))
    
    try:
        # Try parsing as JSON schema
        if schema_input.startswith('{'):
            schema_dict = json.loads(schema_input)
            return json_schema_to_basemodel(schema_dict)
        
        # Try parsing as Python class definition
        elif 'class ' in schema_input and 'BaseModel' in schema_input:
            return python_class_to_basemodel(schema_input)
        
        # Try parsing as simple field definitions
        else:
            return simple_fields_to_basemodel(schema_input)
            
    except Exception as e:
        raise ValueError(f"Could not parse schema: {str(e)}. Please check your schema format.")

def json_schema_to_basemodel(schema_dict: Dict) -> Type[BaseModel]:
    """Convert JSON schema to BaseModel"""
    fields = {}
    properties = schema_dict.get('properties', {})
    required = schema_dict.get('required', [])
    
    for field_name, field_info in properties.items():
        field_type = get_python_type(field_info.get('type', 'string'))
        field_description = field_info.get('description', '')
        
        if field_name in required:
            fields[field_name] = (field_type, Field(description=field_description))
        else:
            fields[field_name] = (field_type, Field(default=None, description=field_description))
    
    return create_model('DynamicSchema', **fields)

def python_class_to_basemodel(class_definition: str) -> Type[BaseModel]:
    """Convert Python class definition to BaseModel"""
    try:
        # Execute the class definition in a safe namespace
        namespace = {'BaseModel': BaseModel, 'Field': Field, 'str': str, 'int': int, 
                    'float': float, 'bool': bool, 'list': list, 'dict': dict}
        exec(class_definition, namespace)
        
        # Find the class that inherits from BaseModel
        for name, obj in namespace.items():
            if (isinstance(obj, type) and 
                issubclass(obj, BaseModel) and 
                obj != BaseModel):
                return obj
        
        raise ValueError("No BaseModel class found in definition")
    except Exception as e:
        raise ValueError(f"Invalid Python class definition: {str(e)}")

def simple_fields_to_basemodel(fields_text: str) -> Type[BaseModel]:
    """Convert simple field definitions to BaseModel"""
    fields = {}
    
    for line in fields_text.strip().split('\n'):
        line = line.strip()
        if not line or line.startswith('#'):
            continue
            
        # Parse field definition (e.g., "name: str = description")
        if ':' in line:
            parts = line.split(':', 1)
            field_name = parts[0].strip()
            
            type_and_desc = parts[1].strip()
            if '=' in type_and_desc:
                type_part, desc_part = type_and_desc.split('=', 1)
                field_type = get_python_type(type_part.strip())
                description = desc_part.strip().strip('"\'')
            else:
                field_type = get_python_type(type_and_desc.strip())
                description = ""
            
            fields[field_name] = (field_type, Field(description=description))
        else:
            # Simple field name only
            field_name = line.strip()
            fields[field_name] = (str, Field(description=""))
    
    if not fields:
        raise ValueError("No valid fields found in schema definition")
    
    return create_model('DynamicSchema', **fields)

def get_python_type(type_str: str):
    """Convert type string to Python type"""
    type_str = type_str.lower().strip()
    type_mapping = {
        'string': str, 'str': str,
        'integer': int, 'int': int,
        'number': float, 'float': float,
        'boolean': bool, 'bool': bool,
        'array': list, 'list': list,
        'object': dict, 'dict': dict
    }
    return type_mapping.get(type_str, str)

def webpage_to_json_wrapper(content: str, is_url: bool, schema_input: str) -> Dict[str, Any]:
    """Wrapper function that converts schema input to BaseModel"""
    try:
        # Parse the schema input into a BaseModel
        schema_model = parse_schema_input(schema_input)
        
        # Call the original function
        return webpage_to_json(content, is_url, schema_model)
        
    except Exception as e:
        return {"error": f"Schema parsing error: {str(e)}"}

def webpage_to_json(content: str, is_url: bool, schema: BaseModel) -> Dict[str, Any]:
    """
    Extracts structured JSON information from a given content based on a specified schema.
    This function sets up a processing pipeline that includes:
    - Preprocessing the input content.
    - Utilizing an AI language model to extract information according to the provided schema.
    - Postprocessing the extracted output to match the exact schema requirements.
    Parameters:
        content (str): The input content to be analyzed. This can be direct text or a URL content.
        is_url (bool): A flag indicating whether the provided content is a URL (True) or raw text (False).
        schema (BaseModel): A Pydantic BaseModel defining the expected structure and data types for the output.
    Returns:
        Dict[str, Any]: A dictionary containing the extracted data matching the schema. In case of errors during initialization
                        or processing, the dictionary will include an "error" key with a descriptive message.
    """
    prompt_template = """Extract the following information from the provided content according to the specified schema.
    
    Content to analyze:
    {content}
    
    Schema requirements:
    {schema}
    
    Instructions:
    - Extract only information that is explicitly present in the content
    - Follow the exact structure and data types specified in the schema
    - If a required field cannot be found, indicate this clearly
    - Preserve the original formatting and context where relevant
    - Return the extracted data in the format specified by the schema"""
    
    # Initialize pipeline components
    # TODO: improve the RAG system and optimize (don't instantiate every time)
    preprocessor = BasicPreprocessor(config={'keep_tags': False}) 
    try:
        llm = GeminiLLMClient(config={'api_key': os.getenv('GEMINI_API_KEY')})
    except Exception as e:
        return {"error": f"Failed to initialize LLM client: {str(e)}"}
    
    # ai_extractor = RAGExtractor(llm_client=llm, prompt_template=prompt_template)
    ai_extractor = AIExtractor(llm_client=llm, prompt_template=prompt_template)
    postprocessor = PostProcessor()
    pipeline = Pipeline(preprocessor, ai_extractor, postprocessor)
    
    try:
        result = pipeline.run(content, is_url, schema)
        print("-"*80)
        print(f"Processed result: {result}")
        return result
    except Exception as e:
        return {"error": f"Processing error: {str(e)}"}

# Example schemas for the user
example_schemas = """
**Example Schema Formats:**

1. **Simple field definitions:**
```
title: str = Page title
price: float = Product price
description: str = Product description
available: bool = Is available
```

2. **JSON Schema:**
```json
{
  "properties": {
    "title": {"type": "string", "description": "Page title"},
    "price": {"type": "number", "description": "Product price"},
    "description": {"type": "string", "description": "Product description"}
  },
  "required": ["title"]
}
```

3. **Python Class Definition:**
```python
class ProductSchema(BaseModel):
    title: str = Field(description="Product title")
    price: float = Field(description="Product price")
    description: str = Field(description="Product description")
    available: bool = Field(default=False, description="Availability status")
```
"""

# Build Gradio Interface
demo = gr.Interface(
    fn=webpage_to_json_wrapper,
    inputs=[
        gr.Textbox(
            label="Content (URL or Raw Text)", 
            lines=10,
            placeholder="Enter URL or paste raw HTML/text here."
        ),
        gr.Checkbox(label="Content is URL?", value=False),
        gr.Textbox(
            label="Schema Definition",
            lines=15,
            placeholder="Define your extraction schema (see examples below)",
            info=example_schemas
        )
    ],
    outputs=gr.JSON(label="Output JSON"),
    title="Webpage to JSON Converter",
    description="Convert web pages or raw text into structured JSON using customizable schemas. Define your schema using simple field definitions, JSON schema, or Python class syntax.",
    examples=[
        [
            "https://example.com",
            True,
            "title: str = Page title\nprice: float = Product price\ndescription: str = Description"
        ],
        [
            "<h1>Sample Product</h1><p>Price: $29.99</p><p>Great quality item</p>",
            False,
            '''{
            "type": "object",
            "properties": {
                "title": {
                "type": "string",
                "description": "Name of the product"
                },
                "price": {
                "type": "number",
                "description": "Price of the product"
                },
                "description": {
                "type": "string",
                "description": "Detailed description of the product"
                },
                "availability": {
                "type": "boolean",
                "description": "Whether the product is in stock (true) or not (false)"
                }
            },
            "required": ["title", "price"]
            }'''
        ]
    ]
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)