Maria Tsilimos
Update app.py
b7a2268 unverified
import time
import streamlit as st
import pandas as pd
import io
from transformers import pipeline
from streamlit_extras.stylable_container import stylable_container
import plotly.express as px
import zipfile
import os
from comet_ml import Experiment
import re
import numpy as np
import json
from cryptography.fernet import Fernet
st.set_page_config(layout="wide",
page_title="Named Entity Recognition App")
# --- Configuration ---
COMET_API_KEY = os.environ.get("COMET_API_KEY")
COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE")
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME")
comet_initialized = False
if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME:
comet_initialized = True
# --- Persistent Counter and History Configuration ---
COUNTER_FILE = "counter_json_finder.json"
HISTORY_FILE = "file_history_json_finder.json"
max_attempts = 300
# --- Functions to manage persistent data ---
def load_attempts():
"""
Loads the attempts count from a persistent JSON file.
Returns 0 if the file doesn't exist or is invalid.
"""
if os.path.exists(COUNTER_FILE):
try:
with open(COUNTER_FILE, "r") as f:
data = json.load(f)
return data.get('file_upload_attempts', 0)
except (json.JSONDecodeError, KeyError):
return 0
return 0
def save_attempts(attempts):
"""
Saves the current attempts count to the persistent JSON file.
"""
with open(COUNTER_FILE, "w") as f:
json.dump({'file_upload_attempts': attempts}, f)
def load_history():
"""
Loads the file upload history from a persistent JSON file.
Returns an empty list if the file doesn't exist or is invalid.
"""
if os.path.exists(HISTORY_FILE):
try:
with open(HISTORY_FILE, "r") as f:
data = json.load(f)
return data.get('uploaded_files', [])
except (json.JSONDecodeError, KeyError):
return []
return []
def save_history(history):
"""
Saves the current file upload history to the persistent JSON file.
"""
with open(HISTORY_FILE, "w") as f:
json.dump({'uploaded_files': history}, f)
def clear_history_data():
"""Clears the file history from session state and deletes the persistent file."""
if os.path.exists(HISTORY_FILE):
os.remove(HISTORY_FILE)
st.session_state['uploaded_files_history'] = []
st.rerun()
# --- Initialize session state with persistent data ---
if 'file_upload_attempts' not in st.session_state:
st.session_state['file_upload_attempts'] = load_attempts()
# Save to ensure the file exists on first run
save_attempts(st.session_state['file_upload_attempts'])
if 'uploaded_files_history' not in st.session_state:
st.session_state['uploaded_files_history'] = load_history()
# Save to ensure the file exists on first run
save_history(st.session_state['uploaded_files_history'])
if 'encrypted_extracted_text' not in st.session_state:
st.session_state['encrypted_extracted_text'] = None
if 'json_dataframe' not in st.session_state:
st.session_state['json_dataframe'] = None
# Define the categories and their associated entity labels
ENTITY_LABELS_CATEGORIZED = {
"Persons": ["PER"],
"Locations": ["LOC"],
"Organizations": ["ORG"],
"Miscellaneous": ["MISC"],
}
# Create a mapping from each specific entity label to its category
LABEL_TO_CATEGORY_MAP = {
label: category for category, labels in ENTITY_LABELS_CATEGORIZED.items() for label in labels
}
@st.cache_resource
def load_ner_model():
"""
Loads the pre-trained NER model ("saattrupdan/nbailab-base-ner-scandi") and caches it.
This model is specifically trained for Scandinavian languages.
"""
try:
return pipeline(
"token-classification",
model="saattrupdan/nbailab-base-ner-scandi",
aggregation_strategy="max",
ignore_labels=["O"],
stride=128
)
except Exception as e:
st.error(f"Failed to load NER model. Please check your internet connection or model availability: {e}")
st.stop()
@st.cache_resource
def load_encryption_key():
"""
Loads the Fernet encryption key from environment variables.
This key is crucial for encrypting/decrypting sensitive data.
It's cached as a resource to be loaded only once.
"""
try:
# Get the key string from environment variables
key_str = os.environ.get("FERNET_KEY")
if not key_str:
raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.")
# Fernet key must be bytes, so encode the string
key_bytes = key_str.encode('utf-8')
return Fernet(key_bytes)
except ValueError as ve:
st.error(
f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely "
"in your deployment environment (e.g., Hugging Face Spaces secrets, Render environment variables) "
"or in a local .env file for development."
)
st.stop() # Stop the app if the key is not found, as security is compromised
except Exception as e:
st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.")
st.stop()
# Initialize the Fernet cipher instance globally (cached)
fernet = load_encryption_key()
def encrypt_text(text_content: str) -> bytes:
"""
Encrypts a string using the loaded Fernet cipher.
The input string is first encoded to UTF-8 bytes.
"""
return fernet.encrypt(text_content.encode('utf-8'))
def decrypt_text(encrypted_bytes: bytes) -> str | None:
"""
Decrypts bytes using the loaded Fernet cipher.
Returns the decrypted string, or None if decryption fails (e.g., tampering).
"""
try:
return fernet.decrypt(encrypted_bytes).decode('utf-8')
except Exception as e:
st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}")
return None
# --- UI Elements ---
st.subheader("Scandinavian JSON Entity Finder", divider="orange")
st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary")
expander = st.expander("**Important notes on the Scandinavian JSON Entity Finder**")
expander.write('''
**Named Entities:** This Scandinavian JSON Entity Finder predicts four
(4) labels (“PER: person”, “LOC: location”, “ORG: organization”, “MISC:
miscellaneous”). Results are presented in an easy-to-read table, visualized in
an interactive tree map, pie chart, and bar chart, and are available for
download along with a Glossary of tags.
**How to Use:** Upload your JSON file. Then, click the 'Results' button
to extract and tag entities in your text data.
**Usage Limits:** You can request results up to 300 times within a 30-day period.
**Language settings:** Please check and adjust the language settings in
your computer, so the Danish, Swedish, Norwegian, Icelandic and Faroese
characters are handled properly in your downloaded file.
**Customization:** To change the app's background color to white or
black, click the three-dot menu on the right-hand side of your app, go to
Settings and then Choose app theme, colors and fonts.
**Technical issues:** If your connection times out, please refresh the
page or reopen the app's URL.
For any errors or inquiries, please contact us at info@nlpblogs.com
''')
with st.sidebar:
# --- Added Persistent History Display ---
st.subheader("Your File Upload History", divider="orange")
if st.session_state['uploaded_files_history']:
history_to_display = st.session_state['uploaded_files_history']
history_df = pd.DataFrame(history_to_display)
st.dataframe(history_df, use_container_width=True, hide_index=True)
# Add a clear history button
if st.button("Clear File History", help="This will permanently delete the file history from the application."):
clear_history_data()
else:
st.info("You have not uploaded any files yet.")
st.subheader("Build your own NER Web App in a minute without writing a single line of code.", divider="orange")
st.link_button("NER File Builder",
"https://nlpblogs.com/shop/named-entity-recognition-ner/ner-file-builder/",
type="primary")
uploaded_file = st.file_uploader("Choose a JSON file", type=["json"])
# Initialize text for the current run outside the if uploaded_file block
# This will be populated if a file is uploaded, otherwise it remains None
current_run_text = None
if uploaded_file is not None:
try:
# Read the content as bytes first, then decode for JSON parsing
file_contents_bytes = uploaded_file.read()
# Reset the file pointer after reading, so json.load can read from the beginning
uploaded_file.seek(0)
dados = json.load(uploaded_file)
# Attempt to convert JSON to DataFrame and extract text
try:
st.session_state['json_dataframe'] = pd.DataFrame(dados)
# Concatenate all content into a single string for NER
df_string_representation = st.session_state['json_dataframe'].to_string(index=False, header=False)
# Simple regex to remove non-alphanumeric characters but keep spaces and periods
text_content = re.sub(r'[^\w\s.]', '', df_string_representation)
# Remove the specific string "Empty DataFrame Columns" if it appears due to conversion
text_content = text_content.replace("Empty DataFrame Columns", "").strip()
current_run_text = text_content # Set text for current run
if not current_run_text.strip(): # Check if text is effectively empty
st.warning("No meaningful text could be extracted from the JSON DataFrame for analysis.")
current_run_text = None # Reset to None if empty
except ValueError:
# If direct conversion to DataFrame fails, try to extract strings directly from JSON structure
st.info("JSON data could not be directly converted to a simple DataFrame for display. Attempting to extract text directly.")
extracted_texts_list = []
if isinstance(dados, list):
for item in dados:
if isinstance(item, str):
extracted_texts_list.append(item)
elif isinstance(item, dict):
# Recursively get string values from dicts in a list
for val in item.values():
if isinstance(val, str):
extracted_texts_list.append(val)
elif isinstance(val, list):
for sub_val in val:
if isinstance(sub_val, str):
extracted_texts_list.append(sub_val)
elif isinstance(dados, dict):
# Get string values from a dictionary
for value in dados.values():
if isinstance(value, str):
extracted_texts_list.append(value)
elif isinstance(value, list):
for sub_val in value:
if isinstance(sub_val, str):
extracted_texts_list.append(sub_val)
if extracted_texts_list:
current_run_text = " ".join(extracted_texts_list).strip()
else:
st.warning("No string text could be extracted from the JSON for analysis.")
current_run_text = None
if current_run_text:
# --- ADDING TO UPLOAD HISTORY ---
new_upload_entry = {
"filename": uploaded_file.name,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
# Append the new file to the session state history
st.session_state['uploaded_files_history'].append(new_upload_entry)
# Save the updated history to the persistent file
save_history(st.session_state['uploaded_files_history'])
# --- END OF HISTORY ADDITION ---
# --- ENCRYPT THE EXTRACTED TEXT BEFORE STORING IN SESSION STATE ---
encrypted_text_bytes = encrypt_text(current_run_text)
st.session_state['encrypted_extracted_text'] = encrypted_text_bytes
# Optionally clear the unencrypted version from session state if you only want the encrypted one
# st.session_state['extracted_text_for_ner'] = None
st.success("JSON file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.")
st.divider()
else:
st.session_state['encrypted_extracted_text'] = None
# st.session_state['extracted_text_for_ner'] = None
st.error("Could not extract meaningful text from the uploaded JSON file.")
except json.JSONDecodeError as e:
st.error(f"JSON Decode Error: {e}")
st.error("Please ensure the uploaded file contains valid JSON data.")
st.session_state['encrypted_extracted_text'] = None
st.session_state['json_dataframe'] = None
except Exception as e:
st.error(f"An unexpected error occurred during file processing: {e}")
st.session_state['encrypted_extracted_text'] = None
st.session_state['json_dataframe'] = None
# --- Results Button and Processing Logic ---
if st.button("Results"):
start_time_overall = time.time() # Start time for overall processing
if not comet_initialized:
st.warning("Comet ML not initialized. Check environment variables if you wish to log data.")
# Check attempts limit BEFORE running the model
if st.session_state['file_upload_attempts'] >= max_attempts:
st.error(f"You have requested results {max_attempts} times. You have reached your daily request limit.")
st.stop()
# --- DECRYPT THE TEXT BEFORE PASSING TO NER MODEL ---
text_for_ner = None
if st.session_state['encrypted_extracted_text'] is not None:
text_for_ner = decrypt_text(st.session_state['encrypted_extracted_text'])
if text_for_ner is None or not text_for_ner.strip():
st.warning("No extractable text content available for analysis. Please upload a valid JSON file.")
st.stop()
# Increment the attempts counter and save it to the persistent file
st.session_state['file_upload_attempts'] += 1
save_attempts(st.session_state['file_upload_attempts'])
with st.spinner("Analyzing text...", show_time=True):
model = load_ner_model()
# Measure NER model processing time
start_time_ner = time.time()
text_entities = model(text_for_ner) # Use the decrypted text
end_time_ner = time.time()
ner_processing_time = end_time_ner - start_time_ner
df = pd.DataFrame(text_entities)
if 'word' in df.columns:
# Ensure 'word' column is string type before applying regex
if df['word'].dtype == 'object':
pattern = r'[^\w\s]' # Regex to remove non-alphanumeric characters but keep spaces and periods
df['word'] = df['word'].astype(str).replace(pattern, '', regex=True)
else:
st.warning("The 'word' column is not of string type; skipping character cleaning.")
else:
st.error("The 'word' column does not exist in the DataFrame. Cannot perform cleaning.")
st.stop() # Stop execution if the column is missing
# Replace empty strings with 'Unknown' and drop rows with NaN after cleaning
df = df.replace('', 'Unknown').dropna()
if df.empty:
st.warning("No entities were extracted from the uploaded text.")
st.stop()
# --- Add 'category' column to the DataFrame based on the grouped labels ---
df['category'] = df['entity_group'].map(LABEL_TO_CATEGORY_MAP)
# Handle cases where an entity_group might not have a category
df['category'] = df['category'].fillna('Uncategorized')
if comet_initialized:
experiment = Experiment(
api_key=COMET_API_KEY,
workspace=COMET_WORKSPACE,
project_name=COMET_PROJECT_NAME,
)
experiment.log_parameter("input_text_length", len(text_for_ner))
experiment.log_table("predicted_entities", df)
experiment.log_metric("ner_processing_time_seconds", ner_processing_time)
# --- Display Results ---
st.subheader("Extracted Entities", divider="rainbow")
properties = {"border": "2px solid gray", "color": "blue", "font-size": "16px"}
df_styled = df.style.set_properties(**properties)
st.dataframe(df_styled, use_container_width=True)
with st.expander("See Glossary of tags"):
st.write('''
'**word**': ['entity extracted from your text data']
'**score**': ['accuracy score; how accurately a tag has been assigned to
a given entity']
'**entity_group**': ['label (tag) assigned to a given extracted entity']
'**start**': ['index of the start of the corresponding entity']
'**end**': ['index of the end of the corresponding entity']
'**category**': ['the broader category the entity belongs to']
''')
st.subheader("Grouped entities", divider="orange")
# Get unique categories and sort them for consistent tab order
unique_categories = sorted(df['category'].unique())
tabs_per_row = 4 # Adjust as needed for better layout
# Loop through categories in chunks to create rows of tabs
for i in range(0, len(unique_categories), tabs_per_row):
current_row_categories = unique_categories[i : i + tabs_per_row]
tabs = st.tabs(current_row_categories)
for j, category in enumerate(current_row_categories):
with tabs[j]:
df_filtered = df[df["category"] == category]
if not df_filtered.empty:
st.dataframe(df_filtered, use_container_width=True)
else:
st.info(f"No '{category}' entities found in the text.")
# Display an empty DataFrame for consistency if no entities are found
st.dataframe(pd.DataFrame({
'entity_group': [np.nan],
'score': [np.nan],
'word': [np.nan],
'start': [np.nan],
'end': [np.nan],
'category': [category]
}), hide_index=True)
st.divider()
# --- Visualizations ---
st.subheader("Tree map", divider="orange")
fig_treemap = px.treemap(df,
path=[px.Constant("all"), 'category', 'entity_group', 'word'],
values='score', color='category',
color_discrete_map={
'Persons': 'blue',
'Locations': 'green',
'Organizations': 'red',
'Miscellaneous': 'purple',
'Uncategorized': 'gray'
})
fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25))
st.plotly_chart(fig_treemap)
if comet_initialized:
experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap")
# Group by category and entity_group to get counts for pie and bar charts
grouped_counts = df.groupby('category').size().reset_index(name='count')
col1, col2 = st.columns(2)
with col1:
st.subheader("Pie Chart", divider="orange")
fig_pie = px.pie(grouped_counts, values='count', names='category',
hover_data=['count'], labels={'count': 'count'}, title='Percentage of predicted categories')
fig_pie.update_traces(textposition='inside', textinfo='percent+label')
st.plotly_chart(fig_pie)
if comet_initialized:
experiment.log_figure(figure=fig_pie, figure_name="category_pie_chart")
with col2:
st.subheader("Bar Chart", divider="orange")
fig_bar = px.bar(grouped_counts, x="count", y="category", color="category", text_auto=True,
title='Occurrences of predicted categories')
st.plotly_chart(fig_bar)
if comet_initialized:
experiment.log_figure(figure=fig_bar, figure_name="category_bar_chart")
# --- Downloadable Content ---
dfa = pd.DataFrame(
data={
'Column Name': ['word', 'entity_group', 'score', 'start', 'end', 'category'],
'Description': [
'entity extracted from your text data',
'label (tag) assigned to a given extracted entity',
'accuracy score; how accurately a tag has been assigned to a given entity',
'index of the start of the corresponding entity',
'index of the end of the corresponding entity',
'the broader category the entity belongs to',
]
}
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as myzip:
myzip.writestr("Summary of the results.csv", df.to_csv(index=False))
myzip.writestr("Glossary of tags.csv", dfa.to_csv(index=False))
with stylable_container(
key="download_button",
css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""",
):
st.download_button(
label="Download zip file",
data=buf.getvalue(),
file_name="nlpblogs_ner_results.zip",
mime="application/zip",
)
if comet_initialized:
experiment.log_asset(buf.getvalue(), file_name="downloadable_results.zip")
st.divider()
if comet_initialized:
experiment.end()
end_time_overall = time.time()
elapsed_time_overall = end_time_overall - start_time_overall
st.info(f"Results processed in **{elapsed_time_overall:.2f} seconds**.")
st.write(f"Number of times you requested results: **{st.session_state['file_upload_attempts']}/{max_attempts}**")