from html import escape import os import shutil import time from datetime import datetime, timedelta import bleach import requests from flask import Flask, request, jsonify, render_template from whoosh.index import create_in, open_dir from whoosh.fields import Schema, TEXT, ID, BOOLEAN from whoosh.qparser import QueryParser app = Flask(__name__) # Configure index directory and schema BASE_INDEX_DIR = "discussion_indices" CACHE_DURATION = timedelta(hours=24) schema = Schema( discussion_id=ID(stored=True), title=TEXT(stored=True), content=TEXT(stored=True), author=TEXT(stored=True), is_pr=BOOLEAN(stored=True), is_open=BOOLEAN(stored=True) ) def get_repo_index_dir(repo_name): # Convert repo name to safe directory name safe_name = repo_name.replace('/', '_') return os.path.join(BASE_INDEX_DIR, safe_name) def get_repo_last_indexed_file(repo_name): return os.path.join(get_repo_index_dir(repo_name), 'last_indexed.txt') def needs_reindex(repo_name): last_indexed_file = get_repo_last_indexed_file(repo_name) if not os.path.exists(last_indexed_file): return True with open(last_indexed_file, 'r') as f: last_indexed = datetime.fromtimestamp(float(f.read().strip())) return datetime.now() - last_indexed > CACHE_DURATION def index_discussions(repo_name): index_dir = get_repo_index_dir(repo_name) # Clear and recreate index directory if os.path.exists(index_dir): shutil.rmtree(index_dir) os.makedirs(index_dir, exist_ok=True) # Create index ix = create_in(index_dir, schema) writer = ix.writer() # Fetch and index discussions discussions = requests.get(f'https://huggingface.co/api/{repo_name}/discussions').json() for discussion in discussions['discussions']: comments = requests.get( f'https://huggingface.co/api/{repo_name}/discussions/{discussion["num"]}' ).json() # Combine all comments into one content string content = [] for comment in comments['events']: try: if comment['type'] == 'comment': authorname = comment['author']['name'] if 'author' in comment else 'deleted' content.append(f'{authorname}: {comment["data"]["latest"]["raw"]}') except KeyError: print('Error in comment:') print(comment) writer.add_document( discussion_id=str(discussion["num"]), title=discussion["title"], content=f"Title: {discussion['title']}\n\n" + '\n'.join(content), author=discussion["author"]["name"] if "author" in discussion else 'deleted', is_pr=discussion["isPullRequest"], is_open=discussion["status"] == "open" ) writer.commit() # Update last indexed timestamp with open(get_repo_last_indexed_file(repo_name), 'w') as f: f.write(str(time.time())) @app.route('/') def index(): repo_name = request.args.get('repo') query = request.args.get('query') if not repo_name: return render_template('no_repo.html') if not repo_name.startswith('spaces/') and not repo_name.startswith('datasets/') and not repo_name.startswith('models/'): repo_name = f'models/{repo_name}' repo_name = '/'.join(repo_name.split('/')[:3]) return render_template('index.html', repo_name=repo_name, query=query) @app.route('/search', methods=['POST']) def search(): data = request.json query = data.get('query') repo_name = data.get('repo') if not repo_name: return jsonify({'error': 'No repository provided'}), 400 if not query: return jsonify({'error': 'No query provided'}), 400 # Check if we need to reindex if needs_reindex(repo_name): index_discussions(repo_name) # Search the index ix = open_dir(get_repo_index_dir(repo_name)) with ix.searcher() as searcher: query_parser = QueryParser("content", ix.schema) q = query_parser.parse(query) results = searcher.search(q) if repo_name.startswith('models/'): url_repo_name = repo_name[7:] else: url_repo_name = repo_name # Format results formatted_results = [{ 'discussion_id': escape(result['discussion_id']), 'url': f'https://huggingface.co/{url_repo_name}/discussions/{result["discussion_id"]}', 'title': escape(result['title']), 'author': escape(result['author']), 'excerpt': bleach.clean(result.highlights("content"), tags=['b'], strip=True), 'is_pr': result['is_pr'], 'is_open': result['is_open'] } for result in results] return jsonify({'results': formatted_results}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)