import time import streamlit as st import pandas as pd import io from transformers import pipeline from streamlit_extras.stylable_container import stylable_container import plotly.express as px import zipfile import os from comet_ml import Experiment import re import numpy as np import json from cryptography.fernet import Fernet st.set_page_config(layout="wide", page_title="Named Entity Recognition App") COMET_API_KEY = os.environ.get("COMET_API_KEY") COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE") COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME") comet_initialized = False if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME: comet_initialized = True if 'file_upload_attempts' not in st.session_state: st.session_state['file_upload_attempts'] = 0 if 'encrypted_extracted_text' not in st.session_state: st.session_state['encrypted_extracted_text'] = None if 'json_dataframe' not in st.session_state: st.session_state['json_dataframe'] = None max_attempts = 10 @st.cache_resource def load_ner_model(): try: return pipeline("token-classification", model="saattrupdan/nbailab-base-ner-scandi", aggregation_strategy="max", stride=128) except Exception as e: st.error(f"Failed to load NER model. Please check your internet connection or model availability: {e}") st.stop() @st.cache_resource def load_encryption_key(): """ Loads the Fernet encryption key from environment variables. This key is crucial for encrypting/decrypting sensitive data. It's cached as a resource to be loaded only once. """ try: # Get the key string from environment variables key_str = os.environ.get("FERNET_KEY") if not key_str: raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.") # Fernet key must be bytes, so encode the string key_bytes = key_str.encode('utf-8') return Fernet(key_bytes) except ValueError as ve: st.error(f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely in your deployment environment (e.g., Hugging Face Spaces secrets, Render environment variables) or in a local .env file for development.") st.stop() # Stop the app if the key is not found, as security is compromised except Exception as e: st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.") st.stop() # Initialize the Fernet cipher instance fernet = load_encryption_key() def encrypt_text(text_content: str) -> bytes: """ Encrypts a string using the loaded Fernet cipher. The input string is first encoded to UTF-8 bytes. """ return fernet.encrypt(text_content.encode('utf-8')) def decrypt_text(encrypted_bytes: bytes) -> str | None: """ Decrypts bytes using the loaded Fernet cipher. Returns the decrypted string, or None if decryption fails (e.g., tampering). """ try: return fernet.decrypt(encrypted_bytes).decode('utf-8') except Exception as e: st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}") return None # --- UI Elements --- st.subheader("Scandinavian JSON Entity Finder", divider="orange") st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary") expander = st.expander("**Important notes on the Scandinavian JSON Entity Finder**") expander.write(''' **Named Entities:** This Scandinavian JSON Entity Finder predicts four (4) labels (“PER: person”, “LOC: location”, “ORG: organization”, “MISC: miscellaneous”). Results are presented in an easy-to-read table, visualized in an interactive tree map, pie chart, and bar chart, and are available for download along with a Glossary of tags. **How to Use:** Upload your JSON file. Then, click the 'Results' button to extract and tag entities in your text data. **Usage Limits:** You can request results up to 10 times. **Language settings:** Please check and adjust the language settings in your computer, so the Danish, Swedish, Norwegian, Icelandic and Faroese characters are handled properly in your downloaded file. **Customization:** To change the app's background color to white or black, click the three-dot menu on the right-hand side of your app, go to Settings and then Choose app theme, colors and fonts. **Technical issues:** If your connection times out, please refresh the page or reopen the app's URL. For any errors or inquiries, please contact us at info@nlpblogs.com ''') with st.sidebar: container = st.container(border=True) container.write("**Named Entity Recognition (NER)** is the task of " "extracting and tagging entities in text data. Entities can be persons, " "organizations, locations, countries, products, events etc.") st.subheader("Related NLP Web Apps", divider="orange") st.link_button("Italian URL & TXT Entity Finder", "https://nlpblogs.com/shop/named-entity-recognition-ner/monolingual-ner-web-apps/italian-url-txt-entity-finder/", type="primary") uploaded_file = st.file_uploader("Choose a JSON file", type=["json"]) # Initialize text for the current run outside the if uploaded_file block # This will be populated if a file is uploaded, otherwise it remains None current_run_text = None if uploaded_file is not None: try: # Read the content as bytes first, then decode for JSON parsing file_contents_bytes = uploaded_file.read() # Reset the file pointer after reading, so json.load can read from the beginning uploaded_file.seek(0) dados = json.load(uploaded_file) # Attempt to convert JSON to DataFrame and extract text try: st.session_state['json_dataframe'] = pd.DataFrame(dados) # Concatenate all content into a single string for NER df_string_representation = st.session_state['json_dataframe'].to_string(index=False, header=False) # Simple regex to remove non-alphanumeric characters but keep spaces and periods text_content = re.sub(r'[^\w\s.]', '', df_string_representation) # Remove the specific string "Empty DataFrame Columns" if it appears due to conversion text_content = text_content.replace("Empty DataFrame Columns", "").strip() current_run_text = text_content # Set text for current run if not current_run_text.strip(): # Check if text is effectively empty st.warning("No meaningful text could be extracted from the JSON DataFrame for analysis.") current_run_text = None # Reset to None if empty except ValueError: # If direct conversion to DataFrame fails, try to extract strings directly from JSON structure st.info("JSON data could not be directly converted to a simple DataFrame for display. Attempting to extract text directly.") extracted_texts_list = [] if isinstance(dados, list): for item in dados: if isinstance(item, str): extracted_texts_list.append(item) elif isinstance(item, dict): # Recursively get string values from dicts in a list for val in item.values(): if isinstance(val, str): extracted_texts_list.append(val) elif isinstance(val, list): for sub_val in val: if isinstance(sub_val, str): extracted_texts_list.append(sub_val) elif isinstance(dados, dict): # Get string values from a dictionary for value in dados.values(): if isinstance(value, str): extracted_texts_list.append(value) elif isinstance(value, list): for sub_val in value: if isinstance(sub_val, str): extracted_texts_list.append(sub_val) if extracted_texts_list: current_run_text = " ".join(extracted_texts_list).strip() else: st.warning("No string text could be extracted from the JSON for analysis.") current_run_text = None if current_run_text: # --- ENCRYPT THE EXTRACTED TEXT BEFORE STORING IN SESSION STATE --- encrypted_text_bytes = encrypt_text(current_run_text) st.session_state['encrypted_extracted_text'] = encrypted_text_bytes # Optionally clear the unencrypted version from session state if you only want the encrypted one # st.session_state['extracted_text_for_ner'] = None st.success("JSON file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.") st.divider() else: st.session_state['encrypted_extracted_text'] = None # st.session_state['extracted_text_for_ner'] = None st.error("Could not extract meaningful text from the uploaded JSON file.") except json.JSONDecodeError as e: st.error(f"JSON Decode Error: {e}") st.error("Please ensure the uploaded file contains valid JSON data.") st.session_state['encrypted_extracted_text'] = None st.session_state['json_dataframe'] = None except Exception as e: st.error(f"An unexpected error occurred during file processing: {e}") st.session_state['encrypted_extracted_text'] = None st.session_state['json_dataframe'] = None # --- Results Button and Processing Logic --- if st.button("Results"): start_time = time.time() if not comet_initialized: st.warning("Comet ML not initialized. Check environment variables if you wish to log data.") if st.session_state['file_upload_attempts'] >= max_attempts: st.error(f"You have requested results {max_attempts} times. You have reached your daily request limit.") st.stop() # --- DECRYPT THE TEXT BEFORE PASSING TO NER MODEL --- text_for_ner = None if st.session_state['encrypted_extracted_text'] is not None: text_for_ner = decrypt_text(st.session_state['encrypted_extracted_text']) if text_for_ner is None or not text_for_ner.strip(): st.warning("No extractable text content available for analysis. Please upload a valid JSON file.") st.stop() st.session_state['file_upload_attempts'] += 1 with st.spinner("Analyzing text...", show_time=True): model = load_ner_model() text_entities = model(text_for_ner) # Use the decrypted text df = pd.DataFrame(text_entities) if 'word' in df.columns: # Ensure 'word' column is string type before applying regex if df['word'].dtype == 'object': pattern = r'[^\w\s]' # Regex to remove non-alphanumeric characters but keep spaces and periods df['word'] = df['word'].astype(str).replace(pattern, '', regex=True) else: st.warning("The 'word' column is not of string type; skipping character cleaning.") else: st.error("The 'word' column does not exist in the DataFrame. Cannot perform cleaning.") st.stop() # Stop execution if the column is missing # Replace empty strings with 'Unknown' and drop rows with NaN after cleaning df = df.replace('', 'Unknown').dropna() if df.empty: st.warning("No entities were extracted from the uploaded text.") st.stop() if comet_initialized: experiment = Experiment( api_key=COMET_API_KEY, workspace=COMET_WORKSPACE, project_name=COMET_PROJECT_NAME, ) experiment.log_parameter("input_text_length", len(text_for_ner)) experiment.log_table("predicted_entities", df) # --- Display Results --- properties = {"border": "2px solid gray", "color": "blue", "font-size": "16px"} df_styled = df.style.set_properties(**properties) st.dataframe(df_styled, use_container_width=True) with st.expander("See Glossary of tags"): st.write(''' '**word**': ['entity extracted from your text data'] '**score**': ['accuracy score; how accurately a tag has been assigned to a given entity'] '**entity_group**': ['label (tag) assigned to a given extracted entity'] '**start**': ['index of the start of the corresponding entity'] '**end**': ['index of the end of the corresponding entity'] ''') entity_groups = {"PER": "person", "LOC": "location", "ORG": "organization", "MISC": "miscellaneous", } st.subheader("Grouped entities", divider = "orange") # Convert entity_groups dictionary to a list of (key, title) tuples entity_items = list(entity_groups.items()) # Define how many tabs per row tabs_per_row = 5 # Loop through the entity items in chunks for i in range(0, len(entity_items), tabs_per_row): current_row_entities = entity_items[i : i + tabs_per_row] tab_titles = [item[1] for item in current_row_entities] tabs = st.tabs(tab_titles) for j, (entity_group_key, tab_title) in enumerate(current_row_entities): with tabs[j]: if entity_group_key in df["entity_group"].unique(): df_filtered = df[df["entity_group"] == entity_group_key] st.dataframe(df_filtered, use_container_width=True) else: st.info(f"No '{tab_title}' entities found in the text.") # Display an empty DataFrame for consistency if no entities are found st.dataframe(pd.DataFrame({ 'entity_group': [entity_group_key], 'score': [np.nan], 'word': [np.nan], 'start': [np.nan], 'end': [np.nan] }), hide_index=True) st.divider() # --- Visualizations --- st.subheader("Tree map", divider="orange") fig_treemap = px.treemap(df, path=[px.Constant("all"), 'word', 'entity_group'], values='score', color='entity_group') fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25)) st.plotly_chart(fig_treemap) if comet_initialized: experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap") value_counts1 = df['entity_group'].value_counts() final_df_counts = value_counts1.reset_index().rename(columns={"index": "entity_group"}) col1, col2 = st.columns(2) with col1: st.subheader("Pie Chart", divider="orange") fig_pie = px.pie(final_df_counts, values='count', names='entity_group', hover_data=['count'], labels={'count': 'count'}, title='Percentage of predicted labels') fig_pie.update_traces(textposition='inside', textinfo='percent+label') st.plotly_chart(fig_pie) if comet_initialized: experiment.log_figure(figure=fig_pie, figure_name="label_pie_chart") with col2: st.subheader("Bar Chart", divider="orange") fig_bar = px.bar(final_df_counts, x="count", y="entity_group", color="entity_group", text_auto=True, title='Occurrences of predicted labels') st.plotly_chart(fig_bar) if comet_initialized: experiment.log_figure(figure=fig_bar, figure_name="label_bar_chart") # --- Downloadable Content --- dfa = pd.DataFrame( data={ 'Column Name': ['word', 'entity_group','score', 'start', 'end'], 'Description': [ 'entity extracted from your text data', 'label (tag) assigned to a given extracted entity', 'accuracy score; how accurately a tag has been assigned to a given entity', 'index of the start of the corresponding entity', 'index of the end of the corresponding entity', ] } ) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as myzip: myzip.writestr("Summary of the results.csv", df.to_csv(index=False)) myzip.writestr("Glossary of tags.csv", dfa.to_csv(index=False)) with stylable_container( key="download_button", css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""", ): st.download_button( label="Download zip file", data=buf.getvalue(), file_name="nlpblogs_ner_results.zip", mime="application/zip", ) if comet_initialized: experiment.log_asset(buf.getvalue(), file_name="downloadable_results.zip") st.divider() if comet_initialized: experiment.end() end_time = time.time() elapsed_time = end_time - start_time st.info(f"Results processed in **{elapsed_time:.2f} seconds**.") st.write(f"Number of times you requested results: **{st.session_state['file_upload_attempts']}/{max_attempts}**")