Spaces:
Sleeping
Sleeping
File size: 6,603 Bytes
c2f9ec8 2854e2c 79b5c9c 2854e2c 56325dc 19ea0c5 2854e2c 56325dc 79b5c9c c2f9ec8 79b5c9c 2854e2c 56325dc 102e49d 8f8f414 edfcf73 8f8f414 2854e2c 0c91845 8f8f414 2854e2c 8f8f414 102e49d 79b5c9c 102e49d 79b5c9c 102e49d 8f8f414 2854e2c 8f8f414 2854e2c b809407 8f8f414 2854e2c 8f8f414 949011b 2854e2c 8f8f414 2854e2c 8f8f414 56325dc 79b5c9c 2854e2c 56325dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# TalentLens
import os
import time # Add time module import
from io import BytesIO
import streamlit as st
import fitz # PyMuPDF
import requests
from dotenv import load_dotenv
from config import supabase, HF_API_TOKEN, HF_HEADERS, HF_ENDPOINTS
from utils.parser import parse_resume, extract_email, summarize_resume
from utils.hybrid_extractor import extract_resume_sections
from utils.builder import build_resume_from_data
from utils.screening import evaluate_resumes
from utils.reporting import generate_pdf_report, generate_interview_questions_from_summaries
def toggle_endpoint(endpoint_name, action):
"""Start or stop an endpoint"""
try:
from config import HF_HEADERS, HF_ENDPOINTS
# Use the health endpoint
endpoint_info = HF_ENDPOINTS[endpoint_name]
url = f"{endpoint_info['url']}/health"
# Use HEAD request to start the endpoint
response = requests.head(url, headers=HF_HEADERS)
if response.status_code == 503:
st.info("π Starting endpoint... This may take 5-6 minutes. Click on 'Start' again to refresh status.")
time.sleep(2) # Wait briefly before refreshing status
from config import check_endpoint_status
new_status = check_endpoint_status(endpoint_name)
st.session_state['endpoint_status'] = {endpoint_name: new_status}
elif response.status_code == 200:
st.success("β
Endpoint is running")
time.sleep(2) # Wait briefly before refreshing status
from config import check_endpoint_status
new_status = check_endpoint_status(endpoint_name)
st.session_state['endpoint_status'] = {endpoint_name: new_status}
else:
st.error(f"β Failed to {action} endpoint: {response.text}")
except Exception as e:
st.error(f"β Failed to {action} endpoint: {str(e)}")
# ------------------------- Main App Function -------------------------
def main():
st.set_page_config(
page_title="TalentLens.AI",
layout="centered",
initial_sidebar_state="collapsed"
)
# Hide sidebar completely with CSS
st.markdown("""
<style>
.css-1d391kg {display: none}
.css-1rs6os {display: none}
.css-17ziqus {display: none}
[data-testid="stSidebar"] {display: none}
[data-testid="collapsedControl"] {display: none}
.css-1lcbmhc {display: none}
.css-1outpf7 {display: none}
.sidebar .sidebar-content {display: none}
</style>
""", unsafe_allow_html=True)
st.markdown("<h1 style='text-align: center;'>TalentLens.AI</h1>", unsafe_allow_html=True)
st.divider()
st.markdown("<h3 style='text-align: center;'>AI-Powered Intelligent Resume Screening</h3>", unsafe_allow_html=True)
# Upload resumes (limit: 10 files)
uploaded_files = st.file_uploader(
"Upload Resumes (PDF Only, Max: 10)",
accept_multiple_files=True,
type=["pdf"]
)
if uploaded_files and len(uploaded_files) > 10:
st.error("β οΈ You can upload a maximum of 10 resumes at a time.")
return
# Input job description
job_description = st.text_area("Enter Job Description")
# Main action buttons
col1, col2 = st.columns(2)
with col1:
# Evaluation trigger
evaluate_clicked = st.button("\U0001F4CA Evaluate Resumes", type="primary", use_container_width=True)
with col2:
# Format Resume redirect button
format_clicked = st.button("\U0001F4C4 Format Resume", use_container_width=True)
# Handle Format Resume redirect
if format_clicked:
st.switch_page("pages/Template.py")
# Handle Evaluate Resumes
if evaluate_clicked:
if not job_description:
st.error("β οΈ Please enter a job description.")
return
if not uploaded_files:
st.error("β οΈ Please upload at least one resume.")
return
st.write("### Evaluating Resumes...")
# Resume Evaluation
shortlisted, removed_candidates = evaluate_resumes(uploaded_files, job_description)
if not shortlisted:
st.warning("β οΈ No resumes matched the required keywords.")
else:
st.subheader("β
Shortlisted Candidates:")
for candidate in shortlisted:
st.write(f"**{candidate['name']}**")
# Generate Interview Questions
questions = generate_interview_questions_from_summaries(shortlisted)
st.subheader("π§ Suggested Interview Questions:")
for idx, q in enumerate(questions, 1):
st.markdown(f"{q}")
# Downloadable PDF Report
pdf_report = generate_pdf_report(shortlisted, questions)
st.download_button("Download Shortlist Report", pdf_report, "shortlist.pdf")
# Removed Candidates Info
if removed_candidates:
st.subheader("β Resumes Removed:")
for removed in removed_candidates:
st.write(f"**{removed['name']}** - {removed['reason']}")
# Get current status using DNS resolution
from config import check_endpoint_status
endpoint_name = "vzwjawyxvu030jsw" # Updated to match endpoint ID
current_status = check_endpoint_status(endpoint_name)
state = current_status.get('status', 'unknown')
# Update session state with current status
st.session_state['endpoint_status'] = {endpoint_name: current_status}
# Show Start button and status
start_button = st.empty() # Placeholder for Start button
if state in ['stopped', 'error']:
if start_button.button("βΆοΈ Start", key=f"start_{endpoint_name}", use_container_width=True):
toggle_endpoint(endpoint_name, "start")
# Refresh status after starting
new_status = check_endpoint_status(endpoint_name)
st.session_state['endpoint_status'] = {endpoint_name: new_status}
if new_status.get('status') == 'running':
st.success("β
Endpoint is running")
elif new_status.get('status') == 'starting':
st.info("π Starting endpoint... This may take 5-6 minutes. Click on 'Start' again to refresh status.")
elif new_status.get('status') == 'error':
st.error(f"β Error: {new_status.get('error', 'Unknown error')}")
# ------------------------- Run the App -------------------------
if __name__ == "__main__":
main() |