import json import asyncio import re import os import httpx import urllib.parse from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI, Query, Request from fastapi.responses import StreamingResponse, Response, JSONResponse from fastapi.middleware.cors import CORSMiddleware from playwright.async_api import async_playwright, BrowserContext from playwright_stealth import stealth_async # FIX: Used correct stealth import # FIX: animepahe.com redirects to .ru. Using .ru directly prevents dropped cookies/sessions. BASE_URL = "https://animepahe.com" ANIWATCHTV_BASE = "https://aniwatchtv.to" ANILIST_API = "https://graphql.anilist.co" JIKAN_API = "https://api.jikan.moe/v4" IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true" # In-memory caches _info_cache: dict = {} _mal_synopsis_cache: dict = {} class AnimePahe: def __init__(self): self.playwright = None self.context: Optional[BrowserContext] = None self.ad_domains = [ "doubleclick.net", "adservice.google", "popads.net", "propellerads", "exoclick", "bebi.com", ] async def start(self): self.playwright = await async_playwright().start() self.context = await self.playwright.chromium.launch_persistent_context( user_data_dir="./browser_data", headless=IS_HEADLESS, # FIX: Removed hardcoded user_agent to prevent Cloudflare Fingerprint mismatches args=["--disable-blink-features=AutomationControlled", "--no-sandbox"], ) await self.context.route("**/*", self._intercept_assets) # Initial visit to ensure cookies are set page = await self.context.new_page() try: await stealth_async(page) # FIX: domcontentloaded instead of networkidle await page.goto(BASE_URL, wait_until="domcontentloaded", timeout=45000) await asyncio.sleep(2) except: pass finally: await page.close() async def stop(self): if self.context: await self.context.close() if self.playwright: await self.playwright.stop() async def _intercept_assets(self, route): url = route.request.url.lower() if "aniwatchtv.to" in url or "kwik" in url: await route.continue_() return if any(ad in url for ad in self.ad_domains) or url.endswith( (".png", ".jpg", ".jpeg", ".webp", ".woff", ".gif") ): await route.abort() else: await route.continue_() async def _fetch_json(self, url: str): page = await self.context.new_page() try: await page.set_extra_http_headers({"Referer": BASE_URL}) # FIX: Use domcontentloaded for API JSON calls await page.goto(url, wait_until="domcontentloaded", timeout=30000) txt = await page.evaluate("document.body.innerText") return json.loads(txt) except: return None finally: await page.close() def _generate_mp4( self, m3u8_url: Optional[str], anime_name: str, episode: str, res: str ) -> Optional[str]: if not m3u8_url: return None match = re.search(r"https?://([^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url) if match: subdomain = match.group(1) token_path = match.group(2) clean_name = re.sub(r"[^\w\s]", "", anime_name).strip().replace(" ", "_") if not clean_name: clean_name = "Anime" filename = f"{clean_name}_EP{episode}_{res}P.mp4" return f"https://{subdomain}.kwik.cx/mp4/{token_path}?file={filename}" return None async def _scrape_play_meta(self, page) -> tuple: meta = await page.evaluate("""() => { const titleEl = document.querySelector('.theatre-info h1 a, .theatre-info h2 a, .anime-title, h1, h2'); let title = titleEl ? titleEl.innerText.trim() : ''; let episode = ''; const t = document.title || ''; const m = t.match(/^(.+?)\\s*[-\\u2013]\\s*(?:Episode\\s*)?(\\d+(?:\\.\\d+)?)/i); if (m) { if (!title || title.length < 2) title = m[1].trim(); if (!episode) episode = m[2].trim(); } return { title, episode } }""") title = (meta.get("title") or "").strip() or "Unknown" episode = (meta.get("episode") or "").strip() or "00" return title, episode async def _scrape_ids(self, session: str) -> dict: page = await self.context.new_page() try: await stealth_async(page) try: await page.goto( f"{BASE_URL}/anime/{session}", wait_until="domcontentloaded", timeout=30000, ) except Exception as e: if "Timeout" not in str(e): raise e try: # Wait for Cloudflare to clear await page.wait_for_function( "!document.title.includes('Just a moment')", timeout=15000 ) await page.wait_for_selector( "div.anime-info, div.anime-summary, aside, main", timeout=10000 ) except: pass ids = await page.evaluate("""() => { let ids = {} document.querySelectorAll("a[href]").forEach(a => { const url = a.href || "" if (url.includes("myanimelist.net/anime")) ids["mal"] = url.split("/").filter(Boolean).pop() if (url.includes("anilist.co/anime")) ids["anilist"] = url.split("/").filter(Boolean).pop() if (url.includes("anidb.net")) ids["anidb"] = url.split("/").filter(Boolean).pop() if (url.includes("animenewsnetwork.com")) { const m = url.match(/id=(\\d+)/); if (m) ids["ann"] = m[1] } if (url.includes("anime-planet.com/anime")) ids["animePlanet"] = url.split("/").filter(Boolean).pop() }) return ids }""") ids["animepahe"] = session return ids except Exception: return {"animepahe": session} finally: await page.close() async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]: if mal_id in _mal_synopsis_cache: return _mal_synopsis_cache[mal_id] try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( f"{JIKAN_API}/anime/{mal_id}", headers={"Accept": "application/json"}, ) resp.raise_for_status() synopsis = resp.json().get("data", {}).get("synopsis") _mal_synopsis_cache[mal_id] = synopsis return synopsis except Exception: _mal_synopsis_cache[mal_id] = None return None async def _collect_buttons(self, page) -> list: # FIX: state="attached" instead of visible (dropdown items are hidden by CSS until clicked) await page.wait_for_selector( "#resolutionMenu button", state="attached", timeout=10000 ) buttons = await page.locator("#resolutionMenu button").all() res_data = [] for btn in buttons: text = (await btn.inner_text()).strip() res_match = re.search(r"(\d+)", text) audio_lang = (await btn.get_attribute("data-audio") or "jpn").lower() audio_type = "dub" if audio_lang == "eng" else "sub" res_data.append( { "embed": await btn.get_attribute("data-src"), "res": int(res_match.group(1)) if res_match else 720, "fansub": text.split("·")[0].strip() if "·" in text else "Unknown", "audio": audio_type, "audio_lang": audio_lang, } ) return res_data async def _embed_to_m3u8(self, embed_url: str) -> Optional[str]: p = await self.context.new_page() m3u8 = None found = asyncio.Event() def capture(req): nonlocal m3u8 if (".m3u8" in req.url or ".mp4" in req.url) and not found.is_set(): m3u8 = req.url found.set() p.on("request", capture) try: await p.set_extra_http_headers({"Referer": BASE_URL}) # FIX: domcontentloaded for video players await p.goto(embed_url, wait_until="domcontentloaded", timeout=20000) for _ in range(6): if found.is_set(): break await p.evaluate("""() => { document.querySelectorAll('video').forEach(v => { v.muted = true; const p = v.play(); if (p !== undefined) p.catch(() => {}); }); document.querySelectorAll('button, .vjs-big-play-button').forEach(b => { try { b.click() } catch(e) {} }); }""") await asyncio.sleep(1.5) try: await asyncio.wait_for(found.wait(), timeout=5.0) except asyncio.TimeoutError: pass except Exception: pass finally: await p.close() return m3u8 async def _fetch_anilist(self, anilist_id: str) -> dict: query = """ query ($id: Int) { Media(id: $id, type: ANIME) { id idMal title { romaji english native } synonyms description(asHtml: false) format status episodes duration source countryOfOrigin isAdult startDate { year month day } endDate { year month day } season seasonYear averageScore meanScore popularity favourites trending genres coverImage { extraLarge large medium color } bannerImage trailer { id site } studios(isMain: true) { nodes { name siteUrl } } relations { edges { relationType(version: 2) node { id idMal title { romaji english } format status episodes averageScore coverImage { medium } siteUrl } } } recommendations(perPage: 20, sort: RATING_DESC) { nodes { rating mediaRecommendation { id idMal title { romaji english } format status episodes averageScore coverImage { medium } siteUrl } } } externalLinks { site url type } nextAiringEpisode { airingAt episode } } } """ try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.post( ANILIST_API, json={"query": query, "variables": {"id": int(anilist_id)}}, headers={ "Content-Type": "application/json", "Accept": "application/json", }, ) resp.raise_for_status() result = resp.json() except Exception as e: return {"error": f"AniList fetch failed: {str(e)}"} media = result.get("data", {}).get("Media") if not media: return {"error": "AniList returned no data"} mal_id = str(media.get("idMal") or "") mal_synopsis = await self._fetch_mal_synopsis(mal_id) if mal_id else None synopsis = mal_synopsis or media.get("description") def fmt_date(d): if not d or not d.get("year"): return None return "-".join( str(p).zfill(2) for p in [d.get("year"), d.get("month"), d.get("day")] if p ) trailer = None if media.get("trailer"): t = media["trailer"] if t.get("site") == "youtube": trailer = f"https://www.youtube.com/watch?v={t['id']}" elif t.get("site") == "dailymotion": trailer = f"https://www.dailymotion.com/video/{t['id']}" relations = {} for edge in media.get("relations", {}).get("edges", []): node = edge.get("node", {}) if not node: continue rel = edge.get("relationType", "OTHER") relations.setdefault(rel, []).append( { "id": node.get("id"), "mal_id": node.get("idMal"), "title": ( node.get("title", {}).get("english") or node.get("title", {}).get("romaji") ), "format": node.get("format"), "status": node.get("status"), "episodes": node.get("episodes"), "score": node.get("averageScore"), "image": node.get("coverImage", {}).get("medium"), "url": node.get("siteUrl"), "relation_type": rel, } ) recommendations = [] for node in media.get("recommendations", {}).get("nodes", []): rec = node.get("mediaRecommendation") if not rec: continue recommendations.append( { "id": rec.get("id"), "mal_id": rec.get("idMal"), "title": rec["title"].get("english") or rec["title"].get("romaji"), "format": rec.get("format"), "status": rec.get("status"), "episodes": rec.get("episodes"), "score": rec.get("averageScore"), "image": rec.get("coverImage", {}).get("medium"), "url": rec.get("siteUrl"), "rating": node.get("rating"), } ) return { "id": media.get("id"), "mal_id": media.get("idMal"), "title": media["title"], "synonyms": media.get("synonyms", []), "synopsis": synopsis, "format": media.get("format"), "status": media.get("status"), "episodes": media.get("episodes"), "duration": media.get("duration"), "source": media.get("source"), "country": media.get("countryOfOrigin"), "is_adult": media.get("isAdult"), "start_date": fmt_date(media.get("startDate")), "end_date": fmt_date(media.get("endDate")), "season": media.get("season"), "season_year": media.get("seasonYear"), "average_score": media.get("averageScore"), "mean_score": media.get("meanScore"), "popularity": media.get("popularity"), "favourites": media.get("favourites"), "trending": media.get("trending"), "genres": media.get("genres", []), "cover_image": media.get("coverImage", {}), "banner_image": media.get("bannerImage"), "trailer": trailer, "studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])], "next_airing": media.get("nextAiringEpisode"), "external_links": [ {"site": l["site"], "url": l["url"], "type": l["type"]} for l in media.get("externalLinks", []) ], "relations": relations, "recommendations": recommendations, } async def search(self, q: str): data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}") return data.get("data", []) if data else [] async def get_latest(self, p: int = 1): return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}") async def get_episodes(self, anime_id: str, p: int = 1, resolve: bool = False): data = await self._fetch_json( f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}" ) if not data or not resolve: return data episodes = data.get("data", []) async def enrich(ep): ep_session = ep.get("session") if not ep_session: return ep stream = await self.resolve(anime_id, ep_session) ep["sub"] = stream.get("sub") ep["dub"] = stream.get("dub") return ep data["data"] = list(await asyncio.gather(*[enrich(ep) for ep in episodes])) return data async def get_ids(self, session: str): try: ids = await self._scrape_ids(session) return { k: ids.get(k) for k in ["animepahe", "anilist", "mal", "anidb", "ann", "animePlanet"] } except Exception as e: return {"error": f"Failed: {str(e)}"} async def get_info(self, session: str): try: ids = await self._scrape_ids(session) anilist_id = ids.get("anilist") if not anilist_id: return {"error": "Could find AniList ID", "ids": ids} if anilist_id in _info_cache: return _info_cache[anilist_id] data = await self._fetch_anilist(anilist_id) if "error" in data: return {"error": data["error"], "ids": ids} data["ids"] = {**ids, "anilist": anilist_id} _info_cache[anilist_id] = data return data except Exception as e: return {"error": f"Failed: {str(e)}"} async def _embed_to_m3u8(self, embed_url: str) -> Optional[str]: p = await self.context.new_page() m3u8 = None found = asyncio.Event() def capture(req): nonlocal m3u8 if (".m3u8" in req.url or ".mp4" in req.url) and not found.is_set(): m3u8 = req.url found.set() p.on("request", capture) try: await p.set_extra_http_headers({"Referer": BASE_URL}) await p.goto(embed_url, wait_until="domcontentloaded", timeout=15000) # Fast polling (0.5s instead of 1.5s sleeps) for _ in range(10): if found.is_set(): break await p.evaluate("""() => { document.querySelectorAll('video').forEach(v => { v.muted = true; const p = v.play(); if (p !== undefined) p.catch(() => {}); }); document.querySelectorAll('button, .vjs-big-play-button').forEach(b => { try { b.click() } catch(e) {} }); }""") try: await asyncio.wait_for(found.wait(), timeout=0.5) except asyncio.TimeoutError: pass except Exception: pass finally: await p.close() return m3u8 async def resolve(self, anime_session: str, episode_session: str): play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}" anime_url = f"{BASE_URL}/anime/{anime_session}" page = await self.context.new_page() try: await stealth_async(page) await page.add_init_script( "Object.defineProperty(navigator, 'webdriver', { get: () => undefined });" ) await page.set_extra_http_headers({"Referer": anime_url}) # 1. ATTEMPT DIRECT VISIT FAST (Saves ~5 seconds if cookies are already valid) await page.goto(play_url, wait_until="domcontentloaded", timeout=20000) title = await page.title() # 2. HANDLE DDOS-GUARD QUICKLY if "DDoS-Guard" in title or "Just a moment" in title: try: await page.wait_for_function( "!document.title.includes('DDoS-Guard') && !document.title.includes('Just a moment')", timeout=15000, ) title = await page.title() except: pass # 3. IF 404 HIT, FALLBACK TO ANIME PAGE TO REFRESH COOKIES if "404" in title: await page.goto(anime_url, wait_until="domcontentloaded", timeout=15000) try: await page.wait_for_function( "!document.title.includes('DDoS-Guard') && !document.title.includes('Just a moment')", timeout=10000, ) except: pass # Try play URL again now that cookies are refreshed await page.set_extra_http_headers({"Referer": anime_url}) await page.goto(play_url, wait_until="domcontentloaded", timeout=15000) # 4. GET LINKS (No clicking needed, they exist in the DOM as attached) try: await page.wait_for_selector( "#resolutionMenu button", state="attached", timeout=10000 ) except Exception: html = await page.content() with open("debug_no_buttons.html", "w", encoding="utf-8") as f: f.write(html) return { "error": "No resolution buttons found", "page_title": await page.title(), } anime_name, episode_num = await self._scrape_play_meta(page) buttons = await page.locator("#resolutionMenu button").all() res_data = [] for btn in buttons: embed_url = await btn.get_attribute("data-src") res_text = await btn.get_attribute("data-resolution") fansub = await btn.get_attribute("data-fansub") or "Unknown" audio_lang = await btn.get_attribute("data-audio") or "jpn" if embed_url: res_data.append( { "res": int(res_text) if res_text and res_text.isdigit() else 720, "embed": embed_url, "audio_lang": audio_lang, "audio": "dub" if audio_lang == "eng" else "sub", "fansub": fansub, } ) await page.close() page = None # 5. RESOLVE EXTRACTED LINKS CONCURRENTLY subs = [r for r in res_data if r["audio"] == "sub"] dubs = [r for r in res_data if r["audio"] == "dub"] best_sub = max(subs, key=lambda x: x["res"]) if subs else None best_dub = max(dubs, key=lambda x: x["res"]) if dubs else None async def resolve_one(item): try: m3u8 = await self._embed_to_m3u8(item["embed"]) res_str = str(item["res"]) return { "resolution": res_str, "fansub": item["fansub"], "audio": item["audio"], "audio_lang": item["audio_lang"], "url": m3u8, "download": self._generate_mp4( m3u8, anime_name, episode_num, res_str ), } except Exception as e: return { "resolution": str(item["res"]), "fansub": item["fansub"], "error": str(e), } tasks = [] if best_sub: tasks.append(resolve_one(best_sub)) if best_dub: tasks.append(resolve_one(best_dub)) results = await asyncio.gather(*tasks) return { "anime": anime_session, "episode": episode_session, "anime_name": anime_name, "episode_num": episode_num, "sub": results[0] if best_sub else None, "dub": results[1] if best_sub and best_dub else (results[0] if best_dub else None), } except Exception as e: if page: try: await page.screenshot(path="debug_error.png", full_page=True) except: pass return {"error": str(e), "hint": "Check debug_error.png"} finally: if page: await page.close() async def get_seasons(self, anime_id: str) -> dict: url = f"{ANIWATCHTV_BASE}/{anime_id}" page = await self.context.new_page() try: # FIX: domcontentloaded await page.goto(url, wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(1) for selector in [".os-list", ".seasons-block", "[class*='season']", "main"]: try: await page.wait_for_selector(selector, timeout=5000) break except: continue seasons = await page.evaluate(f"""() => {{ const BASE = "{ANIWATCHTV_BASE}"; const results =[]; const seen = new Set(); const container = document.querySelector('.os-list') || document.querySelector('.seasons-block') || document.querySelector('[class*="os-list"]'); if (!container) return results; for (const a of container.querySelectorAll('a[href]')) {{ const href = a.getAttribute('href') || ''; const fullUrl = href.startsWith('http') ? href : BASE + href; if (!fullUrl) continue; const slug = fullUrl.replace(/\\/$/, '').split('/').pop(); if (!slug || seen.has(slug)) continue; seen.add(slug); const title = (a.querySelector('span, [class*="title"]')?.innerText?.trim() || a.innerText?.trim() || slug); const posterEl = a.querySelector('.season-poster') || a.closest('li, div')?.querySelector('.season-poster'); let poster = null; if (posterEl) {{ const bg = posterEl.style.backgroundImage || window.getComputedStyle(posterEl).backgroundImage; const bg2 = bg.split('url(').pop().split(')')[0].replace(/['"/]/g, '').trim(); if (bg2 && bg2.startsWith('http')) poster = bg2; }} results.push({{ title, id: slug, url: fullUrl, poster }}); }} return results; }}""") return {"id": anime_id, "total": len(seasons), "seasons": seasons} except Exception as e: return {"id": anime_id, "total": 0, "seasons": [], "error": str(e)} finally: await page.close() pahe = AnimePahe() @asynccontextmanager async def lifespan(app: FastAPI): await pahe.start() yield await pahe.stop() app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): return {"status": "ok"} @app.get("/search") async def api_search(q: str): return await pahe.search(q) @app.get("/latest") async def api_latest(p: int = 1): return await pahe.get_latest(p) @app.get("/info/{session}") async def api_info(session: str): return await pahe.get_info(session) @app.get("/episodes/{session}") async def api_episodes(session: str, p: int = 1, resolve: bool = False): return await pahe.get_episodes(session, p, resolve) @app.get("/resolve/{anime}/{episode}") async def api_resolve(anime: str, episode: str): return await pahe.resolve(anime, episode) @app.get("/seasons/{anime_id:path}") async def api_seasons(anime_id: str, request: Request): data = await pahe.get_seasons(anime_id) base_url = str(request.base_url).rstrip("/") for season in data.get("seasons", []): if season.get("poster"): season["posterProxied"] = f"{base_url}/poster?url={season['poster']}" return data @app.get("/poster") async def api_poster(url: str = Query(..., description="CDN image proxy")): try: async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client: resp = await client.get( url, headers={"Referer": ANIWATCHTV_BASE, "User-Agent": "Mozilla/5.0"} ) resp.raise_for_status() return Response( content=resp.content, media_type=resp.headers.get("content-type", "image/jpeg"), ) except Exception as e: return Response(content=f"Error: {e}", status_code=502) @app.get("/hls-proxy") async def hls_proxy(url: str, request: Request): headers = {"Referer": "https://kwik.cx/", "User-Agent": "Mozilla/5.0"} async def stream_generator(): async with httpx.AsyncClient(follow_redirects=True) as client: async with client.stream("GET", url, headers=headers) as resp: async for chunk in resp.aiter_bytes(): yield chunk if url.split("?")[0].endswith(".m3u8"): async with httpx.AsyncClient(follow_redirects=True) as client: resp = await client.get(url, headers=headers) lines = resp.text.splitlines() base_proxy = f"{str(request.base_url).rstrip('/')}/hls-proxy?url=" new_lines = [] for line in lines: if line.startswith("#") or not line.strip(): new_lines.append(line) else: abs_url = urllib.parse.urljoin(url, line.strip()) new_lines.append(f"{base_proxy}{urllib.parse.quote(abs_url)}") return Response( content="\n".join(new_lines), media_type="application/vnd.apple.mpegurl" ) else: return StreamingResponse(stream_generator(), media_type="video/MP2T") @app.get("/proxy-mapper") async def proxy_mapper(url: str): try: async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: resp = await client.get(url) return resp.json() except Exception as e: return {"error": str(e)} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)