mcp-data-analyst / filesource.py
Jacqkues's picture
Upload 11 files
d25ee4b verified
raw
history blame
4.34 kB
import ibis
import sqlglot
from sqlglot import optimizer
from sqlglot.optimizer import qualify
from sqlglot.errors import OptimizeError, ParseError
from services.utils import extract_filename
class FileSource:
def __init__(self,file_path,file_type="csv") -> None:
self.file_path = file_path
self.file_type = file_type.lower()
self._tables_docs = {}
self._table_exemple = {}
self.engine_dialect = "duckdb"
def connect(self):
try:
self._con = ibis.connect("duckdb://")
name = extract_filename(self.file_path)
ext = name.split(".")[1]
table = name.split(".")[0]
if ext == "csv":
self._table = self._con.read_csv(self.file_path,table_name=table)
elif ext == "parquet":
self._table = self._con.read_parquet(self.file_path,table_name=table)
self._schema = self._table.schema()
return f"βœ… Connection to {name} OK!"
except Exception as e:
raise e
def _optimize_query(self,sql,schema):
optimized_expression = optimizer.optimize(sql, schema=schema, dialect=self.engine_dialect)
optimized_sql = optimized_expression.sql(dialect=self.engine_dialect)
return optimized_sql
def _pretify_table(self,table,columns):
out = ""
if table in self._tables_docs.keys():
out += f"## Documentation \n{self._tables_docs[table]}\n"
if table in self._table_exemple.keys():
out += f"## Exemple \n{self._table_exemple[table]}"
out += f"Table ({table}) with {len(columns)} fields : \n"
for field in columns.keys():
out += f"\t{field} of type : {columns[field]}\n"
return out
def add_table_documentation(self,table_name,documentation):
self._tables_docs[table_name] = documentation
def add_table_exemple(self,table_name,exemples):
self._table_exemple[table_name] = exemples
def get_tables_array(self):
schema = self._build_schema()
array = []
for table in schema.keys():
array.append(self._pretify_table(table,schema[table]))
return array
def _pretify_schema(self):
out = ""
schema = self._build_schema()
for table in schema.keys():
out += self._pretify_table(table,schema[table])
out += "\n"
return out
def _build_schema(self):
tables = self._con.list_tables()
schema = {}
for table_name in tables:
try:
table_expr = self._con.table(table_name)
table_schema = table_expr.schema()
columns = {col: str(dtype) for col, dtype in table_schema.items()}
schema[table_name] = columns
except Exception as e:
print(f"Warning: Could not retrieve schema for table '{table_name}': {e}")
return schema
def query(self, sql_query):
schema = self._build_schema()
print(sql_query)
try:
expression = sqlglot.parse_one(sql_query, read=self.engine_dialect)
except Exception as e:
raise e
try:
optimized_query = self._optimize_query(expression, schema)
final_query = optimized_query
except Exception as e:
final_query = expression.sql(dialect=self.engine_dialect)
try:
expr = self._con.sql(final_query, dialect=self.engine_dialect)
result_df = expr.execute()
return result_df
except Exception as e:
raise e
# db = Database("mysql://user:password@localhost:3306/Pokemon")
# db.connect()
# schema = db._build_schema()
# db.add_table_documentation("Defense","This is a super table")
# db.add_table_exemple("Defense","caca")
# db.add_table_exemple("Joueur","ezofkzrfp")
# for table in schema.keys():
# print(db._pretify_table(table,schema[table]))
# file = FileSource("./Wines.csv")
# file.connect()
# schema = file._build_schema()
# # db.add_table_exemple("Defense","caca")
# # db.add_table_exemple("Joueur","ezofkzrfp")
# for table in schema.keys():
# print(file._pretify_table(table,schema[table]))
# res = file.query("SELECT * FROM Wines;")
# print(len(res))