import os import spacy import stanza import pandas as pd import re import docx from collections import Counter from transformers import pipeline import torch from langdetect import detect import streamlit as st import io from newspaper import Article import concurrent.futures # =============================== # 🔑 Vertex AI Setup # =============================== import vertexai from vertexai.preview.generative_models import GenerativeModel import json import tempfile # Ensure GCP credentials exist if "GCP_SERVICE_ACCOUNT_JSON" not in os.environ: raise RuntimeError("❌ GCP_SERVICE_ACCOUNT_JSON secret not found in Hugging Face Space") # Write the JSON secret into a temp file with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as f: f.write(os.environ["GCP_SERVICE_ACCOUNT_JSON"].encode("utf-8")) SERVICE_ACCOUNT_PATH = f.name os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = SERVICE_ACCOUNT_PATH PROJECT_ID = "prod-project-jnm-smart-cms" REGION = "us-central1" # Initialize Vertex AI vertexai.init(project=PROJECT_ID, location=REGION) # Load Gemini 2.5 Pro model with Flash fallback try: gemini_model = GenerativeModel("publishers/google/models/gemini-2.5-pro") except Exception as e: st.warning(f"⚠️ Falling back to Gemini 2.5 Flash due to: {e}") gemini_model = GenerativeModel("publishers/google/models/gemini-2.5-flash") # =============================== # Safe SpaCy + Stanza Loads # =============================== def safe_load_spacy(): try: return spacy.load("en_core_web_trf") except OSError: try: return spacy.load("en_core_web_sm") except OSError: os.system("python -m spacy download en_core_web_sm") return spacy.load("en_core_web_sm") nlp_en = safe_load_spacy() stanza_dir = os.path.expanduser("~/.stanza_resources") if not os.path.exists(os.path.join(stanza_dir, "hi")): stanza.download("hi") if not os.path.exists(os.path.join(stanza_dir, "ta")): stanza.download("ta") nlp_hi = stanza.Pipeline("hi", processors="tokenize,pos", use_gpu=torch.cuda.is_available()) nlp_ta = stanza.Pipeline("ta", processors="tokenize,pos", use_gpu=torch.cuda.is_available()) # =============================== # Streamlit run check # =============================== if not hasattr(st, "runtime") or not getattr(st.runtime, "exists", lambda: False)(): print("\n⚠️ WARNING: Run with `streamlit run app.py` instead of `python app.py`\n") # =============================== # Load Hugging Face Pipelines # =============================== def load_pipelines(language_code): lang = language_code.upper() device = 0 if torch.cuda.is_available() else -1 st.write(f"🌍 Language detected: {lang}") st.write(f"Device set to use {'cuda:0' if device == 0 else 'cpu'}") if lang == "EN": emo_model = "SamLowe/roberta-base-go_emotions" elif lang in ["HI", "TA"]: emo_model = "bhadresh-savani/bert-base-go-emotion" else: emo_model = "SamLowe/roberta-base-go_emotions" emotion_pipeline = pipeline( "text-classification", model=emo_model, tokenizer=emo_model, return_all_scores=True, device=device ) if lang == "EN": sent_model = "distilbert-base-uncased-finetuned-sst-2-english" else: sent_model = "cardiffnlp/twitter-xlm-roberta-base-sentiment-multilingual" sentiment_pipeline = pipeline( "text-classification", model=sent_model, tokenizer=sent_model, return_all_scores=True, device=device ) return emotion_pipeline, sentiment_pipeline # =============================== # DOCX Reader # =============================== def read_and_split_articles(file_path): doc = docx.Document(file_path) paragraphs = [] for para in doc.paragraphs: if para.text.strip(): paragraphs.append(para.text.strip()) headline = paragraphs[0] if paragraphs else "" body_paragraphs = paragraphs[1:] if len(paragraphs) > 1 else [] return headline, body_paragraphs # =============================== # URL Reader # =============================== def read_article_from_url(url): article = Article(url) article.download() article.parse() headline = article.title.strip() if article.title else "" text_body = article.text.strip() if article.text else "" body_paragraphs = [p.strip() for p in text_body.split("\n") if p.strip()] return headline, body_paragraphs # =============================== # Filter Neutral Emotions # =============================== def filter_neutral(emotion_results, neutral_threshold=0.75): sorted_results = sorted(emotion_results, key=lambda x: x["score"], reverse=True) scores = {} for r in sorted_results: scores[r["label"]] = round(r["score"], 3) if "neutral" in scores and scores["neutral"] > neutral_threshold: scores.pop("neutral") return scores # =============================== # Split Sentences # =============================== def split_sentences(text, lang): if lang == "hi": sentences = re.split(r"।", text) return [s.strip() for s in sentences if s.strip()] elif lang == "ta": sentences = re.split(r"\.", text) return [s.strip() for s in sentences if s.strip()] else: doc = nlp_en(text) return [sent.text.strip() for sent in doc.sents if sent.text.strip()] # =============================== # POS Tagging # =============================== def get_pos_tags(sentence, lang): if lang == "en": doc = nlp_en(sentence) return [(token.text, token.pos_) for token in doc] elif lang == "hi": doc = nlp_hi(sentence) tags = [] for sent in doc.sentences: for word in sent.words: tags.append((word.text, word.upos)) return tags elif lang == "ta": doc = nlp_ta(sentence) tags = [] for sent in doc.sentences: for word in sent.words: tags.append((word.text, word.upos)) return tags return [] # =============================== # Normalize Scores # =============================== def normalize_scores(scores: dict): if not scores: return scores max_val = max(scores.values()) if max_val == 0: return scores normalized = {} for k, v in scores.items(): normalized[k] = round(v / max_val, 3) return normalized # =============================== # Clean Paragraphs (remove embeds/promos) # =============================== def clean_paragraphs(paragraphs): cleaned = [] for para in paragraphs: text = para.strip() if not text: continue upper_text = text.upper() if upper_text.startswith(("ALSO READ", "READ ALSO", "TRENDING", "MUST READ")): continue if "और पढ़ें" in text or "यह भी पढ़ें" in text or "पूरा पढ़ें" in text: continue if len(text.split()) < 5 and ":" in text: continue cleaned.append(text) return cleaned # =============================== # Gemini Insight Generation (patched with guardrails + snippet rewrites) # =============================== # =============================== # Gemini Insight Generation (patched with guardrails + snippet rewrites + Gemini emotions/sentiment) # =============================== # =============================== # Gemini Insight Generation (no Top 3 emotions, skip Gemini scoring if no rewrite) # =============================== # =============================== # Gemini Insight Generation (only Gemini sentiment + top 3 emotions) # =============================== # =============================== # Gemini Insight Generation (only Gemini sentiment + top 3 emotions, with context scoring) # =============================== def generate_insight(text, emotions, sentiment, level="Paragraph", emotion_pipeline=None, sentiment_pipeline=None): try: # Always ask Gemini prompt = f""" You are a seasoned human editor with a natural, conversational tone — not robotic or formulaic. Text to review: {text} Task: - Identify the *specific phrase or sentence* that can be improved for clarity, tone, or impact. - Present it as: Original → [the exact part] Rewrite → [a natural, human-sounding rewrite — avoid over-polishing or AI tone] Why → [briefly explain the edit as if giving human feedback — e.g., “This reads more fluidly” or “Helps it sound more direct.”] Guidelines: - Use everyday phrasing and mild imperfections that feel authentic. - Avoid mechanical transitions like “Overall,” “In summary,” or “This small change.” - Vary sentence rhythm and tone to mimic human writing. - Keep rewrites short and organic, not overly polished. - If the text is already fine, say exactly: No rewrite needed. The {level.lower()} reads naturally and clearly. """ response_text = None for model_id, timeout in [ ("publishers/google/models/gemini-2.5-pro", 40), ("publishers/google/models/gemini-2.5-flash", 25), ]: try: model = GenerativeModel(model_id) with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(lambda: model.generate_content(prompt)) response = future.result(timeout=timeout) if response and getattr(response, "text", None): response_text = response.text.strip() break except concurrent.futures.TimeoutError: st.warning(f"⚡ {model_id} timed out, trying fallback...") continue except Exception as e: st.warning(f"⚠️ {model_id} failed: {e}") continue if not response_text: return {}, f"⚠️ No insight generated." # If Gemini says no rewrite → just show that (no extra scoring) if response_text.startswith("No rewrite needed"): return {}, f"✅ {response_text}" # Otherwise, re-score Gemini rewrite using context (Original + Rewrite) gemini_emotions, gemini_sentiment = {}, {} if emotion_pipeline is not None and sentiment_pipeline is not None: context_for_scoring = f"Original: {text}\nRewrite: {response_text}" emo_res_new = emotion_pipeline(context_for_scoring[:512])[0] gemini_emotions = filter_neutral(emo_res_new) sorted_emotions = sorted(gemini_emotions.items(), key=lambda x: x[1], reverse=True) gemini_emotions = dict(sorted_emotions[:3]) # keep top 3 senti_res_new = sentiment_pipeline(context_for_scoring[:512])[0] gemini_sentiment = max(senti_res_new, key=lambda x: x["score"]) # Guardrails on Gemini output if gemini_sentiment["label"].upper() == "NEGATIVE" and gemini_sentiment["score"] >= 0.8: return {}, f"✅ No rewrite needed. The {level.lower()} is clear and well written." negative_emotions = ["disapproval", "anger", "sadness", "fear", "disgust", "annoyance", "grief", "remorse"] for emo, score in gemini_emotions.items(): if emo.lower() in negative_emotions and score >= 0.8: return {}, f"✅ No rewrite needed. The {level.lower()} is clear and well written." if gemini_emotions.get("approval", 0) > 0.6 and gemini_emotions.get("disapproval", 0) > 0.6: return {}, f"✅ No rewrite needed. The {level.lower()} is clear and well written." # Badge indicator badge = "✍️" # Format Gemini insight with rewrite emotions & sentiment gem_emo_text = ", ".join([f"{k}: {v}" for k, v in gemini_emotions.items()]) if gemini_emotions else "N/A" gem_sent_text = f"{gemini_sentiment.get('label','N/A')} ({round(gemini_sentiment.get('score',0),3)})" if gemini_sentiment else "N/A" final_text = ( f"{badge} {response_text}\n\n" f"✨ Gemini Rewrite Sentiment: {gem_sent_text}\n" f"✨ Gemini Rewrite Top Emotions: {gem_emo_text}" ) return gemini_emotions, final_text except Exception as e: return {}, f"⚠️ Insight generation failed: {str(e)}" # =============================== # Main Analyzer # =============================== def analyze_article(headline, paragraphs, lang, emotion_pipeline, sentiment_pipeline): export_rows = [] paragraphs = clean_paragraphs(paragraphs) st.write(f"📑 Paragraphs detected (after cleaning): {len(paragraphs)}") weighted_scores, total_length, all_sentiments = {}, 0, [] # ----------------------- # Headline Analysis # ----------------------- if headline: st.subheader("📰 HEADLINE") emo_results = emotion_pipeline(headline[:512])[0] filtered = filter_neutral(emo_results) headline_emotions = normalize_scores(filtered) sorted_headline = sorted(headline_emotions.items(), key=lambda x: x[1], reverse=True) headline_emotions = dict(sorted_headline[:10]) senti_res = sentiment_pipeline(headline[:512])[0] headline_sentiment = max(senti_res, key=lambda x: x["score"]) st.write("Headline →", headline) st.write("Emotions →", headline_emotions) st.write("Sentiment →", headline_sentiment) top3_headline, headline_insight = generate_insight( headline, headline_emotions, headline_sentiment, "Headline", emotion_pipeline=emotion_pipeline, sentiment_pipeline=sentiment_pipeline ) st.write(headline_insight) export_rows.append({ "Type": "Headline","Text": headline, "Emotions": headline_emotions,"Sentiment": headline_sentiment, "Top3": dict(top3_headline),"Insight": headline_insight }) # ----------------------- # Overall Article Analysis # ----------------------- if paragraphs: for p in paragraphs: length = len(p.split()) total_length += length emo_res = emotion_pipeline(p[:512])[0] filtered = filter_neutral(emo_res) for emo, score in filtered.items(): weighted_scores[emo] = weighted_scores.get(emo, 0) + score * length senti_res = sentiment_pipeline(p[:512])[0] best_senti = max(senti_res, key=lambda x: x["score"]) all_sentiments.append(best_senti) if total_length > 0: for emo in weighted_scores: weighted_scores[emo] = weighted_scores[emo] / total_length weighted_scores = normalize_scores(weighted_scores) sorted_scores = sorted(weighted_scores.items(), key=lambda x: x[1], reverse=True) weighted_scores = dict(sorted_scores[:10]) overall_sentiment = max(all_sentiments, key=lambda x: x["score"]) if all_sentiments else {} st.subheader("📊 OVERALL (Weighted)") st.write("Emotions →", weighted_scores) st.write("Sentiment →", overall_sentiment) top3_overall, overall_insight = generate_insight( "\n\n".join(paragraphs), weighted_scores, overall_sentiment, "Overall Article", emotion_pipeline=emotion_pipeline, sentiment_pipeline=sentiment_pipeline ) st.write(overall_insight) export_rows.append({ "Type": "Overall","Text": "Weighted across article", "Emotions": weighted_scores,"Sentiment": overall_sentiment, "Top3": dict(top3_overall),"Insight": overall_insight }) # ----------------------- # Paragraph Analysis # ----------------------- for p_idx, para in enumerate(paragraphs, start=1): para_counter, para_sentiments = Counter(), [] sentences = split_sentences(para, lang[:2]) for sentence in sentences: results = emotion_pipeline(sentence[:512])[0] filtered = filter_neutral(results) for emo, score in filtered.items(): para_counter[emo] += score senti_res = sentiment_pipeline(sentence[:512])[0] best_senti = max(senti_res, key=lambda x: x["score"]) para_sentiments.append(best_senti) para_emotions = normalize_scores(dict(para_counter)) sorted_para = sorted(para_emotions.items(), key=lambda x: x[1], reverse=True) para_emotions = dict(sorted_para[:10]) para_sentiment = max(para_sentiments, key=lambda x: x["score"]) if para_sentiments else {} st.subheader(f"📑 Paragraph {p_idx}") st.write(para) st.write("Emotions →", para_emotions) st.write("Sentiment →", para_sentiment) top3_para, insight = generate_insight( para, para_emotions, para_sentiment, "Paragraph", emotion_pipeline=emotion_pipeline, sentiment_pipeline=sentiment_pipeline ) st.write(insight) export_rows.append({ "Type": "Paragraph","Text": para, "Emotions": para_emotions,"Sentiment": para_sentiment, "Top3": dict(top3_para),"Insight": insight }) return export_rows # =============================== # Streamlit App # =============================== st.title("📑 Multilingual Text Emotion + Sentiment Analyzer") download_top = st.empty() uploaded_file = st.file_uploader("Upload a DOCX file", type=["docx"]) url_input = st.text_input("Or enter an Article URL") text_input = st.text_area("Or paste text here") if st.button("🔍 Analyze"): with st.spinner("Running analysis... ⏳"): if uploaded_file: headline, paragraphs = read_and_split_articles(uploaded_file) elif url_input.strip(): headline, paragraphs = read_article_from_url(url_input) elif text_input.strip(): all_lines = [l.strip() for l in text_input.split("\n") if l.strip()] headline = all_lines[0] if all_lines else "" paragraphs = all_lines[1:] if len(all_lines) > 1 else [] else: st.warning("Please provide text input.") st.stop() detected_lang = detect((headline + " " + " ".join(paragraphs))[:200]) if (headline or paragraphs) else "en" emotion_pipeline, sentiment_pipeline = load_pipelines(detected_lang) export_rows = analyze_article(headline, paragraphs, detected_lang, emotion_pipeline, sentiment_pipeline) df_export = pd.DataFrame(export_rows) csv = df_export.to_csv(index=False).encode("utf-8") with download_top.container(): st.download_button("⬇️ Download CSV", csv, "analysis_results.csv", "text/csv", use_container_width=True) excel_buffer = io.BytesIO() df_export.to_excel(excel_buffer, index=False, engine="xlsxwriter") st.download_button("⬇️ Download Excel", excel_buffer, "analysis_results.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", use_container_width=True)