File size: 13,120 Bytes
3afa2e6
 
 
 
 
 
 
 
 
 
 
 
8bd1285
3afa2e6
 
8bd1285
3afa2e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6063cc9
3afa2e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import os
import re
import tempfile
import shutil
import git
from huggingface_hub import (
    create_repo,
    upload_folder,
    list_repo_files,
    # delete_file, # Not used
    Repository,
    whoami,
)
import logging
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to parse markdown input
def parse_markdown(markdown_input):
    """Parse markdown input to extract space details and file structure."""
    space_info = {"repo_name_md": "", "owner_md": "", "files": []}
    current_file = None
    file_content = []
    in_file_content = False # Tracks if we are inside a ### File: block content
    in_code_block = False # Tracks if we are inside a ``` code block ```

    lines = markdown_input.strip().split("\n")
    for line_idx, line_content_orig in enumerate(lines):
        line_content_stripped = line_content_orig.strip()

        # Handle file content collection, especially for code blocks
        if in_file_content:
            if line_content_stripped.startswith("```"):
                if in_code_block: # Closing ```
                    file_content.append(line_content_orig) # Keep the closing backticks as part of content
                    in_code_block = False
                    # Don't immediately save here, let the next ### File or end of input handle it
                    # This allows for text after a code block but before the next file.
                else: # Opening ```
                    in_code_block = True
                    file_content.append(line_content_orig)
            elif in_code_block: # Inside a code block
                file_content.append(line_content_orig)
            elif not in_code_block: # Plain text line within ### File: block but outside ```
                # Check if this line is a new file marker, if so, current file ends.
                if line_content_stripped.startswith("### File:") or line_content_stripped.startswith("## File Structure") or line_content_stripped.startswith("# Space:"):
                    if current_file and file_content:
                        space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
                    current_file = None # Reset
                    file_content = []
                    in_file_content = False # Current file ended
                    # Reprocess this line if it's a new file marker (will be handled by outer ifs)
                else: # Regular content line
                     file_content.append(line_content_orig)


        # Detect major structural elements
        if line_content_stripped.startswith("# Space:"):
            if current_file and file_content: # Save previous file if any
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
            full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
            if "/" in full_space_name_md:
                space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
            else:
                space_info["repo_name_md"] = full_space_name_md
            current_file = None
            file_content = []
            in_file_content = False
            in_code_block = False

        elif line_content_stripped.startswith("## File Structure"):
            if current_file and file_content: # Save previous file if any
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
            current_file = None
            file_content = []
            in_file_content = False
            in_code_block = False
            continue # Just a section header

        elif line_content_stripped.startswith("### File:"):
            if current_file and file_content: # Save content of the previous file
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
            
            current_file = line_content_stripped.replace("### File:", "").strip()
            file_content = [] # Reset for new file
            in_file_content = True # Start collecting content lines for this file
            in_code_block = False # Reset code block state for new file

        # Note: ๐Ÿ“„ and ๐Ÿ“ are ignored if ### File: is the primary mechanism as implemented.
        # If they are meant to define empty files, that logic would need to be added.
        # Current parser prioritizes ### File: sections for content.

    # Append the last file's content if any
    if current_file and file_content:
        space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
    
    space_info["files"] = [f for f in space_info["files"] if f.get("path")] # Filter out empty path entries
    return space_info


def _determine_repo_id(api_token, space_name_ui, owner_ui):
    """
    Determines the final owner and constructs the repo_id.
    space_name_ui should be just the name, not 'owner/name'.
    owner_ui is the value from the UI's owner field.
    Returns (repo_id, error_message)
    """
    if not space_name_ui:
        return None, "Error: Space Name cannot be empty."
    if "/" in space_name_ui: # User should not put slash in space name field
        return None, "Error: Space Name should not contain '/'. Please use the Owner field for the namespace."

    final_owner = owner_ui
    error_message = None

    if not final_owner:
        if not api_token:
            return None, "Error: API token is required to automatically determine owner when Owner field is empty."
        try:
            user_info = whoami(token=api_token)
            if user_info and 'name' in user_info:
                final_owner = user_info['name']
                logger.info(f"Determined owner: {final_owner} from API token.")
            else:
                logger.error(f"whoami(token=...) returned: {user_info} - 'name' field missing or user_info is None.")
                error_message = "Error: Could not retrieve username from API token. Ensure token is valid and has 'Read profile' permissions. Or, specify Owner manually."
        except Exception as e:
            logger.error(f"Error calling whoami for owner: {str(e)}")
            error_message = f"Error retrieving username from API token: {str(e)}. Please specify Owner manually."
        
        if error_message:
            return None, error_message

    if not final_owner: 
        return None, "Error: Owner could not be determined. Please provide an owner or ensure your API token is valid."

    return f"{final_owner}/{space_name_ui}", None

# Function to create and populate a Space
def create_space(api_token, space_name_ui, owner_ui, sdk_ui, markdown_input):
    """Create a Hugging Face Space and populate it with files from markdown input."""
    try:
        if not api_token:
            return "Error: Please provide a valid Hugging Face API token."

        repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
        if err:
            return err
        
        space_info = parse_markdown(markdown_input)
        if not space_info["files"]:
            return "Error: No files found in the markdown input. Ensure '### File: path/to/file.ext' markers are used correctly with content."

        # Create temporary directory
        with tempfile.TemporaryDirectory() as temp_dir:
            repo_local_path = Path(temp_dir) / "repo_content_for_upload"
            repo_local_path.mkdir(exist_ok=True)

            # Write files to temporary directory
            for file_info in space_info["files"]:
                if not file_info.get("path"):
                    logger.warning(f"Skipping file with no path: {file_info}")
                    continue
                file_path_abs = repo_local_path / file_info["path"]
                file_path_abs.parent.mkdir(parents=True, exist_ok=True)
                with open(file_path_abs, "w", encoding="utf-8") as f:
                    f.write(file_info["content"])
                logger.info(f"Wrote file: {file_path_abs}")

            # Create repository on Hugging Face
            try:
                create_repo(
                    repo_id=repo_id,
                    token=api_token,
                    repo_type="space", # Correctly set
                    space_sdk=sdk_ui,
                    private=False,
                )
                logger.info(f"Created Space repo: {repo_id}")
            except Exception as e:
                err_str = str(e).lower()
                if "already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str: # More robust check
                    logger.info(f"Space {repo_id} already exists, proceeding to upload/update files.")
                else:
                    return f"Error creating Space '{repo_id}': {str(e)}"
            
            # Push to Hugging Face Space
            upload_folder(
                repo_id=repo_id,
                folder_path=str(repo_local_path), # upload_folder expects string path
                path_in_repo=".",
                token=api_token,
                repo_type="space",  # ***** ADD THIS LINE *****
                commit_message=f"Initial Space setup of {repo_id} via Builder",
                # allow_patterns=["*.py", "*.md", "*.txt", "Dockerfile", ".gitattributes", "*.json", "*.yaml", "*.yml"], # Example: be more specific if needed
                # ignore_patterns=["*.git/*", ".*", "__pycache__/*"], # Example
            )
            logger.info(f"Uploaded files to Space: {repo_id}")
            return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"

    except Exception as e:
        logger.exception(f"Error in create_space for {repo_id if 'repo_id' in locals() else 'unknown repo'}:") # Log full traceback
        return f"Error during Space creation/update: {str(e)}"

# Function to view Space files
def view_space_files(api_token, space_name_ui, owner_ui):
    """List files in a Hugging Face Space."""
    repo_id_for_error = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        if not api_token:
            return "Error: Please provide a valid Hugging Face API token."
        
        repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
        if err:
            return err
        
        files = list_repo_files(repo_id=repo_id, token=api_token, repo_type="space") # Correctly set
        if files:
            return f"Files in `{repo_id}`:\n\n" + "\n".join([f"- `{f}`" for f in files])
        else:
            return f"No files found in the Space `{repo_id}`."
    except Exception as e:
        logger.exception(f"Error in view_space_files for {repo_id_for_error}:")
        return f"Error listing files for `{repo_id_for_error}`: {str(e)}"

# Function to update a Space file
def update_space_file(api_token, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
    """Update a file in a Hugging Face Space with a commit."""
    repo_id_for_error = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        if not api_token:
            return "Error: Please provide a valid Hugging Face API token."

        repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
        if err:
            return err
        
        if not file_path_in_repo:
            return "Error: File Path cannot be empty."
        if not commit_message_ui:
            commit_message_ui = f"Update {file_path_in_repo} via Space Builder"

        with tempfile.TemporaryDirectory() as temp_dir:
            repo_local_clone_path = Path(temp_dir) / "cloned_space_repo"
            
            cloned_repo = Repository(
                local_dir=str(repo_local_clone_path), # Repository expects string path
                clone_from=f"https://huggingface.co/spaces/{repo_id}", # Ensure this URL is correct
                repo_type="space", # Correctly set
                use_auth_token=api_token,
                git_user="Space Builder Bot", 
                git_email="space-builder@huggingface.co"
            )
            logger.info(f"Cloned Space {repo_id} to {repo_local_clone_path}")

            full_local_file_path = cloned_repo.local_dir / file_path_in_repo # Path object arithmetic
            full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
            with open(full_local_file_path, "w", encoding="utf-8") as f:
                f.write(file_content)
            logger.info(f"Wrote updated file {file_path_in_repo} locally.")

            cloned_repo.push_to_hub(commit_message=commit_message_ui)
            logger.info(f"Pushed changes for {file_path_in_repo} to {repo_id}")
            
            return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"

    except Exception as e:
        logger.exception(f"Error in update_space_file for {repo_id_for_error}:")
        return f"Error updating file for `{repo_id_for_error}`: {str(e)}"