|
import ibis |
|
import sqlglot |
|
from sqlglot import optimizer |
|
from sqlglot.optimizer import qualify |
|
from sqlglot.errors import OptimizeError, ParseError |
|
|
|
class Database: |
|
|
|
def __init__(self,connection_url,engine_dialect = "mysql") -> None: |
|
|
|
self._connect_url = connection_url |
|
self.engine_dialect = engine_dialect |
|
self._tables_docs = {} |
|
self._table_exemple = {} |
|
|
|
def connect(self): |
|
try: |
|
self._con = ibis.connect(self._connect_url) |
|
return f"β
Connection to {self._connect_url} OK!" |
|
except Exception as e: |
|
|
|
raise e |
|
|
|
def _optimize_query(self,sql,schema): |
|
|
|
optimized_expression = optimizer.optimize(sql, schema=schema, dialect=self.engine_dialect) |
|
optimized_sql = optimized_expression.sql(dialect=self.engine_dialect) |
|
return optimized_sql |
|
|
|
def _pretify_table(self,table,columns): |
|
out = "" |
|
if table in self._tables_docs.keys(): |
|
out += f"## Documentation \n{self._tables_docs[table]}\n" |
|
|
|
if table in self._table_exemple.keys(): |
|
out += f"## Exemple \n{self._table_exemple[table]}" |
|
out += f"Table ({table}) with {len(columns)} fields : \n" |
|
for field in columns.keys(): |
|
out += f"\t{field} of type : {columns[field]}\n" |
|
return out |
|
|
|
def add_table_documentation(self,table_name,documentation): |
|
self._tables_docs[table_name] = documentation |
|
def add_table_exemple(self,table_name,exemples): |
|
self._table_exemple[table_name] = exemples |
|
|
|
def get_tables_array(self): |
|
schema = self._build_schema() |
|
array = [] |
|
for table in schema.keys(): |
|
array.append(self._pretify_table(table,schema[table])) |
|
return array |
|
|
|
def _pretify_schema(self): |
|
out = "" |
|
schema = self._build_schema() |
|
for table in schema.keys(): |
|
out += self._pretify_table(table,schema[table]) |
|
out += "\n" |
|
return out |
|
def _build_schema(self): |
|
|
|
tables = self._con.list_tables() |
|
schema = {} |
|
for table_name in tables: |
|
|
|
try: |
|
table_expr = self._con.table(table_name) |
|
table_schema = table_expr.schema() |
|
columns = {col: str(dtype) for col, dtype in table_schema.items()} |
|
schema[table_name] = columns |
|
|
|
except Exception as e: |
|
|
|
print(f"Warning: Could not retrieve schema for table '{table_name}': {e}") |
|
return schema |
|
|
|
def query(self, sql_query): |
|
schema = self._build_schema() |
|
print(sql_query) |
|
try: |
|
expression = sqlglot.parse_one(sql_query, read=self.engine_dialect) |
|
except Exception as e: |
|
raise e |
|
|
|
try: |
|
optimized_query = self._optimize_query(expression, schema) |
|
final_query = optimized_query |
|
except Exception as e: |
|
final_query = expression.sql(dialect=self.engine_dialect) |
|
|
|
try: |
|
expr = self._con.sql(final_query, dialect=self.engine_dialect) |
|
result_df = expr.execute() |
|
return result_df |
|
except Exception as e: |
|
raise e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|