File size: 4,337 Bytes
d25ee4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import ibis
import sqlglot
from sqlglot import optimizer
from sqlglot.optimizer import qualify
from sqlglot.errors import OptimizeError, ParseError
from services.utils import extract_filename
class FileSource:

    def __init__(self,file_path,file_type="csv") -> None:
        self.file_path = file_path
        self.file_type = file_type.lower()
        self._tables_docs = {}
        self._table_exemple = {}
        self.engine_dialect = "duckdb"
       

    def connect(self):
        try:
            self._con = ibis.connect("duckdb://")
            name = extract_filename(self.file_path)
            ext = name.split(".")[1]
            table = name.split(".")[0]
            if ext == "csv":
                self._table = self._con.read_csv(self.file_path,table_name=table)
            elif ext == "parquet":
                self._table = self._con.read_parquet(self.file_path,table_name=table)
            self._schema = self._table.schema()
            return f"✅ Connection to {name} OK!"
        except Exception as e:
            raise e

    def _optimize_query(self,sql,schema):

        optimized_expression = optimizer.optimize(sql, schema=schema, dialect=self.engine_dialect)
        optimized_sql = optimized_expression.sql(dialect=self.engine_dialect)
        return optimized_sql
    
    def _pretify_table(self,table,columns):
        out = ""
        if table in self._tables_docs.keys():
            out += f"## Documentation \n{self._tables_docs[table]}\n"

        if table in self._table_exemple.keys():
            out += f"## Exemple \n{self._table_exemple[table]}"
        out += f"Table ({table}) with  {len(columns)} fields : \n"
        for field in columns.keys():
            out += f"\t{field} of type : {columns[field]}\n"
        return out

    def add_table_documentation(self,table_name,documentation):
        self._tables_docs[table_name] = documentation
    def add_table_exemple(self,table_name,exemples):
        self._table_exemple[table_name] = exemples
    
    def get_tables_array(self):
        schema = self._build_schema()
        array = []
        for table in schema.keys():
           array.append(self._pretify_table(table,schema[table]))
        return array

    def _pretify_schema(self):
        out = ""
        schema = self._build_schema()
        for table in schema.keys():
           out += self._pretify_table(table,schema[table])
           out += "\n"
        return out
    def _build_schema(self):

        tables = self._con.list_tables()
        schema = {}
        for table_name in tables:

            try:
                table_expr = self._con.table(table_name)
                table_schema = table_expr.schema()
                columns = {col: str(dtype) for col, dtype in table_schema.items()}
                schema[table_name] = columns

            except Exception as e:

                print(f"Warning: Could not retrieve schema for table '{table_name}': {e}")
        return schema

    def query(self, sql_query):
        schema = self._build_schema()
        print(sql_query)
        try:
            expression = sqlglot.parse_one(sql_query, read=self.engine_dialect)
        except Exception as e:
            raise e

        try:            
            optimized_query = self._optimize_query(expression, schema)
            final_query = optimized_query
        except Exception as e:
            final_query = expression.sql(dialect=self.engine_dialect)

        try:
            expr = self._con.sql(final_query, dialect=self.engine_dialect)
            result_df = expr.execute()
            return result_df
        except Exception as e:
            raise e
        

# db = Database("mysql://user:password@localhost:3306/Pokemon")
# db.connect()
# schema = db._build_schema()
# db.add_table_documentation("Defense","This is a super table")
# db.add_table_exemple("Defense","caca")
# db.add_table_exemple("Joueur","ezofkzrfp")
# for table in schema.keys():
#     print(db._pretify_table(table,schema[table]))
# file = FileSource("./Wines.csv")
# file.connect()

# schema = file._build_schema()
# # db.add_table_exemple("Defense","caca")
# # db.add_table_exemple("Joueur","ezofkzrfp")
# for table in schema.keys():
#      print(file._pretify_table(table,schema[table]))

# res = file.query("SELECT * FROM Wines;")
# print(len(res))