Spaces:
Runtime error
Runtime error
from html import escape | |
import os | |
import shutil | |
import time | |
from datetime import datetime, timedelta | |
import bleach | |
import requests | |
from flask import Flask, request, jsonify, render_template | |
from whoosh.index import create_in, open_dir | |
from whoosh.fields import Schema, TEXT, ID, BOOLEAN | |
from whoosh.qparser import QueryParser | |
app = Flask(__name__) | |
# Configure index directory and schema | |
BASE_INDEX_DIR = "discussion_indices" | |
CACHE_DURATION = timedelta(hours=24) | |
schema = Schema( | |
discussion_id=ID(stored=True), | |
title=TEXT(stored=True), | |
content=TEXT(stored=True), | |
author=TEXT(stored=True), | |
is_pr=BOOLEAN(stored=True), | |
is_open=BOOLEAN(stored=True) | |
) | |
def get_repo_index_dir(repo_name): | |
# Convert repo name to safe directory name | |
safe_name = repo_name.replace('/', '_') | |
return os.path.join(BASE_INDEX_DIR, safe_name) | |
def get_repo_last_indexed_file(repo_name): | |
return os.path.join(get_repo_index_dir(repo_name), 'last_indexed.txt') | |
def needs_reindex(repo_name): | |
last_indexed_file = get_repo_last_indexed_file(repo_name) | |
if not os.path.exists(last_indexed_file): | |
return True | |
with open(last_indexed_file, 'r') as f: | |
last_indexed = datetime.fromtimestamp(float(f.read().strip())) | |
return datetime.now() - last_indexed > CACHE_DURATION | |
def index_discussions(repo_name): | |
index_dir = get_repo_index_dir(repo_name) | |
# Clear and recreate index directory | |
if os.path.exists(index_dir): | |
shutil.rmtree(index_dir) | |
os.makedirs(index_dir, exist_ok=True) | |
# Create index | |
ix = create_in(index_dir, schema) | |
writer = ix.writer() | |
# Fetch and index discussions | |
discussions = requests.get(f'https://huggingface.co/api/{repo_name}/discussions').json() | |
for discussion in discussions['discussions']: | |
comments = requests.get( | |
f'https://huggingface.co/api/{repo_name}/discussions/{discussion["num"]}' | |
).json() | |
# Combine all comments into one content string | |
content = [] | |
for comment in comments['events']: | |
try: | |
if comment['type'] == 'comment': | |
authorname = comment['author']['name'] if 'author' in comment else 'deleted' | |
content.append(f'{authorname}: {comment["data"]["latest"]["raw"]}') | |
except KeyError: | |
print('Error in comment:') | |
print(comment) | |
writer.add_document( | |
discussion_id=str(discussion["num"]), | |
title=discussion["title"], | |
content=f"Title: {discussion['title']}\n\n" + '\n'.join(content), | |
author=discussion["author"]["name"] if "author" in discussion else 'deleted', | |
is_pr=discussion["isPullRequest"], | |
is_open=discussion["status"] == "open" | |
) | |
writer.commit() | |
# Update last indexed timestamp | |
with open(get_repo_last_indexed_file(repo_name), 'w') as f: | |
f.write(str(time.time())) | |
def index(): | |
repo_name = request.args.get('repo') | |
query = request.args.get('query') | |
if not repo_name: | |
return render_template('no_repo.html') | |
if not repo_name.startswith('spaces/') and not repo_name.startswith('datasets/') and not repo_name.startswith('models/'): | |
repo_name = f'models/{repo_name}' | |
repo_name = '/'.join(repo_name.split('/')[:3]) | |
return render_template('index.html', repo_name=repo_name, query=query) | |
def search(): | |
data = request.json | |
query = data.get('query') | |
repo_name = data.get('repo') | |
if not repo_name: | |
return jsonify({'error': 'No repository provided'}), 400 | |
if not query: | |
return jsonify({'error': 'No query provided'}), 400 | |
# Check if we need to reindex | |
if needs_reindex(repo_name): | |
index_discussions(repo_name) | |
# Search the index | |
ix = open_dir(get_repo_index_dir(repo_name)) | |
with ix.searcher() as searcher: | |
query_parser = QueryParser("content", ix.schema) | |
q = query_parser.parse(query) | |
results = searcher.search(q) | |
if repo_name.startswith('models/'): | |
url_repo_name = repo_name[7:] | |
else: | |
url_repo_name = repo_name | |
# Format results | |
formatted_results = [{ | |
'discussion_id': escape(result['discussion_id']), | |
'url': f'https://huggingface.co/{url_repo_name}/discussions/{result["discussion_id"]}', | |
'title': escape(result['title']), | |
'author': escape(result['author']), | |
'excerpt': bleach.clean(result.highlights("content"), tags=['b'], strip=True), | |
'is_pr': result['is_pr'], | |
'is_open': result['is_open'] | |
} for result in results] | |
return jsonify({'results': formatted_results}) | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860) |