import gradio as gr from googleapiclient.discovery import build from groq import Groq import os import sqlite3 from datetime import datetime, timedelta import re import uuid API_KEY = os.getenv("YOUTUBE_API_KEY") GROQ_API_KEY = os.getenv("GROQ_API_KEY") youtube = build("youtube", "v3", developerKey=API_KEY) groq_client = Groq(api_key=GROQ_API_KEY) if GROQ_API_KEY else None # Global storage ai_pick_storage = {"videos": [], "ratings": {}, "timestamp": None} # ============================================ # ๐ŸŒ UI Language Translations # ============================================ UI_LANG = { "en": { "title": "๐ŸŽฌ YOUTUBE TREND ANALYZER ๐Ÿ“Š", "search_keyword": "Search Keyword", "enter_keyword": "Enter keyword...", "search": "๐Ÿ” SEARCH", "refresh": "๐Ÿ”„ Refresh", "country": "Country", "language": "Language", "sort_by": "Sort By", "period": "Period", "max_results": "Max Results", "click_autofill": "Click to auto-fill", "total": "Total", "results": "results", "views": "Views", "likes": "Likes", "subs": "Subs", "date": "Date", "rank": "Rank", "thumb": "Thumb", "title_col": "Title", "channel": "Channel", "comments": "Cmts", "ai_pick_col": "AI Pick", "no_keyword": "Please enter a search keyword!", "no_results": "No results found.", "sort_options": {"Most Viewed": "viewCount", "Latest": "date", "Relevance": "relevance", "Top Rated": "rating"}, "date_options": {"All Time": "", "Today": "today", "This Week": "thisWeek", "This Month": "thisMonth", "This Year": "thisYear"}, }, "ko": { "title": "๐ŸŽฌ ์œ ํŠœ๋ธŒ ํŠธ๋ Œ๋“œ ๋ถ„์„๊ธฐ ๐Ÿ“Š", "search_keyword": "๊ฒ€์ƒ‰์–ด", "enter_keyword": "๊ฒ€์ƒ‰์–ด ์ž…๋ ฅ...", "search": "๐Ÿ” ๊ฒ€์ƒ‰", "refresh": "๐Ÿ”„ ์ƒˆ๋กœ๊ณ ์นจ", "country": "๊ตญ๊ฐ€", "language": "์–ธ์–ด", "sort_by": "์ •๋ ฌ", "period": "๊ธฐ๊ฐ„", "max_results": "์ตœ๋Œ€ ๊ฒฐ๊ณผ", "click_autofill": "ํด๋ฆญ์‹œ ์ž๋™ ์ž…๋ ฅ", "total": "์ด", "results": "๊ฐœ ๊ฒฐ๊ณผ", "views": "์กฐํšŒ์ˆ˜", "likes": "์ข‹์•„์š”", "subs": "๊ตฌ๋…์ž", "date": "๋‚ ์งœ", "rank": "์ˆœ์œ„", "thumb": "์ธ๋„ค์ผ", "title_col": "์ œ๋ชฉ", "channel": "์ฑ„๋„", "comments": "๋Œ“๊ธ€", "ai_pick_col": "AI์ถ”์ฒœ", "no_keyword": "๊ฒ€์ƒ‰์–ด๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”!", "no_results": "๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.", "sort_options": {"์กฐํšŒ์ˆ˜ ์ˆœ": "viewCount", "์ตœ์‹ ์ˆœ": "date", "๊ด€๋ จ์„ฑ ์ˆœ": "relevance", "ํ‰์  ์ˆœ": "rating"}, "date_options": {"์ „์ฒด ๊ธฐ๊ฐ„": "", "์˜ค๋Š˜": "today", "์ด๋ฒˆ ์ฃผ": "thisWeek", "์ด๋ฒˆ ๋‹ฌ": "thisMonth", "์˜ฌํ•ด": "thisYear"}, } } # ============================================ # ๐ŸŽจ CSS # ============================================ css = """ @import url('https://fonts.googleapis.com/css2?family=Bangers&family=Comic+Neue:wght@400;700&display=swap'); /* Hide ALL Hugging Face elements */ #space-header, .space-header, header, .huggingface-space-header, [data-testid="space-header"], .svelte-1ed2p3z, .svelte-kqij2n, .svelte-1kyws56, .wrap.svelte-1kyws56, button.svelte-1kyws56, .duplicate-button, .settings-button, [class*="settings"], [class*="duplicate"], .embed-buttons, .buttons-container, header button, .gr-button-icon, footer, .footer, .gradio-container footer, .built-with, [class*="footer"], .built-with-gradio, a[href*="gradio.app"], .gradio-container > div:first-child > button, .gradio-container > header { display: none !important; visibility: hidden !important; height: 0 !important; width: 0 !important; padding: 0 !important; margin: 0 !important; overflow: hidden !important; opacity: 0 !important; pointer-events: none !important; position: absolute !important; left: -9999px !important; } .gradio-container { background-color: #FEF9C3 !important; background-image: radial-gradient(#1F2937 1px, transparent 1px) !important; background-size: 20px 20px !important; min-height: 100vh !important; font-family: 'Comic Neue', cursive, sans-serif !important; } .header-text h1 { font-family: 'Bangers', cursive !important; color: #1F2937 !important; font-size: 2.8rem !important; text-align: center !important; text-shadow: 4px 4px 0px #FACC15, 6px 6px 0px #1F2937 !important; } .gr-panel, .gr-box, .gr-form, .block, .gr-group { background: #FFFFFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; box-shadow: 6px 6px 0px #1F2937 !important; } .gr-button-primary, button.primary { background: #3B82F6 !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; color: #FFFFFF !important; font-family: 'Bangers', cursive !important; font-size: 1.2rem !important; box-shadow: 5px 5px 0px #1F2937 !important; } .gr-button-primary:hover { background: #2563EB !important; } .gr-button-secondary, button.secondary { background: #EF4444 !important; border: 3px solid #1F2937 !important; color: #FFFFFF !important; font-family: 'Bangers', cursive !important; box-shadow: 4px 4px 0px #1F2937 !important; } textarea, input[type="text"] { background: #FFFFFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; font-family: 'Comic Neue', cursive !important; font-weight: 700 !important; } label { color: #1F2937 !important; font-family: 'Comic Neue', cursive !important; font-weight: 700 !important; } ::-webkit-scrollbar { width: 12px; } ::-webkit-scrollbar-track { background: #FEF9C3; border: 2px solid #1F2937; } ::-webkit-scrollbar-thumb { background: #3B82F6; border: 2px solid #1F2937; } ::selection { background: #FACC15; color: #1F2937; } /* LLM Result box */ .llm-result textarea { background: #1F2937 !important; color: #10B981 !important; border: 3px solid #10B981 !important; border-radius: 8px !important; font-family: 'Courier New', monospace !important; font-size: 14px !important; line-height: 1.6 !important; } """ # DB ์ดˆ๊ธฐํ™” def init_db(): conn = sqlite3.connect("youtube_data.db") c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS videos ( video_id TEXT PRIMARY KEY, title TEXT, channel_id TEXT, channel_name TEXT, thumbnail TEXT, published_at TEXT, first_seen TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS video_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, views INTEGER, likes INTEGER, comments INTEGER, recorded_at TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS channels ( channel_id TEXT PRIMARY KEY, channel_name TEXT, first_seen TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS channel_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT, subscribers INTEGER, recorded_at TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS search_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT, country TEXT, language TEXT, sort_by TEXT, results_count INTEGER, searched_at TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS trending_alerts ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, alert_type TEXT, old_value INTEGER, new_value INTEGER, change_percent REAL, detected_at TEXT)''') conn.commit() conn.close() init_db() # Country & Language codes COUNTRIES = { "Worldwide": ("", ""), "United States": ("US", "en"), "United Kingdom": ("GB", "en"), "Canada": ("CA", "en"), "Australia": ("AU", "en"), "Germany": ("DE", "de"), "France": ("FR", "fr"), "Japan": ("JP", "ja"), "South Korea": ("KR", "ko"), "Brazil": ("BR", "pt"), "Mexico": ("MX", "es"), "Spain": ("ES", "es"), "Italy": ("IT", "it"), "Russia": ("RU", "ru"), "India": ("IN", "hi"), "Indonesia": ("ID", "id"), "Thailand": ("TH", "th"), "Vietnam": ("VN", "vi"), "Philippines": ("PH", "tl"), "Turkey": ("TR", "tr"), "Saudi Arabia": ("SA", "ar"), "Egypt": ("EG", "ar"), "South Africa": ("ZA", "en"), "Nigeria": ("NG", "en"), "Argentina": ("AR", "es"), "Colombia": ("CO", "es"), "Poland": ("PL", "pl"), "Netherlands": ("NL", "nl"), "Sweden": ("SE", "sv"), "Switzerland": ("CH", "de"), "Taiwan": ("TW", "zh"), "Hong Kong": ("HK", "zh"), "China": ("CN", "zh"), "Singapore": ("SG", "en"), "Malaysia": ("MY", "ms"), "UAE": ("AE", "ar"), } LANGUAGES = { "Auto (by Country)": "", "English": "en", "Korean": "ko", "Spanish": "es", "Portuguese": "pt", "French": "fr", "German": "de", "Italian": "it", "Russian": "ru", "Japanese": "ja", "Chinese": "zh", "Hindi": "hi", "Arabic": "ar", "Turkish": "tr", "Indonesian": "id", "Vietnamese": "vi", "Thai": "th", "Dutch": "nl", "Polish": "pl", "Swedish": "sv", } def format_count(count): if count is None: return "0" count = int(count) if count >= 1000000000: return f"{count/1000000000:.1f}B" elif count >= 1000000: return f"{count/1000000:.1f}M" elif count >= 1000: return f"{count/1000:.1f}K" return str(count) def call_llm(prompt, max_tokens=2000): if not groq_client: return "โš ๏ธ LLM API not configured. Set GROQ_API_KEY." try: completion = groq_client.chat.completions.create( model="openai/gpt-oss-120b", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_completion_tokens=max_tokens, top_p=1, stream=True, stop=None ) result = "" for chunk in completion: if chunk.choices[0].delta.content: result += chunk.choices[0].delta.content return result except Exception as e: return f"Error: {e}" def get_ai_pick_rating(videos_data): global ai_pick_storage if not videos_data: return {} if groq_client: try: sample = videos_data[:50] video_info = "\n".join([ f"#{i+1}. {v['title'][:40]}, Views:{v['views']}, Likes:{v['likes']}, Subs:{v.get('subs',0)}" for i, v in enumerate(sample) ]) prompt = f"Rate YouTube videos 0-4. 0=None,1=โ™ฅ,2=โญ,3=โญโญ,4=โญโญโญ. Consider engagement, viral potential. Format: 1:3,2:2,3:4\n\n{video_info}\n\nResponse (number:rating only):" result = call_llm(prompt, 1500) if result and "Error" not in result and "โš ๏ธ" not in result: ratings = {} for idx, rating in re.findall(r'(\d+):(\d)', result): ratings[int(idx)-1] = int(rating) if len(videos_data) > 50: local = calculate_local_rating(videos_data[50:]) for k, v in local.items(): ratings[k + 50] = v ai_pick_storage = {"videos": videos_data, "ratings": ratings, "timestamp": datetime.now().isoformat()} return ratings except: pass ratings = calculate_local_rating(videos_data) ai_pick_storage = {"videos": videos_data, "ratings": ratings, "timestamp": datetime.now().isoformat()} return ratings def calculate_local_rating(videos_data): ratings = {} if not videos_data: return ratings views_list = [v['views'] for v in videos_data if v['views'] > 0] if not views_list: return {i: 0 for i in range(len(videos_data))} avg_views, max_views = sum(views_list)/len(views_list), max(views_list) for i, v in enumerate(videos_data): views, likes, comments, subs = v['views'], v['likes'], v['comments'], v.get('subs', 0) score = 0 if views > 0: score += min(40, (views/max_views)*40) score += min(30, ((likes+comments*2)/views)*300) if subs > 0 and views > 0: score += min(30, (views/subs)*10) elif views > avg_views: score += 15 if score >= 70: ratings[i] = 4 elif score >= 50: ratings[i] = 3 elif score >= 30: ratings[i] = 2 elif score >= 15: ratings[i] = 1 else: ratings[i] = 0 return ratings def get_rating_display(rating): return {0: "-", 1: "โ™ฅ", 2: "โญ", 3: "โญโญ", 4: "โญโญโญ"}.get(rating, "-") def get_real_trending_keywords(region_code="US", language="en"): try: response = youtube.videos().list(part="snippet", chart="mostPopular", regionCode=region_code or "US", maxResults=50).execute() keywords, seen = [], set() for item in response.get("items", []): for tag in item["snippet"].get("tags", [])[:3]: if tag.lower() not in seen and 2 <= len(tag) <= 20: keywords.append(tag); seen.add(tag.lower()) channel = item["snippet"]["channelTitle"] if channel.lower() not in seen: keywords.append(channel); seen.add(channel.lower()) if len(keywords) >= 20: break return keywords[:20] if keywords else ["AI","gaming","music","vlog","shorts","news"] except: return ["AI","ChatGPT","gaming","music","vlog","shorts","news","tech"] def save_to_db(videos_data, channels_data, keyword, country, language, sort_by): conn = sqlite3.connect("youtube_data.db") c = conn.cursor() now = datetime.now().isoformat() c.execute('INSERT INTO search_history VALUES (NULL,?,?,?,?,?,?)', (keyword, country, language, sort_by, len(videos_data), now)) for video in videos_data: c.execute('INSERT OR IGNORE INTO videos VALUES (?,?,?,?,?,?,?)', (video['video_id'], video['title'], video['channel_id'], video['channel_name'], video['thumbnail'], video['published_at'], now)) c.execute('SELECT views FROM video_stats WHERE video_id=? ORDER BY recorded_at DESC LIMIT 1', (video['video_id'],)) prev = c.fetchone() c.execute('INSERT INTO video_stats VALUES (NULL,?,?,?,?,?)', (video['video_id'], video['views'], video['likes'], video['comments'], now)) if prev and prev[0] > 0: change = ((video['views'] - prev[0]) / prev[0]) * 100 if change >= 20: c.execute('INSERT INTO trending_alerts VALUES (NULL,?,?,?,?,?,?)', (video['video_id'], 'views_surge', prev[0], video['views'], change, now)) for ch_id, subs in channels_data.items(): c.execute('INSERT OR IGNORE INTO channels VALUES (?,?,?)', (ch_id, '', now)) if isinstance(subs, int): c.execute('INSERT INTO channel_stats VALUES (NULL,?,?,?)', (ch_id, subs, now)) conn.commit(); conn.close() def get_db_stats(): try: conn = sqlite3.connect("youtube_data.db") c = conn.cursor() stats = {} for t, k in [("videos","videos"),("video_stats","stats"),("channels","channels"),("search_history","searches"),("trending_alerts","alerts")]: c.execute(f"SELECT COUNT(*) FROM {t}"); stats[k] = c.fetchone()[0] conn.close() return stats except: return {"videos":0,"stats":0,"channels":0,"searches":0,"alerts":0} def update_trending(country): region, lang = COUNTRIES.get(country, ("", "")) return gr.update(choices=get_real_trending_keywords(region or "US", lang or "en"), value=None) def use_trending_keyword(kw): return kw if kw else "" # ============================================ # ๐Ÿ” Main Search Function # ============================================ def search_videos(keyword, country, language, sort_by, date_filter, max_results, ui_lang): L = UI_LANG.get(ui_lang, UI_LANG["en"]) if not keyword or not keyword.strip(): return f"โš ๏ธ {L['no_keyword']}", "๐Ÿ“Š DB: -" max_results = int(max_results) all_items, next_page = [], None region_code, default_lang = COUNTRIES.get(country, ("", "")) lang_code = default_lang if language in ["Auto (by Country)", "์ž๋™ (๊ตญ๊ฐ€ ๊ธฐ๋ฐ˜)"] else LANGUAGES.get(language, "") sort_value = L["sort_options"].get(sort_by, "viewCount") date_value = L["date_options"].get(date_filter, "") params = {"q": keyword, "part": "snippet", "type": "video", "order": sort_value} if region_code: params["regionCode"] = region_code if lang_code: params["relevanceLanguage"] = lang_code if date_value: deltas = {"today": 1, "thisWeek": 7, "thisMonth": 30, "thisYear": 365} params["publishedAfter"] = (datetime.utcnow() - timedelta(days=deltas.get(date_value, 0))).strftime("%Y-%m-%dT%H:%M:%SZ") while len(all_items) < max_results: params["maxResults"] = min(50, max_results - len(all_items)) if next_page: params["pageToken"] = next_page try: resp = youtube.search().list(**params).execute() except Exception as e: return f"API Error: {e}", "๐Ÿ“Š DB: Error" items = resp.get("items", []) if not items: break all_items.extend(items) next_page = resp.get("nextPageToken") if not next_page: break if not all_items: return f"{L['no_results']}", "๐Ÿ“Š DB: -" video_ids = [item["id"]["videoId"] for item in all_items] channel_ids = list(set([item["snippet"]["channelId"] for item in all_items])) video_stats = {} for i in range(0, len(video_ids), 50): try: for v in youtube.videos().list(id=",".join(video_ids[i:i+50]), part="statistics").execute().get("items", []): s = v["statistics"] video_stats[v["id"]] = {"views": int(s.get("viewCount", 0)), "likes": int(s.get("likeCount", 0)), "comments": int(s.get("commentCount", 0))} except: pass channel_subs, channel_subs_raw = {}, {} for i in range(0, len(channel_ids), 50): try: for ch in youtube.channels().list(id=",".join(channel_ids[i:i+50]), part="statistics").execute().get("items", []): sub = ch["statistics"].get("subscriberCount", "0") if sub: channel_subs_raw[ch["id"]] = int(sub); channel_subs[ch["id"]] = format_count(int(sub)) except: pass videos_data = [] for item in all_items: vid, snip = item["id"]["videoId"], item["snippet"] st = video_stats.get(vid, {"views": 0, "likes": 0, "comments": 0}) videos_data.append({ "video_id": vid, "title": snip["title"], "channel_id": snip["channelId"], "channel_name": snip["channelTitle"], "thumbnail": snip["thumbnails"]["medium"]["url"], "published_at": snip["publishedAt"], "views": st.get("views", 0), "likes": st.get("likes", 0), "comments": st.get("comments", 0), "subs": channel_subs_raw.get(snip["channelId"], 0), }) ai_ratings = get_ai_pick_rating(videos_data) save_to_db(videos_data, channel_subs_raw, keyword, country, language, sort_by) uid = str(uuid.uuid4()).replace("-", "")[:8] html = f'''
๐ŸŽฌ {L["total"]} {len(videos_data)} {L["results"]} | ๐Ÿ” "{keyword}" | ๐ŸŒ {country}
๐Ÿค– AI Pick: โ™ฅ โญ โญโญ โญโญโญ | ๐Ÿ’ก {"ํ—ค๋” ํด๋ฆญ = ์ •๋ ฌ" if ui_lang=="ko" else "Click header to sort"}
''' for i, v in enumerate(videos_data): title_short = v["title"][:42] + "..." if len(v["title"]) > 42 else v["title"] ch_short = v["channel_name"][:12] + "..." if len(v["channel_name"]) > 12 else v["channel_name"] url = f"https://youtube.com/watch?v={v['video_id']}" ch_url = f"https://youtube.com/channel/{v['channel_id']}" rating = ai_ratings.get(i, 0) rank_color = "#FFD700" if i==0 else ("#C0C0C0" if i==1 else ("#CD7F32" if i==2 else "#EF4444")) html += f'''''' html += f'''
{L["rank"]} {L["thumb"]} {L["title_col"]} {L["channel"]} {L["subs"]} {L["views"]} {L["likes"]} {L["comments"]} {L["ai_pick_col"]} {L["date"]}
{i+1} {title_short} {ch_short} {format_count(v['subs'])} {format_count(v['views'])} {format_count(v['likes'])} {format_count(v['comments'])} {get_rating_display(rating)} {v['published_at'][:10]}
''' stats = get_db_stats() return html, f"๐Ÿ“Š DB: Videos {stats['videos']} | Records {stats['stats']} | Channels {stats['channels']} | Searches {stats['searches']}" # ============================================ # ๐Ÿ”ฅ Trending Alerts # ============================================ def show_trending_alerts(ui_lang): is_ko = ui_lang == "ko" conn = sqlite3.connect("youtube_data.db") c = conn.cursor() c.execute('''SELECT ta.video_id, v.title, v.channel_name, v.thumbnail, ta.old_value, ta.new_value, ta.change_percent, ta.detected_at FROM trending_alerts ta JOIN videos v ON ta.video_id = v.video_id ORDER BY ta.detected_at DESC LIMIT 30''') alerts = c.fetchall() conn.close() info_box = f'''

๐Ÿ”ฅ {"๊ธ‰์ƒ์Šน - ์กฐํšŒ์ˆ˜ 20%+ ๊ธ‰์ฆ ๊ฐ์ง€" if is_ko else "TRENDING - 20%+ Sudden View Surge"}

๐Ÿ“Œ {"์ •์˜" if is_ko else "What"}{"์ด์ „ ๋Œ€๋น„ ์กฐํšŒ์ˆ˜๊ฐ€ 20% ์ด์ƒ ๊ธ‰์ฆํ•œ ์˜์ƒ" if is_ko else "Videos with 20%+ view increase vs. previous check"}
๐ŸŽฏ {"๋ชฉ์ " if is_ko else "Purpose"}{"์ง€๊ธˆ ๋ฐ”๋กœ ๋ฐ”์ด๋Ÿด ์ค‘์ธ ์˜์ƒ ํฌ์ฐฉ" if is_ko else "Catch videos going viral RIGHT NOW"}
โฑ๏ธ {"์ž‘๋™" if is_ko else "Trigger"}{"๋™์ผ ์˜์ƒ ์žฌ๊ฒ€์ƒ‰์‹œ ์กฐํšŒ์ˆ˜ ๋ณ€ํ™” ๊ฐ์ง€" if is_ko else "Detected when same video is searched again"}
๐Ÿ’ก {"ํ™œ์šฉ" if is_ko else "Best for"}{"๋‰ด์Šค, ์ด์Šˆ, ํ•ซํ† ํ”ฝ ๋ฐœ๊ตด" if is_ko else "News, breaking stories, hot topics"}
''' if not alerts: try: resp = youtube.videos().list(part="snippet,statistics", chart="mostPopular", regionCode="KR" if is_ko else "US", maxResults=20).execute() html = info_box + f'''

๐Ÿ“ข {"์•„์ง ๊ธ‰์ƒ์Šน ์•Œ๋ฆผ์ด ์—†์Šต๋‹ˆ๋‹ค. ๊ฒ€์ƒ‰์„ ์—ฌ๋Ÿฌ ๋ฒˆ ์‹คํ–‰ํ•˜๋ฉด ์กฐํšŒ์ˆ˜ ๋ณ€ํ™”๋ฅผ ๊ฐ์ง€ํ•ฉ๋‹ˆ๋‹ค!" if is_ko else "No surge alerts yet. Run searches multiple times to detect view changes!"}

{"ํ˜„์žฌ ์ธ๊ธฐ ์˜์ƒ" if is_ko else "Current Popular Videos"}

''' for i, item in enumerate(resp.get("items", [])[:20], 1): snip, stats = item["snippet"], item["statistics"] title = snip["title"][:32] + "..." if len(snip["title"]) > 32 else snip["title"] html += f'''

{i}. {title}

๐Ÿ‘€ {format_count(int(stats.get('viewCount',0)))}

''' return html + '
' except Exception as e: return info_box + f"

Error: {e}

" html = info_box + '
' for vid, title, channel, thumb, old_v, new_v, pct, detected in alerts: title = title[:28] + "..." if len(title) > 28 else title html += f'''

{title}

{channel[:18]}

๐Ÿ”ฅ +{pct:.1f}%

{format_count(old_v)} โ†’ {format_count(new_v)}

''' return html + '
' # ============================================ # ๐Ÿ“ˆ Top Growing # ============================================ def show_top_growing(ui_lang): is_ko = ui_lang == "ko" conn = sqlite3.connect("youtube_data.db") c = conn.cursor() cutoff = (datetime.now() - timedelta(hours=48)).isoformat() c.execute('''SELECT v.video_id, v.title, v.channel_name, v.thumbnail, MIN(vs.views) as min_v, MAX(vs.views) as max_v, ((MAX(vs.views) - MIN(vs.views)) * 100.0 / NULLIF(MIN(vs.views),0)) as growth FROM videos v JOIN video_stats vs ON v.video_id = vs.video_id WHERE vs.recorded_at > ? GROUP BY v.video_id HAVING min_v > 0 AND max_v > min_v ORDER BY growth DESC LIMIT 20''', (cutoff,)) results = c.fetchall() conn.close() info_box = f'''

๐Ÿ“ˆ {"๊ธ‰์„ฑ์žฅ TOP - 48์‹œ๊ฐ„ ์„ฑ์žฅ๋ฅ  ์ˆœ์œ„" if is_ko else "TOP GROWING - 48h Growth Rate Ranking"}

๐Ÿ“Œ {"์ •์˜" if is_ko else "What"}{"48์‹œ๊ฐ„ ๋™์•ˆ ๊ฐ€์žฅ ๋†’์€ ์„ฑ์žฅ๋ฅ ์„ ๊ธฐ๋กํ•œ ์˜์ƒ" if is_ko else "Videos with highest growth RATE over 48 hours"}
๐ŸŽฏ {"๋ชฉ์ " if is_ko else "Purpose"}{"๊พธ์ค€ํžˆ ์„ฑ์žฅํ•˜๋Š” ์ฝ˜ํ…์ธ  ๋ฐœ๊ตด" if is_ko else "Find consistently rising content"}
๐Ÿ“Š {"๊ณ„์‚ฐ" if is_ko else "Formula"}(Max - Min) / Min ร— 100%
๐Ÿ’ก {"ํ™œ์šฉ" if is_ko else "Best for"}{"์—๋ฒ„๊ทธ๋ฆฐ ์ฝ˜ํ…์ธ , ์•ˆ์ •์  ํŠธ๋ Œ๋“œ" if is_ko else "Evergreen content, stable trends"}
''' if not results: try: resp = youtube.videos().list(part="snippet,statistics", chart="mostPopular", regionCode="KR" if is_ko else "US", maxResults=20).execute() html = info_box + f'''

๐Ÿ“ข {"๋ฐ์ดํ„ฐ ์ถ•์  ์ค‘์ž…๋‹ˆ๋‹ค. ๊ฒ€์ƒ‰์„ ์—ฌ๋Ÿฌ ๋ฒˆ ์‹คํ–‰ํ•˜๋ฉด ์„ฑ์žฅ๋ฅ ์ด ๊ณ„์‚ฐ๋ฉ๋‹ˆ๋‹ค!" if is_ko else "Accumulating data. Run searches over time to calculate growth rates!"}

{"ํ˜„์žฌ ์ธ๊ธฐ ์˜์ƒ" if is_ko else "Current Popular Videos"}

''' for i, item in enumerate(resp.get("items", [])[:20], 1): snip, stats = item["snippet"], item["statistics"] views, likes = int(stats.get("viewCount", 0)), int(stats.get("likeCount", 0)) engagement = (likes / views * 100) if views > 0 else 0 title = snip["title"][:28] + "..." if len(snip["title"]) > 28 else snip["title"] html += f'''

{i}. {title}

{snip['channelTitle'][:16]}

๐Ÿ‘€ {format_count(views)}

โค๏ธ {engagement:.2f}%

''' return html + '
' except Exception as e: return info_box + f"

Error: {e}

" html = info_box + '
' for i, (vid, title, channel, thumb, min_v, max_v, growth) in enumerate(results, 1): title = title[:28] + "..." if len(title) > 28 else title growth_val = growth if growth else 0 html += f'''

{i}. {title}

{channel[:16]}

๐Ÿ“ˆ +{growth_val:.1f}%

{format_count(int(min_v))} โ†’ {format_count(int(max_v))}

''' return html + '
' # ============================================ # โญ AI Pick # ============================================ def show_ai_picks(ui_lang): is_ko = ui_lang == "ko" global ai_pick_storage if not ai_pick_storage["videos"]: return f'''

โญ {"AI ์ถ”์ฒœ - ๋ฐ์ดํ„ฐ ์—†์Œ" if is_ko else "AI PICK - No Data Yet"}

{"๋จผ์ € ๊ฒ€์ƒ‰ ํƒญ์—์„œ ๊ฒ€์ƒ‰์„ ์‹คํ–‰ํ•˜์„ธ์š”!" if is_ko else "Run a search first in the Search tab!"}

''' videos, ratings = ai_pick_storage["videos"], ai_pick_storage["ratings"] top_picks = [(i, v, ratings.get(i, 0)) for i, v in enumerate(videos) if ratings.get(i, 0) >= 3] top_picks.sort(key=lambda x: (-x[2], -x[1]['views'])) analysis_html = "" if groq_client and top_picks: info = "\n".join([f"- {v['title'][:50]} (Views:{format_count(v['views'])})" for _, v, _ in top_picks[:5]]) lang_prompt = "ํ•œ๊ตญ์–ด๋กœ ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”." if is_ko else "" result = call_llm(f"Analyze top YouTube videos briefly (3-4 sentences):\n{info}\n\n1) Common theme 2) Why popular 3) Content opportunity. {lang_prompt}", 500) if result and "Error" not in result and "โš ๏ธ" not in result: analysis_html = f'''

๐Ÿค– {"AI ๋ถ„์„ ๊ฒฐ๊ณผ" if is_ko else "AI ANALYSIS"}

{result}

''' html = f'''

โญ {"AI ์ถ”์ฒœ - TOP ์˜์ƒ" if is_ko else "AI PICK - TOP RECOMMENDATIONS"}

{"โญโญ ์ด์ƒ ๋“ฑ๊ธ‰ ์˜์ƒ" if is_ko else "โญโญ+ rated videos"}: {len(top_picks)}{"๊ฐœ" if is_ko else " videos"}

{analysis_html} ''' if not top_picks: html += f'''

{"โญโญ ์ด์ƒ ๋“ฑ๊ธ‰ ์˜์ƒ์ด ์—†์Šต๋‹ˆ๋‹ค. ๋‹ค๋ฅธ ํ‚ค์›Œ๋“œ๋กœ ๊ฒ€์ƒ‰ํ•ด๋ณด์„ธ์š”!" if is_ko else "No โญโญ+ rated videos found. Try different keywords!"}

''' return html html += '
' for idx, (_, v, rating) in enumerate(top_picks[:30], 1): border = "#FFD700" if rating == 4 else "#C0C0C0" title = v["title"][:35] + "..." if len(v["title"]) > 35 else v["title"] html += f'''
{get_rating_display(rating)}

{idx}. {title}

{v["channel_name"][:18]}

๐Ÿ‘€ {format_count(v['views'])} โค๏ธ {format_count(v['likes'])} ๐Ÿ’ฌ {format_count(v['comments'])}
''' return html + '
' # ============================================ # ๐Ÿค– AI Tools Functions # ============================================ def analyze_keyword_suggest(keyword, ui_lang): if not keyword: return "โš ๏ธ ํ‚ค์›Œ๋“œ๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”!" if ui_lang == "ko" else "โš ๏ธ Please enter a keyword!" lang = "ํ•œ๊ตญ์–ด๋กœ ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”." if ui_lang == "ko" else "" return call_llm(f'YouTube SEO expert. For "{keyword}", suggest 15 related keywords.\nFor each: keyword, search volume (High/Med/Low), competition (High/Med/Low), content type.\n{lang}', 1500) def analyze_trend_prediction(keyword, ui_lang): if not keyword: return "โš ๏ธ ํ‚ค์›Œ๋“œ๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”!" if ui_lang == "ko" else "โš ๏ธ Please enter a keyword!" lang = "ํ•œ๊ตญ์–ด๋กœ ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”." if ui_lang == "ko" else "" return call_llm(f'Trend analyst for "{keyword}":\n1) Current status\n2) Peak season\n3) 6-month forecast\n4) Risk factors\n5) Opportunity windows\n6) Emerging topics\n{lang}', 1500) def analyze_content_ideas(keyword, ui_lang): if not keyword: return "โš ๏ธ ์ฃผ์ œ๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”!" if ui_lang == "ko" else "โš ๏ธ Please enter a topic!" lang = "ํ•œ๊ตญ์–ด๋กœ ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”." if ui_lang == "ko" else "" return call_llm(f'YouTube strategist for "{keyword}". Generate 10 video ideas:\nEach with: Title, Hook (first 5 sec), Format, Length, Thumbnail concept, Viral score 1-10.\n{lang}', 2000) def analyze_channel(channel_name, ui_lang): if not channel_name: return "โš ๏ธ ์ฑ„๋„๋ช…์„ ์ž…๋ ฅํ•˜์„ธ์š”!" if ui_lang == "ko" else "โš ๏ธ Please enter channel name!" lang = "ํ•œ๊ตญ์–ด๋กœ ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”." if ui_lang == "ko" else "" return call_llm(f'YouTube consultant for "{channel_name}":\n1) Niche assessment\n2) Content strategy\n3) Growth tactics\n4) Monetization\n5) Competitive advantages\n{lang}', 2000) def analyze_competitor(my_channel, competitor, ui_lang): if not my_channel or not competitor: return "โš ๏ธ ๋‘˜ ๋‹ค ์ž…๋ ฅํ•˜์„ธ์š”!" if ui_lang == "ko" else "โš ๏ธ Please enter both!" lang = "ํ•œ๊ตญ์–ด๋กœ ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”." if ui_lang == "ko" else "" return call_llm(f'Compare "{my_channel}" vs "{competitor}":\n1) Positioning\n2) Content gap\n3) Benchmarks\n4) Advantages\n5) Action plan\n6) 5 video ideas to beat them\n{lang}', 2000) # ============================================ # ๐Ÿ• History # ============================================ def show_search_history(ui_lang): is_ko = ui_lang == "ko" conn = sqlite3.connect("youtube_data.db") c = conn.cursor() c.execute('SELECT keyword,country,language,sort_by,results_count,searched_at FROM search_history ORDER BY searched_at DESC LIMIT 50') history = c.fetchall() conn.close() if not history: return f'''

{"๊ฒ€์ƒ‰ ๊ธฐ๋ก์ด ์—†์Šต๋‹ˆ๋‹ค." if is_ko else "No search history yet."}

''' html = f'''

๐Ÿ• {"๊ฒ€์ƒ‰ ๊ธฐ๋ก" if is_ko else "SEARCH HISTORY"}

''' for kw, country, lang, sort_by, cnt, searched in history: html += f'''''' return html + '
{"๊ฒ€์ƒ‰์–ด" if is_ko else "Keyword"} {"๊ตญ๊ฐ€" if is_ko else "Country"} {"์–ธ์–ด" if is_ko else "Lang"} {"์ •๋ ฌ" if is_ko else "Sort"} {"๊ฒฐ๊ณผ" if is_ko else "Results"} {"์‹œ๊ฐ„" if is_ko else "Time"}
{kw} {country} {lang[:10] if lang else "-"} {sort_by[:10] if sort_by else "-"} {cnt} {searched[:16].replace("T"," ")}
' # ============================================ # ๐ŸŒ Language Switch # ============================================ def on_lang_change(lang_choice): return "ko" if lang_choice == "ํ•œ๊ตญ์–ด" else "en" def switch_ui_language(ui_lang): L = UI_LANG.get(ui_lang, UI_LANG["en"]) sort_opts = list(L["sort_options"].keys()) date_opts = list(L["date_options"].keys()) return ( gr.update(label=L["search_keyword"], placeholder=L["enter_keyword"]), gr.update(value=L["search"]), gr.update(value=L["refresh"]), gr.update(label=L["country"]), gr.update(label=L["language"]), gr.update(choices=sort_opts, value=sort_opts[0], label=L["sort_by"]), gr.update(choices=date_opts, value=date_opts[0], label=L["period"]), gr.update(label=L["max_results"]), gr.update(label=L["click_autofill"]), ) # Initial trending initial_trending = get_real_trending_keywords("US", "en") # ============================================ # ๐ŸŽจ Gradio UI (Gradio 6.0 Compatible) # ============================================ with gr.Blocks(title="YouTube Trend Analyzer") as demo: ui_lang_state = gr.State("en") gr.HTML('''
''') gr.Markdown("# ๐ŸŽฌ YOUTUBE TREND ANALYZER ๐Ÿ“Š", elem_classes=["header-text"]) with gr.Row(): with gr.Column(scale=4): db_stats = gr.Markdown("๐Ÿ“Š Loading...") with gr.Column(scale=1): ui_lang_dropdown = gr.Dropdown(choices=["English", "ํ•œ๊ตญ์–ด"], value="English", label="๐ŸŒ UI Language", interactive=True) with gr.Tabs(): with gr.Tab("๐Ÿ” Search"): gr.Markdown("### ๐Ÿ”ฅ Trending Keywords") trending = gr.Radio(choices=initial_trending, label="Click to auto-fill", interactive=True) with gr.Row(): keyword = gr.Textbox(label="Search Keyword", placeholder="Enter keyword...", scale=3) btn = gr.Button("๐Ÿ” SEARCH", variant="primary", scale=1) refresh_btn = gr.Button("๐Ÿ”„ Refresh", variant="secondary", scale=1) with gr.Row(): country = gr.Dropdown(list(COUNTRIES.keys()), value="United States", label="Country") language = gr.Dropdown(list(LANGUAGES.keys()), value="Auto (by Country)", label="Language") sort_by = gr.Dropdown(list(UI_LANG["en"]["sort_options"].keys()), value="Most Viewed", label="Sort By") date_filter = gr.Dropdown(list(UI_LANG["en"]["date_options"].keys()), value="All Time", label="Period") max_results = gr.Slider(10, 1000, value=100, step=10, label="Max Results") output = gr.HTML() with gr.Tab("โญ AI Pick"): gr.Markdown("### ๐Ÿค– AI-Curated Top Recommendations (โญโญ and above)") pick_btn = gr.Button("๐Ÿ”„ Refresh AI Picks", variant="primary") pick_out = gr.HTML() with gr.Tab("๐Ÿ”ฅ Trending"): gr.Markdown("### ๐Ÿ”ฅ Sudden Surge Detection (20%+ view increase)") alerts_btn = gr.Button("๐Ÿ”„ Refresh", variant="primary") alerts_out = gr.HTML() with gr.Tab("๐Ÿ“ˆ Top Growing"): gr.Markdown("### ๐Ÿ“ˆ 48-Hour Growth Champions") growing_btn = gr.Button("๐Ÿ”„ Refresh", variant="primary") growing_out = gr.HTML() with gr.Tab("๐Ÿค– AI Tools"): gr.Markdown("### ๐Ÿง  LLM-Powered Analysis (GPT-OSS-120B)") with gr.Tabs(): with gr.Tab("๐Ÿท๏ธ Keyword Suggest"): kw_input = gr.Textbox(label="Enter base keyword", placeholder="e.g., Python tutorial") kw_btn = gr.Button("๐Ÿ” Generate Keywords", variant="primary") kw_output = gr.Textbox(label="Suggested Keywords", lines=20, elem_classes=["llm-result"]) with gr.Tab("๐Ÿ”ฎ Trend Prediction"): tp_input = gr.Textbox(label="Enter topic", placeholder="e.g., AI tools") tp_btn = gr.Button("๐Ÿ”ฎ Predict Trend", variant="primary") tp_output = gr.Textbox(label="Trend Analysis", lines=20, elem_classes=["llm-result"]) with gr.Tab("๐Ÿ’ก Content Ideas"): ci_input = gr.Textbox(label="Enter topic", placeholder="e.g., Home workout") ci_btn = gr.Button("๐Ÿ’ก Generate Ideas", variant="primary") ci_output = gr.Textbox(label="Content Ideas", lines=25, elem_classes=["llm-result"]) with gr.Tab("๐Ÿ“Š Channel Analysis"): ca_input = gr.Textbox(label="Enter channel/niche", placeholder="e.g., Tech reviews") ca_btn = gr.Button("๐Ÿ“Š Analyze", variant="primary") ca_output = gr.Textbox(label="Analysis", lines=25, elem_classes=["llm-result"]) with gr.Tab("โš”๏ธ Competitor"): with gr.Row(): comp_my = gr.Textbox(label="Your Channel", placeholder="My channel") comp_rival = gr.Textbox(label="Competitor", placeholder="Competitor") comp_btn = gr.Button("โš”๏ธ Compare", variant="primary") comp_output = gr.Textbox(label="Analysis", lines=25, elem_classes=["llm-result"]) with gr.Tab("๐Ÿ• History"): history_btn = gr.Button("๐Ÿ”„ Refresh", variant="primary") history_out = gr.HTML() # Events ui_lang_dropdown.change(on_lang_change, ui_lang_dropdown, ui_lang_state) ui_lang_dropdown.change(lambda x: switch_ui_language("ko" if x == "ํ•œ๊ตญ์–ด" else "en"), ui_lang_dropdown, [keyword, btn, refresh_btn, country, language, sort_by, date_filter, max_results, trending]) trending.change(use_trending_keyword, trending, keyword) country.change(update_trending, country, trending) btn.click(search_videos, [keyword, country, language, sort_by, date_filter, max_results, ui_lang_state], [output, db_stats]) keyword.submit(search_videos, [keyword, country, language, sort_by, date_filter, max_results, ui_lang_state], [output, db_stats]) refresh_btn.click(search_videos, [keyword, country, language, sort_by, date_filter, max_results, ui_lang_state], [output, db_stats]) pick_btn.click(show_ai_picks, ui_lang_state, pick_out) alerts_btn.click(show_trending_alerts, ui_lang_state, alerts_out) growing_btn.click(show_top_growing, ui_lang_state, growing_out) history_btn.click(show_search_history, ui_lang_state, history_out) kw_btn.click(analyze_keyword_suggest, [kw_input, ui_lang_state], kw_output) tp_btn.click(analyze_trend_prediction, [tp_input, ui_lang_state], tp_output) ci_btn.click(analyze_content_ideas, [ci_input, ui_lang_state], ci_output) ca_btn.click(analyze_channel, [ca_input, ui_lang_state], ca_output) comp_btn.click(analyze_competitor, [comp_my, comp_rival, ui_lang_state], comp_output) # Launch with CSS (Gradio 6.0 style) demo.launch(css=css)