import gradio as gr from googleapiclient.discovery import build from groq import Groq import os import sqlite3 from datetime import datetime, timedelta import re import uuid API_KEY = os.getenv("YOUTUBE_API_KEY") GROQ_API_KEY = os.getenv("GROQ_API_KEY") youtube = build("youtube", "v3", developerKey=API_KEY) groq_client = Groq(api_key=GROQ_API_KEY) if GROQ_API_KEY else None # Global storage ai_pick_storage = {"videos": [], "ratings": {}, "timestamp": None} # ============================================ # ๐ UI Language Translations # ============================================ UI_LANG = { "en": { "title": "๐ฌ YOUTUBE TREND ANALYZER ๐", "search_keyword": "Search Keyword", "enter_keyword": "Enter keyword...", "search": "๐ SEARCH", "refresh": "๐ Refresh", "country": "Country", "language": "Language", "sort_by": "Sort By", "period": "Period", "max_results": "Max Results", "click_autofill": "Click to auto-fill", "total": "Total", "results": "results", "views": "Views", "likes": "Likes", "subs": "Subs", "date": "Date", "rank": "Rank", "thumb": "Thumb", "title_col": "Title", "channel": "Channel", "comments": "Cmts", "ai_pick_col": "AI Pick", "no_keyword": "Please enter a search keyword!", "no_results": "No results found.", "sort_options": {"Most Viewed": "viewCount", "Latest": "date", "Relevance": "relevance", "Top Rated": "rating"}, "date_options": {"All Time": "", "Today": "today", "This Week": "thisWeek", "This Month": "thisMonth", "This Year": "thisYear"}, }, "ko": { "title": "๐ฌ ์ ํ๋ธ ํธ๋ ๋ ๋ถ์๊ธฐ ๐", "search_keyword": "๊ฒ์์ด", "enter_keyword": "๊ฒ์์ด ์ ๋ ฅ...", "search": "๐ ๊ฒ์", "refresh": "๐ ์๋ก๊ณ ์นจ", "country": "๊ตญ๊ฐ", "language": "์ธ์ด", "sort_by": "์ ๋ ฌ", "period": "๊ธฐ๊ฐ", "max_results": "์ต๋ ๊ฒฐ๊ณผ", "click_autofill": "ํด๋ฆญ์ ์๋ ์ ๋ ฅ", "total": "์ด", "results": "๊ฐ ๊ฒฐ๊ณผ", "views": "์กฐํ์", "likes": "์ข์์", "subs": "๊ตฌ๋ ์", "date": "๋ ์ง", "rank": "์์", "thumb": "์ธ๋ค์ผ", "title_col": "์ ๋ชฉ", "channel": "์ฑ๋", "comments": "๋๊ธ", "ai_pick_col": "AI์ถ์ฒ", "no_keyword": "๊ฒ์์ด๋ฅผ ์ ๋ ฅํ์ธ์!", "no_results": "๊ฒ์ ๊ฒฐ๊ณผ๊ฐ ์์ต๋๋ค.", "sort_options": {"์กฐํ์ ์": "viewCount", "์ต์ ์": "date", "๊ด๋ จ์ฑ ์": "relevance", "ํ์ ์": "rating"}, "date_options": {"์ ์ฒด ๊ธฐ๊ฐ": "", "์ค๋": "today", "์ด๋ฒ ์ฃผ": "thisWeek", "์ด๋ฒ ๋ฌ": "thisMonth", "์ฌํด": "thisYear"}, } } # ============================================ # ๐จ CSS # ============================================ css = """ @import url('https://fonts.googleapis.com/css2?family=Bangers&family=Comic+Neue:wght@400;700&display=swap'); /* Hide ALL Hugging Face elements */ #space-header, .space-header, header, .huggingface-space-header, [data-testid="space-header"], .svelte-1ed2p3z, .svelte-kqij2n, .svelte-1kyws56, .wrap.svelte-1kyws56, button.svelte-1kyws56, .duplicate-button, .settings-button, [class*="settings"], [class*="duplicate"], .embed-buttons, .buttons-container, header button, .gr-button-icon, footer, .footer, .gradio-container footer, .built-with, [class*="footer"], .built-with-gradio, a[href*="gradio.app"], .gradio-container > div:first-child > button, .gradio-container > header { display: none !important; visibility: hidden !important; height: 0 !important; width: 0 !important; padding: 0 !important; margin: 0 !important; overflow: hidden !important; opacity: 0 !important; pointer-events: none !important; position: absolute !important; left: -9999px !important; } .gradio-container { background-color: #FEF9C3 !important; background-image: radial-gradient(#1F2937 1px, transparent 1px) !important; background-size: 20px 20px !important; min-height: 100vh !important; font-family: 'Comic Neue', cursive, sans-serif !important; } .header-text h1 { font-family: 'Bangers', cursive !important; color: #1F2937 !important; font-size: 2.8rem !important; text-align: center !important; text-shadow: 4px 4px 0px #FACC15, 6px 6px 0px #1F2937 !important; } .gr-panel, .gr-box, .gr-form, .block, .gr-group { background: #FFFFFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; box-shadow: 6px 6px 0px #1F2937 !important; } .gr-button-primary, button.primary { background: #3B82F6 !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; color: #FFFFFF !important; font-family: 'Bangers', cursive !important; font-size: 1.2rem !important; box-shadow: 5px 5px 0px #1F2937 !important; } .gr-button-primary:hover { background: #2563EB !important; } .gr-button-secondary, button.secondary { background: #EF4444 !important; border: 3px solid #1F2937 !important; color: #FFFFFF !important; font-family: 'Bangers', cursive !important; box-shadow: 4px 4px 0px #1F2937 !important; } textarea, input[type="text"] { background: #FFFFFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; font-family: 'Comic Neue', cursive !important; font-weight: 700 !important; } label { color: #1F2937 !important; font-family: 'Comic Neue', cursive !important; font-weight: 700 !important; } ::-webkit-scrollbar { width: 12px; } ::-webkit-scrollbar-track { background: #FEF9C3; border: 2px solid #1F2937; } ::-webkit-scrollbar-thumb { background: #3B82F6; border: 2px solid #1F2937; } ::selection { background: #FACC15; color: #1F2937; } /* LLM Result box */ .llm-result textarea { background: #1F2937 !important; color: #10B981 !important; border: 3px solid #10B981 !important; border-radius: 8px !important; font-family: 'Courier New', monospace !important; font-size: 14px !important; line-height: 1.6 !important; } """ # DB ์ด๊ธฐํ def init_db(): conn = sqlite3.connect("youtube_data.db") c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS videos ( video_id TEXT PRIMARY KEY, title TEXT, channel_id TEXT, channel_name TEXT, thumbnail TEXT, published_at TEXT, first_seen TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS video_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, views INTEGER, likes INTEGER, comments INTEGER, recorded_at TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS channels ( channel_id TEXT PRIMARY KEY, channel_name TEXT, first_seen TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS channel_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT, subscribers INTEGER, recorded_at TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS search_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT, country TEXT, language TEXT, sort_by TEXT, results_count INTEGER, searched_at TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS trending_alerts ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, alert_type TEXT, old_value INTEGER, new_value INTEGER, change_percent REAL, detected_at TEXT)''') conn.commit() conn.close() init_db() # Country & Language codes COUNTRIES = { "Worldwide": ("", ""), "United States": ("US", "en"), "United Kingdom": ("GB", "en"), "Canada": ("CA", "en"), "Australia": ("AU", "en"), "Germany": ("DE", "de"), "France": ("FR", "fr"), "Japan": ("JP", "ja"), "South Korea": ("KR", "ko"), "Brazil": ("BR", "pt"), "Mexico": ("MX", "es"), "Spain": ("ES", "es"), "Italy": ("IT", "it"), "Russia": ("RU", "ru"), "India": ("IN", "hi"), "Indonesia": ("ID", "id"), "Thailand": ("TH", "th"), "Vietnam": ("VN", "vi"), "Philippines": ("PH", "tl"), "Turkey": ("TR", "tr"), "Saudi Arabia": ("SA", "ar"), "Egypt": ("EG", "ar"), "South Africa": ("ZA", "en"), "Nigeria": ("NG", "en"), "Argentina": ("AR", "es"), "Colombia": ("CO", "es"), "Poland": ("PL", "pl"), "Netherlands": ("NL", "nl"), "Sweden": ("SE", "sv"), "Switzerland": ("CH", "de"), "Taiwan": ("TW", "zh"), "Hong Kong": ("HK", "zh"), "China": ("CN", "zh"), "Singapore": ("SG", "en"), "Malaysia": ("MY", "ms"), "UAE": ("AE", "ar"), } LANGUAGES = { "Auto (by Country)": "", "English": "en", "Korean": "ko", "Spanish": "es", "Portuguese": "pt", "French": "fr", "German": "de", "Italian": "it", "Russian": "ru", "Japanese": "ja", "Chinese": "zh", "Hindi": "hi", "Arabic": "ar", "Turkish": "tr", "Indonesian": "id", "Vietnamese": "vi", "Thai": "th", "Dutch": "nl", "Polish": "pl", "Swedish": "sv", } def format_count(count): if count is None: return "0" count = int(count) if count >= 1000000000: return f"{count/1000000000:.1f}B" elif count >= 1000000: return f"{count/1000000:.1f}M" elif count >= 1000: return f"{count/1000:.1f}K" return str(count) def call_llm(prompt, max_tokens=2000): if not groq_client: return "โ ๏ธ LLM API not configured. Set GROQ_API_KEY." try: completion = groq_client.chat.completions.create( model="openai/gpt-oss-120b", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_completion_tokens=max_tokens, top_p=1, stream=True, stop=None ) result = "" for chunk in completion: if chunk.choices[0].delta.content: result += chunk.choices[0].delta.content return result except Exception as e: return f"Error: {e}" def get_ai_pick_rating(videos_data): global ai_pick_storage if not videos_data: return {} if groq_client: try: sample = videos_data[:50] video_info = "\n".join([ f"#{i+1}. {v['title'][:40]}, Views:{v['views']}, Likes:{v['likes']}, Subs:{v.get('subs',0)}" for i, v in enumerate(sample) ]) prompt = f"Rate YouTube videos 0-4. 0=None,1=โฅ,2=โญ,3=โญโญ,4=โญโญโญ. Consider engagement, viral potential. Format: 1:3,2:2,3:4\n\n{video_info}\n\nResponse (number:rating only):" result = call_llm(prompt, 1500) if result and "Error" not in result and "โ ๏ธ" not in result: ratings = {} for idx, rating in re.findall(r'(\d+):(\d)', result): ratings[int(idx)-1] = int(rating) if len(videos_data) > 50: local = calculate_local_rating(videos_data[50:]) for k, v in local.items(): ratings[k + 50] = v ai_pick_storage = {"videos": videos_data, "ratings": ratings, "timestamp": datetime.now().isoformat()} return ratings except: pass ratings = calculate_local_rating(videos_data) ai_pick_storage = {"videos": videos_data, "ratings": ratings, "timestamp": datetime.now().isoformat()} return ratings def calculate_local_rating(videos_data): ratings = {} if not videos_data: return ratings views_list = [v['views'] for v in videos_data if v['views'] > 0] if not views_list: return {i: 0 for i in range(len(videos_data))} avg_views, max_views = sum(views_list)/len(views_list), max(views_list) for i, v in enumerate(videos_data): views, likes, comments, subs = v['views'], v['likes'], v['comments'], v.get('subs', 0) score = 0 if views > 0: score += min(40, (views/max_views)*40) score += min(30, ((likes+comments*2)/views)*300) if subs > 0 and views > 0: score += min(30, (views/subs)*10) elif views > avg_views: score += 15 if score >= 70: ratings[i] = 4 elif score >= 50: ratings[i] = 3 elif score >= 30: ratings[i] = 2 elif score >= 15: ratings[i] = 1 else: ratings[i] = 0 return ratings def get_rating_display(rating): return {0: "-", 1: "โฅ", 2: "โญ", 3: "โญโญ", 4: "โญโญโญ"}.get(rating, "-") def get_real_trending_keywords(region_code="US", language="en"): try: response = youtube.videos().list(part="snippet", chart="mostPopular", regionCode=region_code or "US", maxResults=50).execute() keywords, seen = [], set() for item in response.get("items", []): for tag in item["snippet"].get("tags", [])[:3]: if tag.lower() not in seen and 2 <= len(tag) <= 20: keywords.append(tag); seen.add(tag.lower()) channel = item["snippet"]["channelTitle"] if channel.lower() not in seen: keywords.append(channel); seen.add(channel.lower()) if len(keywords) >= 20: break return keywords[:20] if keywords else ["AI","gaming","music","vlog","shorts","news"] except: return ["AI","ChatGPT","gaming","music","vlog","shorts","news","tech"] def save_to_db(videos_data, channels_data, keyword, country, language, sort_by): conn = sqlite3.connect("youtube_data.db") c = conn.cursor() now = datetime.now().isoformat() c.execute('INSERT INTO search_history VALUES (NULL,?,?,?,?,?,?)', (keyword, country, language, sort_by, len(videos_data), now)) for video in videos_data: c.execute('INSERT OR IGNORE INTO videos VALUES (?,?,?,?,?,?,?)', (video['video_id'], video['title'], video['channel_id'], video['channel_name'], video['thumbnail'], video['published_at'], now)) c.execute('SELECT views FROM video_stats WHERE video_id=? ORDER BY recorded_at DESC LIMIT 1', (video['video_id'],)) prev = c.fetchone() c.execute('INSERT INTO video_stats VALUES (NULL,?,?,?,?,?)', (video['video_id'], video['views'], video['likes'], video['comments'], now)) if prev and prev[0] > 0: change = ((video['views'] - prev[0]) / prev[0]) * 100 if change >= 20: c.execute('INSERT INTO trending_alerts VALUES (NULL,?,?,?,?,?,?)', (video['video_id'], 'views_surge', prev[0], video['views'], change, now)) for ch_id, subs in channels_data.items(): c.execute('INSERT OR IGNORE INTO channels VALUES (?,?,?)', (ch_id, '', now)) if isinstance(subs, int): c.execute('INSERT INTO channel_stats VALUES (NULL,?,?,?)', (ch_id, subs, now)) conn.commit(); conn.close() def get_db_stats(): try: conn = sqlite3.connect("youtube_data.db") c = conn.cursor() stats = {} for t, k in [("videos","videos"),("video_stats","stats"),("channels","channels"),("search_history","searches"),("trending_alerts","alerts")]: c.execute(f"SELECT COUNT(*) FROM {t}"); stats[k] = c.fetchone()[0] conn.close() return stats except: return {"videos":0,"stats":0,"channels":0,"searches":0,"alerts":0} def update_trending(country): region, lang = COUNTRIES.get(country, ("", "")) return gr.update(choices=get_real_trending_keywords(region or "US", lang or "en"), value=None) def use_trending_keyword(kw): return kw if kw else "" # ============================================ # ๐ Main Search Function # ============================================ def search_videos(keyword, country, language, sort_by, date_filter, max_results, ui_lang): L = UI_LANG.get(ui_lang, UI_LANG["en"]) if not keyword or not keyword.strip(): return f"โ ๏ธ {L['no_keyword']}", "๐ DB: -" max_results = int(max_results) all_items, next_page = [], None region_code, default_lang = COUNTRIES.get(country, ("", "")) lang_code = default_lang if language in ["Auto (by Country)", "์๋ (๊ตญ๊ฐ ๊ธฐ๋ฐ)"] else LANGUAGES.get(language, "") sort_value = L["sort_options"].get(sort_by, "viewCount") date_value = L["date_options"].get(date_filter, "") params = {"q": keyword, "part": "snippet", "type": "video", "order": sort_value} if region_code: params["regionCode"] = region_code if lang_code: params["relevanceLanguage"] = lang_code if date_value: deltas = {"today": 1, "thisWeek": 7, "thisMonth": 30, "thisYear": 365} params["publishedAfter"] = (datetime.utcnow() - timedelta(days=deltas.get(date_value, 0))).strftime("%Y-%m-%dT%H:%M:%SZ") while len(all_items) < max_results: params["maxResults"] = min(50, max_results - len(all_items)) if next_page: params["pageToken"] = next_page try: resp = youtube.search().list(**params).execute() except Exception as e: return f"API Error: {e}", "๐ DB: Error" items = resp.get("items", []) if not items: break all_items.extend(items) next_page = resp.get("nextPageToken") if not next_page: break if not all_items: return f"{L['no_results']}", "๐ DB: -" video_ids = [item["id"]["videoId"] for item in all_items] channel_ids = list(set([item["snippet"]["channelId"] for item in all_items])) video_stats = {} for i in range(0, len(video_ids), 50): try: for v in youtube.videos().list(id=",".join(video_ids[i:i+50]), part="statistics").execute().get("items", []): s = v["statistics"] video_stats[v["id"]] = {"views": int(s.get("viewCount", 0)), "likes": int(s.get("likeCount", 0)), "comments": int(s.get("commentCount", 0))} except: pass channel_subs, channel_subs_raw = {}, {} for i in range(0, len(channel_ids), 50): try: for ch in youtube.channels().list(id=",".join(channel_ids[i:i+50]), part="statistics").execute().get("items", []): sub = ch["statistics"].get("subscriberCount", "0") if sub: channel_subs_raw[ch["id"]] = int(sub); channel_subs[ch["id"]] = format_count(int(sub)) except: pass videos_data = [] for item in all_items: vid, snip = item["id"]["videoId"], item["snippet"] st = video_stats.get(vid, {"views": 0, "likes": 0, "comments": 0}) videos_data.append({ "video_id": vid, "title": snip["title"], "channel_id": snip["channelId"], "channel_name": snip["channelTitle"], "thumbnail": snip["thumbnails"]["medium"]["url"], "published_at": snip["publishedAt"], "views": st.get("views", 0), "likes": st.get("likes", 0), "comments": st.get("comments", 0), "subs": channel_subs_raw.get(snip["channelId"], 0), }) ai_ratings = get_ai_pick_rating(videos_data) save_to_db(videos_data, channel_subs_raw, keyword, country, language, sort_by) uid = str(uuid.uuid4()).replace("-", "")[:8] html = f'''
| {L["rank"]} | {L["thumb"]} | {L["title_col"]} | {L["channel"]} | {L["subs"]} | {L["views"]} | {L["likes"]} | {L["comments"]} | {L["ai_pick_col"]} | {L["date"]} |
|---|---|---|---|---|---|---|---|---|---|
| {i+1} | {title_short} | {ch_short} | {format_count(v['subs'])} | {format_count(v['views'])} | {format_count(v['likes'])} | {format_count(v['comments'])} | {get_rating_display(rating)} | {v['published_at'][:10]} |
| ๐ {"์ ์" if is_ko else "What"} | {"์ด์ ๋๋น ์กฐํ์๊ฐ 20% ์ด์ ๊ธ์ฆํ ์์" if is_ko else "Videos with 20%+ view increase vs. previous check"} |
| ๐ฏ {"๋ชฉ์ " if is_ko else "Purpose"} | {"์ง๊ธ ๋ฐ๋ก ๋ฐ์ด๋ด ์ค์ธ ์์ ํฌ์ฐฉ" if is_ko else "Catch videos going viral RIGHT NOW"} |
| โฑ๏ธ {"์๋" if is_ko else "Trigger"} | {"๋์ผ ์์ ์ฌ๊ฒ์์ ์กฐํ์ ๋ณํ ๊ฐ์ง" if is_ko else "Detected when same video is searched again"} |
| ๐ก {"ํ์ฉ" if is_ko else "Best for"} | {"๋ด์ค, ์ด์, ํซํ ํฝ ๋ฐ๊ตด" if is_ko else "News, breaking stories, hot topics"} |
๐ข {"์์ง ๊ธ์์น ์๋ฆผ์ด ์์ต๋๋ค. ๊ฒ์์ ์ฌ๋ฌ ๋ฒ ์คํํ๋ฉด ์กฐํ์ ๋ณํ๋ฅผ ๊ฐ์งํฉ๋๋ค!" if is_ko else "No surge alerts yet. Run searches multiple times to detect view changes!"}
Error: {e}
" html = info_box + '| ๐ {"์ ์" if is_ko else "What"} | {"48์๊ฐ ๋์ ๊ฐ์ฅ ๋์ ์ฑ์ฅ๋ฅ ์ ๊ธฐ๋กํ ์์" if is_ko else "Videos with highest growth RATE over 48 hours"} |
| ๐ฏ {"๋ชฉ์ " if is_ko else "Purpose"} | {"๊พธ์คํ ์ฑ์ฅํ๋ ์ฝํ ์ธ ๋ฐ๊ตด" if is_ko else "Find consistently rising content"} |
| ๐ {"๊ณ์ฐ" if is_ko else "Formula"} | (Max - Min) / Min ร 100% |
| ๐ก {"ํ์ฉ" if is_ko else "Best for"} | {"์๋ฒ๊ทธ๋ฆฐ ์ฝํ ์ธ , ์์ ์ ํธ๋ ๋" if is_ko else "Evergreen content, stable trends"} |
๐ข {"๋ฐ์ดํฐ ์ถ์ ์ค์ ๋๋ค. ๊ฒ์์ ์ฌ๋ฌ ๋ฒ ์คํํ๋ฉด ์ฑ์ฅ๋ฅ ์ด ๊ณ์ฐ๋ฉ๋๋ค!" if is_ko else "Accumulating data. Run searches over time to calculate growth rates!"}
Error: {e}
" html = info_box + '{"๋จผ์ ๊ฒ์ ํญ์์ ๊ฒ์์ ์คํํ์ธ์!" if is_ko else "Run a search first in the Search tab!"}
{result}
{"โญโญ ์ด์ ๋ฑ๊ธ ์์" if is_ko else "โญโญ+ rated videos"}: {len(top_picks)}{"๊ฐ" if is_ko else " videos"}
{"โญโญ ์ด์ ๋ฑ๊ธ ์์์ด ์์ต๋๋ค. ๋ค๋ฅธ ํค์๋๋ก ๊ฒ์ํด๋ณด์ธ์!" if is_ko else "No โญโญ+ rated videos found. Try different keywords!"}
{idx}. {title}
{"๊ฒ์ ๊ธฐ๋ก์ด ์์ต๋๋ค." if is_ko else "No search history yet."}
| {"๊ฒ์์ด" if is_ko else "Keyword"} | {"๊ตญ๊ฐ" if is_ko else "Country"} | {"์ธ์ด" if is_ko else "Lang"} | {"์ ๋ ฌ" if is_ko else "Sort"} | {"๊ฒฐ๊ณผ" if is_ko else "Results"} | {"์๊ฐ" if is_ko else "Time"} |
|---|---|---|---|---|---|
| {kw} | {country} | {lang[:10] if lang else "-"} | {sort_by[:10] if sort_by else "-"} | {cnt} | {searched[:16].replace("T"," ")} |