File size: 16,153 Bytes
c66bf1b
 
 
0da8126
c66bf1b
 
 
 
0da8126
c66bf1b
 
 
 
0da8126
c66bf1b
0da8126
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c66bf1b
0da8126
 
 
c66bf1b
0da8126
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c66bf1b
 
f89e3c9
c66bf1b
 
0da8126
 
c66bf1b
 
f89e3c9
 
 
 
 
0da8126
 
f89e3c9
0da8126
f89e3c9
 
0da8126
f89e3c9
0da8126
f89e3c9
 
 
0da8126
f89e3c9
0da8126
 
f89e3c9
 
 
0da8126
f89e3c9
 
c66bf1b
 
 
 
f89e3c9
 
 
 
 
 
0da8126
c66bf1b
f89e3c9
 
 
 
0da8126
c66bf1b
f89e3c9
0da8126
c66bf1b
 
f89e3c9
0da8126
 
 
 
c66bf1b
 
 
0da8126
c66bf1b
 
f89e3c9
0da8126
 
c66bf1b
 
0da8126
c66bf1b
 
 
 
 
f89e3c9
0da8126
 
 
 
 
 
 
 
 
c66bf1b
0da8126
c66bf1b
 
0da8126
c66bf1b
f89e3c9
c66bf1b
 
 
 
 
 
 
 
0da8126
c66bf1b
 
 
 
0da8126
 
 
 
 
c66bf1b
0da8126
 
 
c66bf1b
0da8126
 
c66bf1b
 
0da8126
c66bf1b
 
 
0da8126
c66bf1b
 
0da8126
f89e3c9
c66bf1b
 
0da8126
f89e3c9
c66bf1b
0da8126
c66bf1b
 
 
 
0da8126
 
c66bf1b
f89e3c9
c66bf1b
f89e3c9
0da8126
 
c66bf1b
 
 
 
0da8126
 
f89e3c9
c66bf1b
f89e3c9
c66bf1b
 
 
0da8126
f89e3c9
c66bf1b
0da8126
 
 
c66bf1b
0da8126
 
 
c66bf1b
0da8126
c66bf1b
 
0da8126
 
 
c66bf1b
 
0da8126
c66bf1b
0da8126
 
 
c66bf1b
0da8126
 
c66bf1b
0da8126
 
 
c66bf1b
0da8126
c66bf1b
 
0da8126
c66bf1b
0da8126
 
c66bf1b
 
0da8126
c66bf1b
 
0da8126
 
 
 
c66bf1b
f89e3c9
c66bf1b
0da8126
c66bf1b
0da8126
f89e3c9
c66bf1b
 
f89e3c9
c66bf1b
 
 
 
0da8126
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import os
import re
import tempfile
import git # Used by Repository
from huggingface_hub import (
    create_repo,
    upload_folder,
    list_repo_files,
    Repository, # Used in update_space_file
    whoami,
)
import logging
from pathlib import Path
from PIL import Image # For type hinting from gr.Image(type="pil")

# Attempt to import keylock_decode
try:
    from keylock_decode import decode_from_image_pil
    KEYLOCK_DECODE_AVAILABLE = True
except ImportError:
    KEYLOCK_DECODE_AVAILABLE = False
    decode_from_image_pil = None # Placeholder
    logging.warning("keylock-decode library not found. Image decoding feature will be disabled.")


# Configure logging (ensure this is suitable for your environment, might be set in app.py too)
# If keylock-decode uses its own logger (e.g., logging.getLogger('keylock_decode')),
# you might need to adjust its level separately if you want more/less verbose output from it.
logging.basicConfig(
    level=logging.INFO, 
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
if KEYLOCK_DECODE_AVAILABLE: # Example: Set keylock_decode's logger to DEBUG if you want its verbose output
    # logging.getLogger('keylock_decode.decoder').setLevel(logging.DEBUG) # Check actual logger name in keylock-decode
    pass


# --- New Helper Function to Get API Token ---
def _get_api_token(ui_token_from_textbox=None):
    """
    Retrieves the Hugging Face API token.
    Prioritizes HF_TOKEN from environment variables (potentially set by KeyLock-Decode),
    then falls back to UI input.
    Returns (token, error_message)
    """
    env_token = os.getenv('HF_TOKEN')
    if env_token:
        logger.info("Using HF_TOKEN from environment.")
        return env_token, None
    
    if ui_token_from_textbox:
        logger.info("Using API token from UI textbox.")
        return ui_token_from_textbox, None
    
    logger.warning("HF API token not found in environment or UI textbox.")
    return None, "Error: Hugging Face API token not provided. Please enter it in the textbox or load it from an image."


# --- New Function for KeyLock-Decode Integration ---
def load_token_from_image_and_set_env(image_pil_object: Image.Image, password: str):
    """
    Decodes data from a PIL Image object using keylock-decode, sets environment variables,
    and returns status messages.
    """
    if not KEYLOCK_DECODE_AVAILABLE:
        return "Error: KeyLock-Decode library is not installed. This feature is disabled."

    if image_pil_object is None:
        return "Error: No image provided for decoding."
    if not password:
        return "Error: Password cannot be empty for image decoding."

    status_messages_display = []
    
    try:
        logger.info(f"Attempting to decode from image using KeyLock-Decode...")
        # decode_from_image_pil expects a PIL Image object
        decoded_data, status_msgs_from_lib = decode_from_image_pil(
            image_pil_object,
            password,
            set_environment_variables=True # CRITICAL: This sets os.environ
        )

        status_messages_display.extend(status_msgs_from_lib)

        if decoded_data:
            status_messages_display.append("\n**Decoded Data Summary (sensitive values masked):**")
            for key, value in decoded_data.items():
                display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
                status_messages_display.append(f"- {key}: {display_value}")
            
            if os.getenv('HF_TOKEN'):
                status_messages_display.append(f"\n**SUCCESS: HF_TOKEN was found and has been set in the current process environment.** It will be prioritized for operations.")
            elif 'HF_TOKEN' in decoded_data: # Should be rare if set_env_vars=True
                 status_messages_display.append(f"\nWarning: HF_TOKEN was in decoded data but os.getenv('HF_TOKEN') is not picking it up. This is unexpected if 'set_environment_variables=True' worked.")
            else:
                status_messages_display.append("\nNote: HF_TOKEN was not specifically found in the decoded data from the image.")
        else:
            # This case is often covered by status_msgs_from_lib, but good to have a fallback.
            if not any("No payload data found" in msg for msg in status_msgs_from_lib):
                 status_messages_display.append("No data was decoded, or the decoded data was empty.")

    except ValueError as e: # Specific errors from keylock-decode (e.g., bad password, corrupted data)
        logger.error(f"KeyLock-Decode ValueError: {e}")
        status_messages_display.append(f"**Decoding Error:** {e}")
    except Exception as e:
        logger.exception("An unexpected error occurred during image decoding with KeyLock-Decode:")
        status_messages_display.append(f"**An unexpected error occurred:** {str(e)}")
    
    return "\n".join(status_messages_display)


# --- Original `parse_markdown` function (unchanged) ---
def parse_markdown(markdown_input):
    """Parse markdown input to extract space details and file structure."""
    space_info = {"repo_name_md": "", "owner_md": "", "files": []}
    current_file = None
    file_content = []
    in_file_content = False 
    in_code_block = False 

    lines = markdown_input.strip().split("\n")
    for line_idx, line_content_orig in enumerate(lines):
        line_content_stripped = line_content_orig.strip()

        if in_file_content:
            if line_content_stripped.startswith("```"):
                if in_code_block: 
                    file_content.append(line_content_orig) 
                    in_code_block = False
                else: 
                    in_code_block = True
                    file_content.append(line_content_orig)
            elif in_code_block: 
                file_content.append(line_content_orig)
            elif not in_code_block: 
                if line_content_stripped.startswith("### File:") or line_content_stripped.startswith("## File Structure") or line_content_stripped.startswith("# Space:"):
                    if current_file and file_content:
                        space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
                    current_file = None 
                    file_content = []
                    in_file_content = False 
                else: 
                     file_content.append(line_content_orig)

        if line_content_stripped.startswith("# Space:"):
            if current_file and file_content: 
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
            full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
            if "/" in full_space_name_md:
                space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
            else:
                space_info["repo_name_md"] = full_space_name_md
            current_file = None
            file_content = []
            in_file_content = False
            in_code_block = False

        elif line_content_stripped.startswith("## File Structure"):
            if current_file and file_content: 
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
            current_file = None
            file_content = []
            in_file_content = False
            in_code_block = False
            continue 

        elif line_content_stripped.startswith("### File:"):
            if current_file and file_content: 
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
            
            current_file = line_content_stripped.replace("### File:", "").strip()
            file_content = [] 
            in_file_content = True 
            in_code_block = False 
            
    if current_file and file_content:
        space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
    
    space_info["files"] = [f for f in space_info["files"] if f.get("path")]
    return space_info


# --- Updated `_determine_repo_id` to use `_get_api_token` ---
def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
    if not space_name_ui:
        return None, "Error: Space Name cannot be empty."
    if "/" in space_name_ui:
        return None, "Error: Space Name should not contain '/'. Please use the Owner field for the namespace."

    final_owner = owner_ui
    error_message = None

    if not final_owner:
        # Get the actual token to use for whoami
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err:
            return None, token_err 
        # If resolved_api_token is None here but token_err is None, it means _get_api_token had an issue not returning error string
        # which _get_api_token is designed not to do, it should always return error if no token.
        if not resolved_api_token: 
             return None, "Error: API token is required to automatically determine owner when Owner field is empty (internal check failed)."

        try:
            user_info = whoami(token=resolved_api_token)
            if user_info and 'name' in user_info:
                final_owner = user_info['name']
                logger.info(f"Determined owner: {final_owner} using API token.")
            else:
                logger.error(f"whoami(token=...) returned: {user_info} - 'name' field missing or user_info is None.")
                error_message = "Error: Could not retrieve username from API token. Ensure token is valid and has 'Read profile' permissions. Or, specify Owner manually."
        except Exception as e:
            logger.error(f"Error calling whoami for owner: {str(e)}")
            error_message = f"Error retrieving username from API token: {str(e)}. Please specify Owner manually."
        
        if error_message:
            return None, error_message

    if not final_owner:
        return None, "Error: Owner could not be determined. Please provide an owner or ensure your API token is valid."

    return f"{final_owner}/{space_name_ui}", None


# --- Updated Core Functions to use `_get_api_token` ---

def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err:
            return token_err

        # _determine_repo_id will also call _get_api_token internally if owner_ui is blank
        repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
        if err:
            return err
        repo_id_for_error_logging = repo_id # Update for more accurate logging if determination succeeds
        
        space_info = parse_markdown(markdown_input)
        if not space_info["files"]:
            return "Error: No files found in the markdown input. Ensure '### File: path/to/file.ext' markers are used correctly."

        with tempfile.TemporaryDirectory() as temp_dir:
            repo_local_path = Path(temp_dir) / "repo_upload_content"
            repo_local_path.mkdir(exist_ok=True)

            for file_info in space_info["files"]:
                if not file_info.get("path"): continue
                file_path_abs = repo_local_path / file_info["path"]
                file_path_abs.parent.mkdir(parents=True, exist_ok=True)
                with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"])
                logger.info(f"Wrote file: {file_path_abs}")

            try:
                create_repo(
                    repo_id=repo_id, token=resolved_api_token, repo_type="space",
                    space_sdk=sdk_ui, private=False,
                )
                logger.info(f"Created Space repo: {repo_id}")
            except Exception as e:
                err_str = str(e).lower()
                if "already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str:
                    logger.info(f"Space {repo_id} already exists. Proceeding to upload/update files.")
                else:
                    return f"Error creating Space '{repo_id}': {str(e)}"
            
            upload_folder(
                repo_id=repo_id, folder_path=str(repo_local_path), path_in_repo=".",
                token=resolved_api_token, repo_type="space",
                commit_message=f"Initial Space setup of {repo_id} via Builder",
            )
            logger.info(f"Uploaded files to Space: {repo_id}")
            return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"

    except Exception as e:
        logger.exception(f"Error in create_space for {repo_id_for_error_logging}:")
        return f"Error during Space creation/update: {str(e)}"


def view_space_files(ui_api_token_from_textbox, space_name_ui, owner_ui):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err:
            return token_err
        
        repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
        if err:
            return err
        repo_id_for_error_logging = repo_id
            
        files = list_repo_files(repo_id=repo_id, token=resolved_api_token, repo_type="space")
        if files:
            return f"Files in `{repo_id}`:\n\n" + "\n".join([f"- `{f}`" for f in files])
        return f"No files found in the Space `{repo_id}`."
    except Exception as e:
        logger.exception(f"Error in view_space_files for {repo_id_for_error_logging}:")
        return f"Error listing files for `{repo_id_for_error_logging}`: {str(e)}"


def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err:
            return token_err

        repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
        if err:
            return err
        repo_id_for_error_logging = repo_id
        
        if not file_path_in_repo: return "Error: File Path cannot be empty."
        commit_message_ui = commit_message_ui or f"Update {file_path_in_repo} via Space Builder"

        with tempfile.TemporaryDirectory() as temp_dir:
            repo_local_clone_path = Path(temp_dir) / "cloned_space_repo_for_update"
            
            cloned_repo = Repository(
                local_dir=str(repo_local_clone_path),
                clone_from=f"https://huggingface.co/spaces/{repo_id}",
                repo_type="space", use_auth_token=resolved_api_token,
                git_user="Space Builder Bot", git_email="space-builder@huggingface.co"
            )
            logger.info(f"Cloned Space {repo_id} to {repo_local_clone_path}")

            full_local_file_path = Path(cloned_repo.local_dir) / file_path_in_repo
            full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
            with open(full_local_file_path, "w", encoding="utf-8") as f: f.write(file_content)
            logger.info(f"Wrote updated file {file_path_in_repo} locally.")

            cloned_repo.push_to_hub(commit_message=commit_message_ui)
            logger.info(f"Pushed changes for {file_path_in_repo} to {repo_id}")
            
            return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"

    except Exception as e:
        logger.exception(f"Error in update_space_file for {repo_id_for_error_logging}:")
        return f"Error updating file for `{repo_id_for_error_logging}`: {str(e)}"