llm_topic_modelling / load_s3_logs.py
seanpedrickcase's picture
Added DynamoDB logging. Optimised topics lists with descriptions
ef5252d
raw
history blame
2.87 kB
import boto3
import pandas as pd
from io import StringIO
from datetime import datetime
from tools.config import DOCUMENT_REDACTION_BUCKET, AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, OUTPUT_FOLDER
# Combine together log files that can be then used for e.g. dashboarding and financial tracking.
# S3 setup. Try to use provided keys (needs S3 permissions), otherwise assume AWS SSO connection
if AWS_ACCESS_KEY and AWS_SECRET_KEY and AWS_REGION:
s3 = boto3.client('s3',
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION)
else: s3 = boto3.client('s3')
bucket_name = DOCUMENT_REDACTION_BUCKET
prefix = 'usage/' # 'feedback/' # 'logs/' # Change as needed - top-level folder where logs are stored
earliest_date = '20250409' # Earliest date of logs folder retrieved
latest_date = '20250423' # Latest date of logs folder retrieved
# Function to list all files in a folder
def list_files_in_s3(bucket, prefix):
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if 'Contents' in response:
return [content['Key'] for content in response['Contents']]
return []
# Function to filter date range
def is_within_date_range(date_str, start_date, end_date):
date_obj = datetime.strptime(date_str, '%Y%m%d')
return start_date <= date_obj <= end_date
# Define the date range
start_date = datetime.strptime(earliest_date, '%Y%m%d') # Replace with your start date
end_date = datetime.strptime(latest_date, '%Y%m%d') # Replace with your end date
# List all subfolders under 'usage/'
all_files = list_files_in_s3(bucket_name, prefix)
# Filter based on date range
log_files = []
for file in all_files:
parts = file.split('/')
if len(parts) >= 3:
date_str = parts[1]
if is_within_date_range(date_str, start_date, end_date) and parts[-1] == 'log.csv':
log_files.append(file)
# Download, read and concatenate CSV files into a pandas DataFrame
df_list = []
for log_file in log_files:
# Download the file
obj = s3.get_object(Bucket=bucket_name, Key=log_file)
try:
csv_content = obj['Body'].read().decode('utf-8')
except:
csv_content = obj['Body'].read().decode('latin-1')
# Read CSV content into pandas DataFrame
try:
df = pd.read_csv(StringIO(csv_content))
except Exception as e:
print("Could not load in log file:", log_file, "due to:", e)
continue
df_list.append(df)
# Concatenate all DataFrames
if df_list:
concatenated_df = pd.concat(df_list, ignore_index=True)
# Save the concatenated DataFrame to a CSV file
concatenated_df.to_csv(OUTPUT_FOLDER + 'consolidated_s3_logs.csv', index=False)
print("Consolidated CSV saved as 'consolidated_s3_logs.csv'")
else:
print("No log files found in the given date range.")