Maria Tsilimos
Update app.py
94b38b3 unverified
import time
import streamlit as st
import pandas as pd
import io
import plotly.express as px
import zipfile
import os
import re
import numpy as np
import json # Added to handle persistent data
from cryptography.fernet import Fernet
from gliner import GLiNER
from PyPDF2 import PdfReader
import docx
from comet_ml import Experiment
from streamlit_extras.stylable_container import stylable_container
st.set_page_config(layout="wide", page_title="Named Entity Recognition App")
# --- Configuration ---
COMET_API_KEY = os.environ.get("COMET_API_KEY")
COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE")
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME")
comet_initialized = False
if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME:
comet_initialized = True
# --- Persistent Counter and History Configuration ---
COUNTER_FILE = "counter_ner_app.json"
HISTORY_FILE = "file_history_ner_app.json"
max_attempts = 300
# --- Functions to manage persistent data ---
def load_attempts():
"""
Loads the attempts count from a persistent JSON file.
Returns 0 if the file doesn't exist or is invalid.
"""
if os.path.exists(COUNTER_FILE):
try:
with open(COUNTER_FILE, "r") as f:
data = json.load(f)
return data.get('file_upload_attempts', 0)
except (json.JSONDecodeError, KeyError):
return 0
return 0
def save_attempts(attempts):
"""
Saves the current attempts count to the persistent JSON file.
"""
with open(COUNTER_FILE, "w") as f:
json.dump({'file_upload_attempts': attempts}, f)
def load_history():
"""
Loads the file upload history from a persistent JSON file.
Returns an empty list if the file doesn't exist or is invalid.
"""
if os.path.exists(HISTORY_FILE):
try:
with open(HISTORY_FILE, "r") as f:
data = json.load(f)
return data.get('uploaded_files', [])
except (json.JSONDecodeError, KeyError):
return []
return []
def save_history(history):
"""
Saves the current file upload history to the persistent JSON file.
"""
with open(HISTORY_FILE, "w") as f:
json.dump({'uploaded_files': history}, f)
def clear_history_data():
"""Clears the file history from session state and deletes the persistent file."""
if os.path.exists(HISTORY_FILE):
os.remove(HISTORY_FILE)
st.session_state['uploaded_files_history'] = []
st.rerun()
# --- Initialize session state with persistent data ---
if 'file_upload_attempts' not in st.session_state:
st.session_state['file_upload_attempts'] = load_attempts()
save_attempts(st.session_state['file_upload_attempts'])
if 'uploaded_files_history' not in st.session_state:
st.session_state['uploaded_files_history'] = load_history()
save_history(st.session_state['uploaded_files_history'])
if 'encrypted_extracted_text' not in st.session_state:
st.session_state['encrypted_extracted_text'] = None
GLINER_LABELS_CATEGORIZED = {
"Personal Identifiers": [
"Person",
"Date of birth",
"Blood type",
"Digital signature",
"Social media handle",
"Username",
"Birth certificate number",
],
"Contact Details": [
"Address",
"Phone number",
"Mobile phone number",
"Landline phone number",
"Email",
"Fax number",
"Postal code",
],
"Financial & Payment": [
"Credit card number",
"Credit card expiration date",
"CVV",
"CVC",
"Bank account number",
"IBAN",
"Transaction number",
"Credit card brand",
],
"Government & Official IDs": [
"Passport number",
"Social security number",
"CPF",
"Driver license number",
"Tax identification number",
"Identity card number",
"National ID number",
"Identity document number",
"Visa number",
"License plate number",
"CNPJ",
"Registration number",
"Student ID number",
"Passport expiration date",
],
"Medical & Health": [
"Medication",
"Medical condition",
"Health insurance ID number",
"Health insurance number",
"National health insurance number",
],
"Travel & Transport": [
"Flight number",
"Reservation number",
"Train ticket number",
"Vehicle registration number",
],
"General Business & Other": [
"Organization",
"Insurance company",
"IP address",
"Serial number",
"Insurance number",
]
}
# Flatten the categorized labels into a single list for GLiNER model input
GLINER_LABELS_FLAT = [label for category_labels in GLINER_LABELS_CATEGORIZED.values() for label in category_labels]
# Create a mapping from each specific label to its category for DataFrame processing
LABEL_TO_CATEGORY_MAP = {label: category for category, labels in GLINER_LABELS_CATEGORIZED.items() for label in labels}
@st.cache_resource
def load_ner_model():
"""
Loads the pre-trained GLiNER NER model (urchade/gliner_multi_pii-v1) and
caches it.
This model is suitable for a wide range of custom entity types.
"""
try:
return GLiNER.from_pretrained("urchade/gliner_multi_pii-v1")
except Exception as e:
st.error(f"Failed to load NER model. Please check your internet connection or model availability: {e}")
st.stop()
@st.cache_resource
def load_encryption_key():
"""
Loads the Fernet encryption key from environment variables.
This key is crucial for encrypting/decrypting sensitive data.
It's cached as a resource to be loaded only once.
"""
try:
# Get the key string from environment variables
key_str = os.environ.get("FERNET_KEY")
if not key_str:
raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.")
# Fernet key must be bytes, so encode the string
key_bytes = key_str.encode('utf-8')
return Fernet(key_bytes)
except ValueError as ve:
st.error(f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely in your deployment environment (e.g., Hugging Face Spaces secrets, Render environment variables) or in a local .env file for development.")
st.stop() # Stop the app if the key is not found, as security is compromised
except Exception as e:
st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.")
st.stop()
# Initialize the Fernet cipher instance globally (cached)
fernet = load_encryption_key()
def encrypt_text(text_content: str) -> bytes:
"""
Encrypts a string using the loaded Fernet cipher.
The input string is first encoded to UTF-8 bytes.
"""
return fernet.encrypt(text_content.encode('utf-8'))
def decrypt_text(encrypted_bytes: bytes) -> str | None:
"""
Decrypts bytes using the loaded Fernet cipher.
Returns the decrypted string, or None if decryption fails (e.g., tampering).
"""
try:
return fernet.decrypt(encrypted_bytes).decode('utf-8')
except Exception as e:
st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}")
return None
# --- UI Elements ---
st.subheader("Multilingual PDF & DOCX Entity Finder", divider="orange") # Updated title
st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary")
expander = st.expander("**Important notes on the Multilingual PDF & DOCX Entity Finder**")
expander.write(f'''
**Named Entities:** This Multilingual PDF & DOCX Entity Finder predicts a wide range of custom labels, including: "Person", "Organization", "Phone number", "Address", "Passport number", "Email", "Credit card number", "Social security number", "Health insurance ID number", "Date of birth", "Mobile phone number", "Bank account number", "Medication", "CPF", "Driver license number", "Tax identification number", "Medical condition", "Identity card number", "National ID number", "IP address", "IBAN", "Credit card expiration date", "Username", "Health insurance number", "Registration number", "Student ID number", "Insurance number", "Flight number", "Landline phone number", "Blood type", "CVV", "Reservation number", "Digital signature", "Social media handle", "License plate number", "CNPJ", "Postal code", "Serial number", "Vehicle registration number", "Credit card brand", "Fax number", "Visa number", "Insurance company", "Identity document number", "Transaction number", "National health insurance number", "CVC", "Birth certificate number", "Train ticket number", "Passport expiration date"
Results are presented in an easy-to-read table, visualized in an interactive tree map, pie chart, and bar chart, and are available for download along with a Glossary of tags.
**Supported languages:** English, French, German, Spanish, Portuguese, Italian
**How to Use:** Upload your PDF or DOCX file. Then, click the 'Results' button to extract and tag entities in your text data.
**Usage Limits:** You can request results up to 300 requests within a 30-day period.
**Language settings:** Please check and adjust the language settings in your computer, so the French, German, Spanish, Portuguese and Italian characters are handled properly in your downloaded file.
**Customization:** To change the app's background color to white or black, click the three-dot menu on the right-hand side of your app, go to Settings and then Choose app theme, colors and fonts.
**Technical issues:** If your connection times out, please refresh the page or reopen the app's URL.
For any errors or inquiries, please contact us at info@nlpblogs.com
''')
with st.sidebar:
# --- Added Persistent History Display ---
st.subheader("Your File Upload History", divider="orange")
if st.session_state['uploaded_files_history']:
history_df = pd.DataFrame(st.session_state['uploaded_files_history'])
st.dataframe(history_df, use_container_width=True, hide_index=True)
# Add a clear history button
if st.button("Clear File History", help="This will permanently delete the file history from the application."):
clear_history_data()
else:
st.info("You have not uploaded any files yet.")
st.subheader("Build your own NER Web App in a minute without writing a single line of code.", divider="orange")
st.link_button("NER File Builder",
"https://nlpblogs.com/shop/named-entity-recognition-ner/ner-file-builder/",
type="primary")
# --- File Upload (PDF/DOCX) ---
uploaded_file = st.file_uploader("Upload your file. Accepted file formats include: .pdf, .docx", type=['pdf', 'docx'])
current_run_text = None
if uploaded_file is not None:
file_extension = uploaded_file.name.split('.')[-1].lower()
# Check if this file has already been processed and is the same as the last one
# This prevents re-adding the same file to history on every rerun of the app
if st.session_state['uploaded_files_history'] and uploaded_file.name == st.session_state['uploaded_files_history'][-1]['filename']:
# Do not re-add to history, just process the file
pass
else:
# --- ADDING TO UPLOAD HISTORY ---
new_upload_entry = {
"filename": uploaded_file.name,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
st.session_state['uploaded_files_history'].append(new_upload_entry)
save_history(st.session_state['uploaded_files_history'])
if file_extension == 'pdf':
try:
pdf_reader = PdfReader(uploaded_file)
text_content = ""
for page in pdf_reader.pages:
text_content += page.extract_text()
current_run_text = text_content
st.success("PDF file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.")
except Exception as e:
st.error(f"An error occurred while reading PDF: {e}")
current_run_text = None
elif file_extension == 'docx':
try:
doc = docx.Document(uploaded_file)
text_content = "\n".join([para.text for para in doc.paragraphs])
current_run_text = text_content
st.success("DOCX file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.")
except Exception as e:
st.error(f"An error occurred while reading DOCX: {e}")
current_run_text = None
else:
st.warning("Unsupported file type. Please upload a .pdf or .docx file.")
current_run_text = None
if current_run_text and current_run_text.strip():
# --- ENCRYPT THE EXTRACTED TEXT BEFORE STORING IN SESSION STATE ---
encrypted_text_bytes = encrypt_text(current_run_text)
st.session_state['encrypted_extracted_text'] = encrypted_text_bytes
st.divider()
else:
st.session_state['encrypted_extracted_text'] = None
st.error("Could not extract meaningful text from the uploaded file.")
# --- Results Button and Processing Logic ---
if st.button("Results"):
start_time_overall = time.time() # Start time for overall processing
if not comet_initialized:
st.warning("Comet ML not initialized. Check environment variables if you wish to log data.")
if st.session_state['file_upload_attempts'] >= max_attempts:
st.error(f"You have requested results {max_attempts} times. You have reached your daily request limit.")
st.stop()
# --- DECRYPT THE TEXT BEFORE PASSING TO NER MODEL ---
text_for_ner = None
if st.session_state['encrypted_extracted_text'] is not None:
text_for_ner = decrypt_text(st.session_state['encrypted_extracted_text'])
if text_for_ner is None or not text_for_ner.strip():
st.warning("No extractable text content available for analysis. Please upload a valid PDF or DOCX file.")
st.stop()
# Increment and save the attempts counter
st.session_state['file_upload_attempts'] += 1
save_attempts(st.session_state['file_upload_attempts'])
with st.spinner("Analyzing text...", show_time=True):
model = load_ner_model()
# Measure NER model processing time
start_time_ner = time.time()
# Use GLiNER's predict_entities method with the defined flat list of labels
text_entities = model.predict_entities(text_for_ner, GLINER_LABELS_FLAT)
end_time_ner = time.time()
ner_processing_time = end_time_ner - start_time_ner
df = pd.DataFrame(text_entities)
# Rename 'label' to 'entity_group' and 'text' to 'word' for consistency
if 'label' in df.columns:
df.rename(columns={'label': 'entity_group', 'text': 'word'}, inplace=True)
else:
st.error("Unexpected GLiNER output structure. Please check the model's output format.")
st.stop()
# Replace empty strings with 'Unknown' and drop rows with NaN after cleaning
df = df.replace('', 'Unknown').dropna()
if df.empty:
st.warning("No entities were extracted from the uploaded text.")
st.stop()
# --- Add 'category' column to the DataFrame based on the grouped labels ---
df['category'] = df['entity_group'].map(LABEL_TO_CATEGORY_MAP)
# Handle cases where an entity_group might not have a category (shouldn't happen if maps are complete)
df['category'] = df['category'].fillna('Uncategorized')
if comet_initialized:
experiment = Experiment(
api_key=COMET_API_KEY,
workspace=COMET_WORKSPACE,
project_name=COMET_PROJECT_NAME,
)
experiment.log_parameter("input_text_length", len(text_for_ner))
experiment.log_table("predicted_entities", df)
experiment.log_metric("ner_processing_time_seconds", ner_processing_time)
# --- Display Results ---
st.subheader("Extracted Entities", divider="rainbow")
properties = {"border": "2px solid gray", "color": "blue", "font-size": "16px"}
df_styled = df.style.set_properties(**properties)
st.dataframe(df_styled, use_container_width=True)
with st.expander("See Glossary of tags"):
st.write("""
'**word**': ['entity extracted from your text data']
'**score**': ['accuracy score; how accurately a tag has been assigned to
a given entity']
'**entity_group**': ['label (tag) assigned to a given extracted entity']
'**start**': ['index of the start of the corresponding entity']
'**end**': ['index of the end of the corresponding entity']
'**category**': ['the broader category this entity belongs to']
""")
st.subheader("Grouped Entities by Category", divider = "orange")
# Create tabs for each category
category_names = list(GLINER_LABELS_CATEGORIZED.keys())
category_tabs = st.tabs(category_names)
for i, category_name in enumerate(category_names):
with category_tabs[i]:
# Filter the main DataFrame for the current category
df_category_filtered = df[df['category'] == category_name]
if not df_category_filtered.empty:
# Sort entities within the category by their specific type for better display
for entity_type in GLINER_LABELS_CATEGORIZED[category_name]:
df_entity_type_filtered = df_category_filtered[df_category_filtered['entity_group'] == entity_type]
if not df_entity_type_filtered.empty:
st.markdown(f"***{entity_type}***")
st.dataframe(df_entity_type_filtered.drop(columns=['category']), use_container_width=True)
else:
st.info(f"No '{entity_type}' entities found for this category.")
else:
st.info(f"No entities found for the '{category_name}' category.")
st.divider()
# --- Visualizations ---
st.subheader("Tree map", divider="orange")
# Update treemap path to include category
fig_treemap = px.treemap(df, path=[px.Constant("all"), 'category', 'entity_group', 'word'],
values='score', color='category') # Color by category for better visual distinction
fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25))
st.plotly_chart(fig_treemap)
if comet_initialized:
experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap")
value_counts1 = df['entity_group'].value_counts()
final_df_counts = value_counts1.reset_index().rename(columns={"index": "entity_group", "count": "count"})
col1, col2 = st.columns(2)
with col1:
st.subheader("Pie Chart (by Entity Type)", divider="orange")
fig_pie = px.pie(final_df_counts, values='count', names='entity_group',
hover_data=['count'], labels={'count': 'count'}, title='Percentage of Predicted Labels (Entity Types)')
fig_pie.update_traces(textposition='inside', textinfo='percent+label')
st.plotly_chart(fig_pie)
if comet_initialized:
experiment.log_figure(figure=fig_pie, figure_name="label_pie_chart")
with col2:
st.subheader("Bar Chart (by Entity Type)", divider="orange")
fig_bar = px.bar(final_df_counts, x="count", y="entity_group", color="entity_group", text_auto=True,
title='Occurrences of Predicted Labels (Entity Types)', orientation='h')
fig_bar.update_layout(yaxis={'categoryorder':'total ascending'}) # Order bars
st.plotly_chart(fig_bar)
if comet_initialized:
experiment.log_figure(figure=fig_bar, figure_name="label_bar_chart")
# Add a chart for categories
st.subheader("Entity Counts by Category", divider="orange")
category_counts = df['category'].value_counts().reset_index().rename(columns={"index": "category", "count": "count"})
fig_cat_bar = px.bar(category_counts, x="count", y="category", color="category", text_auto=True,
title='Occurrences of Entities by Category', orientation='h')
fig_cat_bar.update_layout(yaxis={'categoryorder':'total ascending'})
st.plotly_chart(fig_cat_bar)
# --- Downloadable Content ---
dfa = pd.DataFrame(
data={
'Column Name': ['word', 'entity_group', 'score', 'start', 'end', 'category'],
'Description': [
'entity extracted from your text data',
'label (tag) assigned to a given extracted entity',
'accuracy score; how accurately a tag has been assigned to a given entity',
'index of the start of the corresponding entity',
'index of the end of the corresponding entity',
'the broader category this entity belongs to',
]
}
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as myzip:
myzip.writestr("Summary of the results.csv", df.to_csv(index=False))
myzip.writestr("Glossary of tags.csv", dfa.to_csv(index=False))
with stylable_container(
key="download_button",
css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""",
):
st.download_button(
label="Download zip file",
data=buf.getvalue(),
file_name="nlpblogs_ner_results.zip",
mime="application/zip",
)
if comet_initialized:
experiment.log_asset(buf.getvalue(), file_name="downloadable_results.zip")
st.divider()
if comet_initialized:
experiment.end()
end_time_overall = time.time() # End time for overall processing
elapsed_time_overall = end_time_overall - start_time_overall
st.info(f"Results processed in **{elapsed_time_overall:.2f} seconds**.")
st.write(f"Number of times you requested results: **{st.session_state['file_upload_attempts']}/{max_attempts}**")