Spaces:
Sleeping
Sleeping
import gradio as gr | |
import boto3 | |
import json | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import re | |
import logging | |
import os | |
import pickle | |
import csv | |
from PIL import Image | |
import io | |
from datetime import datetime | |
import uuid | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Try to import ReportLab (needed for PDF generation) | |
try: | |
from reportlab.lib.pagesizes import letter | |
from reportlab.lib import colors | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
REPORTLAB_AVAILABLE = True | |
except ImportError: | |
logger.warning("ReportLab library not available - PDF export will be disabled") | |
REPORTLAB_AVAILABLE = False | |
# Try to import PyPDF2 (needed for PDF reading) | |
try: | |
import PyPDF2 | |
PYPDF2_AVAILABLE = True | |
except ImportError: | |
logger.warning("PyPDF2 library not available - PDF reading will be disabled") | |
PYPDF2_AVAILABLE = False | |
# AWS credentials for Bedrock API | |
# For HuggingFace Spaces, set these as secrets in the Space settings | |
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", "") | |
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", "") | |
AWS_REGION = os.getenv("AWS_REGION", "us-east-1") | |
# Initialize AWS clients if credentials are available | |
bedrock_client = None | |
transcribe_client = None | |
s3_client = None | |
if AWS_ACCESS_KEY and AWS_SECRET_KEY: | |
try: | |
# Initialize Bedrock client for AI analysis | |
bedrock_client = boto3.client( | |
'bedrock-runtime', | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION | |
) | |
logger.info("Bedrock client initialized successfully") | |
# Initialize Transcribe client for speech-to-text | |
transcribe_client = boto3.client( | |
'transcribe', | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION | |
) | |
logger.info("Transcribe client initialized successfully") | |
# Initialize S3 client for storing audio files | |
s3_client = boto3.client( | |
's3', | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION | |
) | |
logger.info("S3 client initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize AWS clients: {str(e)}") | |
# S3 bucket for storing audio files | |
S3_BUCKET = os.environ.get("S3_BUCKET", "casl-audio-files") | |
S3_PREFIX = "transcribe-audio/" | |
# Sample transcript for the demo | |
SAMPLE_TRANSCRIPT = """*PAR: today I would &-um like to talk about &-um a fun trip I took last &-um summer with my family. | |
*PAR: we went to the &-um &-um beach [//] no to the mountains [//] I mean the beach actually. | |
*PAR: there was lots of &-um &-um swimming and &-um sun. | |
*PAR: we [/] we stayed for &-um three no [//] four days in a &-um hotel near the water [: ocean] [*]. | |
*PAR: my favorite part was &-um building &-um castles with sand. | |
*PAR: sometimes I forget [//] forgetted [: forgot] [*] what they call those things we built. | |
*PAR: my brother he [//] he helped me dig a big hole. | |
*PAR: we saw [/] saw fishies [: fish] [*] swimming in the water. | |
*PAR: sometimes I wonder [/] wonder where fishies [: fish] [*] go when it's cold. | |
*PAR: maybe they have [/] have houses under the water. | |
*PAR: after swimming we [//] I eat [: ate] [*] &-um ice cream with &-um chocolate things on top. | |
*PAR: what do you call those &-um &-um sprinkles! that's the word. | |
*PAR: my mom said to &-um that I could have &-um two scoops next time. | |
*PAR: I want to go back to the beach [/] beach next year.""" | |
# =============================== | |
# Database and Storage Functions | |
# =============================== | |
# Create data directories if they don't exist | |
DATA_DIR = os.environ.get("DATA_DIR", "patient_data") | |
RECORDS_FILE = os.path.join(DATA_DIR, "patient_records.csv") | |
ANALYSES_DIR = os.path.join(DATA_DIR, "analyses") | |
DOWNLOADS_DIR = os.path.join(DATA_DIR, "downloads") | |
AUDIO_DIR = os.path.join(DATA_DIR, "audio") | |
def ensure_data_dirs(): | |
"""Ensure data directories exist""" | |
global DOWNLOADS_DIR, AUDIO_DIR | |
try: | |
os.makedirs(DATA_DIR, exist_ok=True) | |
os.makedirs(ANALYSES_DIR, exist_ok=True) | |
os.makedirs(DOWNLOADS_DIR, exist_ok=True) | |
os.makedirs(AUDIO_DIR, exist_ok=True) | |
logger.info(f"Data directories created: {DATA_DIR}, {ANALYSES_DIR}, {DOWNLOADS_DIR}, {AUDIO_DIR}") | |
# Create records file if it doesn't exist | |
if not os.path.exists(RECORDS_FILE): | |
with open(RECORDS_FILE, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow([ | |
"ID", "Name", "Record ID", "Age", "Gender", | |
"Assessment Date", "Clinician", "Analysis Date", "File Path" | |
]) | |
except Exception as e: | |
logger.warning(f"Could not create data directories: {str(e)}") | |
# Fallback to tmp directory on HF Spaces | |
DOWNLOADS_DIR = os.path.join(os.path.expanduser("~"), "casl_downloads") | |
AUDIO_DIR = os.path.join(os.path.expanduser("~"), "casl_audio") | |
os.makedirs(DOWNLOADS_DIR, exist_ok=True) | |
os.makedirs(AUDIO_DIR, exist_ok=True) | |
logger.info(f"Using fallback directories: {DOWNLOADS_DIR}, {AUDIO_DIR}") | |
# Initialize data directories | |
ensure_data_dirs() | |
def save_patient_record(patient_info, analysis_results, transcript): | |
"""Save patient record to storage""" | |
try: | |
# Generate unique ID for the record | |
record_id = str(uuid.uuid4()) | |
# Extract patient information | |
name = patient_info.get("name", "") | |
patient_id = patient_info.get("record_id", "") | |
age = patient_info.get("age", "") | |
gender = patient_info.get("gender", "") | |
assessment_date = patient_info.get("assessment_date", "") | |
clinician = patient_info.get("clinician", "") | |
# Create filename for the analysis data | |
filename = f"analysis_{record_id}.pkl" | |
filepath = os.path.join(ANALYSES_DIR, filename) | |
# Save analysis data | |
with open(filepath, 'wb') as f: | |
pickle.dump({ | |
"patient_info": patient_info, | |
"analysis_results": analysis_results, | |
"transcript": transcript, | |
"timestamp": datetime.now().isoformat(), | |
}, f) | |
# Add record to CSV file | |
with open(RECORDS_FILE, 'a', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow([ | |
record_id, name, patient_id, age, gender, | |
assessment_date, clinician, datetime.now().strftime('%Y-%m-%d'), | |
filepath | |
]) | |
return record_id | |
except Exception as e: | |
logger.error(f"Error saving patient record: {str(e)}") | |
return None | |
def load_patient_record(record_id): | |
"""Load patient record from storage""" | |
try: | |
# Find the record in the CSV file | |
if not os.path.exists(RECORDS_FILE): | |
logger.error(f"Records file does not exist: {RECORDS_FILE}") | |
return None | |
with open(RECORDS_FILE, 'r', newline='') as f: | |
reader = csv.reader(f) | |
next(reader) # Skip header | |
for row in reader: | |
if len(row) < 9: # Ensure row has enough elements | |
logger.warning(f"Skipping malformed record row: {row}") | |
continue | |
if row[0] == record_id: | |
file_path = row[8] | |
# Check if the file exists | |
if not os.path.exists(file_path): | |
logger.error(f"Analysis file not found: {file_path}") | |
return None | |
# Load and return the data | |
try: | |
with open(file_path, 'rb') as f: | |
return pickle.load(f) | |
except (pickle.PickleError, EOFError) as pickle_err: | |
logger.error(f"Error unpickling file {file_path}: {str(pickle_err)}") | |
return None | |
logger.warning(f"Record ID not found: {record_id}") | |
return None | |
except Exception as e: | |
logger.error(f"Error loading patient record: {str(e)}") | |
return None | |
def get_all_patient_records(): | |
"""Return a list of all patient records""" | |
try: | |
records = [] | |
# Ensure data directories exist | |
ensure_data_dirs() | |
if not os.path.exists(RECORDS_FILE): | |
logger.warning(f"Records file does not exist, creating it: {RECORDS_FILE}") | |
with open(RECORDS_FILE, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow([ | |
"ID", "Name", "Record ID", "Age", "Gender", | |
"Assessment Date", "Clinician", "Analysis Date", "File Path" | |
]) | |
return records | |
# Read existing records | |
valid_records = [] | |
with open(RECORDS_FILE, 'r', newline='') as f: | |
reader = csv.reader(f) | |
next(reader) # Skip header | |
for row in reader: | |
if len(row) < 9: # Check for malformed rows | |
continue | |
# Check if the analysis file exists | |
file_path = row[8] | |
file_exists = os.path.exists(file_path) | |
record = { | |
"id": row[0], | |
"name": row[1], | |
"record_id": row[2], | |
"age": row[3], | |
"gender": row[4], | |
"assessment_date": row[5], | |
"clinician": row[6], | |
"analysis_date": row[7], | |
"file_path": file_path, | |
"status": "Valid" if file_exists else "Missing File" | |
} | |
records.append(record) | |
# Keep track of valid records for potential cleanup | |
if file_exists: | |
valid_records.append(row) | |
# If we found invalid records, consider rewriting the CSV with only valid entries | |
if len(valid_records) < len(records): | |
logger.warning(f"Found {len(records) - len(valid_records)} invalid records") | |
# Uncomment to enable automatic cleanup: | |
# with open(RECORDS_FILE, 'w', newline='') as f: | |
# writer = csv.writer(f) | |
# writer.writerow([ | |
# "ID", "Name", "Record ID", "Age", "Gender", | |
# "Assessment Date", "Clinician", "Analysis Date", "File Path" | |
# ]) | |
# for row in valid_records: | |
# writer.writerow(row) | |
return records | |
except Exception as e: | |
logger.error(f"Error getting patient records: {str(e)}") | |
return [] | |
def delete_patient_record(record_id): | |
"""Delete a patient record""" | |
try: | |
if not os.path.exists(RECORDS_FILE): | |
return False | |
# Find the record and its file | |
file_path = None | |
with open(RECORDS_FILE, 'r', newline='') as f: | |
reader = csv.reader(f) | |
rows = list(reader) | |
header = rows[0] | |
for i, row in enumerate(rows[1:], 1): | |
if len(row) < 9: | |
continue | |
if row[0] == record_id: | |
file_path = row[8] | |
break | |
if not file_path: | |
return False | |
# Delete the analysis file if it exists | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
# Remove the record from the CSV | |
rows_to_keep = [row for row in rows[1:] if len(row) >= 9 and row[0] != record_id] | |
with open(RECORDS_FILE, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow(header) | |
writer.writerows(rows_to_keep) | |
return True | |
except Exception as e: | |
logger.error(f"Error deleting patient record: {str(e)}") | |
return False | |
# =============================== | |
# Utility Functions | |
# =============================== | |
def read_pdf(file_path): | |
"""Read text from a PDF file""" | |
if not PYPDF2_AVAILABLE: | |
return "Error: PDF reading is not available - PyPDF2 library is not installed" | |
try: | |
with open(file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
except Exception as e: | |
logger.error(f"Error reading PDF: {str(e)}") | |
return "" | |
def read_cha_file(file_path): | |
"""Read and parse a .cha transcript file""" | |
try: | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
# Extract participant lines (starting with *PAR:) | |
par_lines = [] | |
for line in content.splitlines(): | |
if line.startswith('*PAR:'): | |
par_lines.append(line) | |
# If no PAR lines found, just return the whole content | |
if not par_lines: | |
return content | |
return '\n'.join(par_lines) | |
except Exception as e: | |
logger.error(f"Error reading CHA file: {str(e)}") | |
return "" | |
def process_upload(file): | |
"""Process an uploaded file (PDF, text, or CHA)""" | |
if file is None: | |
return "" | |
file_path = file.name | |
if file_path.endswith('.pdf'): | |
if PYPDF2_AVAILABLE: | |
return read_pdf(file_path) | |
else: | |
return "Error: PDF reading is disabled - PyPDF2 library is not installed" | |
elif file_path.endswith('.cha'): | |
return read_cha_file(file_path) | |
else: | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
return f.read() | |
# =============================== | |
# AI Model Interface Functions | |
# =============================== | |
def call_bedrock(prompt, max_tokens=4096): | |
"""Call the AWS Bedrock API to analyze text using Claude""" | |
if not bedrock_client: | |
return "AWS credentials not configured. Using demo response instead." | |
try: | |
body = json.dumps({ | |
"anthropic_version": "bedrock-2023-05-31", | |
"max_tokens": max_tokens, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": prompt | |
} | |
], | |
"temperature": 0.3, | |
"top_p": 0.9 | |
}) | |
modelId = 'anthropic.claude-3-sonnet-20240229-v1:0' | |
response = bedrock_client.invoke_model( | |
body=body, | |
modelId=modelId, | |
accept='application/json', | |
contentType='application/json' | |
) | |
response_body = json.loads(response.get('body').read()) | |
return response_body['content'][0]['text'] | |
except Exception as e: | |
logger.error(f"Error in call_bedrock: {str(e)}") | |
return f"Error: {str(e)}" | |
def transcribe_audio(audio_path, patient_age=8): | |
"""Transcribe an audio recording using Amazon Transcribe and format in CHAT format""" | |
if not os.path.exists(audio_path): | |
logger.error(f"Audio file not found: {audio_path}") | |
return "Error: Audio file not found." | |
if not transcribe_client or not s3_client: | |
logger.warning("AWS clients not initialized, using demo transcription") | |
return generate_demo_transcription() | |
try: | |
# Get file info | |
file_name = os.path.basename(audio_path) | |
file_size = os.path.getsize(audio_path) | |
_, file_extension = os.path.splitext(file_name) | |
# Check file format | |
supported_formats = ['.mp3', '.mp4', '.wav', '.flac', '.ogg', '.amr', '.webm'] | |
if file_extension.lower() not in supported_formats: | |
logger.error(f"Unsupported audio format: {file_extension}") | |
return f"Error: Unsupported audio format. Please use one of: {', '.join(supported_formats)}" | |
# Generate a unique job name | |
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') | |
job_name = f"casl-transcription-{timestamp}" | |
s3_key = f"{S3_PREFIX}{job_name}{file_extension}" | |
# Upload to S3 | |
logger.info(f"Uploading {file_name} to S3 bucket {S3_BUCKET}") | |
try: | |
with open(audio_path, 'rb') as audio_file: | |
s3_client.upload_fileobj(audio_file, S3_BUCKET, s3_key) | |
except Exception as e: | |
logger.error(f"Failed to upload to S3: {str(e)}") | |
# If upload fails, try to create the bucket | |
try: | |
s3_client.create_bucket(Bucket=S3_BUCKET) | |
logger.info(f"Created S3 bucket: {S3_BUCKET}") | |
# Try upload again | |
with open(audio_path, 'rb') as audio_file: | |
s3_client.upload_fileobj(audio_file, S3_BUCKET, s3_key) | |
except Exception as bucket_error: | |
logger.error(f"Failed to create bucket and upload: {str(bucket_error)}") | |
return "Error: Failed to upload audio file. Please check your AWS permissions." | |
# Start transcription job | |
logger.info(f"Starting transcription job: {job_name}") | |
media_format = file_extension.lower()[1:] # Remove the dot | |
if media_format == 'webm': | |
media_format = 'webm' # Amazon Transcribe expects this | |
# Determine language settings based on patient age | |
if patient_age < 10: | |
# For younger children, enabling child language model is helpful | |
language_options = { | |
'LanguageCode': 'en-US', | |
'Settings': { | |
'ShowSpeakerLabels': True, | |
'MaxSpeakerLabels': 2 # Typically patient + clinician | |
} | |
} | |
else: | |
language_options = { | |
'LanguageCode': 'en-US', | |
'Settings': { | |
'ShowSpeakerLabels': True, | |
'MaxSpeakerLabels': 2 # Typically patient + clinician | |
} | |
} | |
transcribe_client.start_transcription_job( | |
TranscriptionJobName=job_name, | |
Media={ | |
'MediaFileUri': f"s3://{S3_BUCKET}/{s3_key}" | |
}, | |
MediaFormat=media_format, | |
**language_options | |
) | |
# Wait for the job to complete (with timeout) | |
logger.info("Waiting for transcription to complete...") | |
max_tries = 30 # 5 minutes max wait | |
tries = 0 | |
while tries < max_tries: | |
try: | |
job = transcribe_client.get_transcription_job(TranscriptionJobName=job_name) | |
status = job['TranscriptionJob']['TranscriptionJobStatus'] | |
if status == 'COMPLETED': | |
# Get the transcript | |
transcript_uri = job['TranscriptionJob']['Transcript']['TranscriptFileUri'] | |
# Download the transcript | |
import urllib.request | |
import json | |
with urllib.request.urlopen(transcript_uri) as response: | |
transcript_json = json.loads(response.read().decode('utf-8')) | |
# Convert to CHAT format | |
chat_transcript = format_as_chat(transcript_json) | |
return chat_transcript | |
elif status == 'FAILED': | |
reason = job['TranscriptionJob'].get('FailureReason', 'Unknown failure') | |
logger.error(f"Transcription job failed: {reason}") | |
return f"Error: Transcription failed - {reason}" | |
# Still in progress, wait and try again | |
tries += 1 | |
time.sleep(10) # Check every 10 seconds | |
except Exception as e: | |
logger.error(f"Error checking transcription job: {str(e)}") | |
return f"Error getting transcription: {str(e)}" | |
# If we got here, we timed out | |
return "Error: Transcription timed out. The process is taking longer than expected." | |
except Exception as e: | |
logger.exception("Error in audio transcription") | |
return f"Error transcribing audio: {str(e)}" | |
def format_as_chat(transcript_json): | |
"""Format the Amazon Transcribe JSON result as CHAT format""" | |
try: | |
# Get transcript items | |
items = transcript_json['results']['items'] | |
# Get speaker labels if available | |
speakers = {} | |
if 'speaker_labels' in transcript_json['results']: | |
speaker_segments = transcript_json['results']['speaker_labels']['segments'] | |
# Map each item to its speaker | |
for segment in speaker_segments: | |
for item in segment['items']: | |
start_time = item['start_time'] | |
speakers[start_time] = segment['speaker_label'] | |
# Build transcript by combining words into utterances by speaker | |
current_speaker = None | |
current_utterance = [] | |
utterances = [] | |
for item in items: | |
# Skip non-pronunciation items (like punctuation) | |
if item['type'] != 'pronunciation': | |
continue | |
word = item['alternatives'][0]['content'] | |
start_time = item.get('start_time') | |
# Determine speaker if available | |
speaker = speakers.get(start_time, 'spk_0') | |
# If speaker changed, start a new utterance | |
if speaker != current_speaker and current_utterance: | |
utterances.append((current_speaker, ' '.join(current_utterance))) | |
current_utterance = [] | |
current_speaker = speaker | |
current_utterance.append(word) | |
# Add the last utterance | |
if current_utterance: | |
utterances.append((current_speaker, ' '.join(current_utterance))) | |
# Format as CHAT | |
chat_lines = [] | |
for speaker, text in utterances: | |
# Map speakers to CHAT format | |
# Assuming spk_0 is the patient (PAR) and spk_1 is the clinician (INV) | |
chat_speaker = "*PAR:" if speaker == "spk_0" else "*INV:" | |
chat_lines.append(f"{chat_speaker} {text}.") | |
return '\n'.join(chat_lines) | |
except Exception as e: | |
logger.exception("Error formatting transcript") | |
return "*PAR: (Error formatting transcript)" | |
def generate_demo_transcription(): | |
"""Generate a simulated transcription response""" | |
return """*PAR: today I want to tell you about my favorite toy. | |
*PAR: it's a &-um teddy bear that I got for my birthday. | |
*PAR: he has &-um brown fur and a red bow. | |
*PAR: I like to sleep with him every night. | |
*PAR: sometimes I take him to school in my backpack. | |
*INV: what's your teddy bear's name? | |
*PAR: his name is &-um Brownie because he's brown.""" | |
def generate_demo_response(prompt): | |
"""Generate a simulated response for demo purposes""" | |
# This function generates a realistic but fake response for demo purposes | |
# In a real deployment, you would call an actual LLM API | |
return """<SPEECH_FACTORS_START> | |
Difficulty producing fluent speech: 8, 65 | |
Examples: | |
- "today I would &-um like to talk about &-um a fun trip I took last &-um summer with my family" | |
- "we went to the &-um &-um beach [//] no to the mountains [//] I mean the beach actually" | |
Word retrieval issues: 6, 72 | |
Examples: | |
- "what do you call those &-um &-um sprinkles! that's the word" | |
- "sometimes I forget [//] forgetted [: forgot] [*] what they call those things we built" | |
Grammatical errors: 4, 58 | |
Examples: | |
- "after swimming we [//] I eat [: ate] [*] &-um ice cream" | |
- "sometimes I forget [//] forgetted [: forgot] [*] what they call those things we built" | |
Repetitions and revisions: 5, 62 | |
Examples: | |
- "we [/] we stayed for &-um three no [//] four days" | |
- "we went to the &-um &-um beach [//] no to the mountains [//] I mean the beach actually" | |
<SPEECH_FACTORS_END> | |
<CASL_SKILLS_START> | |
Lexical/Semantic Skills: Standard Score (92), Percentile Rank (30%), Average Performance | |
Examples: | |
- "what do you call those &-um &-um sprinkles! that's the word" | |
- "we went to the &-um &-um beach [//] no to the mountains [//] I mean the beach actually" | |
Syntactic Skills: Standard Score (87), Percentile Rank (19%), Low Average Performance | |
Examples: | |
- "my brother he [//] he helped me dig a big hole" | |
- "after swimming we [//] I eat [: ate] [*] &-um ice cream with &-um chocolate things on top" | |
Supralinguistic Skills: Standard Score (90), Percentile Rank (25%), Average Performance | |
Examples: | |
- "sometimes I wonder [/] wonder where fishies [: fish] [*] go when it's cold" | |
- "maybe they have [/] have houses under the water" | |
<CASL_SKILLS_END> | |
<TREATMENT_RECOMMENDATIONS_START> | |
- Implement word-finding strategies with semantic cuing focused on everyday objects and activities, using the patient's beach experience as a context (e.g., "sprinkles," "castles") | |
- Practice structured narrative tasks with visual supports to reduce revisions and improve sequencing | |
- Use sentence formulation exercises focusing on verb tense consistency (addressing errors like "forgetted" and "eat" for "ate") | |
- Incorporate self-monitoring techniques to help identify and correct grammatical errors | |
- Work on increasing vocabulary specificity (e.g., "things on top" to "sprinkles") | |
<TREATMENT_RECOMMENDATIONS_END> | |
<EXPLANATION_START> | |
This child demonstrates moderate word-finding difficulties with compensatory strategies including fillers ("&-um") and repetitions. The frequent use of self-corrections shows good metalinguistic awareness, but the pauses and repairs impact conversational fluency. Syntactic errors primarily involve verb tense inconsistency. Overall, the pattern suggests a mild-to-moderate language disorder with stronger receptive than expressive skills. | |
<EXPLANATION_END> | |
<ADDITIONAL_ANALYSIS_START> | |
The child shows relative strengths in maintaining topic coherence and conveying a complete narrative structure despite the language challenges. The pattern of errors suggests that word-finding difficulties and processing speed are primary concerns rather than conceptual or cognitive issues. Semantic network activities that strengthen word associations would likely be beneficial, particularly when paired with visual supports. | |
<ADDITIONAL_ANALYSIS_END> | |
<DIAGNOSTIC_IMPRESSIONS_START> | |
Based on the language sample, this child presents with a profile consistent with a mild-to-moderate expressive language disorder. The most prominent features include: | |
1. Word-finding difficulties characterized by fillers, pauses, and self-corrections when attempting to retrieve specific vocabulary | |
2. Grammatical challenges primarily affecting verb tense consistency and morphological markers | |
3. Relatively intact narrative structure and topic maintenance | |
These findings suggest intervention should focus on word retrieval strategies, grammatical form practice, and continued support for narrative development, with an emphasis on fluency and self-monitoring. | |
<DIAGNOSTIC_IMPRESSIONS_END> | |
<ERROR_EXAMPLES_START> | |
Word-finding difficulties: | |
- "what do you call those &-um &-um sprinkles! that's the word" | |
- "we went to the &-um &-um beach [//] no to the mountains [//] I mean the beach actually" | |
- "there was lots of &-um &-um swimming and &-um sun" | |
Grammatical errors: | |
- "after swimming we [//] I eat [: ate] [*] &-um ice cream" | |
- "sometimes I forget [//] forgetted [: forgot] [*] what they call those things we built" | |
- "we saw [/] saw fishies [: fish] [*] swimming in the water" | |
Repetitions and revisions: | |
- "we [/] we stayed for &-um three no [//] four days" | |
- "I want to go back to the beach [/] beach next year" | |
- "sometimes I wonder [/] wonder where fishies [: fish] [*] go when it's cold" | |
<ERROR_EXAMPLES_END>""" | |
def parse_casl_response(response): | |
"""Parse the LLM response for CASL analysis into structured data""" | |
# Extract speech factors section using section markers | |
speech_factors_section = "" | |
factors_pattern = re.compile(r"<SPEECH_FACTORS_START>(.*?)<SPEECH_FACTORS_END>", re.DOTALL) | |
factors_match = factors_pattern.search(response) | |
if factors_match: | |
speech_factors_section = factors_match.group(1).strip() | |
else: | |
speech_factors_section = "Error extracting speech factors from analysis." | |
# Extract CASL skills section | |
casl_section = "" | |
casl_pattern = re.compile(r"<CASL_SKILLS_START>(.*?)<CASL_SKILLS_END>", re.DOTALL) | |
casl_match = casl_pattern.search(response) | |
if casl_match: | |
casl_section = casl_match.group(1).strip() | |
else: | |
casl_section = "Error extracting CASL skills from analysis." | |
# Extract treatment recommendations | |
treatment_text = "" | |
treatment_pattern = re.compile(r"<TREATMENT_RECOMMENDATIONS_START>(.*?)<TREATMENT_RECOMMENDATIONS_END>", re.DOTALL) | |
treatment_match = treatment_pattern.search(response) | |
if treatment_match: | |
treatment_text = treatment_match.group(1).strip() | |
else: | |
treatment_text = "Error extracting treatment recommendations from analysis." | |
# Extract explanation section | |
explanation_text = "" | |
explanation_pattern = re.compile(r"<EXPLANATION_START>(.*?)<EXPLANATION_END>", re.DOTALL) | |
explanation_match = explanation_pattern.search(response) | |
if explanation_match: | |
explanation_text = explanation_match.group(1).strip() | |
else: | |
explanation_text = "Error extracting clinical explanation from analysis." | |
# Extract additional analysis | |
additional_analysis = "" | |
additional_pattern = re.compile(r"<ADDITIONAL_ANALYSIS_START>(.*?)<ADDITIONAL_ANALYSIS_END>", re.DOTALL) | |
additional_match = additional_pattern.search(response) | |
if additional_match: | |
additional_analysis = additional_match.group(1).strip() | |
# Extract diagnostic impressions | |
diagnostic_impressions = "" | |
diagnostic_pattern = re.compile(r"<DIAGNOSTIC_IMPRESSIONS_START>(.*?)<DIAGNOSTIC_IMPRESSIONS_END>", re.DOTALL) | |
diagnostic_match = diagnostic_pattern.search(response) | |
if diagnostic_match: | |
diagnostic_impressions = diagnostic_match.group(1).strip() | |
# Extract specific error examples | |
specific_errors_text = "" | |
errors_pattern = re.compile(r"<ERROR_EXAMPLES_START>(.*?)<ERROR_EXAMPLES_END>", re.DOTALL) | |
errors_match = errors_pattern.search(response) | |
if errors_match: | |
specific_errors_text = errors_match.group(1).strip() | |
# Create full report text | |
full_report = f""" | |
## Speech Factors Analysis | |
{speech_factors_section} | |
## CASL Skills Assessment | |
{casl_section} | |
## Treatment Recommendations | |
{treatment_text} | |
## Clinical Explanation | |
{explanation_text} | |
""" | |
if additional_analysis: | |
full_report += f"\n## Additional Analysis\n\n{additional_analysis}" | |
if diagnostic_impressions: | |
full_report += f"\n## Diagnostic Impressions\n\n{diagnostic_impressions}" | |
if specific_errors_text: | |
full_report += f"\n## Detailed Error Examples\n\n{specific_errors_text}" | |
return { | |
'speech_factors': speech_factors_section, | |
'casl_data': casl_section, | |
'treatment_suggestions': treatment_text, | |
'explanation': explanation_text, | |
'additional_analysis': additional_analysis, | |
'diagnostic_impressions': diagnostic_impressions, | |
'specific_errors': specific_errors_text, | |
'full_report': full_report, | |
'raw_response': response | |
} | |
def analyze_transcript(transcript, age, gender): | |
"""Analyze a speech transcript using Claude""" | |
# CASL-2 assessment cheat sheet | |
cheat_sheet = """ | |
# Speech-Language Pathologist Analysis Cheat Sheet | |
## Types of Speech Patterns to Identify: | |
1. Difficulty producing fluent, grammatical speech | |
- Fillers (um, uh) and pauses | |
- False starts and revisions | |
- Incomplete sentences | |
2. Word retrieval issues | |
- Pauses before content words | |
- Circumlocutions (talking around a word) | |
- Word substitutions | |
3. Grammatical errors | |
- Verb tense inconsistencies | |
- Subject-verb agreement errors | |
- Morphological errors (plurals, possessives) | |
4. Repetitions and revisions | |
- Word or phrase repetitions [/] | |
- Self-corrections [//] | |
- Retracing | |
5. Neologisms | |
- Made-up words | |
- Word blends | |
6. Perseveration | |
- Inappropriate repetition of ideas | |
- Recurring themes | |
7. Comprehension issues | |
- Topic maintenance difficulties | |
- Non-sequiturs | |
- Inappropriate responses | |
""" | |
# Instructions for the analysis | |
instructions = """ | |
Analyze this speech transcript to identify specific patterns and provide a detailed CASL-2 (Comprehensive Assessment of Spoken Language) assessment. | |
For each speech pattern you identify: | |
1. Count the occurrences in the transcript | |
2. Estimate a percentile (how typical/atypical this is for the age) | |
3. Provide DIRECT QUOTES from the transcript as evidence | |
Then assess the following CASL-2 domains: | |
1. Lexical/Semantic Skills: | |
- Assess vocabulary diversity, word-finding abilities, semantic precision | |
- Provide Standard Score (mean=100, SD=15), percentile rank, and performance level | |
- Include SPECIFIC QUOTES as evidence | |
2. Syntactic Skills: | |
- Evaluate grammatical accuracy, sentence complexity, morphological skills | |
- Provide Standard Score, percentile rank, and performance level | |
- Include SPECIFIC QUOTES as evidence | |
3. Supralinguistic Skills: | |
- Assess figurative language use, inferencing, and abstract reasoning | |
- Provide Standard Score, percentile rank, and performance level | |
- Include SPECIFIC QUOTES as evidence | |
YOUR RESPONSE MUST USE THESE EXACT SECTION MARKERS FOR PARSING: | |
<SPEECH_FACTORS_START> | |
Difficulty producing fluent speech: (occurrences), (percentile) | |
Examples: | |
- "(direct quote from transcript)" | |
- "(direct quote from transcript)" | |
Word retrieval issues: (occurrences), (percentile) | |
Examples: | |
- "(direct quote from transcript)" | |
- "(direct quote from transcript)" | |
(And so on for each factor) | |
<SPEECH_FACTORS_END> | |
<CASL_SKILLS_START> | |
Lexical/Semantic Skills: Standard Score (X), Percentile Rank (X%), Performance Level | |
Examples: | |
- "(direct quote showing strength or weakness)" | |
- "(direct quote showing strength or weakness)" | |
Syntactic Skills: Standard Score (X), Percentile Rank (X%), Performance Level | |
Examples: | |
- "(direct quote showing strength or weakness)" | |
- "(direct quote showing strength or weakness)" | |
Supralinguistic Skills: Standard Score (X), Percentile Rank (X%), Performance Level | |
Examples: | |
- "(direct quote showing strength or weakness)" | |
- "(direct quote showing strength or weakness)" | |
<CASL_SKILLS_END> | |
<TREATMENT_RECOMMENDATIONS_START> | |
- (treatment recommendation) | |
- (treatment recommendation) | |
- (treatment recommendation) | |
<TREATMENT_RECOMMENDATIONS_END> | |
<EXPLANATION_START> | |
(brief diagnostic rationale based on findings) | |
<EXPLANATION_END> | |
<ADDITIONAL_ANALYSIS_START> | |
(specific insights that would be helpful for treatment planning) | |
<ADDITIONAL_ANALYSIS_END> | |
<DIAGNOSTIC_IMPRESSIONS_START> | |
(summarize findings across domains using specific examples and clear explanations) | |
<DIAGNOSTIC_IMPRESSIONS_END> | |
<ERROR_EXAMPLES_START> | |
(Copy all the specific quote examples here again, organized by error type or skill domain) | |
<ERROR_EXAMPLES_END> | |
MOST IMPORTANT: | |
1. Use EXACTLY the section markers provided (like <SPEECH_FACTORS_START>) to make parsing reliable | |
2. For EVERY factor and domain you analyze, you MUST provide direct quotes from the transcript as evidence | |
3. Be very specific and cite the exact text | |
4. Do not omit any of the required sections | |
""" | |
# Prepare prompt for Claude with the user's role context | |
role_context = """ | |
You are a speech pathologist, a healthcare professional who specializes in evaluating, diagnosing, and treating communication disorders, including speech, language, cognitive-communication, voice, swallowing, and fluency disorders. Your role is to help patients improve their speech and communication skills through various therapeutic techniques and exercises. | |
You are working with a student with speech impediments. | |
The most important thing is that you stay kind to the child. Be constructive and helpful rather than critical. | |
""" | |
prompt = f""" | |
{role_context} | |
You are analyzing a transcript for a patient who is {age} years old and {gender}. | |
TRANSCRIPT: | |
{transcript} | |
{cheat_sheet} | |
{instructions} | |
Remember to be precise but compassionate in your analysis. Use direct quotes from the transcript for every factor and domain you analyze. | |
""" | |
# Call the appropriate API or fallback to demo mode | |
if bedrock_client: | |
response = call_bedrock(prompt) | |
else: | |
response = generate_demo_response(prompt) | |
# Parse the response | |
results = parse_casl_response(response) | |
return results | |
def export_pdf(results, patient_name="", record_id="", age="", gender="", assessment_date="", clinician=""): | |
"""Export analysis results to a PDF report""" | |
global DOWNLOADS_DIR | |
# Check if ReportLab is available | |
if not REPORTLAB_AVAILABLE: | |
return "ERROR: PDF export is not available - ReportLab library is not installed. Please run 'pip install reportlab'." | |
try: | |
# Generate a safe filename | |
if patient_name: | |
safe_name = f"{patient_name.replace(' ', '_')}" | |
else: | |
safe_name = f"speech_analysis_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
# Make sure the downloads directory exists | |
try: | |
os.makedirs(DOWNLOADS_DIR, exist_ok=True) | |
except Exception as e: | |
logger.warning(f"Could not access downloads directory: {str(e)}") | |
# Fallback to temp directory | |
DOWNLOADS_DIR = os.path.join(os.path.expanduser("~"), "casl_downloads") | |
os.makedirs(DOWNLOADS_DIR, exist_ok=True) | |
# Create the PDF path in our downloads directory | |
pdf_path = os.path.join(DOWNLOADS_DIR, f"{safe_name}.pdf") | |
# Create the PDF document | |
doc = SimpleDocTemplate(pdf_path, pagesize=letter) | |
styles = getSampleStyleSheet() | |
# Create enhanced custom styles | |
styles.add(ParagraphStyle( | |
name='Heading1', | |
parent=styles['Heading1'], | |
fontSize=16, | |
spaceAfter=12, | |
textColor=colors.navy | |
)) | |
styles.add(ParagraphStyle( | |
name='Heading2', | |
parent=styles['Heading2'], | |
fontSize=14, | |
spaceAfter=10, | |
spaceBefore=10, | |
textColor=colors.darkblue | |
)) | |
styles.add(ParagraphStyle( | |
name='Heading3', | |
parent=styles['Heading2'], | |
fontSize=12, | |
spaceAfter=8, | |
spaceBefore=8, | |
textColor=colors.darkblue | |
)) | |
styles.add(ParagraphStyle( | |
name='BodyText', | |
parent=styles['BodyText'], | |
fontSize=11, | |
spaceAfter=8, | |
leading=14 | |
)) | |
styles.add(ParagraphStyle( | |
name='BulletPoint', | |
parent=styles['BodyText'], | |
fontSize=11, | |
leftIndent=20, | |
firstLineIndent=-15, | |
spaceAfter=4, | |
leading=14 | |
)) | |
# Convert markdown to PDF elements | |
story = [] | |
# Add title and date | |
story.append(Paragraph("Speech Language Assessment Report", styles['Title'])) | |
story.append(Spacer(1, 12)) | |
# Add patient information table | |
if patient_name or record_id or age or gender: | |
# Prepare patient info data | |
data = [] | |
if patient_name: | |
data.append(["Patient Name:", patient_name]) | |
if record_id: | |
data.append(["Record ID:", record_id]) | |
if age: | |
data.append(["Age:", f"{age} years"]) | |
if gender: | |
data.append(["Gender:", gender]) | |
if assessment_date: | |
data.append(["Assessment Date:", assessment_date]) | |
if clinician: | |
data.append(["Clinician:", clinician]) | |
if data: | |
# Create a table with the data | |
patient_table = Table(data, colWidths=[120, 350]) | |
patient_table.setStyle(TableStyle([ | |
('BACKGROUND', (0, 0), (0, -1), colors.lightgrey), | |
('TEXTCOLOR', (0, 0), (0, -1), colors.darkblue), | |
('ALIGN', (0, 0), (0, -1), 'RIGHT'), | |
('ALIGN', (1, 0), (1, -1), 'LEFT'), | |
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'), | |
('BOTTOMPADDING', (0, 0), (-1, -1), 6), | |
('TOPPADDING', (0, 0), (-1, -1), 6), | |
('GRID', (0, 0), (-1, -1), 0.5, colors.lightgrey), | |
])) | |
story.append(patient_table) | |
story.append(Spacer(1, 12)) | |
# Add clinical analysis sections | |
story.append(Paragraph("Speech Factors Analysis", styles['Heading1'])) | |
speech_factors_paragraphs = [] | |
for line in results['speech_factors'].split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
if line.startswith('- '): | |
story.append(Paragraph(f"• {line[2:]}", styles['BulletPoint'])) | |
else: | |
story.append(Paragraph(line, styles['BodyText'])) | |
story.append(Spacer(1, 12)) | |
story.append(Paragraph("CASL Skills Assessment", styles['Heading1'])) | |
for line in results['casl_data'].split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
if line.startswith('- '): | |
story.append(Paragraph(f"• {line[2:]}", styles['BulletPoint'])) | |
else: | |
story.append(Paragraph(line, styles['BodyText'])) | |
story.append(Spacer(1, 12)) | |
story.append(Paragraph("Treatment Recommendations", styles['Heading1'])) | |
# Process treatment recommendations as bullet points | |
for line in results['treatment_suggestions'].split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
if line.startswith('- '): | |
story.append(Paragraph(f"• {line[2:]}", styles['BulletPoint'])) | |
else: | |
story.append(Paragraph(line, styles['BodyText'])) | |
story.append(Spacer(1, 12)) | |
story.append(Paragraph("Clinical Explanation", styles['Heading1'])) | |
story.append(Paragraph(results['explanation'], styles['BodyText'])) | |
story.append(Spacer(1, 12)) | |
if results['additional_analysis']: | |
story.append(Paragraph("Additional Analysis", styles['Heading1'])) | |
story.append(Paragraph(results['additional_analysis'], styles['BodyText'])) | |
story.append(Spacer(1, 12)) | |
if results['diagnostic_impressions']: | |
story.append(Paragraph("Diagnostic Impressions", styles['Heading1'])) | |
story.append(Paragraph(results['diagnostic_impressions'], styles['BodyText'])) | |
story.append(Spacer(1, 12)) | |
# Add footer with date | |
footer_text = f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
story.append(Spacer(1, 20)) | |
story.append(Paragraph(footer_text, ParagraphStyle( | |
name='Footer', | |
parent=styles['Normal'], | |
fontSize=8, | |
textColor=colors.grey | |
))) | |
# Build the PDF | |
doc.build(story) | |
logger.info(f"Report saved as PDF: {pdf_path}") | |
return pdf_path | |
except Exception as e: | |
logger.exception("Error creating PDF") | |
return f"Error creating PDF: {str(e)}" | |
def create_interface(): | |
"""Create the Gradio interface""" | |
# Set a theme compatible with Hugging Face Spaces | |
theme = gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="indigo", | |
) | |
with gr.Blocks(title="CASL Analysis Tool", theme=theme) as app: | |
gr.Markdown("# CASL Analysis Tool") | |
gr.Markdown("A tool for analyzing speech transcripts and audio using the CASL framework") | |
with gr.Tabs() as main_tabs: | |
# Analysis Tab | |
with gr.TabItem("Analysis", id=0): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# Patient info | |
gr.Markdown("### Patient Information") | |
patient_name = gr.Textbox(label="Patient Name", placeholder="Enter patient name") | |
record_id = gr.Textbox(label="Record ID", placeholder="Enter record ID") | |
with gr.Row(): | |
age = gr.Number(label="Age", value=8, minimum=1, maximum=120) | |
gender = gr.Radio(["male", "female", "other"], label="Gender", value="male") | |
assessment_date = gr.Textbox( | |
label="Assessment Date", | |
placeholder="MM/DD/YYYY", | |
value=datetime.now().strftime('%m/%d/%Y') | |
) | |
clinician_name = gr.Textbox(label="Clinician", placeholder="Enter clinician name") | |
# Transcript input | |
gr.Markdown("### Transcript") | |
sample_btn = gr.Button("Load Sample Transcript") | |
file_upload = gr.File(label="Upload transcript file (.txt or .cha)") | |
transcript = gr.Textbox( | |
label="Speech transcript (CHAT format preferred)", | |
placeholder="Enter transcript text or upload a file...", | |
lines=10 | |
) | |
# Analysis button | |
analyze_btn = gr.Button("Analyze Transcript", variant="primary") | |
with gr.Column(scale=1): | |
# Results display | |
with gr.Tabs() as results_tabs: | |
with gr.TabItem("Summary", id=0): | |
gr.Markdown("### Speech Factors Analysis") | |
speech_factors_md = gr.Markdown() | |
gr.Markdown("### CASL Skills Assessment") | |
casl_results_md = gr.Markdown() | |
with gr.TabItem("Treatment", id=1): | |
gr.Markdown("### Treatment Recommendations") | |
treatment_md = gr.Markdown() | |
gr.Markdown("### Clinical Explanation") | |
explanation_md = gr.Markdown() | |
with gr.TabItem("Error Examples", id=2): | |
specific_errors_md = gr.Markdown() | |
with gr.TabItem("Full Report", id=3): | |
full_analysis = gr.Markdown() | |
# PDF export (only shown if ReportLab is available) | |
export_status = gr.Markdown("") | |
if REPORTLAB_AVAILABLE: | |
export_btn = gr.Button("Export as PDF", variant="secondary") | |
else: | |
gr.Markdown("⚠️ PDF export is disabled - ReportLab library is not installed") | |
# Transcription Tab | |
with gr.TabItem("Transcription", id=1): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Audio Transcription") | |
gr.Markdown("Upload an audio recording to automatically transcribe it in CHAT format") | |
# Patient's age helps with transcription accuracy | |
transcription_age = gr.Number(label="Patient Age", value=8, minimum=1, maximum=120, | |
info="For children under 10, special language models may be used") | |
# Audio input | |
audio_input = gr.Audio(type="filepath", label="Upload Audio Recording", | |
format="mp3,wav,ogg,webm", | |
elem_id="audio-input") | |
# Transcribe button | |
transcribe_btn = gr.Button("Transcribe Audio", variant="primary") | |
with gr.Column(scale=1): | |
# Transcription output | |
transcription_output = gr.Textbox( | |
label="Transcription Result", | |
placeholder="Transcription will appear here...", | |
lines=12 | |
) | |
with gr.Row(): | |
# Button to use transcription in analysis | |
copy_to_analysis_btn = gr.Button("Use for Analysis", variant="secondary") | |
# Status/info message | |
transcription_status = gr.Markdown("") | |
# Load sample transcript button | |
def load_sample(): | |
return SAMPLE_TRANSCRIPT | |
sample_btn.click(load_sample, outputs=[transcript]) | |
# File upload handler | |
file_upload.upload(process_upload, file_upload, transcript) | |
# Analysis button handler | |
def on_analyze_click(transcript_text, age_val, gender_val, patient_name_val, record_id_val, clinician_val, assessment_date_val): | |
if not transcript_text or len(transcript_text.strip()) < 50: | |
return "Error: Please provide a longer transcript for analysis.", "Error: Insufficient data", "Error: Insufficient data", "Error: Please provide a transcript of at least 50 characters for meaningful analysis.", "Error: Not enough transcript data for analysis.", "Error: No detailed error examples available for an empty transcript." | |
try: | |
# Get the analysis results | |
results = analyze_transcript(transcript_text, age_val, gender_val) | |
# Save patient record | |
patient_info = { | |
"name": patient_name_val, | |
"record_id": record_id_val, | |
"age": age_val, | |
"gender": gender_val, | |
"assessment_date": assessment_date_val, | |
"clinician": clinician_val | |
} | |
saved_id = save_patient_record(patient_info, results, transcript_text) | |
if saved_id: | |
save_msg = f"✅ Patient record saved successfully. ID: {saved_id}" | |
else: | |
save_msg = "⚠️ Could not save patient record. Check directory permissions." | |
# Return the results | |
return results['speech_factors'], results['casl_data'], results['treatment_suggestions'], results['explanation'], results['full_report'], save_msg, results['specific_errors'] | |
except Exception as e: | |
logger.exception("Error during analysis") | |
return f"Error during analysis: {str(e)}", "Analysis failed", "Not available", f"Error: {str(e)}", f"Analysis error: {str(e)}", "", "" | |
analyze_btn.click( | |
on_analyze_click, | |
inputs=[ | |
transcript, age, gender, | |
patient_name, record_id, clinician_name, assessment_date | |
], | |
outputs=[ | |
speech_factors_md, | |
casl_results_md, | |
treatment_md, | |
explanation_md, | |
full_analysis, | |
export_status, | |
specific_errors_md | |
] | |
) | |
# PDF export function | |
def on_export_pdf(report_text, p_name, p_record_id, p_age, p_gender, p_date, p_clinician): | |
# Check if ReportLab is available | |
if not REPORTLAB_AVAILABLE: | |
return "ERROR: PDF export is not available because the ReportLab library is not installed. Please install it with 'pip install reportlab'." | |
if not report_text or len(report_text.strip()) < 50: | |
return "Error: Please run the analysis first before exporting to PDF." | |
try: | |
# Parse the report text back into sections | |
results = { | |
'speech_factors': '', | |
'casl_data': '', | |
'treatment_suggestions': '', | |
'explanation': '', | |
'additional_analysis': '', | |
'diagnostic_impressions': '', | |
'specific_errors': '', | |
} | |
sections = report_text.split('##') | |
for section in sections: | |
section = section.strip() | |
if not section: | |
continue | |
title_content = section.split('\n', 1) | |
if len(title_content) < 2: | |
continue | |
title = title_content[0].strip() | |
content = title_content[1].strip() | |
if "Speech Factors Analysis" in title: | |
results['speech_factors'] = content | |
elif "CASL Skills Assessment" in title: | |
results['casl_data'] = content | |
elif "Treatment Recommendations" in title: | |
results['treatment_suggestions'] = content | |
elif "Clinical Explanation" in title: | |
results['explanation'] = content | |
elif "Additional Analysis" in title: | |
results['additional_analysis'] = content | |
elif "Diagnostic Impressions" in title: | |
results['diagnostic_impressions'] = content | |
elif "Detailed Error Examples" in title: | |
results['specific_errors'] = content | |
pdf_path = export_pdf( | |
results, | |
patient_name=p_name, | |
record_id=p_record_id, | |
age=p_age, | |
gender=p_gender, | |
assessment_date=p_date, | |
clinician=p_clinician | |
) | |
# Check if the export was successful | |
if pdf_path.startswith("ERROR:"): | |
return pdf_path | |
# Make it downloadable in Hugging Face Spaces | |
download_link = f'<a href="file={pdf_path}" download="{os.path.basename(pdf_path)}">Download PDF Report</a>' | |
return f"Report saved as PDF: {pdf_path}<br>{download_link}" | |
except Exception as e: | |
logger.exception("Error exporting to PDF") | |
return f"Error creating PDF: {str(e)}" | |
# Only set up the PDF export button if ReportLab is available | |
if REPORTLAB_AVAILABLE: | |
export_btn.click( | |
on_export_pdf, | |
inputs=[ | |
full_analysis, | |
patient_name, | |
record_id, | |
age, | |
gender, | |
assessment_date, | |
clinician_name | |
], | |
outputs=[export_status] | |
) | |
# Transcription button handler | |
def on_transcribe_audio(audio_path, age_val): | |
try: | |
if not audio_path: | |
return "Please upload an audio file to transcribe.", "Error: No audio file provided." | |
# Process the audio file with Amazon Transcribe | |
transcription = transcribe_audio(audio_path, age_val) | |
# Return status message based on whether it's a demo or real transcription | |
if not transcribe_client: | |
status_msg = "⚠️ Demo mode: Using example transcription (AWS credentials not configured)" | |
else: | |
status_msg = "✅ Transcription completed successfully" | |
return transcription, status_msg | |
except Exception as e: | |
logger.exception("Error transcribing audio") | |
return f"Error: {str(e)}", f"❌ Transcription failed: {str(e)}" | |
# Connect the transcribe button to its handler | |
transcribe_btn.click( | |
on_transcribe_audio, | |
inputs=[audio_input, transcription_age], | |
outputs=[transcription_output, transcription_status] | |
) | |
# Copy transcription to analysis tab | |
def copy_to_analysis(transcription): | |
return transcription, gr.update(selected=0) # Switch to Analysis tab | |
copy_to_analysis_btn.click( | |
copy_to_analysis, | |
inputs=[transcription_output], | |
outputs=[transcript, main_tabs] | |
) | |
return app | |
# Create requirements.txt file for HuggingFace Spaces | |
def create_requirements_file(): | |
requirements = [ | |
"gradio>=4.0.0", | |
"pandas", | |
"numpy", | |
"matplotlib", | |
"Pillow", | |
"reportlab>=3.6.0", # Required for PDF exports | |
"PyPDF2>=3.0.0", # Required for PDF reading | |
"boto3>=1.28.0" # Required for AWS services | |
] | |
with open("requirements.txt", "w") as f: | |
for req in requirements: | |
f.write(f"{req}\n") | |
if __name__ == "__main__": | |
# Create requirements.txt for HuggingFace Spaces | |
create_requirements_file() | |
# Check for AWS credentials | |
if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: | |
print("NOTE: AWS credentials not found. The app will run in demo mode with simulated responses.") | |
print("To enable full functionality, set AWS_ACCESS_KEY and AWS_SECRET_KEY environment variables.") | |
app = create_interface() | |
app.launch(show_api=False) # Disable API tab for security |