Spaces:
Sleeping
Sleeping
import gradio as gr | |
import boto3 | |
import json | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import re | |
import logging | |
import os | |
import pickle | |
import csv | |
from PIL import Image | |
import io | |
import uuid | |
from datetime import datetime | |
import tempfile | |
import time | |
# Try to import ReportLab (needed for PDF generation) | |
try: | |
from reportlab.lib.pagesizes import letter | |
from reportlab.lib import colors | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
REPORTLAB_AVAILABLE = True | |
except ImportError: | |
logger = logging.getLogger(__name__) | |
logger.warning("ReportLab library not available - PDF export will be disabled") | |
REPORTLAB_AVAILABLE = False | |
# Try to import PyPDF2 (needed for PDF reading) | |
try: | |
import PyPDF2 | |
PYPDF2_AVAILABLE = True | |
except ImportError: | |
logger = logging.getLogger(__name__) | |
logger.warning("PyPDF2 library not available - PDF reading will be disabled") | |
PYPDF2_AVAILABLE = False | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# AWS credentials for Bedrock API | |
# For HuggingFace Spaces, set these as secrets in the Space settings | |
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", "") | |
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", "") | |
AWS_REGION = os.getenv("AWS_REGION", "us-east-1") | |
# Initialize AWS clients if credentials are available | |
bedrock_client = None | |
transcribe_client = None | |
s3_client = None | |
if AWS_ACCESS_KEY and AWS_SECRET_KEY: | |
try: | |
# Initialize Bedrock client for AI analysis | |
bedrock_client = boto3.client( | |
'bedrock-runtime', | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION | |
) | |
logger.info("Bedrock client initialized successfully") | |
# Initialize Transcribe client for speech-to-text | |
transcribe_client = boto3.client( | |
'transcribe', | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION | |
) | |
logger.info("Transcribe client initialized successfully") | |
# Initialize S3 client for storing audio files | |
s3_client = boto3.client( | |
's3', | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION | |
) | |
logger.info("S3 client initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize AWS clients: {str(e)}") | |
# S3 bucket for storing audio files | |
S3_BUCKET = os.environ.get("S3_BUCKET", "casl-audio-files") | |
S3_PREFIX = "transcribe-audio/" | |
# Sample transcript for the demo | |
SAMPLE_TRANSCRIPT = """*PAR: today I would &-um like to talk about &-um a fun trip I took last &-um summer with my family. | |
*PAR: we went to the &-um &-um beach [//] no to the mountains [//] I mean the beach actually. | |
*PAR: there was lots of &-um &-um swimming and &-um sun. | |
*PAR: we [/] we stayed for &-um three no [//] four days in a &-um hotel near the water [: ocean] [*]. | |
*PAR: my favorite part was &-um building &-um castles with sand. | |
*PAR: sometimes I forget [//] forgetted [: forgot] [*] what they call those things we built. | |
*PAR: my brother he [//] he helped me dig a big hole. | |
*PAR: we saw [/] saw fishies [: fish] [*] swimming in the water. | |
*PAR: sometimes I wonder [/] wonder where fishies [: fish] [*] go when it's cold. | |
*PAR: maybe they have [/] have houses under the water. | |
*PAR: after swimming we [//] I eat [: ate] [*] &-um ice cream with &-um chocolate things on top. | |
*PAR: what do you call those &-um &-um sprinkles! that's the word. | |
*PAR: my mom said to &-um that I could have &-um two scoops next time. | |
*PAR: I want to go back to the beach [/] beach next year.""" | |
# =============================== | |
# Database and Storage Functions | |
# =============================== | |
# Create data directories if they don't exist | |
DATA_DIR = os.environ.get("DATA_DIR", "patient_data") | |
RECORDS_FILE = os.path.join(DATA_DIR, "patient_records.csv") | |
ANALYSES_DIR = os.path.join(DATA_DIR, "analyses") | |
DOWNLOADS_DIR = os.path.join(DATA_DIR, "downloads") | |
AUDIO_DIR = os.path.join(DATA_DIR, "audio") | |
def ensure_data_dirs(): | |
"""Ensure data directories exist""" | |
global DOWNLOADS_DIR, AUDIO_DIR | |
try: | |
os.makedirs(DATA_DIR, exist_ok=True) | |
os.makedirs(ANALYSES_DIR, exist_ok=True) | |
os.makedirs(DOWNLOADS_DIR, exist_ok=True) | |
os.makedirs(AUDIO_DIR, exist_ok=True) | |
logger.info(f"Data directories created: {DATA_DIR}, {ANALYSES_DIR}, {DOWNLOADS_DIR}, {AUDIO_DIR}") | |
# Create records file if it doesn't exist | |
if not os.path.exists(RECORDS_FILE): | |
with open(RECORDS_FILE, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow([ | |
"ID", "Name", "Record ID", "Age", "Gender", | |
"Assessment Date", "Clinician", "Analysis Date", "File Path" | |
]) | |
except Exception as e: | |
logger.warning(f"Could not create data directories: {str(e)}") | |
# Fallback to tmp directory on HF Spaces | |
DOWNLOADS_DIR = os.path.join(tempfile.gettempdir(), "casl_downloads") | |
AUDIO_DIR = os.path.join(tempfile.gettempdir(), "casl_audio") | |
os.makedirs(DOWNLOADS_DIR, exist_ok=True) | |
os.makedirs(AUDIO_DIR, exist_ok=True) | |
logger.info(f"Using fallback directories: {DOWNLOADS_DIR}, {AUDIO_DIR}") | |
# Initialize data directories | |
ensure_data_dirs() | |
def save_patient_record(patient_info, analysis_results, transcript): | |
"""Save patient record to storage""" | |
try: | |
# Generate unique ID for the record | |
record_id = str(uuid.uuid4()) | |
# Extract patient information | |
name = patient_info.get("name", "") | |
patient_id = patient_info.get("record_id", "") | |
age = patient_info.get("age", "") | |
gender = patient_info.get("gender", "") | |
assessment_date = patient_info.get("assessment_date", "") | |
clinician = patient_info.get("clinician", "") | |
# Create filename for the analysis data | |
filename = f"analysis_{record_id}.pkl" | |
filepath = os.path.join(ANALYSES_DIR, filename) | |
# Save analysis data | |
with open(filepath, 'wb') as f: | |
pickle.dump({ | |
"patient_info": patient_info, | |
"analysis_results": analysis_results, | |
"transcript": transcript, | |
"timestamp": datetime.now().isoformat(), | |
}, f) | |
# Add record to CSV file | |
with open(RECORDS_FILE, 'a', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow([ | |
record_id, name, patient_id, age, gender, | |
assessment_date, clinician, datetime.now().strftime('%Y-%m-%d'), | |
filepath | |
]) | |
return record_id | |
except Exception as e: | |
logger.error(f"Error saving patient record: {str(e)}") | |
return None | |
def load_patient_record(record_id): | |
"""Load patient record from storage""" | |
try: | |
# Find the record in the CSV file | |
if not os.path.exists(RECORDS_FILE): | |
logger.error(f"Records file does not exist: {RECORDS_FILE}") | |
return None | |
with open(RECORDS_FILE, 'r', newline='') as f: | |
reader = csv.reader(f) | |
next(reader) # Skip header | |
for row in reader: | |
if len(row) < 9: # Ensure row has enough elements | |
logger.warning(f"Skipping malformed record row: {row}") | |
continue | |
if row[0] == record_id: | |
file_path = row[8] | |
# Check if the file exists | |
if not os.path.exists(file_path): | |
logger.error(f"Analysis file not found: {file_path}") | |
return None | |
# Load and return the data | |
try: | |
with open(file_path, 'rb') as f: | |
return pickle.load(f) | |
except (pickle.PickleError, EOFError) as pickle_err: | |
logger.error(f"Error unpickling file {file_path}: {str(pickle_err)}") | |
return None | |
logger.warning(f"Record ID not found: {record_id}") | |
return None | |
except Exception as e: | |
logger.error(f"Error loading patient record: {str(e)}") | |
return None | |
def get_all_patient_records(): | |
"""Return a list of all patient records""" | |
try: | |
records = [] | |
# Ensure data directories exist | |
ensure_data_dirs() | |
if not os.path.exists(RECORDS_FILE): | |
logger.warning(f"Records file does not exist, creating it: {RECORDS_FILE}") | |
with open(RECORDS_FILE, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow([ | |
"ID", "Name", "Record ID", "Age", "Gender", | |
"Assessment Date", "Clinician", "Analysis Date", "File Path" | |
]) | |
return records | |
# Read existing records | |
valid_records = [] | |
with open(RECORDS_FILE, 'r', newline='') as f: | |
reader = csv.reader(f) | |
next(reader) # Skip header | |
for row in reader: | |
if len(row) < 9: # Check for malformed rows | |
continue | |
# Check if the analysis file exists | |
file_path = row[8] | |
file_exists = os.path.exists(file_path) | |
record = { | |
"id": row[0], | |
"name": row[1], | |
"record_id": row[2], | |
"age": row[3], | |
"gender": row[4], | |
"assessment_date": row[5], | |
"clinician": row[6], | |
"analysis_date": row[7], | |
"file_path": file_path, | |
"status": "Valid" if file_exists else "Missing File" | |
} | |
records.append(record) | |
# Keep track of valid records for potential cleanup | |
if file_exists: | |
valid_records.append(row) | |
# If we found invalid records, consider rewriting the CSV with only valid entries | |
if len(valid_records) < len(records): | |
logger.warning(f"Found {len(records) - len(valid_records)} invalid records") | |
# Uncomment to enable automatic cleanup: | |
# with open(RECORDS_FILE, 'w', newline='') as f: | |
# writer = csv.writer(f) | |
# writer.writerow([ | |
# "ID", "Name", "Record ID", "Age", "Gender", | |
# "Assessment Date", "Clinician", "Analysis Date", "File Path" | |
# ]) | |
# for row in valid_records: | |
# writer.writerow(row) | |
return records | |
except Exception as e: | |
logger.error(f"Error getting patient records: {str(e)}") | |
return [] | |
def delete_patient_record(record_id): | |
"""Delete a patient record""" | |
try: | |
if not os.path.exists(RECORDS_FILE): | |
return False | |
# Find the record and its file | |
file_path = None | |
with open(RECORDS_FILE, 'r', newline='') as f: | |
reader = csv.reader(f) | |
rows = list(reader) | |
header = rows[0] | |
for i, row in enumerate(rows[1:], 1): | |
if len(row) < 9: | |
continue | |
if row[0] == record_id: | |
file_path = row[8] | |
break | |
if not file_path: | |
return False | |
# Delete the analysis file if it exists | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
# Remove the record from the CSV | |
rows_to_keep = [row for row in rows[1:] if len(row) >= 9 and row[0] != record_id] | |
with open(RECORDS_FILE, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow(header) | |
writer.writerows(rows_to_keep) | |
return True | |
except Exception as e: | |
logger.error(f"Error deleting patient record: {str(e)}") | |
return False | |
# =============================== | |
# Utility Functions | |
# =============================== | |
def read_pdf(file_path): | |
"""Read text from a PDF file""" | |
if not PYPDF2_AVAILABLE: | |
return "Error: PDF reading is not available - PyPDF2 library is not installed" | |
try: | |
with open(file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
except Exception as e: | |
logger.error(f"Error reading PDF: {str(e)}") | |
return "" | |
def read_cha_file(file_path): | |
"""Read and parse a .cha transcript file""" | |
try: | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
# Extract participant lines (starting with *PAR:) | |
par_lines = [] | |
for line in content.splitlines(): | |
if line.startswith('*PAR:'): | |
par_lines.append(line) | |
# If no PAR lines found, just return the whole content | |
if not par_lines: | |
return content | |
return '\n'.join(par_lines) | |
except Exception as e: | |
logger.error(f"Error reading CHA file: {str(e)}") | |
return "" | |
def process_upload(file): | |
"""Process an uploaded file (PDF, text, or CHA)""" | |
if file is None: | |
return "" | |
file_path = file.name | |
if file_path.endswith('.pdf'): | |
if PYPDF2_AVAILABLE: | |
return read_pdf(file_path) | |
else: | |
return "Error: PDF reading is disabled - PyPDF2 library is not installed" | |
elif file_path.endswith('.cha'): | |
return read_cha_file(file_path) | |
else: | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
return f.read() | |
# =============================== | |
# AI Model Interface Functions | |
# =============================== | |
def call_bedrock(prompt, max_tokens=4096): | |
"""Call the AWS Bedrock API to analyze text using Claude""" | |
if not bedrock_client: | |
return "AWS credentials not configured. Please set your AWS credentials as secrets in the Space settings." | |
try: | |
body = json.dumps({ | |
"anthropic_version": "bedrock-2023-05-31", | |
"max_tokens": max_tokens, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": prompt | |
} | |
], | |
"temperature": 0.3, | |
"top_p": 0.9 | |
}) | |
modelId = 'anthropic.claude-3-sonnet-20240229-v1:0' | |
response = bedrock_client.invoke_model( | |
body=body, | |
modelId=modelId, | |
accept='application/json', | |
contentType='application/json' | |
) | |
response_body = json.loads(response.get('body').read()) | |
return response_body['content'][0]['text'] | |
except Exception as e: | |
logger.error(f"Error in call_bedrock: {str(e)}") | |
return f"Error: {str(e)}" | |
def generate_demo_response(prompt): | |
"""Generate a simulated response for demo purposes""" | |
# This function generates a realistic but fake response for demo purposes | |
# In a real deployment, you would call an actual LLM API | |
random_seed = sum(ord(c) for c in prompt) % 1000 # Generate a seed based on prompt | |
np.random.seed(random_seed) | |
# Simulate speech factors with random but reasonable values | |
factors = [ | |
"Difficulty producing fluent speech", | |
"Word retrieval issues", | |
"Grammatical errors", | |
"Repetitions and revisions", | |
"Neologisms", | |
"Perseveration", | |
"Comprehension issues" | |
] | |
occurrences = np.random.randint(1, 15, size=len(factors)) | |
percentiles = np.random.randint(30, 95, size=len(factors)) | |
# Simulate CASL scores | |
domains = ["Lexical/Semantic", "Syntactic", "Supralinguistic"] | |
scores = np.random.randint(80, 115, size=3) | |
percentiles_casl = [int(np.interp(s, [70, 85, 100, 115, 130], [2, 16, 50, 84, 98])) for s in scores] | |
perf_levels = [] | |
for s in scores: | |
if s < 70: perf_levels.append("Well Below Average") | |
elif s < 85: perf_levels.append("Below Average") | |
elif s < 115: perf_levels.append("Average") | |
elif s < 130: perf_levels.append("Above Average") | |
else: perf_levels.append("Well Above Average") | |
# Build response | |
response = "## Speech Factor Analysis\n\n" | |
for i, factor in enumerate(factors): | |
response += f"{factor}: {occurrences[i]}, {percentiles[i]}\n" | |
response += "\n## CASL-2 Assessment\n\n" | |
for i, domain in enumerate(domains): | |
response += f"{domain} Skills: Standard Score ({scores[i]}), Percentile Rank ({percentiles_casl[i]}%), Performance Level ({perf_levels[i]})\n" | |
response += "\n## Other analysis/Best plans of action:\n\n" | |
suggestions = [ | |
"Implement word-finding strategies with semantic cuing", | |
"Practice structured narrative tasks with visual supports", | |
"Use sentence formulation exercises with increasing complexity", | |
"Incorporate self-monitoring techniques during structured conversations", | |
"Work on grammatical forms through structured practice" | |
] | |
for suggestion in suggestions: | |
response += f"- {suggestion}\n" | |
response += "\n## Explanation:\n\n" | |
response += "Based on the analysis, this patient demonstrates moderate word-finding difficulties with compensatory strategies like filler words and repetitions. Their syntactic skills show some weakness in verb tense consistency. Treatment should focus on building vocabulary access, grammatical accuracy, and narrative structure using scaffolded support.\n" | |
response += "\n## Additional Analysis:\n\n" | |
response += "The patient shows relative strengths in conversation maintenance and topic coherence. Consider building on these strengths while addressing specific language formulation challenges. Recommended frequency: 2-3 sessions per week for 10-12 weeks with periodic reassessment." | |
return response | |
def generate_demo_transcription(): | |
"""Generate a simulated transcription response""" | |
return """*PAR: today I want to tell you about my favorite toy. | |
*PAR: it's a &-um teddy bear that I got for my birthday. | |
*PAR: he has &-um brown fur and a red bow. | |
*PAR: I like to sleep with him every night. | |
*PAR: sometimes I take him to school in my backpack. | |
*INV: what's your teddy bear's name? | |
*PAR: his name is &-um Brownie because he's brown.""" | |
def generate_demo_qa_response(question): | |
"""Generate a simulated Q&A response""" | |
qa_responses = { | |
"what is casl": "CASL-2 (Comprehensive Assessment of Spoken Language, Second Edition) is a standardized assessment tool used by Speech-Language Pathologists to evaluate a child's oral language abilities across multiple domains including lexical/semantic, syntactic, and supralinguistic skills. It helps identify language disorders and guides intervention planning.", | |
"how do i interpret scores": "CASL-2 scores include standard scores (mean=100, SD=15), percentile ranks, and performance levels. Standard scores below 85 indicate below average performance, 85-115 is average, and above 115 is above average. Percentile ranks show how a child performs relative to same-age peers.", | |
"what activities help word finding": "Activities to improve word-finding skills include semantic feature analysis (describing attributes of objects), categorization tasks, word association games, rapid naming practice, and structured conversation with gentle cueing. Visual supports and semantic mapping can also be helpful.", | |
"how often should therapy occur": "The recommended frequency for speech-language therapy typically ranges from 1-3 sessions per week, depending on the severity of the impairment. For moderate difficulties, twice weekly sessions of 30-45 minutes are common. Consistency is important for progress.", | |
"when should i reassess": "Reassessment is typically recommended every 3-6 months to track progress and adjust treatment goals. For educational settings, annual reassessment is common. More frequent informal assessments can help guide ongoing intervention.", | |
} | |
# Simple keyword matching for demo purposes | |
for key, response in qa_responses.items(): | |
if key in question.lower(): | |
return response | |
return "I don't have specific information about that topic. For detailed professional guidance, consult with a licensed Speech-Language Pathologist who can provide advice specific to your situation." | |
# =============================== | |
# Analysis Functions | |
# =============================== | |
def parse_casl_response(response): | |
"""Parse the LLM response for CASL analysis into structured data""" | |
lines = response.split('\n') | |
data = { | |
'Factor': [], | |
'Occurrences': [], | |
'Severity': [], | |
'Examples': [] # Added field for error examples | |
} | |
casl_data = { | |
'Domain': ['Lexical/Semantic', 'Syntactic', 'Supralinguistic'], | |
'Standard Score': [0, 0, 0], | |
'Percentile': [0, 0, 0], | |
'Performance Level': ['', '', ''], | |
'Examples': ['', '', ''] # Added field for specific examples | |
} | |
treatment_suggestions = [] | |
explanation = "" | |
additional_analysis = "" | |
specific_errors = {} # Track specific error examples by factor | |
raw_response = response # Store the complete raw LLM response | |
# Pattern to match factor lines - updated to potentially capture examples | |
factor_pattern = re.compile(r'([\w\s/]+):\s*(\d+)[,\s]+(\d+)(?:\s*-\s*(.+))?') | |
# Pattern to match CASL data | |
casl_pattern = re.compile(r'(\w+/?\w*)\s+Skills:\s+Standard\s+Score\s+\((\d+)\),\s+Percentile\s+Rank\s+\((\d+)%\),\s+Performance\s+Level\s+\(([\w\s]+)\)') | |
# Pattern to find examples | |
example_pattern = re.compile(r'(?:Example|Examples|observed|observed in)[^\"\'"]*[\"\']([^\"\']*)[\"\']') | |
error_pattern = re.compile(r'(?:error|errors|difficulty|difficulties)[^\"\'"]*[\"\']([^\"\']*)[\"\']') | |
current_factor = None | |
current_domain = None | |
in_suggestions = False | |
in_explanation = False | |
in_additional = False | |
in_examples = False | |
for i, line in enumerate(lines): | |
line = line.strip() | |
# Skip empty lines | |
if not line: | |
continue | |
# Check for factor data | |
factor_match = factor_pattern.search(line) | |
if factor_match: | |
factor = factor_match.group(1).strip() | |
occurrences = int(factor_match.group(2)) | |
severity = int(factor_match.group(3)) | |
example = factor_match.group(4) if factor_match.group(4) else "" | |
# Look ahead to find examples for this factor | |
if not example: | |
# Check next few lines for examples | |
for j in range(i+1, min(i+5, len(lines))): | |
next_line = lines[j].strip() | |
if next_line and ('"' in next_line or "'" in next_line): | |
example_match = example_pattern.search(next_line) | |
if example_match: | |
example = example_match.group(1) | |
break | |
error_match = error_pattern.search(next_line) | |
if error_match: | |
example = error_match.group(1) | |
break | |
data['Factor'].append(factor) | |
data['Occurrences'].append(occurrences) | |
data['Severity'].append(severity) | |
data['Examples'].append(example) | |
specific_errors[factor] = example | |
current_factor = factor | |
continue | |
# Check for CASL data | |
casl_match = casl_pattern.search(line) | |
if casl_match: | |
domain = casl_match.group(1) | |
score = int(casl_match.group(2)) | |
percentile = int(casl_match.group(3)) | |
level = casl_match.group(4) | |
domain_examples = "" | |
# Look ahead for examples related to this domain | |
for j in range(i+1, min(i+10, len(lines))): | |
next_line = lines[j].strip() | |
if "Domain:" in next_line or casl_pattern.search(next_line): | |
break | |
if ('"' in next_line or "'" in next_line) and "example" in next_line.lower(): | |
example_match = re.search(r'[\"\']([^\"\']*)[\"\']', next_line) | |
if example_match: | |
domain_examples = example_match.group(1) | |
break | |
if "Lexical" in domain: | |
casl_data['Standard Score'][0] = score | |
casl_data['Percentile'][0] = percentile | |
casl_data['Performance Level'][0] = level | |
casl_data['Examples'][0] = domain_examples | |
current_domain = "Lexical/Semantic" | |
elif "Syntactic" in domain: | |
casl_data['Standard Score'][1] = score | |
casl_data['Percentile'][1] = percentile | |
casl_data['Performance Level'][1] = level | |
casl_data['Examples'][1] = domain_examples | |
current_domain = "Syntactic" | |
elif "Supralinguistic" in domain: | |
casl_data['Standard Score'][2] = score | |
casl_data['Percentile'][2] = percentile | |
casl_data['Performance Level'][2] = level | |
casl_data['Examples'][2] = domain_examples | |
current_domain = "Supralinguistic" | |
continue | |
# Check for section headers | |
if "Other analysis/Best plans of action:" in line or "### Recommended Treatment Approaches" in line or "Treatment Recommendations:" in line: | |
in_suggestions = True | |
in_explanation = False | |
in_additional = False | |
in_examples = False | |
continue | |
elif "Explanation:" in line or "### Clinical Rationale" in line or "Clinical Rationale:" in line: | |
in_suggestions = False | |
in_explanation = True | |
in_additional = False | |
in_examples = False | |
continue | |
elif "Additional Analysis:" in line or "Further Observations:" in line: | |
in_suggestions = False | |
in_explanation = False | |
in_additional = True | |
in_examples = False | |
continue | |
elif "Examples:" in line or "Specific Errors:" in line: | |
in_suggestions = False | |
in_explanation = False | |
in_additional = False | |
in_examples = True | |
continue | |
# Add content to appropriate section | |
if in_suggestions: | |
if line.startswith("- "): | |
treatment_suggestions.append(line[2:]) # Remove the bullet point | |
elif line.startswith("•"): | |
treatment_suggestions.append(line[1:].strip()) # Remove bullet and trim | |
elif line and not line.startswith("#"): | |
# Non-empty, non-header lines might be treatment suggestions without bullets | |
treatment_suggestions.append(line) | |
elif in_explanation: | |
explanation += line + "\n" | |
elif in_additional: | |
additional_analysis += line + "\n" | |
elif in_examples and current_factor and not specific_errors.get(current_factor): | |
# Look for quoted examples in the examples section | |
if '"' in line or "'" in line: | |
example_match = re.search(r'[\"\']([^\"\']*)[\"\']', line) | |
if example_match: | |
specific_errors[current_factor] = example_match.group(1) | |
# Update the examples in the dataframe | |
if current_factor in data['Factor']: | |
idx = data['Factor'].index(current_factor) | |
data['Examples'][idx] = example_match.group(1) | |
# Continuously look for examples with quotes regardless of section | |
if ('"' in line or "'" in line) and current_factor: | |
if re.search(rf'{current_factor}.*[\"\']([^\"\']*)[\"\']', line, re.IGNORECASE): | |
example_match = re.search(r'[\"\']([^\"\']*)[\"\']', line) | |
if example_match: | |
specific_errors[current_factor] = example_match.group(1) | |
# Update in dataframe | |
if current_factor in data['Factor']: | |
idx = data['Factor'].index(current_factor) | |
data['Examples'][idx] = example_match.group(1) | |
# Process specific errors and examples if they're presented as a list later in the text | |
for i, line in enumerate(lines): | |
if "examples of errors" in line.lower() or "error examples" in line.lower(): | |
# Look through next few lines for examples | |
for j in range(i+1, min(i+15, len(lines))): | |
example_line = lines[j].strip() | |
if not example_line or example_line.startswith("#"): | |
continue | |
# Look for factors mentioned with examples | |
for factor in data['Factor']: | |
if factor.lower() in example_line.lower() and ('"' in example_line or "'" in example_line): | |
example_match = re.search(r'[\"\']([^\"\']*)[\"\']', example_line) | |
if example_match: | |
idx = data['Factor'].index(factor) | |
data['Examples'][idx] = example_match.group(1) | |
specific_errors[factor] = example_match.group(1) | |
return { | |
'speech_factors': pd.DataFrame(data), | |
'casl_data': pd.DataFrame(casl_data), | |
'treatment_suggestions': treatment_suggestions, | |
'explanation': explanation, | |
'additional_analysis': additional_analysis, | |
'specific_errors': specific_errors, | |
'raw_response': raw_response # Include the full LLM response | |
} | |
def create_casl_plots(speech_factors, casl_data): | |
"""Create visualizations for the CASL analysis results""" | |
# Set a professional style for the plots | |
plt.style.use('seaborn-v0_8-pastel') | |
# Create figure with two subplots | |
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6), dpi=100) | |
# Plot speech factors - sorted by occurrence count | |
if not speech_factors.empty: | |
# Sort the dataframe | |
speech_factors_sorted = speech_factors.sort_values('Occurrences', ascending=False) | |
# Custom colors | |
speech_colors = ['#4C72B0', '#55A868', '#C44E52', '#8172B3', '#CCB974', '#64B5CD', '#4C72B0'] | |
# Create horizontal bar chart | |
bars = ax1.barh(speech_factors_sorted['Factor'], | |
speech_factors_sorted['Occurrences'], | |
color=speech_colors[:len(speech_factors_sorted)]) | |
# Add count labels at the end of each bar | |
for i, bar in enumerate(bars): | |
width = bar.get_width() | |
factor = speech_factors_sorted.iloc[i]['Factor'] | |
# Get severity percentile for this factor | |
severity = speech_factors_sorted.iloc[i]['Severity'] | |
# Label with both count and severity percentile | |
ax1.text(width + 0.3, bar.get_y() + bar.get_height()/2, | |
f'{width:.0f} ({severity}%)', ha='left', va='center') | |
# Add example as annotation if available | |
if 'Examples' in speech_factors_sorted.columns: | |
example = speech_factors_sorted.iloc[i]['Examples'] | |
if example and len(example) > 0: | |
# Add a small marker to indicate example exists | |
ax1.text(0.5, bar.get_y() + bar.get_height()/2, | |
'★', ha='center', va='center', color='#C44E52', | |
fontsize=8, fontweight='bold') | |
ax1.set_title('Speech Factors Analysis', fontsize=14, fontweight='bold') | |
ax1.set_xlabel('Number of Occurrences', fontsize=11) | |
# No y-label needed for horizontal bar chart | |
# Remove top and right spines | |
ax1.spines['top'].set_visible(False) | |
ax1.spines['right'].set_visible(False) | |
# Add a footnote about the star symbol | |
ax1.annotate('★ = Example available in details panel', xy=(0, -0.1), xycoords='axes fraction', | |
fontsize=8, ha='left', va='center', color='#C44E52') | |
# Plot CASL domains | |
domain_names = casl_data['Domain'] | |
y_scores = casl_data['Standard Score'] | |
percentiles = casl_data['Percentile'] | |
# Custom color scheme | |
casl_colors = ['#4C72B0', '#55A868', '#C44E52'] | |
# Create bars with nice colors | |
bars = ax2.bar(domain_names, y_scores, color=casl_colors) | |
# Add score labels on top of each bar | |
for i, bar in enumerate(bars): | |
height = bar.get_height() | |
score = y_scores.iloc[i] | |
percentile = percentiles.iloc[i] | |
# Label with both score and percentile | |
ax2.text(bar.get_x() + bar.get_width()/2., height + 1, | |
f'{score:.0f} ({percentile}%)', ha='center', va='bottom') | |
# Add a star marker if example exists | |
if 'Examples' in casl_data.columns: | |
example = casl_data.iloc[i]['Examples'] | |
if example and len(example) > 0: | |
ax2.text(bar.get_x() + bar.get_width()/2., height/2, | |
'★', ha='center', va='center', color='white', | |
fontsize=12, fontweight='bold') | |
# Add score reference lines | |
ax2.axhline(y=100, linestyle='--', color='gray', alpha=0.7, label='Average (100)') | |
ax2.axhline(y=85, linestyle=':', color='orange', alpha=0.7, label='Below Average (<85)') | |
ax2.axhline(y=115, linestyle=':', color='green', alpha=0.7, label='Above Average (>115)') | |
# Add labels and title | |
ax2.set_title('CASL-2 Standard Scores', fontsize=14, fontweight='bold') | |
ax2.set_ylabel('Standard Score', fontsize=11) | |
ax2.set_ylim(bottom=0, top=max(130, max(y_scores) + 15)) # Set y-axis limit with some padding | |
# Add legend | |
ax2.legend(loc='upper right', fontsize='small') | |
# Remove top and right spines | |
ax2.spines['top'].set_visible(False) | |
ax2.spines['right'].set_visible(False) | |
# Add overall figure title | |
fig.suptitle('Speech Analysis Results', fontsize=16, fontweight='bold', y=0.98) | |
# Add a subtitle with note about examples | |
plt.figtext(0.5, 0.01, '★ indicates specific examples available in the Error Examples panel', | |
ha='center', fontsize=9, fontstyle='italic') | |
plt.tight_layout(rect=[0, 0.03, 1, 0.95]) # Adjust layout to make room for suptitle | |
# Save plot to buffer | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', bbox_inches='tight') | |
buf.seek(0) | |
plt.close() | |
return buf | |
def create_casl_radar_chart(speech_factors): | |
"""Create a radar chart for speech factors (percentiles)""" | |
if speech_factors.empty or 'Severity' not in speech_factors.columns: | |
# Create a placeholder image if no data | |
plt.figure(figsize=(8, 8)) | |
plt.text(0.5, 0.5, "No data available for radar chart", | |
ha='center', va='center', fontsize=14) | |
plt.axis('off') | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png') | |
buf.seek(0) | |
plt.close() | |
return buf | |
# Prepare data for radar chart | |
categories = speech_factors['Factor'].tolist() | |
percentiles = speech_factors['Severity'].tolist() | |
# Need to repeat first value to close the polygon | |
categories = categories + [categories[0]] | |
percentiles = percentiles + [percentiles[0]] | |
# Convert to radians and calculate points | |
N = len(categories) - 1 # Subtract 1 for the repeated point | |
angles = [n / float(N) * 2 * np.pi for n in range(N)] | |
angles += angles[:1] # Repeat the first angle to close the polygon | |
# Create the plot | |
fig = plt.figure(figsize=(8, 8)) | |
ax = fig.add_subplot(111, polar=True) | |
# Draw percentile lines with labels | |
plt.xticks(angles[:-1], categories[:-1], size=12) | |
ax.set_rlabel_position(0) | |
plt.yticks([20, 40, 60, 80, 100], ["20", "40", "60", "80", "100"], color="grey", size=10) | |
plt.ylim(0, 100) | |
# Plot data | |
ax.plot(angles, percentiles, linewidth=1, linestyle='solid', color='#4C72B0') | |
ax.fill(angles, percentiles, color='#4C72B0', alpha=0.25) | |
# Add title | |
plt.title('Speech Factors Severity (Percentile)', size=15, fontweight='bold', pad=20) | |
# Save to buffer | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', bbox_inches='tight') | |
buf.seek(0) | |
plt.close() | |
return buf | |
def analyze_transcript(transcript, age, gender): | |
"""Analyze a speech transcript using the CASL framework""" | |
# CHAT transcription symbol cheat sheet | |
cheat_sheet = """ | |
CHAT TRANSCRIPTION SYMBOL SUMMARY -- Abridged for AphasiaBank | |
Basic Utterance Terminators | |
. period | |
? question | |
! exclamation | |
Special Utterance Terminators | |
+… trailing off | |
+..? trailing off of a question | |
+/. interruption by another speaker | |
+/? interruption of a question by another speaker | |
+//. self-interruption | |
+//? self-interruption of a question | |
+"/. quotation follows on next line | |
+" quoted utterance occurs on this line (use at beginning of utterance | |
as link, not a terminator) | |
+< lazy overlap marking (at beginning of utterance that overlapped the | |
the previous utterance) | |
@n neologism (e.g., sakov@n) | |
exclamations common ones: ah, aw, haha, ow, oy, sh, ugh, uhoh | |
interjections common ones: mhm, uhhuh, hm, uhuh | |
fillers common ones: &-um, &-uh | |
letters s@l | |
letter sequence abcdefg@k | |
xxx unintelligible speech, not treated as a word | |
www untranscribed material (e.g., looking through pictures, talking with | |
spouse), must be followed by %exp tier (see below) | |
&+sounds phonological fragment (&+sh &+w we came home) | |
Scoped Symbols | |
[: text] target/intended word for errors (e.g., tried [: cried]) | |
[*] error (e.g., paraphasia -- wɛk@u [: wet] [*]) | |
[/] retracing without correction (e.g., simple repetition) | |
put repeated items between <> unless only one word was repeated | |
[//] retracing with correction (e.g., simple word or grammar change) | |
put changed items between <> unless only one word was changed | |
""" | |
# Instructions for the LLM analysis | |
instructions = """ | |
Advanced Linguistic Analysis Protocol for Adolescent Language Samples (Ages 14-18) | |
You are a highly specialized assistant supporting speech-language pathologists in conducting comprehensive linguistic analyses of adolescent language samples. Your analysis must adhere to evidence-based practice standards for secondary-level language assessment while producing results that inform both clinical decision-making and family understanding. | |
Initial Sample Assessment | |
1. Document sample metadata: | |
* Total number of utterances | |
* Sample collection context (conversational, narrative, expository, persuasive, procedural) | |
* Sample elicitation method (if indicated) | |
* Total duration/length of interaction | |
* Any transcription conventions used (e.g., SALT, CHAT) | |
2. Determine analysis approach: | |
* For samples with ≤50 utterances: Perform complete analysis on all utterances | |
* For samples >50 utterances: Perform complete analysis, then select 50 representative utterances that capture key patterns across all linguistic domains while maintaining the natural distribution of features | |
Utterance-Level Microanalysis | |
For each utterance, provide detailed linguistic breakdown including: | |
1. Structural Components | |
* Utterance number and full text (verbatim) | |
* Word count (excluding fillers, false starts, and repetitions) | |
* C-unit segmentation (when applicable) | |
* MLU in words and morphemes | |
* Syntactic classification: | |
* Simple, compound, complex, or compound-complex | |
* Complete or fragmentary | |
* Declarative, interrogative, imperative, or exclamatory | |
* Clausal density (number of clauses/utterance) | |
2. Syntactic Analysis | |
* Constituent structure identification: | |
* Subject and predicate components | |
* Noun phrases (including pre- and post-modification) | |
* Verb phrases (including auxiliaries, complements) | |
* Adverbial phrases and clauses (including position and function) | |
* Prepositional phrases (including syntactic role) | |
* Subordinate clauses (including type and function) | |
* Embedding depth and recursion patterns | |
* Syntactic movement and transformations | |
* Non-canonical structures (passives, clefts, etc.) | |
3. Morphological Analysis | |
* Bound morpheme usage (inflectional and derivational) | |
* Tense marking consistency | |
* Agreement patterns (subject-verb, pronoun-antecedent) | |
* Morphological errors with classification | |
4. Error Analysis (for each identified error) | |
* Precise error location: | |
* Utterance number and full quotation | |
* Position within utterance (initial, medial, final) | |
* Syntactic position (e.g., main clause, subordinate clause, noun phrase) | |
* Proximity to other linguistic features (e.g., complex vocabulary, disfluencies) | |
* Error type classification: | |
* Morphosyntactic (agreement, tense, etc.) | |
* Lexical-semantic (word selection, collocational) | |
* Phonological (if transcribed) | |
* Pragmatic (if contextually inappropriate) | |
* Error pattern (developmental vs. atypical) | |
* Clinical significance of location: | |
* Relationship to sentence complexity (errors increasing with complexity) | |
* Patterns related to linguistic context (e.g., errors occurring after disfluencies) | |
* Consistency across similar syntactic environments | |
* Relationship to cognitive load (e.g., errors increasing in dense information units) | |
* Error frequency and distribution across contexts | |
* Self-correction attempts and success rate | |
5. Fluency Markers | |
* Mazes (false starts, repetitions, reformulations) | |
* Filled pauses (um, uh, like, etc.) | |
* Silent pauses (duration if indicated) | |
* Disruption patterns and positions | |
* Impact on communicative effectiveness | |
QUANTITATIVE ANALYSIS & METRICS | |
Calculate the following evidence-based metrics with interpretations relevant to adolescent language development: | |
Productivity Measures | |
1. Total Output Measures | |
* Total number of words (TNW) | |
* Total number of utterances (TNU) | |
* Total number of different words (TDW) | |
* Total communication units (T-units/C-units) | |
2. Length Measures | |
* Mean length of utterance in words (MLU-w) | |
* Mean length of utterance in morphemes (MLU-m) | |
* Mean length of C-unit (MLCU) | |
* Words per minute (if timing available) | |
Complexity Measures | |
1. Syntactic Complexity | |
* Clausal density (clauses per C-unit) | |
* Subordination index (SI) | |
* Coordination index | |
* Embedding depth (max levels of embedding) | |
* T-unit complexity ratio | |
* Percentage of complex sentences | |
2. Phrase-Level Complexity | |
* Mean noun phrase length | |
* Mean verb phrase complexity | |
* Prepositional phrase frequency | |
* Adverbial complexity | |
Accuracy Measures | |
1. Error Analysis Summary | |
* Percentage of grammatically correct utterances | |
* Errors per C-unit | |
* Error pattern distribution (morphological, syntactic, lexical) | |
* Most frequent error types with specific examples | |
* Error location patterns: | |
* Distribution across utterance positions (initial, medial, final) | |
* Distribution across syntactic structures (simple vs. complex) | |
* Correlation with utterance length and complexity | |
* Patterns related to information density or processing demands | |
* Clinical significance of error locations: | |
* Interpretation of position-specific error patterns | |
* Analysis of syntactic contexts where errors predominate | |
* Relationship between error location and communicative impact | |
Lexical Diversity & Sophistication | |
1. Vocabulary Metrics | |
* Type-token ratio (TTR) | |
* Moving-average type-token ratio (MATTR) | |
* Number of different words (NDW) | |
* Vocabulary diversity (D) | |
* Lexical density (content words/total words) | |
2. Lexical Sophistication | |
* Low-frequency word usage | |
* Academic vocabulary presence | |
* Abstract word usage | |
* Word specificity analysis | |
Fluency & Formulation Measures | |
1. Disruption Analysis | |
* Percentage of mazes | |
* Total maze words/total words | |
* Revisions per utterance | |
* Hesitation frequency | |
* Incomplete utterance percentage | |
* Word-finding difficulties (frequency and patterns) | |
CASL-2 DOMAIN ALIGNMENT | |
Analyze the sample according to the Comprehensive Assessment of Spoken Language (CASL-2) framework, providing detailed evidence for each domain: | |
1. Lexical/Semantic Domain | |
* Vocabulary Range Assessment | |
* Basic vs. precise vocabulary usage | |
* Abstract vs. concrete terminology | |
* Academic language presence | |
* Subject-specific terminology | |
* Register-appropriate lexicon | |
* Word Relationships | |
* Synonym/antonym usage | |
* Categorical relationships | |
* Part-whole relationships | |
* Semantic networks | |
* Word Retrieval Patterns | |
* Word-finding hesitations | |
* Circumlocutions | |
* Semantic substitutions | |
* Retrieval strategies | |
* Evidence Summary | |
* Provide specific examples from transcript | |
* Compare to age-appropriate expectations | |
* Estimate standard score range (using clinical judgment) | |
* Indicate percentile rank range | |
* Assign performance level category | |
2. Syntactic Domain | |
* Sentence Structure Analysis | |
* Distribution of sentence types | |
* Complex syntax usage patterns | |
* Syntactic versatility | |
* Age-appropriate structures | |
* Morphosyntactic Elements | |
* Regular and irregular morphology | |
* Verb tense system mastery | |
* Complex verb forms (perfect, progressive) | |
* Advanced agreement patterns | |
* Syntactic Maturity Indicators | |
* Clause combining strategies | |
* Embedding types and frequency | |
* Noun phrase elaboration | |
* Adverbial complexity | |
* Evidence Summary | |
* Provide specific examples from transcript | |
* Compare to age-appropriate expectations | |
* Estimate standard score range | |
* Indicate percentile rank range | |
* Assign performance level category | |
3. Supralinguistic Domain | |
* Figurative Language | |
* Idiomatic expressions | |
* Metaphors and analogies | |
* Humor or wordplay | |
* Understanding of non-literal content | |
* Higher-Order Reasoning | |
* Inferential language | |
* Ambiguity recognition | |
* Abstract concept expression | |
* Perspective-taking indicators | |
* Metalinguistic Awareness | |
* Self-monitoring | |
* Linguistic reflection | |
* Awareness of language rules | |
* Metacognitive comments | |
* Evidence Summary | |
* Provide specific examples from transcript | |
* Note limitations of assessment from sample | |
* Estimate standard score range (if sufficient evidence) | |
* Indicate percentile rank range (if applicable) | |
* Assign performance level category (or note insufficient evidence) | |
4. Pragmatic Domain | |
* Discourse Management | |
* Topic initiation, maintenance, and change | |
* Turn-taking patterns | |
* Response contingency | |
* Conversational repair strategies | |
* Social Communication | |
* Perspective-taking | |
* Register variation | |
* Politeness conventions | |
* Social inferencing | |
* Narrative/Expository Skills (if applicable) | |
* Coherence and cohesion | |
* Organizational structure | |
* Use of cohesive devices | |
* Information density | |
* Evidence Summary | |
* Provide specific examples from transcript | |
* Note contextual limitations | |
* Estimate standard score range (if sufficient evidence) | |
* Indicate percentile rank range (if applicable) | |
* Assign performance level category (or note insufficient evidence) | |
DEVELOPMENTAL PROFILE ANALYSIS | |
Compare observed language features to established adolescent language development patterns: | |
1. Age-Based Comparison | |
* Alignment with typical syntactic development (14-18) | |
* Lexical development expectations | |
* Discourse maturity indicators | |
* Academic language benchmarks | |
2. Strength-Challenge Pattern Analysis | |
* Identify domains of relative strength with evidence | |
* Identify domains requiring support with evidence | |
* Note any asynchronous development patterns | |
* Document compensatory strategies observed | |
3. Developmental Trajectory Indicators | |
* Features suggesting typical development | |
* Features suggesting delayed development | |
* Features suggesting disordered development | |
* Features suggesting language difference vs. disorder | |
COMPREHENSIVE REPORTING FORMAT | |
1. Professional Clinical Summary (SLP-Oriented) | |
* Sample characteristics and analysis methodology | |
* Key quantitative findings table with age-based interpretation | |
* CASL-2 domain profiles with evidence-based rationales | |
* Error pattern analysis with clinical implications | |
* Identified strengths and challenges | |
* Differential considerations | |
* Recommendations for further assessment | |
* Potential treatment targets based on evidence | |
2. Family-Friendly Summary Report | |
* Introduction | |
* Purpose of language sample analysis | |
* Brief explanation of what was analyzed | |
* How this information helps understand communication | |
* Your Adolescent's Language Profile | |
* Overall communication strengths (with clear examples) | |
* Areas for continued growth (with supportive examples) | |
* How these patterns may impact academic and social communication | |
* Understanding the Assessment | |
* Simple explanations of key findings | |
* Comparison to typical adolescent language patterns | |
* Visual representation of language profile | |
* Accessible examples from the transcript | |
* Supporting Language Development | |
* Practical strategies aligned with findings | |
* Communication opportunities that leverage strengths | |
* Questions to discuss with the SLP | |
* Resources for family understanding | |
* Next Steps | |
* Connections to academic and social communication | |
* Relevance to current educational goals | |
* Partnership opportunities between home and therapy | |
3. Educational Implications (if requested) | |
* Connections to academic standards | |
* Impact on classroom participation | |
* Alignment with IEP goals (if applicable) | |
* Recommendations for classroom support | |
IMPLEMENTATION GUIDELINES | |
1. Analysis Integrity | |
* Analyze only what is directly observable in the transcript | |
* Clearly differentiate observations from interpretations | |
* Note when certain domains cannot be adequately assessed | |
* Document analysis limitations based on sample constraints | |
2. Clinical Reasoning | |
* Apply evidence-based standards for adolescent language | |
* Consider developmental appropriateness for ages 14-18 | |
* Document patterns rather than isolated instances | |
* Provide context for interpretations | |
3. Reporting Ethics | |
* Use person-first, strength-based language | |
* Avoid definitive diagnostic statements | |
* Focus on functional communication impact | |
* Maintain appropriate scope of analysis | |
4. Flexibility Adaptations | |
* For different discourse types (narrative, expository, conversational) | |
* For different cultural and linguistic backgrounds | |
* For various academic and social contexts | |
* For potential co-occurring conditions | |
This protocol produces a comprehensive linguistic analysis tailored to adolescents (14-18) that provides both clinically relevant information and family-accessible insights while maintaining the flexibility to adapt to various sample types and contexts. | |
""" | |
prompt = f""" | |
{role_context} | |
You are analyzing a transcript for a patient who is {age} years old and {gender}. | |
TRANSCRIPT: | |
{transcript} | |
{cheat_sheet} | |
{instructions} | |
Remember to be precise but compassionate in your analysis. Use direct quotes from the transcript for every factor and domain you analyze. | |
""" | |
# Call the appropriate API or fallback to demo mode | |
if bedrock_client: | |
response = call_bedrock(prompt) | |
else: | |
response = generate_demo_response(prompt) | |
# Parse the response | |
results = parse_casl_response(response) | |
# Create visualizations | |
plot_image = create_casl_plots(results['speech_factors'], results['casl_data']) | |
radar_image = create_casl_radar_chart(results['speech_factors']) | |
return results, plot_image, radar_image, response | |
def transcribe_audio(audio_path, patient_age=8): | |
"""Transcribe an audio recording using Amazon Transcribe and format in CHAT format""" | |
if not os.path.exists(audio_path): | |
logger.error(f"Audio file not found: {audio_path}") | |
return "Error: Audio file not found." | |
if not transcribe_client or not s3_client: | |
logger.warning("AWS clients not initialized, using demo transcription") | |
return generate_demo_transcription() | |
try: | |
# Get file info | |
file_name = os.path.basename(audio_path) | |
file_size = os.path.getsize(audio_path) | |
_, file_extension = os.path.splitext(file_name) | |
# Check file format | |
supported_formats = ['.mp3', '.mp4', '.wav', '.flac', '.ogg', '.amr', '.webm'] | |
if file_extension.lower() not in supported_formats: | |
logger.error(f"Unsupported audio format: {file_extension}") | |
return f"Error: Unsupported audio format. Please use one of: {', '.join(supported_formats)}" | |
# Generate a unique job name | |
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') | |
job_name = f"casl-transcription-{timestamp}" | |
s3_key = f"{S3_PREFIX}{job_name}{file_extension}" | |
# Upload to S3 | |
logger.info(f"Uploading {file_name} to S3 bucket {S3_BUCKET}") | |
try: | |
with open(audio_path, 'rb') as audio_file: | |
s3_client.upload_fileobj(audio_file, S3_BUCKET, s3_key) | |
except Exception as e: | |
logger.error(f"Failed to upload to S3: {str(e)}") | |
# If upload fails, try to create the bucket | |
try: | |
s3_client.create_bucket(Bucket=S3_BUCKET) | |
logger.info(f"Created S3 bucket: {S3_BUCKET}") | |
# Try upload again | |
with open(audio_path, 'rb') as audio_file: | |
s3_client.upload_fileobj(audio_file, S3_BUCKET, s3_key) | |
except Exception as bucket_error: | |
logger.error(f"Failed to create bucket and upload: {str(bucket_error)}") | |
return "Error: Failed to upload audio file. Please check your AWS permissions." | |
# Start transcription job | |
logger.info(f"Starting transcription job: {job_name}") | |
media_format = file_extension.lower()[1:] # Remove the dot | |
if media_format == 'webm': | |
media_format = 'webm' # Amazon Transcribe expects this | |
# Determine language settings based on patient age | |
if patient_age < 10: | |
# For younger children, enabling child language model is helpful | |
language_options = { | |
'LanguageCode': 'en-US', | |
'Settings': { | |
'ShowSpeakerLabels': True, | |
'MaxSpeakerLabels': 2 # Typically patient + clinician | |
} | |
} | |
else: | |
language_options = { | |
'LanguageCode': 'en-US', | |
'Settings': { | |
'ShowSpeakerLabels': True, | |
'MaxSpeakerLabels': 2 # Typically patient + clinician | |
} | |
} | |
transcribe_client.start_transcription_job( | |
TranscriptionJobName=job_name, | |
Media={ | |
'MediaFileUri': f"s3://{S3_BUCKET}/{s3_key}" | |
}, | |
MediaFormat=media_format, | |
**language_options | |
) | |
# Wait for the job to complete (with timeout) | |
logger.info("Waiting for transcription to complete...") | |
max_tries = 30 # 5 minutes max wait | |
tries = 0 | |
while tries < max_tries: | |
try: | |
job = transcribe_client.get_transcription_job(TranscriptionJobName=job_name) | |
status = job['TranscriptionJob']['TranscriptionJobStatus'] | |
if status == 'COMPLETED': | |
# Get the transcript | |
transcript_uri = job['TranscriptionJob']['Transcript']['TranscriptFileUri'] | |
# Download the transcript | |
import urllib.request | |
import json | |
with urllib.request.urlopen(transcript_uri) as response: | |
transcript_json = json.loads(response.read().decode('utf-8')) | |
# Convert to CHAT format | |
chat_transcript = format_as_chat(transcript_json) | |
return chat_transcript | |
elif status == 'FAILED': | |
reason = job['TranscriptionJob'].get('FailureReason', 'Unknown failure') | |
logger.error(f"Transcription job failed: {reason}") | |
return f"Error: Transcription failed - {reason}" | |
# Still in progress, wait and try again | |
tries += 1 | |
time.sleep(10) # Check every 10 seconds | |
except Exception as e: | |
logger.error(f"Error checking transcription job: {str(e)}") | |
return f"Error getting transcription: {str(e)}" | |
# If we got here, we timed out | |
return "Error: Transcription timed out. The process is taking longer than expected." | |
except Exception as e: | |
logger.exception("Error in audio transcription") | |
return f"Error transcribing audio: {str(e)}" | |
def format_as_chat(transcript_json): | |
"""Format the Amazon Transcribe JSON result as CHAT format""" | |
try: | |
# Get transcript items | |
items = transcript_json['results']['items'] | |
# Get speaker labels if available | |
speakers = {} | |
if 'speaker_labels' in transcript_json['results']: | |
speaker_segments = transcript_json['results']['speaker_labels']['segments'] | |
# Map each item to its speaker | |
for segment in speaker_segments: | |
for item in segment['items']: | |
start_time = item['start_time'] | |
speakers[start_time] = segment['speaker_label'] | |
# Build transcript by combining words into utterances by speaker | |
current_speaker = None | |
current_utterance = [] | |
utterances = [] | |
for item in items: | |
# Skip non-pronunciation items (like punctuation) | |
if item['type'] != 'pronunciation': | |
continue | |
word = item['alternatives'][0]['content'] | |
start_time = item.get('start_time') | |
# Determine speaker if available | |
speaker = speakers.get(start_time, 'spk_0') | |
# If speaker changed, start a new utterance | |
if speaker != current_speaker and current_utterance: | |
utterances.append((current_speaker, ' '.join(current_utterance))) | |
current_utterance = [] | |
current_speaker = speaker | |
current_utterance.append(word) | |
# Add the last utterance | |
if current_utterance: | |
utterances.append((current_speaker, ' '.join(current_utterance))) | |
# Format as CHAT | |
chat_lines = [] | |
for speaker, text in utterances: | |
# Map speakers to CHAT format | |
# Assuming spk_0 is the patient (PAR) and spk_1 is the clinician (INV) | |
chat_speaker = "*PAR:" if speaker == "spk_0" else "*INV:" | |
chat_lines.append(f"{chat_speaker} {text}.") | |
return '\n'.join(chat_lines) | |
except Exception as e: | |
logger.exception("Error formatting transcript") | |
return "*PAR: (Error formatting transcript)" | |
def answer_slp_question(question): | |
"""Answer a question about SLP practice or CASL assessment""" | |
prompt = f""" | |
You are an experienced Speech-Language Pathologist answering a question from a colleague. | |
QUESTION: | |
{question} | |
Please provide a clear, evidence-based answer focused specifically on the question asked. | |
Reference best practices and current research where appropriate. | |
Keep your answer concise but comprehensive. | |
""" | |
if bedrock_client: | |
answer = call_bedrock(prompt) | |
else: | |
answer = generate_demo_qa_response(question) | |
return answer | |
# =============================== | |
# Gradio Interface | |
# =============================== | |
def create_interface(): | |
"""Create the main Gradio interface""" | |
# Use a simple theme with default colors | |
custom_theme = gr.themes.Soft( | |
font=[gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"] | |
) | |
with gr.Blocks(theme=custom_theme, css=""" | |
.header { | |
text-align: center; | |
margin-bottom: 20px; | |
} | |
.header img { | |
max-height: 100px; | |
margin-bottom: 10px; | |
} | |
.container { | |
border-radius: 10px; | |
padding: 10px; | |
margin-bottom: 20px; | |
} | |
.patient-info { | |
background-color: #e3f2fd; | |
} | |
.speech-sample { | |
background-color: #f0f8ff; | |
} | |
.results-container { | |
background-color: #f9f9f9; | |
} | |
.viz-container { | |
display: flex; | |
justify-content: center; | |
margin-bottom: 20px; | |
} | |
.footer { | |
text-align: center; | |
margin-top: 30px; | |
padding: 10px; | |
font-size: 0.8em; | |
color: #78909C; | |
} | |
.info-box { | |
background-color: #e8f5e9; | |
border-left: 4px solid #4CAF50; | |
padding: 10px 15px; | |
margin-bottom: 15px; | |
border-radius: 4px; | |
} | |
.warning-box { | |
background-color: #fff8e1; | |
border-left: 4px solid #FFC107; | |
padding: 10px 15px; | |
border-radius: 4px; | |
} | |
.markdown-text h3 { | |
color: #2C7FB8; | |
border-bottom: 1px solid #eaeaea; | |
padding-bottom: 5px; | |
} | |
.evidence-table { | |
border-collapse: collapse; | |
width: 100%; | |
} | |
.evidence-table th, .evidence-table td { | |
border: 1px solid #ddd; | |
padding: 8px; | |
text-align: left; | |
} | |
.evidence-table th { | |
background-color: #f5f7fa; | |
color: #333; | |
} | |
.evidence-table tr:nth-child(even) { | |
background-color: #f9f9f9; | |
} | |
.tab-content { | |
padding: 15px; | |
background-color: white; | |
border-radius: 0 0 8px 8px; | |
box-shadow: 0 2px 5px rgba(0,0,0,0.05); | |
} | |
""") as app: | |
# Create header with logo | |
gr.HTML( | |
""" | |
<div class="header"> | |
<h1>SLP Analysis Tool</h1> | |
<p>A comprehensive assessment tool for Speech-Language Pathologists</p> | |
</div> | |
""" | |
) | |
# Main tabs | |
with gr.Tabs() as main_tabs: | |
# =============================== | |
# CASL Analysis Tab | |
# =============================== | |
with gr.TabItem("CASL Analysis", id=0): | |
with gr.Row(): | |
# Left column - Input section | |
with gr.Column(scale=1): | |
# Patient information panel | |
with gr.Group(elem_classes="container patient-info"): | |
gr.Markdown("### Patient Information") | |
with gr.Row(): | |
patient_name = gr.Textbox(label="Patient Name", placeholder="Enter patient name") | |
record_id = gr.Textbox(label="Record ID", placeholder="Enter record ID") | |
with gr.Row(): | |
age = gr.Number(label="Age", value=8, minimum=1, maximum=120) | |
gender = gr.Radio(["male", "female", "other"], label="Gender", value="male") | |
with gr.Row(): | |
assessment_date = gr.Textbox( | |
label="Assessment Date", | |
placeholder="MM/DD/YYYY", | |
value=datetime.now().strftime('%m/%d/%Y') | |
) | |
clinician_name = gr.Textbox( | |
label="Clinician", | |
placeholder="Enter clinician name" | |
) | |
# Speech sample panel | |
with gr.Group(elem_classes="container speech-sample"): | |
gr.Markdown("### Speech Sample") | |
# Sample button | |
sample_btn = gr.Button("Load Sample Transcript", size="sm") | |
# Transcript input | |
transcript = gr.Textbox( | |
label="Transcript", | |
placeholder="Paste the speech transcript here...", | |
lines=10 | |
) | |
# Add info about transcript format | |
gr.Markdown( | |
""" | |
<div class="info-box"> | |
<strong>Transcript Format:</strong> Use CHAT format with *PAR: for patient lines. | |
Mark word-finding with &-um, paraphasias with [*], and provide intended words with [: word]. | |
</div> | |
""", | |
elem_classes="markdown-text" | |
) | |
# File upload | |
file_upload = gr.File( | |
label="Or upload a transcript file", | |
file_types=["text", "txt", "pdf", "rtf"] | |
) | |
# Analysis button | |
analyze_btn = gr.Button("Analyze Speech Sample", variant="primary", size="lg") | |
# Right column - Results section | |
with gr.Column(scale=1): | |
with gr.Group(elem_classes="container results-container"): | |
with gr.Tabs() as results_tabs: | |
# Summary tab | |
with gr.TabItem("Summary", id=0, elem_classes="tab-content"): | |
with gr.Group(): | |
gr.Markdown("### Key Findings", elem_classes="markdown-text") | |
speech_factors_md = gr.Markdown(elem_classes="markdown-text") | |
with gr.Accordion("CASL Assessment Results", open=True): | |
casl_results_md = gr.Markdown(elem_classes="markdown-text") | |
with gr.Accordion("Detailed Error Examples", open=True): | |
specific_errors_md = gr.Markdown(elem_classes="markdown-text") | |
# Treatment tab | |
with gr.TabItem("Treatment Plan", id=1, elem_classes="tab-content"): | |
gr.Markdown("### Recommended Treatment Approaches", elem_classes="markdown-text") | |
treatment_md = gr.Markdown(elem_classes="treatment-panel") | |
gr.Markdown("### Clinical Rationale", elem_classes="markdown-text") | |
explanation_md = gr.Markdown(elem_classes="panel") | |
with gr.Accordion("Supporting Evidence", open=False): | |
gr.Markdown(""" | |
<table class="evidence-table"> | |
<tr> | |
<th>Factor</th> | |
<th>Evidence-based Approaches</th> | |
<th>References</th> | |
</tr> | |
<tr> | |
<td>Word Retrieval</td> | |
<td>Semantic feature analysis, phonological cueing, word generation tasks</td> | |
<td>Boyle, 2010; Kiran & Thompson, 2003</td> | |
</tr> | |
<tr> | |
<td>Grammatical Errors</td> | |
<td>Treatment of Underlying Forms (TUF), Morphosyntactic therapy</td> | |
<td>Thompson et al., 2003; Ebbels, 2014</td> | |
</tr> | |
<tr> | |
<td>Fluency/Prosody</td> | |
<td>Rate control, rhythmic cueing, contrastive stress exercises</td> | |
<td>Ballard et al., 2010; Tamplin & Baker, 2017</td> | |
</tr> | |
</table> | |
""", elem_classes="markdown-text") | |
# Full report tab | |
with gr.TabItem("Full Report", id=2, elem_classes="tab-content"): | |
full_analysis = gr.Markdown() | |
# Add PDF export option | |
export_btn = gr.Button("Export Report as PDF", variant="secondary") | |
export_status = gr.Markdown("") | |
# Raw LLM Output tab | |
with gr.TabItem("Raw LLM Output", id=3, elem_classes="tab-content"): | |
gr.Markdown("### Complete Model Output", elem_classes="markdown-text") | |
gr.Markdown("This tab shows the unprocessed output from the AI model for debugging purposes.") | |
raw_llm_output = gr.Textbox( | |
label="Raw AI Output", | |
lines=20, | |
interactive=False | |
) | |
# =============================== | |
# Patient Records Tab | |
# =============================== | |
with gr.TabItem("Patient Records", id=1): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Patient Records") | |
# Records table with status column | |
patient_records_table = gr.Dataframe( | |
headers=["ID", "Name", "Record ID", "Age", "Gender", "Assessment Date", "Clinician", "Status"], | |
datatype=["str", "str", "str", "str", "str", "str", "str", "str"], | |
label="Saved Patients", | |
interactive=False | |
) | |
with gr.Row(): | |
refresh_records_btn = gr.Button("Refresh Records", size="sm") | |
delete_record_btn = gr.Button("Delete Selected Record", size="sm", variant="secondary") | |
records_status = gr.Markdown("") | |
# Record selection | |
selected_record_id = gr.Textbox(label="Selected Record ID", visible=False) | |
with gr.Row(): | |
load_record_btn = gr.Button("Load for Analysis", variant="primary") | |
with gr.Column(scale=1): | |
# Record details | |
record_details = gr.Markdown(label="Record Details") | |
with gr.Accordion("Record Actions", open=False): | |
export_record_btn = gr.Button("Export Record as PDF", variant="secondary") | |
# Event handlers for records | |
def refresh_patient_records(): | |
"""Refresh the patient records table""" | |
records = get_all_patient_records() | |
data = [] | |
for r in records: | |
data.append([ | |
r["id"], r["name"], r["record_id"], | |
r["age"], r["gender"], r["assessment_date"], r["clinician"] | |
]) | |
df = pd.DataFrame(data) | |
status_msg = f"Found {len(data)} patient records." | |
return df, status_msg | |
refresh_records_btn.click( | |
refresh_patient_records, | |
outputs=[patient_records_table, records_status] | |
) | |
# Note: The automatic tab selection event was removed because it's not supported in newer Gradio versions | |
# Instead, we'll rely on the refresh button that's already in place | |
# Load record when a row is selected | |
def handle_record_selection(evt: gr.SelectData, records): | |
if records is None or len(records) == 0: | |
return "", "No record selected." | |
selected_row = evt.index[0] | |
if selected_row < len(records): | |
record_id = records.iloc[selected_row, 0] | |
# Load the record to show details | |
record_data = load_patient_record(record_id) | |
if record_data: | |
patient_info = record_data.get("patient_info", {}) | |
# Format record details as markdown | |
details = f""" | |
## Selected Patient Record | |
**Name:** {patient_info.get('name', 'N/A')} | |
**Record ID:** {patient_info.get('record_id', 'N/A')} | |
**Age:** {patient_info.get('age', 'N/A')} | |
**Gender:** {patient_info.get('gender', 'N/A')} | |
**Assessment Date:** {patient_info.get('assessment_date', 'N/A')} | |
**Clinician:** {patient_info.get('clinician', 'N/A')} | |
**Analyzed:** {record_data.get('timestamp', 'Unknown')} | |
### Preview | |
This record contains: | |
- Speech transcript analysis | |
- CASL assessment results | |
- Treatment recommendations | |
Click "Load Selected Record" to view the full analysis. | |
""" | |
return record_id, details | |
return record_id, f"Selected record: {record_id}" | |
return "", "Invalid selection." | |
patient_records_table.select( | |
handle_record_selection, | |
inputs=[patient_records_table], | |
outputs=[selected_record_id, record_details] | |
) | |
# Load record into analysis tab | |
def load_patient_record_to_analysis(record_id): | |
if not record_id: | |
return gr.update(selected=1), "", "", "", "male", "", "", "", "" | |
record_data = load_patient_record(record_id) | |
if not record_data: | |
return gr.update(selected=1), "", "", "", "male", "", "", "", "" | |
# Extract data | |
patient_info = record_data.get("patient_info", {}) | |
transcript_text = record_data.get("transcript", "") | |
analysis_results = record_data.get("analysis_results", {}) | |
# Create status message for the record loading | |
status_msg = f"✅ Record loaded successfully: {patient_info.get('name', 'Unknown')} ({record_id})" | |
# Now we should also load the analysis results | |
# In a future version, we would need to update all analysis outputs here as well | |
return ( | |
gr.update(selected=0), # Switch to analysis tab | |
patient_info.get("name", ""), | |
patient_info.get("record_id", ""), | |
patient_info.get("age", ""), | |
patient_info.get("gender", "male"), | |
patient_info.get("assessment_date", ""), | |
patient_info.get("clinician", ""), | |
transcript_text, | |
status_msg | |
) | |
load_record_btn.click( | |
load_patient_record_to_analysis, | |
inputs=[selected_record_id], | |
outputs=[ | |
main_tabs, | |
patient_name, record_id, age, gender, | |
assessment_date, clinician_name, transcript, | |
records_status | |
] | |
) | |
# =============================== | |
# Transcription Tool Tab | |
# =============================== | |
with gr.TabItem("Transcription Tool", id=2): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Audio Transcription Tool") | |
gr.Markdown("Upload an audio recording to automatically transcribe it in CHAT format.") | |
audio_input = gr.Audio(type="filepath", label="Upload Audio Recording") | |
with gr.Row(): | |
transcription_age = gr.Number(label="Patient Age", value=8, minimum=1, maximum=120) | |
transcribe_btn = gr.Button("Transcribe Audio", variant="primary") | |
with gr.Column(scale=1): | |
transcription_output = gr.Textbox( | |
label="Transcription Result", | |
placeholder="Transcription will appear here...", | |
lines=12 | |
) | |
with gr.Row(): | |
copy_to_analysis_btn = gr.Button("Use for Analysis", variant="secondary") | |
edit_transcription_btn = gr.Button("Edit Transcription", variant="secondary") | |
# =============================== | |
# SLP Assistant Tab | |
# =============================== | |
with gr.TabItem("SLP Assistant", id=3): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### SLP Knowledge Assistant") | |
gr.Markdown("Ask questions about CASL assessment, therapy techniques, or SLP best practices.") | |
question_input = gr.Textbox( | |
label="Your Question", | |
placeholder="e.g., What activities help improve word-finding skills?", | |
lines=3 | |
) | |
ask_question_btn = gr.Button("Ask Question", variant="primary") | |
# Quick question buttons | |
gr.Markdown("#### Common Questions") | |
with gr.Row(): | |
q1_btn = gr.Button("What is CASL?") | |
q2_btn = gr.Button("How do I interpret scores?") | |
with gr.Row(): | |
q3_btn = gr.Button("Activities for word finding") | |
q4_btn = gr.Button("When to reassess") | |
with gr.Column(scale=1): | |
answer_output = gr.Markdown() | |
with gr.Accordion("References", open=False): | |
gr.Markdown(""" | |
- American Speech-Language-Hearing Association (ASHA) | |
- Comprehensive Assessment of Spoken Language (CASL-2) Manual | |
- Evidence-Based Practice in Speech-Language Pathology | |
- Current research in pediatric language intervention | |
""") | |
# =============================== | |
# Event Handlers | |
# =============================== | |
# Load sample transcript button | |
def load_sample(): | |
return SAMPLE_TRANSCRIPT | |
sample_btn.click(load_sample, outputs=[transcript]) | |
# File upload handler | |
file_upload.upload(process_upload, file_upload, transcript) | |
# Analysis button handler | |
def on_analyze_click(transcript_text, age_val, gender_val, patient_name_val, record_id_val, clinician_val, assessment_date_val): | |
if not transcript_text or len(transcript_text.strip()) < 50: | |
return ( | |
"Error: Please provide a longer transcript for analysis.", | |
"The transcript is too short for meaningful analysis.", | |
"Please provide a speech sample with at least 50 characters.", | |
"Error: Insufficient data", | |
"Please provide a speech sample with at least 50 characters.", | |
"", | |
"", | |
"" | |
) | |
try: | |
# Get the raw analysis response | |
results, _, _, full_text = analyze_transcript(transcript_text, age_val, gender_val) | |
# Extract speech factors section using section markers | |
speech_factors_section = "" | |
factors_pattern = re.compile(r"<SPEECH_FACTORS_START>(.*?)<SPEECH_FACTORS_END>", re.DOTALL) | |
factors_match = factors_pattern.search(full_text) | |
if factors_match: | |
speech_factors_section = factors_match.group(1).strip() | |
else: | |
# Fallback to old pattern if markers aren't found | |
old_factors_pattern = re.compile(r"(Difficulty producing fluent.*?)(?:Evaluation of CASL Skills|<CASL_SKILLS_START>)", re.DOTALL) | |
old_factors_match = old_factors_pattern.search(full_text) | |
if old_factors_match: | |
speech_factors_section = old_factors_match.group(1).strip() | |
else: | |
speech_factors_section = "Error extracting speech factors from analysis." | |
# Extract CASL skills section | |
casl_section = "" | |
casl_pattern = re.compile(r"<CASL_SKILLS_START>(.*?)<CASL_SKILLS_END>", re.DOTALL) | |
casl_match = casl_pattern.search(full_text) | |
if casl_match: | |
casl_section = casl_match.group(1).strip() | |
else: | |
# Fallback pattern | |
old_casl_pattern = re.compile(r"(?:Evaluation of CASL Skills:|Lexical/Semantic Skills:)(.*?)(?:Other analysis/Best plans of action:|<TREATMENT_RECOMMENDATIONS_START>)", re.DOTALL) | |
old_casl_match = old_casl_pattern.search(full_text) | |
if old_casl_match: | |
casl_section = old_casl_match.group(1).strip() | |
# Add a header if it's missing | |
if not casl_section.startswith("Lexical"): | |
casl_section = "Evaluation of CASL Skills:\n\n" + casl_section | |
else: | |
casl_section = "Error extracting CASL skills from analysis." | |
# Extract treatment recommendations | |
treatment_text = "" | |
treatment_pattern = re.compile(r"<TREATMENT_RECOMMENDATIONS_START>(.*?)<TREATMENT_RECOMMENDATIONS_END>", re.DOTALL) | |
treatment_match = treatment_pattern.search(full_text) | |
if treatment_match: | |
treatment_text = "### Treatment Recommendations\n\n" + treatment_match.group(1).strip() | |
else: | |
# Fallback pattern | |
old_treatment_pattern = re.compile(r"(?:Other analysis/Best plans of action:)(.*?)(?:Explanation:|<EXPLANATION_START>)", re.DOTALL) | |
old_treatment_match = old_treatment_pattern.search(full_text) | |
if old_treatment_match: | |
treatment_text = "### Treatment Recommendations\n\n" + old_treatment_match.group(1).strip() | |
elif 'treatment_suggestions' in results: | |
treatment_text = "### Treatment Recommendations\n\n" | |
for suggestion in results['treatment_suggestions']: | |
treatment_text += f"- {suggestion}\n" | |
# Extract explanation section | |
explanation_text = "### Clinical Rationale\n\n" | |
explanation_pattern = re.compile(r"<EXPLANATION_START>(.*?)<EXPLANATION_END>", re.DOTALL) | |
explanation_match = explanation_pattern.search(full_text) | |
if explanation_match: | |
explanation_text += explanation_match.group(1).strip() | |
else: | |
# Fallback pattern | |
old_explanation_pattern = re.compile(r"(?:Explanation:)(.*?)(?:Additional Analysis:|<ADDITIONAL_ANALYSIS_START>)", re.DOTALL) | |
old_explanation_match = old_explanation_pattern.search(full_text) | |
if old_explanation_match: | |
explanation_text += old_explanation_match.group(1).strip() | |
else: | |
explanation_text += results.get('explanation', "No explanation provided.") | |
# Extract additional analysis | |
additional_analysis = "" | |
additional_pattern = re.compile(r"<ADDITIONAL_ANALYSIS_START>(.*?)<ADDITIONAL_ANALYSIS_END>", re.DOTALL) | |
additional_match = additional_pattern.search(full_text) | |
if additional_match: | |
additional_analysis = additional_match.group(1).strip() | |
explanation_text += "\n\n### Additional Analysis\n\n" + additional_analysis | |
else: | |
# Fallback pattern | |
old_additional_pattern = re.compile(r"(?:Additional Analysis:)(.*?)(?:Diagnostic Impressions:|<DIAGNOSTIC_IMPRESSIONS_START>)", re.DOTALL) | |
old_additional_match = old_additional_pattern.search(full_text) | |
if old_additional_match: | |
explanation_text += "\n\n### Additional Analysis\n\n" + old_additional_match.group(1).strip() | |
elif 'additional_analysis' in results: | |
explanation_text += "\n\n### Additional Analysis\n\n" + results.get('additional_analysis', "") | |
# Extract diagnostic impressions | |
diagnostic_impressions = "" | |
diagnostic_pattern = re.compile(r"<DIAGNOSTIC_IMPRESSIONS_START>(.*?)<DIAGNOSTIC_IMPRESSIONS_END>", re.DOTALL) | |
diagnostic_match = diagnostic_pattern.search(full_text) | |
if diagnostic_match: | |
diagnostic_impressions = diagnostic_match.group(1).strip() | |
# Add to the explanation section | |
explanation_text += "\n\n### Diagnostic Impressions\n\n" + diagnostic_impressions | |
# Extract specific error examples | |
specific_errors_text = "## Detailed Error Examples\n\n" | |
# First try the dedicated section | |
errors_pattern = re.compile(r"<ERROR_EXAMPLES_START>(.*?)<ERROR_EXAMPLES_END>", re.DOTALL) | |
errors_match = errors_pattern.search(full_text) | |
if errors_match: | |
specific_errors_text += errors_match.group(1).strip() | |
else: | |
# Fallback to extracting examples from the text | |
example_sections = re.findall(r"Examples:\s*\n((?:- \".*\"\s*\n)+)", full_text) | |
for section in example_sections: | |
specific_errors_text += section + "\n" | |
if not example_sections: | |
specific_errors_text += "No specific error examples were found in the analysis." | |
# Save the record to storage | |
patient_info = { | |
"name": patient_name_val, | |
"record_id": record_id_val, | |
"age": age_val, | |
"gender": gender_val, | |
"assessment_date": assessment_date_val, | |
"clinician": clinician_val | |
} | |
saved_id = save_patient_record(patient_info, results, transcript_text) | |
save_message = "" | |
if saved_id: | |
save_message = f""" | |
✅ Patient record saved successfully. | |
**System ID:** {saved_id} | |
**Patient:** {patient_name_val or "Unnamed"} | |
**Record ID:** {record_id_val or "Not provided"} | |
You can access this record later in the Patient Records tab. | |
""" | |
else: | |
save_message = "⚠️ Failed to save patient record. Please check data directory permissions." | |
# Format to include patient metadata in the full report | |
patient_info_text = "" | |
if patient_name_val: | |
patient_info_text += f"**Patient:** {patient_name_val}\n" | |
if record_id_val: | |
patient_info_text += f"**Record ID:** {record_id_val}\n" | |
if age_val: | |
patient_info_text += f"**Age:** {age_val} years\n" | |
if gender_val: | |
patient_info_text += f"**Gender:** {gender_val}\n" | |
if assessment_date_val: | |
patient_info_text += f"**Assessment Date:** {assessment_date_val}\n" | |
if clinician_val: | |
patient_info_text += f"**Clinician:** {clinician_val}\n" | |
if saved_id: | |
patient_info_text += f"**System ID:** {saved_id}\n" | |
if patient_info_text: | |
full_report = f"## Patient Information\n\n{patient_info_text}\n\n## Analysis Report\n\n{full_text}" | |
else: | |
full_report = f"## Complete Analysis Report\n\n{full_text}" | |
# Get the raw LLM response for debugging | |
raw_output = full_text | |
return ( | |
speech_factors_section, | |
casl_section, | |
treatment_text, | |
explanation_text, | |
full_report, | |
save_message, | |
specific_errors_text, | |
raw_output | |
) | |
except Exception as e: | |
logger.exception("Error during analysis") | |
error_message = f"Error during analysis: {str(e)}" | |
return ( | |
f"Error: {str(e)}", | |
"Error: Analysis failed. Please check input data.", | |
"Error: Treatment analysis not available.", | |
"An error occurred while processing the transcript.", | |
f"Error details: {str(e)}", | |
"", | |
"", | |
f"Analysis failed with error: {error_message}\n\nPlease check your transcript format and try again." | |
) | |
analyze_btn.click( | |
on_analyze_click, | |
inputs=[ | |
transcript, age, gender, | |
patient_name, record_id, clinician_name, assessment_date | |
], | |
outputs=[ | |
speech_factors_md, | |
casl_results_md, | |
treatment_md, | |
explanation_md, | |
full_analysis, | |
export_status, | |
specific_errors_md, | |
raw_llm_output | |
] | |
) | |
# Improved PDF export functionality | |
def export_pdf(report_text, patient_name="Patient", record_id="", age="", gender="", assessment_date="", clinician=""): | |
# Check if ReportLab is available | |
if not REPORTLAB_AVAILABLE: | |
return "Error: ReportLab library is not installed. Please install it with 'pip install reportlab'." | |
try: | |
# Create a proper downloads directory in the app folder | |
downloads_dir = os.path.join(DATA_DIR, "downloads") | |
os.makedirs(downloads_dir, exist_ok=True) | |
# Generate a safe filename | |
if patient_name and record_id: | |
safe_name = f"{patient_name.replace(' ', '_')}_{record_id}" | |
elif patient_name: | |
safe_name = patient_name.replace(' ', '_') | |
else: | |
safe_name = f"speech_analysis_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
# Create the PDF path in our downloads directory | |
pdf_path = os.path.join(downloads_dir, f"{safe_name}.pdf") | |
# Create the PDF document | |
doc = SimpleDocTemplate(pdf_path, pagesize=letter) | |
styles = getSampleStyleSheet() | |
# Create enhanced custom styles | |
styles.add(ParagraphStyle( | |
name='Heading1', | |
parent=styles['Heading1'], | |
fontSize=16, | |
spaceAfter=12, | |
textColor=colors.navy | |
)) | |
styles.add(ParagraphStyle( | |
name='Heading2', | |
parent=styles['Heading2'], | |
fontSize=14, | |
spaceAfter=10, | |
spaceBefore=10, | |
textColor=colors.darkblue | |
)) | |
styles.add(ParagraphStyle( | |
name='Heading3', | |
parent=styles['Heading2'], | |
fontSize=12, | |
spaceAfter=8, | |
spaceBefore=8, | |
textColor=colors.darkblue | |
)) | |
styles.add(ParagraphStyle( | |
name='BodyText', | |
parent=styles['BodyText'], | |
fontSize=11, | |
spaceAfter=8, | |
leading=14 | |
)) | |
styles.add(ParagraphStyle( | |
name='BulletPoint', | |
parent=styles['BodyText'], | |
fontSize=11, | |
leftIndent=20, | |
firstLineIndent=-15, | |
spaceAfter=4, | |
leading=14 | |
)) | |
# Convert markdown to PDF elements | |
story = [] | |
# Add title and date | |
story.append(Paragraph("Speech Language Assessment Report", styles['Title'])) | |
story.append(Spacer(1, 12)) | |
# Add patient information table | |
if patient_name or record_id or age or gender: | |
# Prepare patient info data | |
data = [] | |
if patient_name: | |
data.append(["Patient Name:", patient_name]) | |
if record_id: | |
data.append(["Record ID:", record_id]) | |
if age: | |
data.append(["Age:", f"{age} years"]) | |
if gender: | |
data.append(["Gender:", gender]) | |
if assessment_date: | |
data.append(["Assessment Date:", assessment_date]) | |
if clinician: | |
data.append(["Clinician:", clinician]) | |
if data: | |
# Create a table with the data | |
patient_table = Table(data, colWidths=[120, 350]) | |
patient_table.setStyle(TableStyle([ | |
('BACKGROUND', (0, 0), (0, -1), colors.lightgrey), | |
('TEXTCOLOR', (0, 0), (0, -1), colors.darkblue), | |
('ALIGN', (0, 0), (0, -1), 'RIGHT'), | |
('ALIGN', (1, 0), (1, -1), 'LEFT'), | |
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'), | |
('BOTTOMPADDING', (0, 0), (-1, -1), 6), | |
('TOPPADDING', (0, 0), (-1, -1), 6), | |
('GRID', (0, 0), (-1, -1), 0.5, colors.lightgrey), | |
])) | |
story.append(patient_table) | |
story.append(Spacer(1, 12)) | |
# Process the markdown content | |
in_bullet_list = False | |
current_list_items = [] | |
for line in report_text.split('\n'): | |
line = line.strip() | |
# Skip empty lines | |
if not line: | |
if in_bullet_list: | |
# End the current list | |
in_bullet_list = False | |
for item in current_list_items: | |
story.append(Paragraph(f"• {item}", styles['BulletPoint'])) | |
current_list_items = [] | |
story.append(Spacer(1, 6)) | |
else: | |
story.append(Spacer(1, 6)) | |
continue | |
# Check for headings | |
if line.startswith('# '): | |
if in_bullet_list: | |
# End the current list before starting a new section | |
in_bullet_list = False | |
for item in current_list_items: | |
story.append(Paragraph(f"• {item}", styles['BulletPoint'])) | |
current_list_items = [] | |
story.append(Paragraph(line[2:], styles['Heading1'])) | |
elif line.startswith('## '): | |
if in_bullet_list: | |
# End the current list before starting a new section | |
in_bullet_list = False | |
for item in current_list_items: | |
story.append(Paragraph(f"• {item}", styles['BulletPoint'])) | |
current_list_items = [] | |
story.append(Paragraph(line[3:], styles['Heading2'])) | |
elif line.startswith('### '): | |
if in_bullet_list: | |
# End the current list before starting a new section | |
in_bullet_list = False | |
for item in current_list_items: | |
story.append(Paragraph(f"• {item}", styles['BulletPoint'])) | |
current_list_items = [] | |
story.append(Paragraph(line[4:], styles['Heading3'])) | |
elif line.startswith('- '): | |
# Bullet points - collect them to process as a list | |
in_bullet_list = True | |
current_list_items.append(line[2:]) | |
elif line.startswith('**') and line.endswith('**'): | |
# Bold text - assuming it's a short line like a heading | |
if in_bullet_list: | |
# End the current list before adding this element | |
in_bullet_list = False | |
for item in current_list_items: | |
story.append(Paragraph(f"• {item}", styles['BulletPoint'])) | |
current_list_items = [] | |
text = line.replace('**', '') | |
story.append(Paragraph(f"<b>{text}</b>", styles['BodyText'])) | |
else: | |
# Regular text | |
if in_bullet_list: | |
# End the current list before adding regular text | |
in_bullet_list = False | |
for item in current_list_items: | |
story.append(Paragraph(f"• {item}", styles['BulletPoint'])) | |
current_list_items = [] | |
# Handle lines with bold text within them | |
formatted_line = line | |
bold_pattern = re.compile(r'\*\*(.*?)\*\*') | |
for match in bold_pattern.finditer(line): | |
bold_text = match.group(1) | |
formatted_line = formatted_line.replace(f"**{bold_text}**", f"<b>{bold_text}</b>") | |
story.append(Paragraph(formatted_line, styles['BodyText'])) | |
# If there are any remaining list items, add them now | |
if in_bullet_list: | |
for item in current_list_items: | |
story.append(Paragraph(f"• {item}", styles['BulletPoint'])) | |
# Add footer with date | |
footer_text = f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
story.append(Spacer(1, 20)) | |
story.append(Paragraph(footer_text, ParagraphStyle( | |
name='Footer', | |
parent=styles['Normal'], | |
fontSize=8, | |
textColor=colors.grey | |
))) | |
# Build the PDF | |
doc.build(story) | |
# Create a copy in the temp directory to make it accessible in web environments | |
temp_dir = tempfile.gettempdir() | |
temp_pdf_path = os.path.join(temp_dir, f"{safe_name}.pdf") | |
shutil.copy2(pdf_path, temp_pdf_path) | |
return f"Report saved as PDF: {pdf_path}<br>Temporary copy: {temp_pdf_path}" | |
except Exception as e: | |
logger.exception("Error creating PDF") | |
return f"Error creating PDF: {str(e)}" | |
# Use the full PDF export function regardless of environment | |
export_btn.click( | |
export_pdf, | |
inputs=[ | |
full_analysis, | |
patient_name, | |
record_id, | |
age, | |
gender, | |
assessment_date, | |
clinician_name | |
], | |
outputs=[export_status] | |
) | |
# Transcription button | |
def on_transcribe_audio(audio_path, age): | |
try: | |
if not audio_path: | |
return "Please upload an audio file to transcribe." | |
transcription = transcribe_audio(audio_path, age) | |
return transcription | |
except Exception as e: | |
logger.exception("Error transcribing audio") | |
return f"Error transcribing audio: {str(e)}" | |
transcribe_btn.click( | |
on_transcribe_audio, | |
inputs=[audio_input, transcription_age], | |
outputs=[transcription_output] | |
) | |
# Copy transcription to analysis | |
def copy_to_analysis(transcription): | |
return transcription, gr.update(selected=0) # Switches to the Analysis tab | |
copy_to_analysis_btn.click( | |
copy_to_analysis, | |
inputs=[transcription_output], | |
outputs=[transcript, main_tabs] | |
) | |
# SLP Assistant question handling | |
def on_ask_question(question): | |
try: | |
answer = answer_slp_question(question) | |
return answer | |
except Exception as e: | |
logger.exception("Error getting answer") | |
return f"Error: {str(e)}" | |
ask_question_btn.click( | |
on_ask_question, | |
inputs=[question_input], | |
outputs=[answer_output] | |
) | |
# Quick question buttons | |
q1_btn.click(lambda: "What is CASL?", outputs=[question_input]) | |
q2_btn.click(lambda: "How do I interpret CASL scores?", outputs=[question_input]) | |
q3_btn.click(lambda: "What activities help with word finding difficulties?", outputs=[question_input]) | |
q4_btn.click(lambda: "When should I reassess a patient?", outputs=[question_input]) | |
return app | |
# =============================== | |
# Main Application | |
# =============================== | |
# Create requirements.txt file for HuggingFace Spaces | |
def create_requirements_file(): | |
requirements = [ | |
"gradio>=4.0.0", | |
"pandas", | |
"matplotlib", | |
"numpy", | |
"Pillow", | |
"PyPDF2", | |
"boto3", | |
"reportlab", | |
"uuid" | |
] | |
with open("requirements.txt", "w") as f: | |
for req in requirements: | |
f.write(f"{req}\n") | |
# Create and launch the interface | |
if __name__ == "__main__": | |
# Create requirements.txt for HuggingFace Spaces | |
create_requirements_file() | |
# Check for AWS credentials | |
if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: | |
print("NOTE: AWS credentials not found. The app will run in demo mode with simulated responses.") | |
print("To enable full functionality, set AWS_ACCESS_KEY and AWS_SECRET_KEY environment variables.") | |
# Launch the Gradio app | |
app = create_interface() | |
app.launch() | |