Maria Tsilimos
Create app.py
d2e00e4 unverified
raw
history blame
18.2 kB
import time
import streamlit as st
import pandas as pd
import io
from transformers import pipeline
from streamlit_extras.stylable_container import stylable_container
import plotly.express as px
import zipfile
import os
from comet_ml import Experiment
import re
import numpy as np
import json
from cryptography.fernet import Fernet
st.set_page_config(layout="wide", page_title="Named Entity Recognition App")
COMET_API_KEY = os.environ.get("COMET_API_KEY")
COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE")
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME")
comet_initialized = False
if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME:
comet_initialized = True
if 'file_upload_attempts' not in st.session_state:
st.session_state['file_upload_attempts'] = 0
if 'encrypted_extracted_text' not in st.session_state:
st.session_state['encrypted_extracted_text'] = None
if 'json_dataframe' not in st.session_state:
st.session_state['json_dataframe'] = None
max_attempts = 10
@st.cache_resource
def load_ner_model():
try:
return pipeline("token-classification",
model="saattrupdan/nbailab-base-ner-scandi",
aggregation_strategy="max",
stride=128)
except Exception as e:
st.error(f"Failed to load NER model. Please check your internet connection or model availability: {e}")
st.stop()
@st.cache_resource
def load_encryption_key():
"""
Loads the Fernet encryption key from environment variables.
This key is crucial for encrypting/decrypting sensitive data.
It's cached as a resource to be loaded only once.
"""
try:
# Get the key string from environment variables
key_str = os.environ.get("FERNET_KEY")
if not key_str:
raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.")
# Fernet key must be bytes, so encode the string
key_bytes = key_str.encode('utf-8')
return Fernet(key_bytes)
except ValueError as ve:
st.error(f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely in your deployment environment (e.g., Hugging Face Spaces secrets, Render environment variables) or in a local .env file for development.")
st.stop() # Stop the app if the key is not found, as security is compromised
except Exception as e:
st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.")
st.stop()
# Initialize the Fernet cipher instance
fernet = load_encryption_key()
def encrypt_text(text_content: str) -> bytes:
"""
Encrypts a string using the loaded Fernet cipher.
The input string is first encoded to UTF-8 bytes.
"""
return fernet.encrypt(text_content.encode('utf-8'))
def decrypt_text(encrypted_bytes: bytes) -> str | None:
"""
Decrypts bytes using the loaded Fernet cipher.
Returns the decrypted string, or None if decryption fails (e.g., tampering).
"""
try:
return fernet.decrypt(encrypted_bytes).decode('utf-8')
except Exception as e:
st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}")
return None
# --- UI Elements ---
st.subheader("Scandinavian JSON Entity Finder", divider="orange")
st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary")
expander = st.expander("**Important notes on the Scandinavian JSON Entity Finder**")
expander.write('''
**Named Entities:** This Scandinavian JSON Entity Finder predicts four
(4) labels (“PER: person”, “LOC: location”, “ORG: organization”, “MISC:
miscellaneous”). Results are presented in an easy-to-read table, visualized in
an interactive tree map, pie chart, and bar chart, and are available for
download along with a Glossary of tags.
**How to Use:** Upload your JSON file. Then, click the 'Results' button
to extract and tag entities in your text data.
**Usage Limits:** You can request results up to 10 times.
**Language settings:** Please check and adjust the language settings in
your computer, so the Danish, Swedish, Norwegian, Icelandic and Faroese
characters are handled properly in your downloaded file.
**Customization:** To change the app's background color to white or
black, click the three-dot menu on the right-hand side of your app, go to
Settings and then Choose app theme, colors and fonts.
**Technical issues:** If your connection times out, please refresh the
page or reopen the app's URL.
For any errors or inquiries, please contact us at info@nlpblogs.com
''')
with st.sidebar:
container = st.container(border=True)
container.write("**Named Entity Recognition (NER)** is the task of "
"extracting and tagging entities in text data. Entities can be persons, "
"organizations, locations, countries, products, events etc.")
st.subheader("Related NLP Web Apps", divider="orange")
st.link_button("Italian URL & TXT Entity Finder",
"https://nlpblogs.com/shop/named-entity-recognition-ner/monolingual-ner-web-apps/italian-url-txt-entity-finder/",
type="primary")
uploaded_file = st.file_uploader("Choose a JSON file", type=["json"])
# Initialize text for the current run outside the if uploaded_file block
# This will be populated if a file is uploaded, otherwise it remains None
current_run_text = None
if uploaded_file is not None:
try:
# Read the content as bytes first, then decode for JSON parsing
file_contents_bytes = uploaded_file.read()
# Reset the file pointer after reading, so json.load can read from the beginning
uploaded_file.seek(0)
dados = json.load(uploaded_file)
# Attempt to convert JSON to DataFrame and extract text
try:
st.session_state['json_dataframe'] = pd.DataFrame(dados)
# Concatenate all content into a single string for NER
df_string_representation = st.session_state['json_dataframe'].to_string(index=False, header=False)
# Simple regex to remove non-alphanumeric characters but keep spaces and periods
text_content = re.sub(r'[^\w\s.]', '', df_string_representation)
# Remove the specific string "Empty DataFrame Columns" if it appears due to conversion
text_content = text_content.replace("Empty DataFrame Columns", "").strip()
current_run_text = text_content # Set text for current run
if not current_run_text.strip(): # Check if text is effectively empty
st.warning("No meaningful text could be extracted from the JSON DataFrame for analysis.")
current_run_text = None # Reset to None if empty
except ValueError:
# If direct conversion to DataFrame fails, try to extract strings directly from JSON structure
st.info("JSON data could not be directly converted to a simple DataFrame for display. Attempting to extract text directly.")
extracted_texts_list = []
if isinstance(dados, list):
for item in dados:
if isinstance(item, str):
extracted_texts_list.append(item)
elif isinstance(item, dict):
# Recursively get string values from dicts in a list
for val in item.values():
if isinstance(val, str):
extracted_texts_list.append(val)
elif isinstance(val, list):
for sub_val in val:
if isinstance(sub_val, str):
extracted_texts_list.append(sub_val)
elif isinstance(dados, dict):
# Get string values from a dictionary
for value in dados.values():
if isinstance(value, str):
extracted_texts_list.append(value)
elif isinstance(value, list):
for sub_val in value:
if isinstance(sub_val, str):
extracted_texts_list.append(sub_val)
if extracted_texts_list:
current_run_text = " ".join(extracted_texts_list).strip()
else:
st.warning("No string text could be extracted from the JSON for analysis.")
current_run_text = None
if current_run_text:
# --- ENCRYPT THE EXTRACTED TEXT BEFORE STORING IN SESSION STATE ---
encrypted_text_bytes = encrypt_text(current_run_text)
st.session_state['encrypted_extracted_text'] = encrypted_text_bytes
# Optionally clear the unencrypted version from session state if you only want the encrypted one
# st.session_state['extracted_text_for_ner'] = None
st.success("JSON file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.")
st.divider()
else:
st.session_state['encrypted_extracted_text'] = None
# st.session_state['extracted_text_for_ner'] = None
st.error("Could not extract meaningful text from the uploaded JSON file.")
except json.JSONDecodeError as e:
st.error(f"JSON Decode Error: {e}")
st.error("Please ensure the uploaded file contains valid JSON data.")
st.session_state['encrypted_extracted_text'] = None
st.session_state['json_dataframe'] = None
except Exception as e:
st.error(f"An unexpected error occurred during file processing: {e}")
st.session_state['encrypted_extracted_text'] = None
st.session_state['json_dataframe'] = None
# --- Results Button and Processing Logic ---
if st.button("Results"):
start_time = time.time()
if not comet_initialized:
st.warning("Comet ML not initialized. Check environment variables if you wish to log data.")
if st.session_state['file_upload_attempts'] >= max_attempts:
st.error(f"You have requested results {max_attempts} times. You have reached your daily request limit.")
st.stop()
# --- DECRYPT THE TEXT BEFORE PASSING TO NER MODEL ---
text_for_ner = None
if st.session_state['encrypted_extracted_text'] is not None:
text_for_ner = decrypt_text(st.session_state['encrypted_extracted_text'])
if text_for_ner is None or not text_for_ner.strip():
st.warning("No extractable text content available for analysis. Please upload a valid JSON file.")
st.stop()
st.session_state['file_upload_attempts'] += 1
with st.spinner("Analyzing text...", show_time=True):
model = load_ner_model()
text_entities = model(text_for_ner) # Use the decrypted text
df = pd.DataFrame(text_entities)
if 'word' in df.columns:
# Ensure 'word' column is string type before applying regex
if df['word'].dtype == 'object':
pattern = r'[^\w\s]' # Regex to remove non-alphanumeric characters but keep spaces and periods
df['word'] = df['word'].astype(str).replace(pattern, '', regex=True)
else:
st.warning("The 'word' column is not of string type; skipping character cleaning.")
else:
st.error("The 'word' column does not exist in the DataFrame. Cannot perform cleaning.")
st.stop() # Stop execution if the column is missing
# Replace empty strings with 'Unknown' and drop rows with NaN after cleaning
df = df.replace('', 'Unknown').dropna()
if df.empty:
st.warning("No entities were extracted from the uploaded text.")
st.stop()
if comet_initialized:
experiment = Experiment(
api_key=COMET_API_KEY,
workspace=COMET_WORKSPACE,
project_name=COMET_PROJECT_NAME,
)
experiment.log_parameter("input_text_length", len(text_for_ner))
experiment.log_table("predicted_entities", df)
# --- Display Results ---
properties = {"border": "2px solid gray", "color": "blue", "font-size": "16px"}
df_styled = df.style.set_properties(**properties)
st.dataframe(df_styled, use_container_width=True)
with st.expander("See Glossary of tags"):
st.write('''
'**word**': ['entity extracted from your text data']
'**score**': ['accuracy score; how accurately a tag has been assigned to
a given entity']
'**entity_group**': ['label (tag) assigned to a given extracted entity']
'**start**': ['index of the start of the corresponding entity']
'**end**': ['index of the end of the corresponding entity']
''')
entity_groups = {"PER": "person",
"LOC": "location",
"ORG": "organization",
"MISC": "miscellaneous",
}
st.subheader("Grouped entities", divider = "orange")
# Convert entity_groups dictionary to a list of (key, title) tuples
entity_items = list(entity_groups.items())
# Define how many tabs per row
tabs_per_row = 5
# Loop through the entity items in chunks
for i in range(0, len(entity_items), tabs_per_row):
current_row_entities = entity_items[i : i + tabs_per_row]
tab_titles = [item[1] for item in current_row_entities]
tabs = st.tabs(tab_titles)
for j, (entity_group_key, tab_title) in enumerate(current_row_entities):
with tabs[j]:
if entity_group_key in df["entity_group"].unique():
df_filtered = df[df["entity_group"] == entity_group_key]
st.dataframe(df_filtered, use_container_width=True)
else:
st.info(f"No '{tab_title}' entities found in the text.")
# Display an empty DataFrame for consistency if no entities are found
st.dataframe(pd.DataFrame({
'entity_group': [entity_group_key],
'score': [np.nan],
'word': [np.nan],
'start': [np.nan],
'end': [np.nan]
}), hide_index=True)
st.divider()
# --- Visualizations ---
st.subheader("Tree map", divider="orange")
fig_treemap = px.treemap(df, path=[px.Constant("all"), 'word',
'entity_group'],
values='score', color='entity_group')
fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25))
st.plotly_chart(fig_treemap)
if comet_initialized:
experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap")
value_counts1 = df['entity_group'].value_counts()
final_df_counts = value_counts1.reset_index().rename(columns={"index": "entity_group"})
col1, col2 = st.columns(2)
with col1:
st.subheader("Pie Chart", divider="orange")
fig_pie = px.pie(final_df_counts, values='count', names='entity_group',
hover_data=['count'], labels={'count': 'count'}, title='Percentage of predicted labels')
fig_pie.update_traces(textposition='inside', textinfo='percent+label')
st.plotly_chart(fig_pie)
if comet_initialized:
experiment.log_figure(figure=fig_pie, figure_name="label_pie_chart")
with col2:
st.subheader("Bar Chart", divider="orange")
fig_bar = px.bar(final_df_counts, x="count", y="entity_group", color="entity_group", text_auto=True,
title='Occurrences of predicted labels')
st.plotly_chart(fig_bar)
if comet_initialized:
experiment.log_figure(figure=fig_bar, figure_name="label_bar_chart")
# --- Downloadable Content ---
dfa = pd.DataFrame(
data={
'Column Name': ['word', 'entity_group','score', 'start', 'end'],
'Description': [
'entity extracted from your text data',
'label (tag) assigned to a given extracted entity',
'accuracy score; how accurately a tag has been assigned to a given entity',
'index of the start of the corresponding entity',
'index of the end of the corresponding entity',
]
}
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as myzip:
myzip.writestr("Summary of the results.csv", df.to_csv(index=False))
myzip.writestr("Glossary of tags.csv", dfa.to_csv(index=False))
with stylable_container(
key="download_button",
css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""",
):
st.download_button(
label="Download zip file",
data=buf.getvalue(),
file_name="nlpblogs_ner_results.zip",
mime="application/zip",
)
if comet_initialized:
experiment.log_asset(buf.getvalue(), file_name="downloadable_results.zip")
st.divider()
if comet_initialized:
experiment.end()
end_time = time.time()
elapsed_time = end_time - start_time
st.info(f"Results processed in **{elapsed_time:.2f} seconds**.")
st.write(f"Number of times you requested results: **{st.session_state['file_upload_attempts']}/{max_attempts}**")