import ibis import sqlglot from sqlglot import optimizer from sqlglot.optimizer import qualify from sqlglot.errors import OptimizeError, ParseError from services.utils import extract_filename class FileSource: def __init__(self,file_path,file_type="csv") -> None: self.file_path = file_path self.file_type = file_type.lower() self._tables_docs = {} self._table_exemple = {} self.engine_dialect = "duckdb" def connect(self): try: self._con = ibis.connect("duckdb://") name = extract_filename(self.file_path) ext = name.split(".")[1] table = name.split(".")[0] if ext == "csv": self._table = self._con.read_csv(self.file_path,table_name=table) elif ext == "parquet": self._table = self._con.read_parquet(self.file_path,table_name=table) self._schema = self._table.schema() return f"✅ Connection to {name} OK!" except Exception as e: raise e def _optimize_query(self,sql,schema): optimized_expression = optimizer.optimize(sql, schema=schema, dialect=self.engine_dialect) optimized_sql = optimized_expression.sql(dialect=self.engine_dialect) return optimized_sql def _pretify_table(self,table,columns): out = "" if table in self._tables_docs.keys(): out += f"## Documentation \n{self._tables_docs[table]}\n" if table in self._table_exemple.keys(): out += f"## Exemple \n{self._table_exemple[table]}" out += f"Table ({table}) with {len(columns)} fields : \n" for field in columns.keys(): out += f"\t{field} of type : {columns[field]}\n" return out def add_table_documentation(self,table_name,documentation): self._tables_docs[table_name] = documentation def add_table_exemple(self,table_name,exemples): self._table_exemple[table_name] = exemples def get_tables_array(self): schema = self._build_schema() array = [] for table in schema.keys(): array.append(self._pretify_table(table,schema[table])) return array def _pretify_schema(self): out = "" schema = self._build_schema() for table in schema.keys(): out += self._pretify_table(table,schema[table]) out += "\n" return out def _build_schema(self): tables = self._con.list_tables() schema = {} for table_name in tables: try: table_expr = self._con.table(table_name) table_schema = table_expr.schema() columns = {col: str(dtype) for col, dtype in table_schema.items()} schema[table_name] = columns except Exception as e: print(f"Warning: Could not retrieve schema for table '{table_name}': {e}") return schema def query(self, sql_query): schema = self._build_schema() print(sql_query) try: expression = sqlglot.parse_one(sql_query, read=self.engine_dialect) except Exception as e: raise e try: optimized_query = self._optimize_query(expression, schema) final_query = optimized_query except Exception as e: final_query = expression.sql(dialect=self.engine_dialect) try: expr = self._con.sql(final_query, dialect=self.engine_dialect) result_df = expr.execute() return result_df except Exception as e: raise e # db = Database("mysql://user:password@localhost:3306/Pokemon") # db.connect() # schema = db._build_schema() # db.add_table_documentation("Defense","This is a super table") # db.add_table_exemple("Defense","caca") # db.add_table_exemple("Joueur","ezofkzrfp") # for table in schema.keys(): # print(db._pretify_table(table,schema[table])) # file = FileSource("./Wines.csv") # file.connect() # schema = file._build_schema() # # db.add_table_exemple("Defense","caca") # # db.add_table_exemple("Joueur","ezofkzrfp") # for table in schema.keys(): # print(file._pretify_table(table,schema[table])) # res = file.query("SELECT * FROM Wines;") # print(len(res))