Spaces:
Sleeping
Sleeping
# core/stock_analysis.py | |
import requests | |
import json | |
from datetime import datetime, timedelta | |
from tradingview_ta import TA_Handler, Interval | |
from langchain.document_loaders import WebBaseLoader | |
from langchain.docstore.document import Document | |
from bs4 import BeautifulSoup | |
from GoogleNews import GoogleNews | |
from langchain.prompts import PromptTemplate | |
from langchain.chains import StuffDocumentsChain, LLMChain | |
def clean_google_news_url(url: str): | |
for ext in [".html", ".cms"]: | |
if ext in url: | |
return url.split(ext)[0] + ext | |
return url.split("&")[0] | |
def get_google_news_documents(query: str, max_articles: int = 10, timeout: int = 10): | |
googlenews = GoogleNews(lang="en") | |
end_date = datetime.today() | |
start_date = end_date - timedelta(days=2) | |
googlenews.set_time_range(start_date.strftime("%m/%d/%Y"), end_date.strftime("%m/%d/%Y")) | |
googlenews.search(query) | |
articles = googlenews.result() | |
documents = [] | |
for article in articles[:max_articles]: | |
url = clean_google_news_url(article.get("link")) | |
try: | |
response = requests.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"}) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
paragraphs = soup.find_all("p") | |
content = "\n".join([p.get_text(strip=True) for p in paragraphs if p.get_text(strip=True)]) | |
if content and len(content) > 200: | |
doc = Document( | |
page_content=content, | |
metadata={ | |
"source": "Google News", | |
"title": article.get("title", ""), | |
"published": article.get("date", ""), | |
"link": url, | |
} | |
) | |
documents.append(doc) | |
except Exception: | |
continue | |
return documents | |
def analyze_stock(ticker, llm): | |
try: | |
handler = TA_Handler(symbol=ticker, screener="india", exchange="NSE", interval=Interval.INTERVAL_1_DAY) | |
summary = handler.get_analysis().summary | |
except Exception: | |
return {"error": "Invalid ticker or failed to fetch trading data"} | |
urls = [ | |
f"https://www.google.com/finance/quote/{ticker}:NSE?hl=en", | |
f"https://in.tradingview.com/symbols/NSE-{ticker}/", | |
f"https://in.tradingview.com/symbols/NSE-{ticker}/news/", | |
f"https://in.tradingview.com/symbols/NSE-{ticker}/minds/" | |
] | |
loader = WebBaseLoader(urls) | |
web_docs = loader.load() | |
news_docs = get_google_news_documents(f"Trending News for {ticker}", max_articles=10) | |
docs = web_docs + news_docs | |
prompt_template = """You are an expert Stock Market Trader... | |
Context: | |
{input_documents} | |
Task: | |
Summarize the stock... | |
RETURN JSON with this schema: | |
```json | |
{{ | |
"stock_summary": {{ | |
"company_name": "", | |
"ticker": "", | |
"exchange": "", | |
... | |
}}, | |
... | |
}} | |
``` | |
""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
chain = StuffDocumentsChain(llm_chain=LLMChain(llm=llm, prompt=prompt), document_variable_name="input_documents") | |
response = chain.invoke({"input_documents": docs}) | |
raw = response["output_text"].strip() | |
# Clean code block markdown if present | |
if raw.startswith("```json"): | |
raw = raw[len("```json"):] | |
if raw.endswith("```"): | |
raw = raw[:-3] | |
try: | |
return json.loads(raw.strip()) | |
except json.JSONDecodeError: | |
return {"error": "Failed to parse model output", "raw": raw} | |