|
import ibis |
|
import sqlglot |
|
from sqlglot import optimizer |
|
from sqlglot.optimizer import qualify |
|
from sqlglot.errors import OptimizeError, ParseError |
|
from services.utils import extract_filename |
|
class FileSource: |
|
|
|
def __init__(self,file_path,file_type="csv") -> None: |
|
self.file_path = file_path |
|
self.file_type = file_type.lower() |
|
self._tables_docs = {} |
|
self._table_exemple = {} |
|
self.engine_dialect = "duckdb" |
|
|
|
|
|
def connect(self): |
|
try: |
|
self._con = ibis.connect("duckdb://") |
|
name = extract_filename(self.file_path) |
|
ext = name.split(".")[1] |
|
table = name.split(".")[0] |
|
if ext == "csv": |
|
self._table = self._con.read_csv(self.file_path,table_name=table) |
|
elif ext == "parquet": |
|
self._table = self._con.read_parquet(self.file_path,table_name=table) |
|
self._schema = self._table.schema() |
|
return f"β
Connection to {name} OK!" |
|
except Exception as e: |
|
raise e |
|
|
|
def _optimize_query(self,sql,schema): |
|
|
|
optimized_expression = optimizer.optimize(sql, schema=schema, dialect=self.engine_dialect) |
|
optimized_sql = optimized_expression.sql(dialect=self.engine_dialect) |
|
return optimized_sql |
|
|
|
def _pretify_table(self,table,columns): |
|
out = "" |
|
if table in self._tables_docs.keys(): |
|
out += f"## Documentation \n{self._tables_docs[table]}\n" |
|
|
|
if table in self._table_exemple.keys(): |
|
out += f"## Exemple \n{self._table_exemple[table]}" |
|
out += f"Table ({table}) with {len(columns)} fields : \n" |
|
for field in columns.keys(): |
|
out += f"\t{field} of type : {columns[field]}\n" |
|
return out |
|
|
|
def add_table_documentation(self,table_name,documentation): |
|
self._tables_docs[table_name] = documentation |
|
def add_table_exemple(self,table_name,exemples): |
|
self._table_exemple[table_name] = exemples |
|
|
|
def get_tables_array(self): |
|
schema = self._build_schema() |
|
array = [] |
|
for table in schema.keys(): |
|
array.append(self._pretify_table(table,schema[table])) |
|
return array |
|
|
|
def _pretify_schema(self): |
|
out = "" |
|
schema = self._build_schema() |
|
for table in schema.keys(): |
|
out += self._pretify_table(table,schema[table]) |
|
out += "\n" |
|
return out |
|
def _build_schema(self): |
|
|
|
tables = self._con.list_tables() |
|
schema = {} |
|
for table_name in tables: |
|
|
|
try: |
|
table_expr = self._con.table(table_name) |
|
table_schema = table_expr.schema() |
|
columns = {col: str(dtype) for col, dtype in table_schema.items()} |
|
schema[table_name] = columns |
|
|
|
except Exception as e: |
|
|
|
print(f"Warning: Could not retrieve schema for table '{table_name}': {e}") |
|
return schema |
|
|
|
def query(self, sql_query): |
|
schema = self._build_schema() |
|
print(sql_query) |
|
try: |
|
expression = sqlglot.parse_one(sql_query, read=self.engine_dialect) |
|
except Exception as e: |
|
raise e |
|
|
|
try: |
|
optimized_query = self._optimize_query(expression, schema) |
|
final_query = optimized_query |
|
except Exception as e: |
|
final_query = expression.sql(dialect=self.engine_dialect) |
|
|
|
try: |
|
expr = self._con.sql(final_query, dialect=self.engine_dialect) |
|
result_df = expr.execute() |
|
return result_df |
|
except Exception as e: |
|
raise e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|