Spaces:
Sleeping
Sleeping
File size: 13,120 Bytes
3afa2e6 8bd1285 3afa2e6 8bd1285 3afa2e6 6063cc9 3afa2e6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
import os
import re
import tempfile
import shutil
import git
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
# delete_file, # Not used
Repository,
whoami,
)
import logging
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Function to parse markdown input
def parse_markdown(markdown_input):
"""Parse markdown input to extract space details and file structure."""
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file = None
file_content = []
in_file_content = False # Tracks if we are inside a ### File: block content
in_code_block = False # Tracks if we are inside a ``` code block ```
lines = markdown_input.strip().split("\n")
for line_idx, line_content_orig in enumerate(lines):
line_content_stripped = line_content_orig.strip()
# Handle file content collection, especially for code blocks
if in_file_content:
if line_content_stripped.startswith("```"):
if in_code_block: # Closing ```
file_content.append(line_content_orig) # Keep the closing backticks as part of content
in_code_block = False
# Don't immediately save here, let the next ### File or end of input handle it
# This allows for text after a code block but before the next file.
else: # Opening ```
in_code_block = True
file_content.append(line_content_orig)
elif in_code_block: # Inside a code block
file_content.append(line_content_orig)
elif not in_code_block: # Plain text line within ### File: block but outside ```
# Check if this line is a new file marker, if so, current file ends.
if line_content_stripped.startswith("### File:") or line_content_stripped.startswith("## File Structure") or line_content_stripped.startswith("# Space:"):
if current_file and file_content:
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
current_file = None # Reset
file_content = []
in_file_content = False # Current file ended
# Reprocess this line if it's a new file marker (will be handled by outer ifs)
else: # Regular content line
file_content.append(line_content_orig)
# Detect major structural elements
if line_content_stripped.startswith("# Space:"):
if current_file and file_content: # Save previous file if any
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md:
space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
else:
space_info["repo_name_md"] = full_space_name_md
current_file = None
file_content = []
in_file_content = False
in_code_block = False
elif line_content_stripped.startswith("## File Structure"):
if current_file and file_content: # Save previous file if any
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
current_file = None
file_content = []
in_file_content = False
in_code_block = False
continue # Just a section header
elif line_content_stripped.startswith("### File:"):
if current_file and file_content: # Save content of the previous file
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
current_file = line_content_stripped.replace("### File:", "").strip()
file_content = [] # Reset for new file
in_file_content = True # Start collecting content lines for this file
in_code_block = False # Reset code block state for new file
# Note: ๐ and ๐ are ignored if ### File: is the primary mechanism as implemented.
# If they are meant to define empty files, that logic would need to be added.
# Current parser prioritizes ### File: sections for content.
# Append the last file's content if any
if current_file and file_content:
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
space_info["files"] = [f for f in space_info["files"] if f.get("path")] # Filter out empty path entries
return space_info
def _determine_repo_id(api_token, space_name_ui, owner_ui):
"""
Determines the final owner and constructs the repo_id.
space_name_ui should be just the name, not 'owner/name'.
owner_ui is the value from the UI's owner field.
Returns (repo_id, error_message)
"""
if not space_name_ui:
return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui: # User should not put slash in space name field
return None, "Error: Space Name should not contain '/'. Please use the Owner field for the namespace."
final_owner = owner_ui
error_message = None
if not final_owner:
if not api_token:
return None, "Error: API token is required to automatically determine owner when Owner field is empty."
try:
user_info = whoami(token=api_token)
if user_info and 'name' in user_info:
final_owner = user_info['name']
logger.info(f"Determined owner: {final_owner} from API token.")
else:
logger.error(f"whoami(token=...) returned: {user_info} - 'name' field missing or user_info is None.")
error_message = "Error: Could not retrieve username from API token. Ensure token is valid and has 'Read profile' permissions. Or, specify Owner manually."
except Exception as e:
logger.error(f"Error calling whoami for owner: {str(e)}")
error_message = f"Error retrieving username from API token: {str(e)}. Please specify Owner manually."
if error_message:
return None, error_message
if not final_owner:
return None, "Error: Owner could not be determined. Please provide an owner or ensure your API token is valid."
return f"{final_owner}/{space_name_ui}", None
# Function to create and populate a Space
def create_space(api_token, space_name_ui, owner_ui, sdk_ui, markdown_input):
"""Create a Hugging Face Space and populate it with files from markdown input."""
try:
if not api_token:
return "Error: Please provide a valid Hugging Face API token."
repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
if err:
return err
space_info = parse_markdown(markdown_input)
if not space_info["files"]:
return "Error: No files found in the markdown input. Ensure '### File: path/to/file.ext' markers are used correctly with content."
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
repo_local_path = Path(temp_dir) / "repo_content_for_upload"
repo_local_path.mkdir(exist_ok=True)
# Write files to temporary directory
for file_info in space_info["files"]:
if not file_info.get("path"):
logger.warning(f"Skipping file with no path: {file_info}")
continue
file_path_abs = repo_local_path / file_info["path"]
file_path_abs.parent.mkdir(parents=True, exist_ok=True)
with open(file_path_abs, "w", encoding="utf-8") as f:
f.write(file_info["content"])
logger.info(f"Wrote file: {file_path_abs}")
# Create repository on Hugging Face
try:
create_repo(
repo_id=repo_id,
token=api_token,
repo_type="space", # Correctly set
space_sdk=sdk_ui,
private=False,
)
logger.info(f"Created Space repo: {repo_id}")
except Exception as e:
err_str = str(e).lower()
if "already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str: # More robust check
logger.info(f"Space {repo_id} already exists, proceeding to upload/update files.")
else:
return f"Error creating Space '{repo_id}': {str(e)}"
# Push to Hugging Face Space
upload_folder(
repo_id=repo_id,
folder_path=str(repo_local_path), # upload_folder expects string path
path_in_repo=".",
token=api_token,
repo_type="space", # ***** ADD THIS LINE *****
commit_message=f"Initial Space setup of {repo_id} via Builder",
# allow_patterns=["*.py", "*.md", "*.txt", "Dockerfile", ".gitattributes", "*.json", "*.yaml", "*.yml"], # Example: be more specific if needed
# ignore_patterns=["*.git/*", ".*", "__pycache__/*"], # Example
)
logger.info(f"Uploaded files to Space: {repo_id}")
return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.exception(f"Error in create_space for {repo_id if 'repo_id' in locals() else 'unknown repo'}:") # Log full traceback
return f"Error during Space creation/update: {str(e)}"
# Function to view Space files
def view_space_files(api_token, space_name_ui, owner_ui):
"""List files in a Hugging Face Space."""
repo_id_for_error = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
if not api_token:
return "Error: Please provide a valid Hugging Face API token."
repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
if err:
return err
files = list_repo_files(repo_id=repo_id, token=api_token, repo_type="space") # Correctly set
if files:
return f"Files in `{repo_id}`:\n\n" + "\n".join([f"- `{f}`" for f in files])
else:
return f"No files found in the Space `{repo_id}`."
except Exception as e:
logger.exception(f"Error in view_space_files for {repo_id_for_error}:")
return f"Error listing files for `{repo_id_for_error}`: {str(e)}"
# Function to update a Space file
def update_space_file(api_token, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
"""Update a file in a Hugging Face Space with a commit."""
repo_id_for_error = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
if not api_token:
return "Error: Please provide a valid Hugging Face API token."
repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
if err:
return err
if not file_path_in_repo:
return "Error: File Path cannot be empty."
if not commit_message_ui:
commit_message_ui = f"Update {file_path_in_repo} via Space Builder"
with tempfile.TemporaryDirectory() as temp_dir:
repo_local_clone_path = Path(temp_dir) / "cloned_space_repo"
cloned_repo = Repository(
local_dir=str(repo_local_clone_path), # Repository expects string path
clone_from=f"https://huggingface.co/spaces/{repo_id}", # Ensure this URL is correct
repo_type="space", # Correctly set
use_auth_token=api_token,
git_user="Space Builder Bot",
git_email="space-builder@huggingface.co"
)
logger.info(f"Cloned Space {repo_id} to {repo_local_clone_path}")
full_local_file_path = cloned_repo.local_dir / file_path_in_repo # Path object arithmetic
full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_local_file_path, "w", encoding="utf-8") as f:
f.write(file_content)
logger.info(f"Wrote updated file {file_path_in_repo} locally.")
cloned_repo.push_to_hub(commit_message=commit_message_ui)
logger.info(f"Pushed changes for {file_path_in_repo} to {repo_id}")
return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error}:")
return f"Error updating file for `{repo_id_for_error}`: {str(e)}" |