import json import asyncio import re import os import httpx from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI from playwright.async_api import async_playwright, BrowserContext BASE_URL = "https://animepahe.si" ANILIST_API = "https://graphql.anilist.co" JIKAN_API = "https://api.jikan.moe/v4" KITSU_API = "https://kitsu.io/api/edge" IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true" # In-memory caches _info_cache: dict = {} _mal_synopsis_cache: dict = {} _kitsu_relations_cache: dict = {} KITSU_HEADERS = { "Accept": "application/vnd.api+json", "Content-Type": "application/vnd.api+json", } DIRECT_RELATION_TYPES = {"sequel", "prequel", "parent", "full_story", "side_story"} class AnimePahe: def __init__(self): self.playwright = None self.context: Optional[BrowserContext] = None self.ad_domains = [ "doubleclick.net", "adservice.google", "popads.net", "propellerads", "exoclick", "bebi.com", ] async def start(self): self.playwright = await async_playwright().start() self.context = await self.playwright.chromium.launch_persistent_context( user_data_dir="./browser_data", headless=IS_HEADLESS, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36", args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", ], ) await self.context.route("**/*", self._intercept_assets) async def stop(self): if self.context: await self.context.close() if self.playwright: await self.playwright.stop() async def _intercept_assets(self, route): url = route.request.url.lower() if any(ad in url for ad in self.ad_domains) or url.endswith( (".png", ".jpg", ".jpeg", ".webp", ".woff") ): await route.abort() else: await route.continue_() async def _fetch_json(self, url: str): page = await self.context.new_page() try: await page.goto(url, wait_until="domcontentloaded") txt = await page.evaluate("document.body.innerText") return json.loads(txt) except: return None finally: await page.close() def _generate_mp4(self, m3u8_url: Optional[str], anime_id: str, res: str): if not m3u8_url: return None match = re.search(r"(https?://[^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url) if match: return f"{match.group(1)}.kwik.cx/mp4/{match.group(2)}?file=AnimePahe_{anime_id}_{res}p.mp4" return None # ---------------- SCRAPE IDs ONLY ---------------- async def _scrape_ids(self, session: str) -> dict: page = await self.context.new_page() try: await page.goto( f"{BASE_URL}/anime/{session}", wait_until="networkidle", timeout=30000, ) await page.wait_for_selector(".anime-info", timeout=10000) await asyncio.sleep(1) ids = await page.evaluate("""() => { let ids = {} document.querySelectorAll("a[href]").forEach(a => { const url = a.href || "" if (url.includes("myanimelist.net/anime")) ids["mal"] = url.split("/").filter(Boolean).pop() if (url.includes("anilist.co/anime")) ids["anilist"] = url.split("/").filter(Boolean).pop() if (url.includes("anidb.net")) ids["anidb"] = url.split("/").filter(Boolean).pop() if (url.includes("kitsu.io/anime")) ids["kitsu"] = url.split("/").filter(Boolean).pop() if (url.includes("animenewsnetwork.com")) { const m = url.match(/id=(\\d+)/) if (m) ids["ann"] = m[1] } if (url.includes("anime-planet.com/anime")) ids["animePlanet"] = url.split("/").filter(Boolean).pop() }) return ids }""") ids["animepahe"] = session return ids except Exception as e: print(f"[scrape_ids] ERROR: {e}") return {"animepahe": session} finally: await page.close() # ---------------- MAL SYNOPSIS ---------------- async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]: if mal_id in _mal_synopsis_cache: return _mal_synopsis_cache[mal_id] try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( f"{JIKAN_API}/anime/{mal_id}", headers={"Accept": "application/json"}, ) resp.raise_for_status() synopsis = resp.json().get("data", {}).get("synopsis") _mal_synopsis_cache[mal_id] = synopsis return synopsis except Exception as e: print(f"[mal_synopsis] failed for mal_id={mal_id}: {e}") _mal_synopsis_cache[mal_id] = None return None # ---------------- KITSU RELATIONS ---------------- async def _fetch_kitsu_relations(self, kitsu_id: str) -> list: if kitsu_id in _kitsu_relations_cache: return _kitsu_relations_cache[kitsu_id] try: async with httpx.AsyncClient(timeout=15) as client: url = ( f"{KITSU_API}/anime/{kitsu_id}/media-relationships" f"?include=destination" f"&fields[anime]=canonicalTitle,posterImage,episodeCount,status,subtype,startDate" f"&page[limit]=20" ) resp = await client.get(url, headers=KITSU_HEADERS) resp.raise_for_status() data = resp.json() except Exception as e: print(f"[kitsu_relations] failed for kitsu_id={kitsu_id}: {e}") _kitsu_relations_cache[kitsu_id] = [] return [] included = {} for item in data.get("included", []): included[item["id"]] = item direct = [] indirect = [] for rel in data.get("data", []): attrs = rel.get("attributes", {}) role = (attrs.get("role") or "").lower() dest_data = ( rel.get("relationships", {}).get("destination", {}).get("data", {}) ) dest_type = dest_data.get("type", "") dest_id = dest_data.get("id", "") if dest_type != "anime": continue dest = included.get(dest_id, {}) dest_attrs = dest.get("attributes", {}) poster = dest_attrs.get("posterImage") or {} entry = { "kitsu_id": dest_id, "title": dest_attrs.get("canonicalTitle"), "format": dest_attrs.get("subtype"), "status": dest_attrs.get("status"), "episodes": dest_attrs.get("episodeCount"), "start_date": dest_attrs.get("startDate"), "image": ( poster.get("small") or poster.get("medium") or poster.get("original") ), "url": f"https://kitsu.io/anime/{dest_id}", "relation_type": role, } if role in DIRECT_RELATION_TYPES: direct.append(entry) else: indirect.append(entry) combined = direct + indirect _kitsu_relations_cache[kitsu_id] = combined return combined # ---------------- SHARED RESOLVE HELPERS ---------------- async def _collect_buttons(self, page) -> list: """ Read all #resolutionMenu buttons. Returns list with embed URL, resolution (int), fansub, audio type. data-audio="jpn" → sub, data-audio="eng" → dub """ buttons = await page.locator("#resolutionMenu button").all() res_data = [] for btn in buttons: text = (await btn.inner_text()).strip() res_match = re.search(r"(\d+)", text) audio_lang = (await btn.get_attribute("data-audio") or "jpn").lower() audio_type = "dub" if audio_lang == "eng" else "sub" res_data.append( { "embed": await btn.get_attribute("data-src"), "res": int(res_match.group(1)) if res_match else 720, "fansub": text.split("·")[0].strip() if "·" in text else "Unknown", "audio": audio_type, "audio_lang": audio_lang, } ) return res_data async def _embed_to_m3u8(self, embed_url: str) -> Optional[str]: """Open embed URL and capture the first .m3u8 network request.""" p = await self.context.new_page() m3u8 = None def capture(req): nonlocal m3u8 if ".m3u8" in req.url: m3u8 = req.url p.on("request", capture) try: await p.set_extra_http_headers({"Referer": BASE_URL}) await p.goto(embed_url, wait_until="domcontentloaded") for _ in range(10): if m3u8: break await p.evaluate( "document.querySelectorAll('button, video, [class*=play]')" ".forEach(el => el.click())" ) await asyncio.sleep(0.5) finally: await p.close() return m3u8 # ---------------- ANILIST ---------------- async def _fetch_anilist(self, anilist_id: str) -> dict: query = """ query ($id: Int) { Media(id: $id, type: ANIME) { id idMal title { romaji english native } synonyms description(asHtml: false) format status episodes duration source countryOfOrigin isAdult startDate { year month day } endDate { year month day } season seasonYear averageScore meanScore popularity favourites trending genres tags { name category rank isMediaSpoiler } coverImage { extraLarge large medium color } bannerImage trailer { id site } studios(isMain: true) { nodes { name siteUrl } } staff(perPage: 10) { edges { role node { name { full } image { medium } siteUrl } } } characters(perPage: 10, sort: [ROLE, RELEVANCE]) { edges { role node { name { full } image { medium } siteUrl } voiceActors(language: JAPANESE) { name { full } image { medium } siteUrl } } } recommendations(perPage: 20, sort: RATING_DESC) { nodes { rating mediaRecommendation { id idMal title { romaji english } format status episodes averageScore coverImage { medium } siteUrl } } } externalLinks { site url type } nextAiringEpisode { airingAt episode } } } """ try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.post( ANILIST_API, json={"query": query, "variables": {"id": int(anilist_id)}}, headers={ "Content-Type": "application/json", "Accept": "application/json", }, ) resp.raise_for_status() result = resp.json() except Exception as e: print(f"[anilist] failed for id={anilist_id}: {e}") return {"error": f"AniList fetch failed: {str(e)}"} media = result.get("data", {}).get("Media") if not media: return {"error": "AniList returned no data"} mal_id = str(media.get("idMal") or "") mal_synopsis = await self._fetch_mal_synopsis(mal_id) if mal_id else None synopsis = mal_synopsis or media.get("description") def fmt_date(d): if not d or not d.get("year"): return None parts = [d.get("year"), d.get("month"), d.get("day")] return "-".join(str(p).zfill(2) for p in parts if p) trailer = None if media.get("trailer"): t = media["trailer"] if t.get("site") == "youtube": trailer = f"https://www.youtube.com/watch?v={t['id']}" elif t.get("site") == "dailymotion": trailer = f"https://www.dailymotion.com/video/{t['id']}" recommendations = [] for node in media.get("recommendations", {}).get("nodes", []): rec = node.get("mediaRecommendation") if not rec: continue recommendations.append( { "id": rec.get("id"), "mal_id": rec.get("idMal"), "title": rec["title"].get("english") or rec["title"].get("romaji"), "format": rec.get("format"), "status": rec.get("status"), "episodes": rec.get("episodes"), "score": rec.get("averageScore"), "image": rec.get("coverImage", {}).get("medium"), "url": rec.get("siteUrl"), "rating": node.get("rating"), } ) characters = [] for edge in media.get("characters", {}).get("edges", []): node = edge.get("node", {}) vas = edge.get("voiceActors", []) characters.append( { "name": node.get("name", {}).get("full"), "image": node.get("image", {}).get("medium"), "role": edge.get("role"), "url": node.get("siteUrl"), "voice_actor": { "name": vas[0]["name"]["full"], "image": vas[0].get("image", {}).get("medium"), "url": vas[0].get("siteUrl"), } if vas else None, } ) staff = [] for edge in media.get("staff", {}).get("edges", []): node = edge.get("node", {}) staff.append( { "name": node.get("name", {}).get("full"), "image": node.get("image", {}).get("medium"), "role": edge.get("role"), "url": node.get("siteUrl"), } ) return { "id": media.get("id"), "mal_id": media.get("idMal"), "title": { "romaji": media["title"].get("romaji"), "english": media["title"].get("english"), "native": media["title"].get("native"), }, "synonyms": media.get("synonyms", []), "synopsis": synopsis, "format": media.get("format"), "status": media.get("status"), "episodes": media.get("episodes"), "duration": media.get("duration"), "source": media.get("source"), "country": media.get("countryOfOrigin"), "is_adult": media.get("isAdult"), "start_date": fmt_date(media.get("startDate")), "end_date": fmt_date(media.get("endDate")), "season": media.get("season"), "season_year": media.get("seasonYear"), "average_score": media.get("averageScore"), "mean_score": media.get("meanScore"), "popularity": media.get("popularity"), "favourites": media.get("favourites"), "trending": media.get("trending"), "genres": media.get("genres", []), "tags": [ { "name": t["name"], "category": t["category"], "rank": t["rank"], "spoiler": t["isMediaSpoiler"], } for t in media.get("tags", []) ], "cover_image": media.get("coverImage", {}), "banner_image": media.get("bannerImage"), "trailer": trailer, "studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])], "next_airing": media.get("nextAiringEpisode"), "external_links": [ {"site": l["site"], "url": l["url"], "type": l["type"]} for l in media.get("externalLinks", []) ], "characters": characters, "staff": staff, "relations": {}, "recommendations": recommendations, } # ---------------- SEARCH ---------------- async def search(self, q: str): data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}") return data.get("data", []) if data else [] # ---------------- LATEST ---------------- async def get_latest(self, p: int = 1): return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}") # ---------------- EPISODES ---------------- async def get_episodes(self, anime_id: str, p: int = 1, resolve: bool = False): data = await self._fetch_json( f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}" ) if not data or not resolve: return data episodes = data.get("data", []) async def enrich(ep): ep_session = ep.get("session") if not ep_session: return ep stream = await self._resolve_episode(anime_id, ep_session) ep["sub"] = stream.get("sub") ep["dub"] = stream.get("dub") return ep data["data"] = list(await asyncio.gather(*[enrich(ep) for ep in episodes])) return data # ---------------- INFO ---------------- async def get_info(self, session: str): try: ids = await self._scrape_ids(session) anilist_id = ids.get("anilist") if not anilist_id: return { "error": "Could not find AniList ID on AnimePahe page", "ids": ids, } if anilist_id in _info_cache: return _info_cache[anilist_id] kitsu_id = ids.get("kitsu") async def empty_relations(): return [] anilist_task = self._fetch_anilist(anilist_id) kitsu_task = ( self._fetch_kitsu_relations(kitsu_id) if kitsu_id else empty_relations() ) data, kitsu_relations = await asyncio.gather(anilist_task, kitsu_task) if "error" in data: return {"error": data["error"], "ids": ids} data["relations"] = {"Related": kitsu_relations} if kitsu_relations else {} data["ids"] = { "animepahe": ids.get("animepahe"), "anilist": anilist_id, "mal": ids.get("mal"), "anidb": ids.get("anidb"), "kitsu": kitsu_id, "ann": ids.get("ann"), "animePlanet": ids.get("animePlanet"), } _info_cache[anilist_id] = data return data except Exception as e: print(f"[get_info] ERROR: {e}") return {"error": f"Failed: {str(e)}"} # ---------------- _resolve_episode (used by get_episodes) ---------------- async def _resolve_episode(self, anime_session: str, episode_session: str) -> dict: play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}" page = await self.context.new_page() try: await page.goto(play_url, wait_until="domcontentloaded") await page.wait_for_selector( "#resolutionMenu button", state="attached", timeout=15000, ) res_data = await self._collect_buttons(page) await page.close() page = None if not res_data: return {"sub": None, "dub": None} subs = [r for r in res_data if r["audio"] == "sub"] dubs = [r for r in res_data if r["audio"] == "dub"] best_sub = max(subs, key=lambda x: x["res"]) if subs else None best_dub = max(dubs, key=lambda x: x["res"]) if dubs else None result = {"sub": None, "dub": None} async def resolve_one(item, key): m3u8 = await self._embed_to_m3u8(item["embed"]) res_str = str(item["res"]) result[key] = { "url": m3u8, "download": self._generate_mp4(m3u8, anime_session, res_str), "resolution": res_str, "fansub": item["fansub"], } tasks = [] if best_sub: tasks.append(resolve_one(best_sub, "sub")) if best_dub: tasks.append(resolve_one(best_dub, "dub")) await asyncio.gather(*tasks) return result except Exception as e: print(f"[_resolve_episode] ERROR: {e}") return {"sub": None, "dub": None, "error": str(e)} finally: if page: await page.close() # ---------------- RESOLVE ---------------- async def resolve(self, anime_session: str, episode_session: str): """ Resolve highest-res sub and dub for a single episode. Returns: sub: { resolution, fansub, audio, url, download } dub: { resolution, fansub, audio, url, download } or null if no dub """ play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}" page = await self.context.new_page() try: await page.goto(play_url, wait_until="domcontentloaded") await page.wait_for_selector( "#resolutionMenu button", state="attached", timeout=15000, ) res_data = await self._collect_buttons(page) await page.close() page = None async def resolve_source(item): try: m3u8 = await self._embed_to_m3u8(item["embed"]) res_str = str(item["res"]) return { "resolution": res_str, "fansub": item["fansub"], "audio": item["audio"], "audio_lang": item["audio_lang"], "url": m3u8, "download": self._generate_mp4(m3u8, anime_session, res_str), } except Exception as e: return { "resolution": str(item["res"]), "fansub": item["fansub"], "audio": item["audio"], "audio_lang": item["audio_lang"], "url": None, "download": None, "error": str(e), } all_sources = list( await asyncio.gather(*[resolve_source(i) for i in res_data]) ) sub_sources = [s for s in all_sources if s["audio"] == "sub"] dub_sources = [s for s in all_sources if s["audio"] == "dub"] def best(sources): if not sources: return None return max( [s for s in sources if s["url"]], key=lambda x: int(x["resolution"]) if x["resolution"] else 0, default=None, ) return { "anime": anime_session, "episode": episode_session, "sub": best(sub_sources), "dub": best(dub_sources), } except Exception as e: return {"error": str(e)} finally: if page: await page.close() pahe = AnimePahe() @asynccontextmanager async def lifespan(app: FastAPI): await pahe.start() yield await pahe.stop() app = FastAPI(lifespan=lifespan) @app.get("/search") async def api_search(q: str): return await pahe.search(q) @app.get("/latest") async def api_latest(p: int = 1): return await pahe.get_latest(p) @app.get("/info/{session}") async def api_info(session: str): return await pahe.get_info(session) @app.get("/episodes/{session}") async def api_episodes(session: str, p: int = 1, resolve: bool = False): return await pahe.get_episodes(session, p, resolve) @app.get("/resolve/{anime}/{episode}") async def api_resolve(anime: str, episode: str): return await pahe.resolve(anime, episode) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)