|
import time |
|
import streamlit as st |
|
import pandas as pd |
|
import io |
|
import plotly.express as px |
|
import zipfile |
|
import os |
|
import re |
|
import numpy as np |
|
import json |
|
|
|
from cryptography.fernet import Fernet |
|
from gliner import GLiNER |
|
from PyPDF2 import PdfReader |
|
import docx |
|
from comet_ml import Experiment |
|
from streamlit_extras.stylable_container import stylable_container |
|
|
|
st.set_page_config(layout="wide", page_title="Named Entity Recognition App") |
|
|
|
|
|
COMET_API_KEY = os.environ.get("COMET_API_KEY") |
|
COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE") |
|
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME") |
|
|
|
comet_initialized = False |
|
if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME: |
|
comet_initialized = True |
|
|
|
|
|
COUNTER_FILE = "counter_ner_app.json" |
|
HISTORY_FILE = "file_history_ner_app.json" |
|
max_attempts = 300 |
|
|
|
|
|
def load_attempts(): |
|
""" |
|
Loads the attempts count from a persistent JSON file. |
|
Returns 0 if the file doesn't exist or is invalid. |
|
""" |
|
if os.path.exists(COUNTER_FILE): |
|
try: |
|
with open(COUNTER_FILE, "r") as f: |
|
data = json.load(f) |
|
return data.get('file_upload_attempts', 0) |
|
except (json.JSONDecodeError, KeyError): |
|
return 0 |
|
return 0 |
|
|
|
def save_attempts(attempts): |
|
""" |
|
Saves the current attempts count to the persistent JSON file. |
|
""" |
|
with open(COUNTER_FILE, "w") as f: |
|
json.dump({'file_upload_attempts': attempts}, f) |
|
|
|
def load_history(): |
|
""" |
|
Loads the file upload history from a persistent JSON file. |
|
Returns an empty list if the file doesn't exist or is invalid. |
|
""" |
|
if os.path.exists(HISTORY_FILE): |
|
try: |
|
with open(HISTORY_FILE, "r") as f: |
|
data = json.load(f) |
|
return data.get('uploaded_files', []) |
|
except (json.JSONDecodeError, KeyError): |
|
return [] |
|
return [] |
|
|
|
def save_history(history): |
|
""" |
|
Saves the current file upload history to the persistent JSON file. |
|
""" |
|
with open(HISTORY_FILE, "w") as f: |
|
json.dump({'uploaded_files': history}, f) |
|
|
|
def clear_history_data(): |
|
"""Clears the file history from session state and deletes the persistent file.""" |
|
if os.path.exists(HISTORY_FILE): |
|
os.remove(HISTORY_FILE) |
|
st.session_state['uploaded_files_history'] = [] |
|
st.rerun() |
|
|
|
|
|
if 'file_upload_attempts' not in st.session_state: |
|
st.session_state['file_upload_attempts'] = load_attempts() |
|
save_attempts(st.session_state['file_upload_attempts']) |
|
|
|
if 'uploaded_files_history' not in st.session_state: |
|
st.session_state['uploaded_files_history'] = load_history() |
|
save_history(st.session_state['uploaded_files_history']) |
|
|
|
if 'encrypted_extracted_text' not in st.session_state: |
|
st.session_state['encrypted_extracted_text'] = None |
|
|
|
GLINER_LABELS_CATEGORIZED = { |
|
"Personal Identifiers": [ |
|
"Person", |
|
"Date of birth", |
|
"Blood type", |
|
"Digital signature", |
|
"Social media handle", |
|
"Username", |
|
"Birth certificate number", |
|
], |
|
"Contact Details": [ |
|
"Address", |
|
"Phone number", |
|
"Mobile phone number", |
|
"Landline phone number", |
|
"Email", |
|
"Fax number", |
|
"Postal code", |
|
], |
|
"Financial & Payment": [ |
|
"Credit card number", |
|
"Credit card expiration date", |
|
"CVV", |
|
"CVC", |
|
"Bank account number", |
|
"IBAN", |
|
"Transaction number", |
|
"Credit card brand", |
|
], |
|
"Government & Official IDs": [ |
|
"Passport number", |
|
"Social security number", |
|
"CPF", |
|
"Driver license number", |
|
"Tax identification number", |
|
"Identity card number", |
|
"National ID number", |
|
"Identity document number", |
|
"Visa number", |
|
"License plate number", |
|
"CNPJ", |
|
"Registration number", |
|
"Student ID number", |
|
"Passport expiration date", |
|
], |
|
"Medical & Health": [ |
|
"Medication", |
|
"Medical condition", |
|
"Health insurance ID number", |
|
"Health insurance number", |
|
"National health insurance number", |
|
], |
|
"Travel & Transport": [ |
|
"Flight number", |
|
"Reservation number", |
|
"Train ticket number", |
|
"Vehicle registration number", |
|
], |
|
"General Business & Other": [ |
|
"Organization", |
|
"Insurance company", |
|
"IP address", |
|
"Serial number", |
|
"Insurance number", |
|
] |
|
} |
|
|
|
|
|
GLINER_LABELS_FLAT = [label for category_labels in GLINER_LABELS_CATEGORIZED.values() for label in category_labels] |
|
|
|
|
|
LABEL_TO_CATEGORY_MAP = {label: category for category, labels in GLINER_LABELS_CATEGORIZED.items() for label in labels} |
|
|
|
@st.cache_resource |
|
def load_ner_model(): |
|
""" |
|
Loads the pre-trained GLiNER NER model (urchade/gliner_multi_pii-v1) and |
|
caches it. |
|
This model is suitable for a wide range of custom entity types. |
|
""" |
|
try: |
|
return GLiNER.from_pretrained("urchade/gliner_multi_pii-v1") |
|
except Exception as e: |
|
st.error(f"Failed to load NER model. Please check your internet connection or model availability: {e}") |
|
st.stop() |
|
|
|
@st.cache_resource |
|
def load_encryption_key(): |
|
""" |
|
Loads the Fernet encryption key from environment variables. |
|
This key is crucial for encrypting/decrypting sensitive data. |
|
It's cached as a resource to be loaded only once. |
|
""" |
|
try: |
|
|
|
key_str = os.environ.get("FERNET_KEY") |
|
if not key_str: |
|
raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.") |
|
|
|
|
|
key_bytes = key_str.encode('utf-8') |
|
return Fernet(key_bytes) |
|
except ValueError as ve: |
|
st.error(f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely in your deployment environment (e.g., Hugging Face Spaces secrets, Render environment variables) or in a local .env file for development.") |
|
st.stop() |
|
except Exception as e: |
|
st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.") |
|
st.stop() |
|
|
|
|
|
fernet = load_encryption_key() |
|
|
|
def encrypt_text(text_content: str) -> bytes: |
|
""" |
|
Encrypts a string using the loaded Fernet cipher. |
|
The input string is first encoded to UTF-8 bytes. |
|
""" |
|
return fernet.encrypt(text_content.encode('utf-8')) |
|
|
|
def decrypt_text(encrypted_bytes: bytes) -> str | None: |
|
""" |
|
Decrypts bytes using the loaded Fernet cipher. |
|
Returns the decrypted string, or None if decryption fails (e.g., tampering). |
|
""" |
|
try: |
|
return fernet.decrypt(encrypted_bytes).decode('utf-8') |
|
except Exception as e: |
|
st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}") |
|
return None |
|
|
|
|
|
st.subheader("Multilingual PDF & DOCX Entity Finder", divider="orange") |
|
st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary") |
|
|
|
expander = st.expander("**Important notes on the Multilingual PDF & DOCX Entity Finder**") |
|
expander.write(f''' |
|
**Named Entities:** This Multilingual PDF & DOCX Entity Finder predicts a wide range of custom labels, including: "Person", "Organization", "Phone number", "Address", "Passport number", "Email", "Credit card number", "Social security number", "Health insurance ID number", "Date of birth", "Mobile phone number", "Bank account number", "Medication", "CPF", "Driver license number", "Tax identification number", "Medical condition", "Identity card number", "National ID number", "IP address", "IBAN", "Credit card expiration date", "Username", "Health insurance number", "Registration number", "Student ID number", "Insurance number", "Flight number", "Landline phone number", "Blood type", "CVV", "Reservation number", "Digital signature", "Social media handle", "License plate number", "CNPJ", "Postal code", "Serial number", "Vehicle registration number", "Credit card brand", "Fax number", "Visa number", "Insurance company", "Identity document number", "Transaction number", "National health insurance number", "CVC", "Birth certificate number", "Train ticket number", "Passport expiration date" |
|
|
|
Results are presented in an easy-to-read table, visualized in an interactive tree map, pie chart, and bar chart, and are available for download along with a Glossary of tags. |
|
|
|
**Supported languages:** English, French, German, Spanish, Portuguese, Italian |
|
|
|
**How to Use:** Upload your PDF or DOCX file. Then, click the 'Results' button to extract and tag entities in your text data. |
|
|
|
**Usage Limits:** You can request results up to 300 requests within a 30-day period. |
|
|
|
**Language settings:** Please check and adjust the language settings in your computer, so the French, German, Spanish, Portuguese and Italian characters are handled properly in your downloaded file. |
|
|
|
**Customization:** To change the app's background color to white or black, click the three-dot menu on the right-hand side of your app, go to Settings and then Choose app theme, colors and fonts. |
|
|
|
**Technical issues:** If your connection times out, please refresh the page or reopen the app's URL. |
|
|
|
For any errors or inquiries, please contact us at info@nlpblogs.com |
|
''') |
|
|
|
with st.sidebar: |
|
|
|
|
|
|
|
|
|
st.subheader("Your File Upload History", divider="orange") |
|
if st.session_state['uploaded_files_history']: |
|
history_df = pd.DataFrame(st.session_state['uploaded_files_history']) |
|
st.dataframe(history_df, use_container_width=True, hide_index=True) |
|
|
|
if st.button("Clear File History", help="This will permanently delete the file history from the application."): |
|
clear_history_data() |
|
else: |
|
st.info("You have not uploaded any files yet.") |
|
|
|
st.subheader("Build your own NER Web App in a minute without writing a single line of code.", divider="orange") |
|
st.link_button("NER File Builder", |
|
"https://nlpblogs.com/shop/named-entity-recognition-ner/ner-file-builder/", |
|
type="primary") |
|
|
|
|
|
uploaded_file = st.file_uploader("Upload your file. Accepted file formats include: .pdf, .docx", type=['pdf', 'docx']) |
|
|
|
current_run_text = None |
|
|
|
if uploaded_file is not None: |
|
file_extension = uploaded_file.name.split('.')[-1].lower() |
|
|
|
|
|
|
|
if st.session_state['uploaded_files_history'] and uploaded_file.name == st.session_state['uploaded_files_history'][-1]['filename']: |
|
|
|
pass |
|
else: |
|
|
|
new_upload_entry = { |
|
"filename": uploaded_file.name, |
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") |
|
} |
|
st.session_state['uploaded_files_history'].append(new_upload_entry) |
|
save_history(st.session_state['uploaded_files_history']) |
|
|
|
if file_extension == 'pdf': |
|
try: |
|
pdf_reader = PdfReader(uploaded_file) |
|
text_content = "" |
|
for page in pdf_reader.pages: |
|
text_content += page.extract_text() |
|
current_run_text = text_content |
|
st.success("PDF file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.") |
|
except Exception as e: |
|
st.error(f"An error occurred while reading PDF: {e}") |
|
current_run_text = None |
|
elif file_extension == 'docx': |
|
try: |
|
doc = docx.Document(uploaded_file) |
|
text_content = "\n".join([para.text for para in doc.paragraphs]) |
|
current_run_text = text_content |
|
st.success("DOCX file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.") |
|
except Exception as e: |
|
st.error(f"An error occurred while reading DOCX: {e}") |
|
current_run_text = None |
|
else: |
|
st.warning("Unsupported file type. Please upload a .pdf or .docx file.") |
|
current_run_text = None |
|
|
|
if current_run_text and current_run_text.strip(): |
|
|
|
encrypted_text_bytes = encrypt_text(current_run_text) |
|
st.session_state['encrypted_extracted_text'] = encrypted_text_bytes |
|
|
|
st.divider() |
|
else: |
|
st.session_state['encrypted_extracted_text'] = None |
|
st.error("Could not extract meaningful text from the uploaded file.") |
|
|
|
|
|
if st.button("Results"): |
|
start_time_overall = time.time() |
|
if not comet_initialized: |
|
st.warning("Comet ML not initialized. Check environment variables if you wish to log data.") |
|
|
|
if st.session_state['file_upload_attempts'] >= max_attempts: |
|
st.error(f"You have requested results {max_attempts} times. You have reached your daily request limit.") |
|
st.stop() |
|
|
|
|
|
text_for_ner = None |
|
if st.session_state['encrypted_extracted_text'] is not None: |
|
text_for_ner = decrypt_text(st.session_state['encrypted_extracted_text']) |
|
|
|
if text_for_ner is None or not text_for_ner.strip(): |
|
st.warning("No extractable text content available for analysis. Please upload a valid PDF or DOCX file.") |
|
st.stop() |
|
|
|
|
|
st.session_state['file_upload_attempts'] += 1 |
|
save_attempts(st.session_state['file_upload_attempts']) |
|
|
|
with st.spinner("Analyzing text...", show_time=True): |
|
model = load_ner_model() |
|
|
|
|
|
start_time_ner = time.time() |
|
|
|
text_entities = model.predict_entities(text_for_ner, GLINER_LABELS_FLAT) |
|
end_time_ner = time.time() |
|
ner_processing_time = end_time_ner - start_time_ner |
|
|
|
df = pd.DataFrame(text_entities) |
|
|
|
|
|
if 'label' in df.columns: |
|
df.rename(columns={'label': 'entity_group', 'text': 'word'}, inplace=True) |
|
else: |
|
st.error("Unexpected GLiNER output structure. Please check the model's output format.") |
|
st.stop() |
|
|
|
|
|
df = df.replace('', 'Unknown').dropna() |
|
|
|
if df.empty: |
|
st.warning("No entities were extracted from the uploaded text.") |
|
st.stop() |
|
|
|
|
|
df['category'] = df['entity_group'].map(LABEL_TO_CATEGORY_MAP) |
|
|
|
df['category'] = df['category'].fillna('Uncategorized') |
|
|
|
if comet_initialized: |
|
experiment = Experiment( |
|
api_key=COMET_API_KEY, |
|
workspace=COMET_WORKSPACE, |
|
project_name=COMET_PROJECT_NAME, |
|
) |
|
experiment.log_parameter("input_text_length", len(text_for_ner)) |
|
experiment.log_table("predicted_entities", df) |
|
experiment.log_metric("ner_processing_time_seconds", ner_processing_time) |
|
|
|
|
|
st.subheader("Extracted Entities", divider="rainbow") |
|
properties = {"border": "2px solid gray", "color": "blue", "font-size": "16px"} |
|
df_styled = df.style.set_properties(**properties) |
|
st.dataframe(df_styled, use_container_width=True) |
|
|
|
with st.expander("See Glossary of tags"): |
|
st.write(""" |
|
'**word**': ['entity extracted from your text data'] |
|
|
|
'**score**': ['accuracy score; how accurately a tag has been assigned to |
|
a given entity'] |
|
|
|
'**entity_group**': ['label (tag) assigned to a given extracted entity'] |
|
|
|
'**start**': ['index of the start of the corresponding entity'] |
|
|
|
'**end**': ['index of the end of the corresponding entity'] |
|
|
|
'**category**': ['the broader category this entity belongs to'] |
|
""") |
|
|
|
|
|
st.subheader("Grouped Entities by Category", divider = "orange") |
|
|
|
|
|
category_names = list(GLINER_LABELS_CATEGORIZED.keys()) |
|
category_tabs = st.tabs(category_names) |
|
|
|
for i, category_name in enumerate(category_names): |
|
with category_tabs[i]: |
|
|
|
|
|
df_category_filtered = df[df['category'] == category_name] |
|
|
|
if not df_category_filtered.empty: |
|
|
|
for entity_type in GLINER_LABELS_CATEGORIZED[category_name]: |
|
df_entity_type_filtered = df_category_filtered[df_category_filtered['entity_group'] == entity_type] |
|
if not df_entity_type_filtered.empty: |
|
st.markdown(f"***{entity_type}***") |
|
st.dataframe(df_entity_type_filtered.drop(columns=['category']), use_container_width=True) |
|
else: |
|
st.info(f"No '{entity_type}' entities found for this category.") |
|
else: |
|
st.info(f"No entities found for the '{category_name}' category.") |
|
|
|
st.divider() |
|
|
|
|
|
st.subheader("Tree map", divider="orange") |
|
|
|
fig_treemap = px.treemap(df, path=[px.Constant("all"), 'category', 'entity_group', 'word'], |
|
values='score', color='category') |
|
fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25)) |
|
st.plotly_chart(fig_treemap) |
|
if comet_initialized: |
|
experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap") |
|
|
|
value_counts1 = df['entity_group'].value_counts() |
|
final_df_counts = value_counts1.reset_index().rename(columns={"index": "entity_group", "count": "count"}) |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.subheader("Pie Chart (by Entity Type)", divider="orange") |
|
fig_pie = px.pie(final_df_counts, values='count', names='entity_group', |
|
hover_data=['count'], labels={'count': 'count'}, title='Percentage of Predicted Labels (Entity Types)') |
|
fig_pie.update_traces(textposition='inside', textinfo='percent+label') |
|
st.plotly_chart(fig_pie) |
|
if comet_initialized: |
|
experiment.log_figure(figure=fig_pie, figure_name="label_pie_chart") |
|
|
|
with col2: |
|
st.subheader("Bar Chart (by Entity Type)", divider="orange") |
|
fig_bar = px.bar(final_df_counts, x="count", y="entity_group", color="entity_group", text_auto=True, |
|
title='Occurrences of Predicted Labels (Entity Types)', orientation='h') |
|
fig_bar.update_layout(yaxis={'categoryorder':'total ascending'}) |
|
st.plotly_chart(fig_bar) |
|
if comet_initialized: |
|
experiment.log_figure(figure=fig_bar, figure_name="label_bar_chart") |
|
|
|
|
|
st.subheader("Entity Counts by Category", divider="orange") |
|
category_counts = df['category'].value_counts().reset_index().rename(columns={"index": "category", "count": "count"}) |
|
fig_cat_bar = px.bar(category_counts, x="count", y="category", color="category", text_auto=True, |
|
title='Occurrences of Entities by Category', orientation='h') |
|
fig_cat_bar.update_layout(yaxis={'categoryorder':'total ascending'}) |
|
st.plotly_chart(fig_cat_bar) |
|
|
|
|
|
dfa = pd.DataFrame( |
|
data={ |
|
'Column Name': ['word', 'entity_group', 'score', 'start', 'end', 'category'], |
|
'Description': [ |
|
'entity extracted from your text data', |
|
'label (tag) assigned to a given extracted entity', |
|
'accuracy score; how accurately a tag has been assigned to a given entity', |
|
'index of the start of the corresponding entity', |
|
'index of the end of the corresponding entity', |
|
'the broader category this entity belongs to', |
|
] |
|
} |
|
) |
|
|
|
buf = io.BytesIO() |
|
with zipfile.ZipFile(buf, "w") as myzip: |
|
myzip.writestr("Summary of the results.csv", df.to_csv(index=False)) |
|
myzip.writestr("Glossary of tags.csv", dfa.to_csv(index=False)) |
|
|
|
with stylable_container( |
|
key="download_button", |
|
css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""", |
|
): |
|
st.download_button( |
|
label="Download zip file", |
|
data=buf.getvalue(), |
|
file_name="nlpblogs_ner_results.zip", |
|
mime="application/zip", |
|
) |
|
if comet_initialized: |
|
experiment.log_asset(buf.getvalue(), file_name="downloadable_results.zip") |
|
|
|
st.divider() |
|
if comet_initialized: |
|
experiment.end() |
|
|
|
end_time_overall = time.time() |
|
elapsed_time_overall = end_time_overall - start_time_overall |
|
st.info(f"Results processed in **{elapsed_time_overall:.2f} seconds**.") |
|
|
|
st.write(f"Number of times you requested results: **{st.session_state['file_upload_attempts']}/{max_attempts}**") |
|
|