|
import time |
|
import streamlit as st |
|
import pandas as pd |
|
import io |
|
from transformers import pipeline |
|
from streamlit_extras.stylable_container import stylable_container |
|
import plotly.express as px |
|
import zipfile |
|
import os |
|
from comet_ml import Experiment |
|
import re |
|
import numpy as np |
|
import json |
|
from cryptography.fernet import Fernet |
|
|
|
st.set_page_config(layout="wide", |
|
page_title="Named Entity Recognition App") |
|
|
|
|
|
COMET_API_KEY = os.environ.get("COMET_API_KEY") |
|
COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE") |
|
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME") |
|
|
|
comet_initialized = False |
|
if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME: |
|
comet_initialized = True |
|
|
|
|
|
COUNTER_FILE = "counter_json_finder.json" |
|
HISTORY_FILE = "file_history_json_finder.json" |
|
max_attempts = 300 |
|
|
|
|
|
def load_attempts(): |
|
""" |
|
Loads the attempts count from a persistent JSON file. |
|
Returns 0 if the file doesn't exist or is invalid. |
|
""" |
|
if os.path.exists(COUNTER_FILE): |
|
try: |
|
with open(COUNTER_FILE, "r") as f: |
|
data = json.load(f) |
|
return data.get('file_upload_attempts', 0) |
|
except (json.JSONDecodeError, KeyError): |
|
return 0 |
|
return 0 |
|
|
|
def save_attempts(attempts): |
|
""" |
|
Saves the current attempts count to the persistent JSON file. |
|
""" |
|
with open(COUNTER_FILE, "w") as f: |
|
json.dump({'file_upload_attempts': attempts}, f) |
|
|
|
def load_history(): |
|
""" |
|
Loads the file upload history from a persistent JSON file. |
|
Returns an empty list if the file doesn't exist or is invalid. |
|
""" |
|
if os.path.exists(HISTORY_FILE): |
|
try: |
|
with open(HISTORY_FILE, "r") as f: |
|
data = json.load(f) |
|
return data.get('uploaded_files', []) |
|
except (json.JSONDecodeError, KeyError): |
|
return [] |
|
return [] |
|
|
|
def save_history(history): |
|
""" |
|
Saves the current file upload history to the persistent JSON file. |
|
""" |
|
with open(HISTORY_FILE, "w") as f: |
|
json.dump({'uploaded_files': history}, f) |
|
|
|
def clear_history_data(): |
|
"""Clears the file history from session state and deletes the persistent file.""" |
|
if os.path.exists(HISTORY_FILE): |
|
os.remove(HISTORY_FILE) |
|
st.session_state['uploaded_files_history'] = [] |
|
st.rerun() |
|
|
|
|
|
if 'file_upload_attempts' not in st.session_state: |
|
st.session_state['file_upload_attempts'] = load_attempts() |
|
|
|
save_attempts(st.session_state['file_upload_attempts']) |
|
|
|
if 'uploaded_files_history' not in st.session_state: |
|
st.session_state['uploaded_files_history'] = load_history() |
|
|
|
save_history(st.session_state['uploaded_files_history']) |
|
|
|
if 'encrypted_extracted_text' not in st.session_state: |
|
st.session_state['encrypted_extracted_text'] = None |
|
|
|
if 'json_dataframe' not in st.session_state: |
|
st.session_state['json_dataframe'] = None |
|
|
|
|
|
ENTITY_LABELS_CATEGORIZED = { |
|
"Persons": ["PER"], |
|
"Locations": ["LOC"], |
|
"Organizations": ["ORG"], |
|
"Miscellaneous": ["MISC"], |
|
} |
|
|
|
|
|
LABEL_TO_CATEGORY_MAP = { |
|
label: category for category, labels in ENTITY_LABELS_CATEGORIZED.items() for label in labels |
|
} |
|
|
|
@st.cache_resource |
|
def load_ner_model(): |
|
""" |
|
Loads the pre-trained NER model ("saattrupdan/nbailab-base-ner-scandi") and caches it. |
|
This model is specifically trained for Scandinavian languages. |
|
""" |
|
try: |
|
return pipeline( |
|
"token-classification", |
|
model="saattrupdan/nbailab-base-ner-scandi", |
|
aggregation_strategy="max", |
|
ignore_labels=["O"], |
|
stride=128 |
|
) |
|
except Exception as e: |
|
st.error(f"Failed to load NER model. Please check your internet connection or model availability: {e}") |
|
st.stop() |
|
|
|
@st.cache_resource |
|
def load_encryption_key(): |
|
""" |
|
Loads the Fernet encryption key from environment variables. |
|
This key is crucial for encrypting/decrypting sensitive data. |
|
It's cached as a resource to be loaded only once. |
|
""" |
|
try: |
|
|
|
key_str = os.environ.get("FERNET_KEY") |
|
if not key_str: |
|
raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.") |
|
|
|
|
|
key_bytes = key_str.encode('utf-8') |
|
return Fernet(key_bytes) |
|
except ValueError as ve: |
|
st.error( |
|
f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely " |
|
"in your deployment environment (e.g., Hugging Face Spaces secrets, Render environment variables) " |
|
"or in a local .env file for development." |
|
) |
|
st.stop() |
|
except Exception as e: |
|
st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.") |
|
st.stop() |
|
|
|
|
|
fernet = load_encryption_key() |
|
|
|
def encrypt_text(text_content: str) -> bytes: |
|
""" |
|
Encrypts a string using the loaded Fernet cipher. |
|
The input string is first encoded to UTF-8 bytes. |
|
""" |
|
return fernet.encrypt(text_content.encode('utf-8')) |
|
|
|
def decrypt_text(encrypted_bytes: bytes) -> str | None: |
|
""" |
|
Decrypts bytes using the loaded Fernet cipher. |
|
Returns the decrypted string, or None if decryption fails (e.g., tampering). |
|
""" |
|
try: |
|
return fernet.decrypt(encrypted_bytes).decode('utf-8') |
|
except Exception as e: |
|
st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}") |
|
return None |
|
|
|
|
|
st.subheader("Scandinavian JSON Entity Finder", divider="orange") |
|
st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary") |
|
|
|
expander = st.expander("**Important notes on the Scandinavian JSON Entity Finder**") |
|
expander.write(''' |
|
**Named Entities:** This Scandinavian JSON Entity Finder predicts four |
|
(4) labels (“PER: person”, “LOC: location”, “ORG: organization”, “MISC: |
|
miscellaneous”). Results are presented in an easy-to-read table, visualized in |
|
an interactive tree map, pie chart, and bar chart, and are available for |
|
download along with a Glossary of tags. |
|
|
|
**How to Use:** Upload your JSON file. Then, click the 'Results' button |
|
to extract and tag entities in your text data. |
|
|
|
**Usage Limits:** You can request results up to 300 times within a 30-day period. |
|
|
|
**Language settings:** Please check and adjust the language settings in |
|
your computer, so the Danish, Swedish, Norwegian, Icelandic and Faroese |
|
characters are handled properly in your downloaded file. |
|
|
|
**Customization:** To change the app's background color to white or |
|
black, click the three-dot menu on the right-hand side of your app, go to |
|
Settings and then Choose app theme, colors and fonts. |
|
|
|
**Technical issues:** If your connection times out, please refresh the |
|
page or reopen the app's URL. |
|
|
|
For any errors or inquiries, please contact us at info@nlpblogs.com |
|
''') |
|
|
|
with st.sidebar: |
|
|
|
|
|
|
|
st.subheader("Your File Upload History", divider="orange") |
|
if st.session_state['uploaded_files_history']: |
|
history_to_display = st.session_state['uploaded_files_history'] |
|
history_df = pd.DataFrame(history_to_display) |
|
st.dataframe(history_df, use_container_width=True, hide_index=True) |
|
|
|
if st.button("Clear File History", help="This will permanently delete the file history from the application."): |
|
clear_history_data() |
|
else: |
|
st.info("You have not uploaded any files yet.") |
|
|
|
|
|
st.subheader("Build your own NER Web App in a minute without writing a single line of code.", divider="orange") |
|
st.link_button("NER File Builder", |
|
"https://nlpblogs.com/shop/named-entity-recognition-ner/ner-file-builder/", |
|
type="primary") |
|
|
|
uploaded_file = st.file_uploader("Choose a JSON file", type=["json"]) |
|
|
|
|
|
|
|
current_run_text = None |
|
|
|
if uploaded_file is not None: |
|
try: |
|
|
|
file_contents_bytes = uploaded_file.read() |
|
|
|
|
|
uploaded_file.seek(0) |
|
dados = json.load(uploaded_file) |
|
|
|
|
|
try: |
|
st.session_state['json_dataframe'] = pd.DataFrame(dados) |
|
|
|
|
|
df_string_representation = st.session_state['json_dataframe'].to_string(index=False, header=False) |
|
|
|
text_content = re.sub(r'[^\w\s.]', '', df_string_representation) |
|
|
|
text_content = text_content.replace("Empty DataFrame Columns", "").strip() |
|
current_run_text = text_content |
|
if not current_run_text.strip(): |
|
st.warning("No meaningful text could be extracted from the JSON DataFrame for analysis.") |
|
current_run_text = None |
|
except ValueError: |
|
|
|
st.info("JSON data could not be directly converted to a simple DataFrame for display. Attempting to extract text directly.") |
|
extracted_texts_list = [] |
|
if isinstance(dados, list): |
|
for item in dados: |
|
if isinstance(item, str): |
|
extracted_texts_list.append(item) |
|
elif isinstance(item, dict): |
|
|
|
for val in item.values(): |
|
if isinstance(val, str): |
|
extracted_texts_list.append(val) |
|
elif isinstance(val, list): |
|
for sub_val in val: |
|
if isinstance(sub_val, str): |
|
extracted_texts_list.append(sub_val) |
|
elif isinstance(dados, dict): |
|
|
|
for value in dados.values(): |
|
if isinstance(value, str): |
|
extracted_texts_list.append(value) |
|
elif isinstance(value, list): |
|
for sub_val in value: |
|
if isinstance(sub_val, str): |
|
extracted_texts_list.append(sub_val) |
|
if extracted_texts_list: |
|
current_run_text = " ".join(extracted_texts_list).strip() |
|
else: |
|
st.warning("No string text could be extracted from the JSON for analysis.") |
|
current_run_text = None |
|
|
|
if current_run_text: |
|
|
|
new_upload_entry = { |
|
"filename": uploaded_file.name, |
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") |
|
} |
|
|
|
st.session_state['uploaded_files_history'].append(new_upload_entry) |
|
|
|
save_history(st.session_state['uploaded_files_history']) |
|
|
|
|
|
encrypted_text_bytes = encrypt_text(current_run_text) |
|
st.session_state['encrypted_extracted_text'] = encrypted_text_bytes |
|
|
|
|
|
st.success("JSON file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.") |
|
st.divider() |
|
else: |
|
st.session_state['encrypted_extracted_text'] = None |
|
|
|
st.error("Could not extract meaningful text from the uploaded JSON file.") |
|
except json.JSONDecodeError as e: |
|
st.error(f"JSON Decode Error: {e}") |
|
st.error("Please ensure the uploaded file contains valid JSON data.") |
|
st.session_state['encrypted_extracted_text'] = None |
|
st.session_state['json_dataframe'] = None |
|
except Exception as e: |
|
st.error(f"An unexpected error occurred during file processing: {e}") |
|
st.session_state['encrypted_extracted_text'] = None |
|
st.session_state['json_dataframe'] = None |
|
|
|
|
|
if st.button("Results"): |
|
start_time_overall = time.time() |
|
if not comet_initialized: |
|
st.warning("Comet ML not initialized. Check environment variables if you wish to log data.") |
|
|
|
|
|
if st.session_state['file_upload_attempts'] >= max_attempts: |
|
st.error(f"You have requested results {max_attempts} times. You have reached your daily request limit.") |
|
st.stop() |
|
|
|
|
|
text_for_ner = None |
|
if st.session_state['encrypted_extracted_text'] is not None: |
|
text_for_ner = decrypt_text(st.session_state['encrypted_extracted_text']) |
|
|
|
if text_for_ner is None or not text_for_ner.strip(): |
|
st.warning("No extractable text content available for analysis. Please upload a valid JSON file.") |
|
st.stop() |
|
|
|
|
|
st.session_state['file_upload_attempts'] += 1 |
|
save_attempts(st.session_state['file_upload_attempts']) |
|
|
|
with st.spinner("Analyzing text...", show_time=True): |
|
model = load_ner_model() |
|
|
|
|
|
start_time_ner = time.time() |
|
text_entities = model(text_for_ner) |
|
end_time_ner = time.time() |
|
ner_processing_time = end_time_ner - start_time_ner |
|
|
|
df = pd.DataFrame(text_entities) |
|
|
|
if 'word' in df.columns: |
|
|
|
if df['word'].dtype == 'object': |
|
pattern = r'[^\w\s]' |
|
df['word'] = df['word'].astype(str).replace(pattern, '', regex=True) |
|
else: |
|
st.warning("The 'word' column is not of string type; skipping character cleaning.") |
|
else: |
|
st.error("The 'word' column does not exist in the DataFrame. Cannot perform cleaning.") |
|
st.stop() |
|
|
|
|
|
df = df.replace('', 'Unknown').dropna() |
|
|
|
if df.empty: |
|
st.warning("No entities were extracted from the uploaded text.") |
|
st.stop() |
|
|
|
|
|
df['category'] = df['entity_group'].map(LABEL_TO_CATEGORY_MAP) |
|
|
|
df['category'] = df['category'].fillna('Uncategorized') |
|
|
|
if comet_initialized: |
|
experiment = Experiment( |
|
api_key=COMET_API_KEY, |
|
workspace=COMET_WORKSPACE, |
|
project_name=COMET_PROJECT_NAME, |
|
) |
|
experiment.log_parameter("input_text_length", len(text_for_ner)) |
|
experiment.log_table("predicted_entities", df) |
|
experiment.log_metric("ner_processing_time_seconds", ner_processing_time) |
|
|
|
|
|
|
|
st.subheader("Extracted Entities", divider="rainbow") |
|
properties = {"border": "2px solid gray", "color": "blue", "font-size": "16px"} |
|
df_styled = df.style.set_properties(**properties) |
|
st.dataframe(df_styled, use_container_width=True) |
|
|
|
with st.expander("See Glossary of tags"): |
|
st.write(''' |
|
'**word**': ['entity extracted from your text data'] |
|
|
|
'**score**': ['accuracy score; how accurately a tag has been assigned to |
|
a given entity'] |
|
|
|
'**entity_group**': ['label (tag) assigned to a given extracted entity'] |
|
|
|
'**start**': ['index of the start of the corresponding entity'] |
|
|
|
'**end**': ['index of the end of the corresponding entity'] |
|
|
|
'**category**': ['the broader category the entity belongs to'] |
|
''') |
|
|
|
st.subheader("Grouped entities", divider="orange") |
|
|
|
|
|
unique_categories = sorted(df['category'].unique()) |
|
tabs_per_row = 4 |
|
|
|
|
|
for i in range(0, len(unique_categories), tabs_per_row): |
|
current_row_categories = unique_categories[i : i + tabs_per_row] |
|
tabs = st.tabs(current_row_categories) |
|
|
|
for j, category in enumerate(current_row_categories): |
|
with tabs[j]: |
|
df_filtered = df[df["category"] == category] |
|
if not df_filtered.empty: |
|
st.dataframe(df_filtered, use_container_width=True) |
|
else: |
|
st.info(f"No '{category}' entities found in the text.") |
|
|
|
st.dataframe(pd.DataFrame({ |
|
'entity_group': [np.nan], |
|
'score': [np.nan], |
|
'word': [np.nan], |
|
'start': [np.nan], |
|
'end': [np.nan], |
|
'category': [category] |
|
}), hide_index=True) |
|
st.divider() |
|
|
|
|
|
st.subheader("Tree map", divider="orange") |
|
fig_treemap = px.treemap(df, |
|
path=[px.Constant("all"), 'category', 'entity_group', 'word'], |
|
values='score', color='category', |
|
color_discrete_map={ |
|
'Persons': 'blue', |
|
'Locations': 'green', |
|
'Organizations': 'red', |
|
'Miscellaneous': 'purple', |
|
'Uncategorized': 'gray' |
|
}) |
|
fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25)) |
|
st.plotly_chart(fig_treemap) |
|
if comet_initialized: |
|
experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap") |
|
|
|
|
|
grouped_counts = df.groupby('category').size().reset_index(name='count') |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.subheader("Pie Chart", divider="orange") |
|
fig_pie = px.pie(grouped_counts, values='count', names='category', |
|
hover_data=['count'], labels={'count': 'count'}, title='Percentage of predicted categories') |
|
fig_pie.update_traces(textposition='inside', textinfo='percent+label') |
|
st.plotly_chart(fig_pie) |
|
if comet_initialized: |
|
experiment.log_figure(figure=fig_pie, figure_name="category_pie_chart") |
|
|
|
with col2: |
|
st.subheader("Bar Chart", divider="orange") |
|
fig_bar = px.bar(grouped_counts, x="count", y="category", color="category", text_auto=True, |
|
title='Occurrences of predicted categories') |
|
st.plotly_chart(fig_bar) |
|
if comet_initialized: |
|
experiment.log_figure(figure=fig_bar, figure_name="category_bar_chart") |
|
|
|
|
|
dfa = pd.DataFrame( |
|
data={ |
|
'Column Name': ['word', 'entity_group', 'score', 'start', 'end', 'category'], |
|
'Description': [ |
|
'entity extracted from your text data', |
|
'label (tag) assigned to a given extracted entity', |
|
'accuracy score; how accurately a tag has been assigned to a given entity', |
|
'index of the start of the corresponding entity', |
|
'index of the end of the corresponding entity', |
|
'the broader category the entity belongs to', |
|
] |
|
} |
|
) |
|
buf = io.BytesIO() |
|
with zipfile.ZipFile(buf, "w") as myzip: |
|
myzip.writestr("Summary of the results.csv", df.to_csv(index=False)) |
|
myzip.writestr("Glossary of tags.csv", dfa.to_csv(index=False)) |
|
|
|
with stylable_container( |
|
key="download_button", |
|
css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""", |
|
): |
|
st.download_button( |
|
label="Download zip file", |
|
data=buf.getvalue(), |
|
file_name="nlpblogs_ner_results.zip", |
|
mime="application/zip", |
|
) |
|
if comet_initialized: |
|
experiment.log_asset(buf.getvalue(), file_name="downloadable_results.zip") |
|
|
|
st.divider() |
|
if comet_initialized: |
|
experiment.end() |
|
|
|
end_time_overall = time.time() |
|
elapsed_time_overall = end_time_overall - start_time_overall |
|
st.info(f"Results processed in **{elapsed_time_overall:.2f} seconds**.") |
|
|
|
st.write(f"Number of times you requested results: **{st.session_state['file_upload_attempts']}/{max_attempts}**") |
|
|