hf-search / app.py
mrfakename's picture
Update app.py
97c0482 verified
from html import escape
import os
import shutil
import time
from datetime import datetime, timedelta
import bleach
import requests
from flask import Flask, request, jsonify, render_template
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID, BOOLEAN
from whoosh.qparser import QueryParser
app = Flask(__name__)
# Configure index directory and schema
BASE_INDEX_DIR = "discussion_indices"
CACHE_DURATION = timedelta(hours=24)
schema = Schema(
discussion_id=ID(stored=True),
title=TEXT(stored=True),
content=TEXT(stored=True),
author=TEXT(stored=True),
is_pr=BOOLEAN(stored=True),
is_open=BOOLEAN(stored=True)
)
def get_repo_index_dir(repo_name):
# Convert repo name to safe directory name
safe_name = repo_name.replace('/', '_')
return os.path.join(BASE_INDEX_DIR, safe_name)
def get_repo_last_indexed_file(repo_name):
return os.path.join(get_repo_index_dir(repo_name), 'last_indexed.txt')
def needs_reindex(repo_name):
last_indexed_file = get_repo_last_indexed_file(repo_name)
if not os.path.exists(last_indexed_file):
return True
with open(last_indexed_file, 'r') as f:
last_indexed = datetime.fromtimestamp(float(f.read().strip()))
return datetime.now() - last_indexed > CACHE_DURATION
def index_discussions(repo_name):
index_dir = get_repo_index_dir(repo_name)
# Clear and recreate index directory
if os.path.exists(index_dir):
shutil.rmtree(index_dir)
os.makedirs(index_dir, exist_ok=True)
# Create index
ix = create_in(index_dir, schema)
writer = ix.writer()
# Fetch and index discussions
discussions = requests.get(f'https://huggingface.co/api/{repo_name}/discussions').json()
for discussion in discussions['discussions']:
comments = requests.get(
f'https://huggingface.co/api/{repo_name}/discussions/{discussion["num"]}'
).json()
# Combine all comments into one content string
content = []
for comment in comments['events']:
try:
if comment['type'] == 'comment':
authorname = comment['author']['name'] if 'author' in comment else 'deleted'
content.append(f'{authorname}: {comment["data"]["latest"]["raw"]}')
except KeyError:
print('Error in comment:')
print(comment)
writer.add_document(
discussion_id=str(discussion["num"]),
title=discussion["title"],
content=f"Title: {discussion['title']}\n\n" + '\n'.join(content),
author=discussion["author"]["name"] if "author" in discussion else 'deleted',
is_pr=discussion["isPullRequest"],
is_open=discussion["status"] == "open"
)
writer.commit()
# Update last indexed timestamp
with open(get_repo_last_indexed_file(repo_name), 'w') as f:
f.write(str(time.time()))
@app.route('/')
def index():
repo_name = request.args.get('repo')
query = request.args.get('query')
if not repo_name:
return render_template('no_repo.html')
if not repo_name.startswith('spaces/') and not repo_name.startswith('datasets/') and not repo_name.startswith('models/'):
repo_name = f'models/{repo_name}'
repo_name = '/'.join(repo_name.split('/')[:3])
return render_template('index.html', repo_name=repo_name, query=query)
@app.route('/search', methods=['POST'])
def search():
data = request.json
query = data.get('query')
repo_name = data.get('repo')
if not repo_name:
return jsonify({'error': 'No repository provided'}), 400
if not query:
return jsonify({'error': 'No query provided'}), 400
# Check if we need to reindex
if needs_reindex(repo_name):
index_discussions(repo_name)
# Search the index
ix = open_dir(get_repo_index_dir(repo_name))
with ix.searcher() as searcher:
query_parser = QueryParser("content", ix.schema)
q = query_parser.parse(query)
results = searcher.search(q)
if repo_name.startswith('models/'):
url_repo_name = repo_name[7:]
else:
url_repo_name = repo_name
# Format results
formatted_results = [{
'discussion_id': escape(result['discussion_id']),
'url': f'https://huggingface.co/{url_repo_name}/discussions/{result["discussion_id"]}',
'title': escape(result['title']),
'author': escape(result['author']),
'excerpt': bleach.clean(result.highlights("content"), tags=['b'], strip=True),
'is_pr': result['is_pr'],
'is_open': result['is_open']
} for result in results]
return jsonify({'results': formatted_results})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)