From 99bde080cc01c6a979ef7aa0dd33e524a88ce8eb Mon Sep 17 00:00:00 2001 From: Aira Catapang Date: Wed, 1 Apr 2026 12:44:06 +0000 Subject: [PATCH] Update main.py make it function again --- main.py | 494 ++++++++++++++++---------------------------------------- 1 file changed, 135 insertions(+), 359 deletions(-) diff --git a/main.py b/main.py index f9ccfdb..4b54770 100644 --- a/main.py +++ b/main.py @@ -22,19 +22,13 @@ IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true" _info_cache: dict = {} _mal_synopsis_cache: dict = {} - class AnimePahe: def __init__(self): self.playwright = None self.context: Optional[BrowserContext] = None - self.ad_domains = [ - "doubleclick.net", - "adservice.google", - "popads.net", - "propellerads", - "exoclick", - "bebi.com", + "doubleclick.net", "adservice.google", "popads.net", + "propellerads", "exoclick", "bebi.com", ] async def start(self): @@ -42,26 +36,27 @@ class AnimePahe: self.context = await self.playwright.chromium.launch_persistent_context( user_data_dir="./browser_data", headless=IS_HEADLESS, - user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36", - args=[ - "--disable-blink-features=AutomationControlled", - "--no-sandbox", - ], + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", + args=["--disable-blink-features=AutomationControlled", "--no-sandbox"], ) await self.context.route("**/*", self._intercept_assets) + # Initial visit to ensure cookies are set + page = await self.context.new_page() + try: + await Stealth().apply_stealth_async(page) + await page.goto(BASE_URL, wait_until="networkidle", timeout=60000) + except: pass + finally: await page.close() async def stop(self): - if self.context: - await self.context.close() - if self.playwright: - await self.playwright.stop() + if self.context: await self.context.close() + if self.playwright: await self.playwright.stop() async def _intercept_assets(self, route): url = route.request.url.lower() if "aniwatchtv.to" in url or "kwik" in url: await route.continue_() return - if any(ad in url for ad in self.ad_domains) or url.endswith( (".png", ".jpg", ".jpeg", ".webp", ".woff", ".gif") ): @@ -72,7 +67,10 @@ class AnimePahe: async def _fetch_json(self, url: str): page = await self.context.new_page() try: - await page.goto(url, wait_until="domcontentloaded") + # FIX: Referer header is required for AnimePahe API + await page.set_extra_http_headers({"Referer": BASE_URL}) + # FIX: Use networkidle instead of domcontentloaded to bypass CF + await page.goto(url, wait_until="networkidle", timeout=30000) txt = await page.evaluate("document.body.innerText") return json.loads(txt) except: @@ -80,33 +78,16 @@ class AnimePahe: finally: await page.close() - def _generate_mp4( - self, m3u8_url: Optional[str], anime_name: str, episode: str, res: str - ) -> Optional[str]: - if not m3u8_url: - return None - - # Regex explanation: - # https?://([^.]+) matches the subdomain (e.g., vault-99) - # [^/]*/stream/ ignores the rest of the domain (e.g., .owocdn.top) and matches /stream/ - # (.*?)/[^/]+\.m3u8 matches the entire token path up to the final /uwu.m3u8 + def _generate_mp4(self, m3u8_url: Optional[str], anime_name: str, episode: str, res: str) -> Optional[str]: + if not m3u8_url: return None match = re.search(r"https?://([^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url) - if match: - subdomain = match.group(1) # e.g., "vault-99" - token_path = match.group( - 2 - ) # e.g., "99/01/d138b9bb16e0a47024fad856caab2fba99d7cbd661ef2662a3572694eaebcf9a" - + subdomain = match.group(1) + token_path = match.group(2) clean_name = re.sub(r"[^\w\s]", "", anime_name).strip().replace(" ", "_") - if not clean_name: - clean_name = "Anime" - + if not clean_name: clean_name = "Anime" filename = f"{clean_name}_EP{episode}_{res}P.mp4" - - # Reconstruct the string using the kwik.cx domain and /mp4/ endpoint return f"https://{subdomain}.kwik.cx/mp4/{token_path}?file={filename}" - return None async def _scrape_play_meta(self, page) -> tuple: @@ -114,18 +95,14 @@ class AnimePahe: const titleEl = document.querySelector('.theatre-info h1 a, .theatre-info h2 a, .anime-title, h1, h2'); let title = titleEl ? titleEl.innerText.trim() : ''; let episode = ''; - const t = document.title || ''; const m = t.match(/^(.+?)\\s*[-\\u2013]\\s*(?:Episode\\s*)?(\\d+(?:\\.\\d+)?)/i); - if (m) { if (!title || title.length < 2) title = m[1].trim(); if (!episode) episode = m[2].trim(); } - return { title, episode } }""") - title = (meta.get("title") or "").strip() or "Unknown" episode = (meta.get("episode") or "").strip() or "00" return title, episode @@ -133,27 +110,16 @@ class AnimePahe: async def _scrape_ids(self, session: str) -> dict: page = await self.context.new_page() try: - # 1. Apply stealth to bypass Cloudflare on the info page await Stealth().apply_stealth_async(page) - - # 2. Use wait_until="commit" and catch timeouts just like we did in resolve + # FIX: Changed wait_until to networkidle try: - await page.goto( - f"{BASE_URL}/anime/{session}", - wait_until="commit", - timeout=30000, - ) + await page.goto(f"{BASE_URL}/anime/{session}", wait_until="networkidle", timeout=30000) except Exception as e: - if "Timeout" not in str(e): - raise e + if "Timeout" not in str(e): raise e - # Wait for the main anime content to appear try: - await page.wait_for_selector( - "div.anime-info, div.anime-summary, aside, main", timeout=15000 - ) - except: - pass + await page.wait_for_selector("div.anime-info, div.anime-summary, aside, main", timeout=15000) + except: pass await asyncio.sleep(2) ids = await page.evaluate("""() => { @@ -164,8 +130,7 @@ class AnimePahe: if (url.includes("anilist.co/anime")) ids["anilist"] = url.split("/").filter(Boolean).pop() if (url.includes("anidb.net")) ids["anidb"] = url.split("/").filter(Boolean).pop() if (url.includes("animenewsnetwork.com")) { - const m = url.match(/id=(\\d+)/) - if (m) ids["ann"] = m[1] + const m = url.match(/id=(\\d+)/); if (m) ids["ann"] = m[1] } if (url.includes("anime-planet.com/anime")) ids["animePlanet"] = url.split("/").filter(Boolean).pop() }) @@ -173,20 +138,16 @@ class AnimePahe: }""") ids["animepahe"] = session return ids - except Exception as e: + except Exception: return {"animepahe": session} finally: await page.close() async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]: - if mal_id in _mal_synopsis_cache: - return _mal_synopsis_cache[mal_id] + if mal_id in _mal_synopsis_cache: return _mal_synopsis_cache[mal_id] try: async with httpx.AsyncClient(timeout=10) as client: - resp = await client.get( - f"{JIKAN_API}/anime/{mal_id}", - headers={"Accept": "application/json"}, - ) + resp = await client.get(f"{JIKAN_API}/anime/{mal_id}", headers={"Accept": "application/json"}) resp.raise_for_status() synopsis = resp.json().get("data", {}).get("synopsis") _mal_synopsis_cache[mal_id] = synopsis @@ -196,6 +157,8 @@ class AnimePahe: return None async def _collect_buttons(self, page) -> list: + # FIX: Ensure resolution menu is attached and visible + await page.wait_for_selector("#resolutionMenu button", state="visible", timeout=10000) buttons = await page.locator("#resolutionMenu button").all() res_data = [] for btn in buttons: @@ -203,15 +166,13 @@ class AnimePahe: res_match = re.search(r"(\d+)", text) audio_lang = (await btn.get_attribute("data-audio") or "jpn").lower() audio_type = "dub" if audio_lang == "eng" else "sub" - res_data.append( - { - "embed": await btn.get_attribute("data-src"), - "res": int(res_match.group(1)) if res_match else 720, - "fansub": text.split("·")[0].strip() if "·" in text else "Unknown", - "audio": audio_type, - "audio_lang": audio_lang, - } - ) + res_data.append({ + "embed": await btn.get_attribute("data-src"), + "res": int(res_match.group(1)) if res_match else 720, + "fansub": text.split("·")[0].strip() if "·" in text else "Unknown", + "audio": audio_type, + "audio_lang": audio_lang, + }) return res_data async def _embed_to_m3u8(self, embed_url: str) -> Optional[str]: @@ -221,17 +182,18 @@ class AnimePahe: def capture(req): nonlocal m3u8 - if ".m3u8" in req.url and not found.is_set(): + # FIX: Sniff both m3u8 and mp4 from Kwik network traffic + if (".m3u8" in req.url or ".mp4" in req.url) and not found.is_set(): m3u8 = req.url found.set() p.on("request", capture) try: - await p.set_extra_http_headers({"Referer": "https://animepahe.si/"}) - await p.goto(embed_url, wait_until="domcontentloaded", timeout=15000) + await p.set_extra_http_headers({"Referer": BASE_URL}) + # FIX: Using networkidle for Kwik + await p.goto(embed_url, wait_until="networkidle", timeout=15000) for _ in range(6): - if found.is_set(): - break + if found.is_set(): break await p.evaluate("""() => { document.querySelectorAll('video').forEach(v => { v.muted = true; @@ -245,13 +207,9 @@ class AnimePahe: await asyncio.sleep(1.5) try: await asyncio.wait_for(found.wait(), timeout=5.0) - except asyncio.TimeoutError: - pass - except Exception: - pass - finally: - await p.close() - + except asyncio.TimeoutError: pass + except Exception: pass + finally: await p.close() return m3u8 async def _fetch_anilist(self, anilist_id: str) -> dict: @@ -272,10 +230,7 @@ class AnimePahe: resp = await client.post( ANILIST_API, json={"query": query, "variables": {"id": int(anilist_id)}}, - headers={ - "Content-Type": "application/json", - "Accept": "application/json", - }, + headers={"Content-Type": "application/json", "Accept": "application/json"}, ) resp.raise_for_status() result = resp.json() @@ -283,108 +238,61 @@ class AnimePahe: return {"error": f"AniList fetch failed: {str(e)}"} media = result.get("data", {}).get("Media") - if not media: - return {"error": "AniList returned no data"} + if not media: return {"error": "AniList returned no data"} mal_id = str(media.get("idMal") or "") mal_synopsis = await self._fetch_mal_synopsis(mal_id) if mal_id else None synopsis = mal_synopsis or media.get("description") def fmt_date(d): - if not d or not d.get("year"): - return None - return "-".join( - str(p).zfill(2) - for p in [d.get("year"), d.get("month"), d.get("day")] - if p - ) + if not d or not d.get("year"): return None + return "-".join(str(p).zfill(2) for p in [d.get("year"), d.get("month"), d.get("day")] if p) trailer = None if media.get("trailer"): t = media["trailer"] - if t.get("site") == "youtube": - trailer = f"https://www.youtube.com/watch?v={t['id']}" - elif t.get("site") == "dailymotion": - trailer = f"https://www.dailymotion.com/video/{t['id']}" + if t.get("site") == "youtube": trailer = f"https://www.youtube.com/watch?v={t['id']}" + elif t.get("site") == "dailymotion": trailer = f"https://www.dailymotion.com/video/{t['id']}" relations = {} for edge in media.get("relations", {}).get("edges", []): node = edge.get("node", {}) - if not node: - continue + if not node: continue rel = edge.get("relationType", "OTHER") - relations.setdefault(rel, []).append( - { - "id": node.get("id"), - "mal_id": node.get("idMal"), - "title": ( - node.get("title", {}).get("english") - or node.get("title", {}).get("romaji") - ), - "format": node.get("format"), - "status": node.get("status"), - "episodes": node.get("episodes"), - "score": node.get("averageScore"), - "image": node.get("coverImage", {}).get("medium"), - "url": node.get("siteUrl"), - "relation_type": rel, - } - ) + relations.setdefault(rel, []).append({ + "id": node.get("id"), "mal_id": node.get("idMal"), + "title": (node.get("title", {}).get("english") or node.get("title", {}).get("romaji")), + "format": node.get("format"), "status": node.get("status"), "episodes": node.get("episodes"), + "score": node.get("averageScore"), "image": node.get("coverImage", {}).get("medium"), + "url": node.get("siteUrl"), "relation_type": rel, + }) recommendations = [] for node in media.get("recommendations", {}).get("nodes", []): rec = node.get("mediaRecommendation") - if not rec: - continue - recommendations.append( - { - "id": rec.get("id"), - "mal_id": rec.get("idMal"), - "title": rec["title"].get("english") or rec["title"].get("romaji"), - "format": rec.get("format"), - "status": rec.get("status"), - "episodes": rec.get("episodes"), - "score": rec.get("averageScore"), - "image": rec.get("coverImage", {}).get("medium"), - "url": rec.get("siteUrl"), - "rating": node.get("rating"), - } - ) + if not rec: continue + recommendations.append({ + "id": rec.get("id"), "mal_id": rec.get("idMal"), + "title": rec["title"].get("english") or rec["title"].get("romaji"), + "format": rec.get("format"), "status": rec.get("status"), "episodes": rec.get("episodes"), + "score": rec.get("averageScore"), "image": rec.get("coverImage", {}).get("medium"), + "url": rec.get("siteUrl"), "rating": node.get("rating"), + }) return { - "id": media.get("id"), - "mal_id": media.get("idMal"), - "title": media["title"], - "synonyms": media.get("synonyms", []), - "synopsis": synopsis, - "format": media.get("format"), - "status": media.get("status"), - "episodes": media.get("episodes"), - "duration": media.get("duration"), - "source": media.get("source"), - "country": media.get("countryOfOrigin"), - "is_adult": media.get("isAdult"), - "start_date": fmt_date(media.get("startDate")), - "end_date": fmt_date(media.get("endDate")), - "season": media.get("season"), - "season_year": media.get("seasonYear"), - "average_score": media.get("averageScore"), - "mean_score": media.get("meanScore"), - "popularity": media.get("popularity"), - "favourites": media.get("favourites"), - "trending": media.get("trending"), - "genres": media.get("genres", []), - "cover_image": media.get("coverImage", {}), - "banner_image": media.get("bannerImage"), - "trailer": trailer, - "studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])], - "next_airing": media.get("nextAiringEpisode"), - "external_links": [ - {"site": l["site"], "url": l["url"], "type": l["type"]} - for l in media.get("externalLinks", []) - ], - "relations": relations, - "recommendations": recommendations, + "id": media.get("id"), "mal_id": media.get("idMal"), "title": media["title"], + "synonyms": media.get("synonyms", []), "synopsis": synopsis, + "format": media.get("format"), "status": media.get("status"), "episodes": media.get("episodes"), + "duration": media.get("duration"), "source": media.get("source"), "country": media.get("countryOfOrigin"), + "is_adult": media.get("isAdult"), "start_date": fmt_date(media.get("startDate")), + "end_date": fmt_date(media.get("endDate")), "season": media.get("season"), "season_year": media.get("seasonYear"), + "average_score": media.get("averageScore"), "mean_score": media.get("meanScore"), + "popularity": media.get("popularity"), "favourites": media.get("favourites"), "trending": media.get("trending"), + "genres": media.get("genres", []), "cover_image": media.get("coverImage", {}), + "banner_image": media.get("bannerImage"), "trailer": trailer, + "studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])], "next_airing": media.get("nextAiringEpisode"), + "external_links": [{"site": l["site"], "url": l["url"], "type": l["type"]} for l in media.get("externalLinks", [])], + "relations": relations, "recommendations": recommendations, } async def search(self, q: str): @@ -395,87 +303,51 @@ class AnimePahe: return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}") async def get_episodes(self, anime_id: str, p: int = 1, resolve: bool = False): - data = await self._fetch_json( - f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}" - ) - if not data or not resolve: - return data + data = await self._fetch_json(f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}") + if not data or not resolve: return data episodes = data.get("data", []) - async def enrich(ep): ep_session = ep.get("session") - if not ep_session: - return ep + if not ep_session: return ep stream = await self.resolve(anime_id, ep_session) - ep["sub"] = stream.get("sub") - ep["dub"] = stream.get("dub") + ep["sub"] = stream.get("sub"); ep["dub"] = stream.get("dub") return ep - data["data"] = list(await asyncio.gather(*[enrich(ep) for ep in episodes])) return data async def get_ids(self, session: str): try: ids = await self._scrape_ids(session) - return { - "animepahe": ids.get("animepahe"), - "anilist": ids.get("anilist"), - "mal": ids.get("mal"), - "anidb": ids.get("anidb"), - "ann": ids.get("ann"), - "animePlanet": ids.get("animePlanet"), - } - except Exception as e: - return {"error": f"Failed: {str(e)}"} + return {k: ids.get(k) for k in ["animepahe", "anilist", "mal", "anidb", "ann", "animePlanet"]} + except Exception as e: return {"error": f"Failed: {str(e)}"} async def get_info(self, session: str): try: ids = await self._scrape_ids(session) anilist_id = ids.get("anilist") - if not anilist_id: - return {"error": "Could not find AniList ID", "ids": ids} - if anilist_id in _info_cache: - return _info_cache[anilist_id] + if not anilist_id: return {"error": "Could find AniList ID", "ids": ids} + if anilist_id in _info_cache: return _info_cache[anilist_id] data = await self._fetch_anilist(anilist_id) - if "error" in data: - return {"error": data["error"], "ids": ids} - data["ids"] = { - "animepahe": ids.get("animepahe"), - "anilist": anilist_id, - "mal": ids.get("mal"), - "anidb": ids.get("anidb"), - "ann": ids.get("ann"), - "animePlanet": ids.get("animePlanet"), - } + if "error" in data: return {"error": data["error"], "ids": ids} + data["ids"] = {**ids, "anilist": anilist_id} _info_cache[anilist_id] = data return data - except Exception as e: - return {"error": f"Failed: {str(e)}"} + except Exception as e: return {"error": f"Failed: {str(e)}"} async def resolve(self, anime_session: str, episode_session: str): play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}" page = await self.context.new_page() - try: - # 1. Apply stealth to bypass Cloudflare await Stealth().apply_stealth_async(page) - + # FIX: networkidle try: - await page.goto(play_url, wait_until="commit", timeout=45000) + await page.goto(play_url, wait_until="networkidle", timeout=45000) except Exception as e: - if "Timeout" not in str(e): - raise e - - # 2. Increase timeout to 30 seconds to give Cloudflare time to auto-resolve - await page.wait_for_selector( - "#resolutionMenu button", state="attached", timeout=30000 - ) + if "Timeout" not in str(e): raise e + await page.wait_for_selector("#resolutionMenu button", state="attached", timeout=30000) anime_name, episode_num = await self._scrape_play_meta(page) res_data = await self._collect_buttons(page) - - # Don't close the page quite yet, pass it to _embed_to_m3u8 if needed - # Wait, actually we can close it here since _embed_to_m3u8 creates its own page. await page.close() page = None @@ -489,80 +361,49 @@ class AnimePahe: m3u8 = await self._embed_to_m3u8(item["embed"]) res_str = str(item["res"]) return { - "resolution": res_str, - "fansub": item["fansub"], - "audio": item["audio"], - "audio_lang": item["audio_lang"], - "url": m3u8, - "download": self._generate_mp4( - m3u8, anime_name, episode_num, res_str - ), + "resolution": res_str, "fansub": item["fansub"], "audio": item["audio"], + "audio_lang": item["audio_lang"], "url": m3u8, + "download": self._generate_mp4(m3u8, anime_name, episode_num, res_str), } except Exception as e: - return { - "resolution": str(item["res"]), - "fansub": item["fansub"], - "audio": item["audio"], - "audio_lang": item["audio_lang"], - "url": None, - "download": None, - "error": str(e), - } + return {"resolution": str(item["res"]), "fansub": item["fansub"], "error": str(e)} tasks = [] - if best_sub: - tasks.append(resolve_one(best_sub)) - if best_dub: - tasks.append(resolve_one(best_dub)) + if best_sub: tasks.append(resolve_one(best_sub)) + if best_dub: tasks.append(resolve_one(best_dub)) results = await asyncio.gather(*tasks) return { - "anime": anime_session, - "episode": episode_session, - "anime_name": anime_name, + "anime": anime_session, "episode": episode_session, "anime_name": anime_name, "episode_num": episode_num, "sub": results[0] if best_sub else None, - "dub": results[1] - if best_sub and best_dub - else (results[0] if best_dub else None), + "dub": results[1] if best_sub and best_dub else (results[0] if best_dub else None), } except Exception as e: - # 3. TAKE A SCREENSHOT ON FAILURE to see what blocked the bot if page: - try: - await page.screenshot(path="debug_error.png", full_page=True) - except: - pass - return { - "error": str(e), - "hint": "Check debug_error.png to see what the browser got stuck on.", - } + try: await page.screenshot(path="debug_error.png", full_page=True) + except: pass + return {"error": str(e), "hint": "Check debug_error.png"} finally: - if page: - await page.close() + if page: await page.close() async def get_seasons(self, anime_id: str) -> dict: url = f"{ANIWATCHTV_BASE}/{anime_id}" page = await self.context.new_page() - try: - await page.goto(url, wait_until="domcontentloaded", timeout=30000) + await page.goto(url, wait_until="networkidle", timeout=30000) await asyncio.sleep(1) - for selector in [".os-list", ".seasons-block", "[class*='season']", "main"]: try: await page.wait_for_selector(selector, timeout=5000) break - except: - continue + except: continue seasons = await page.evaluate(f"""() => {{ const BASE = "{ANIWATCHTV_BASE}"; - const results =[]; - const seen = new Set(); + const results =[]; const seen = new Set(); const container = document.querySelector('.os-list') || document.querySelector('.seasons-block') || document.querySelector('[class*="os-list"]'); if (!container) return results; - for (const a of container.querySelectorAll('a[href]')) {{ const href = a.getAttribute('href') || ''; const fullUrl = href.startsWith('http') ? href : BASE + href; @@ -570,7 +411,6 @@ class AnimePahe: const slug = fullUrl.replace(/\\/$/, '').split('/').pop(); if (!slug || seen.has(slug)) continue; seen.add(slug); - const title = (a.querySelector('span, [class*="title"]')?.innerText?.trim() || a.innerText?.trim() || slug); const posterEl = a.querySelector('.season-poster') || a.closest('li, div')?.querySelector('.season-poster'); let poster = null; @@ -584,66 +424,40 @@ class AnimePahe: return results; }}""") return {"id": anime_id, "total": len(seasons), "seasons": seasons} - except Exception as e: - return {"id": anime_id, "total": 0, "seasons": [], "error": str(e)} - finally: - await page.close() - + except Exception as e: return {"id": anime_id, "total": 0, "seasons": [], "error": str(e)} + finally: await page.close() pahe = AnimePahe() - @asynccontextmanager async def lifespan(app: FastAPI): await pahe.start() yield await pahe.stop() - app = FastAPI(lifespan=lifespan) +app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) -# 🔥 ENABLE CORS SO REACT CAN COMMUNICATE WITH THIS API 🔥 -app.add_middleware( - CORSMiddleware, - allow_origins=[ - "*" - ], # For dev, allows all origins. Change to your Vite URL in prod. - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - - -@app.get("/", response_class=JSONResponse) -async def root(): - return {"status": "ok"} - +@app.get("/") +async def root(): return {"status": "ok"} @app.get("/search") -async def api_search(q: str): - return await pahe.search(q) - +async def api_search(q: str): return await pahe.search(q) @app.get("/latest") -async def api_latest(p: int = 1): - return await pahe.get_latest(p) - +async def api_latest(p: int = 1): return await pahe.get_latest(p) @app.get("/info/{session}") -async def api_info(session: str): - return await pahe.get_info(session) - +async def api_info(session: str): return await pahe.get_info(session) @app.get("/episodes/{session}") async def api_episodes(session: str, p: int = 1, resolve: bool = False): return await pahe.get_episodes(session, p, resolve) - @app.get("/resolve/{anime}/{episode}") async def api_resolve(anime: str, episode: str): return await pahe.resolve(anime, episode) - @app.get("/seasons/{anime_id:path}") async def api_seasons(anime_id: str, request: Request): data = await pahe.get_seasons(anime_id) @@ -653,82 +467,44 @@ async def api_seasons(anime_id: str, request: Request): season["posterProxied"] = f"{base_url}/poster?url={season['poster']}" return data - @app.get("/poster") async def api_poster(url: str = Query(..., description="CDN image proxy")): try: async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client: - resp = await client.get( - url, - headers={ - "Referer": "https://aniwatchtv.to/", - "User-Agent": "Mozilla/5.0", - }, - ) + resp = await client.get(url, headers={"Referer": ANIWATCHTV_BASE, "User-Agent": "Mozilla/5.0"}) resp.raise_for_status() - return Response( - content=resp.content, - media_type=resp.headers.get("content-type", "image/jpeg"), - ) - except Exception as e: - return Response(content=f"Error: {e}", status_code=502) + return Response(content=resp.content, media_type=resp.headers.get("content-type", "image/jpeg")) + except Exception as e: return Response(content=f"Error: {e}", status_code=502) - -# 🔥 NEW HLS PROXY TO BYPASS CORS & 403 🔥 @app.get("/hls-proxy") async def hls_proxy(url: str, request: Request): - headers = { - "Referer": "https://kwik.cx/", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36", - } - - # Helper function to stream chunks directly + headers = {"Referer": "https://kwik.cx/", "User-Agent": "Mozilla/5.0"} async def stream_generator(): async with httpx.AsyncClient(follow_redirects=True) as client: async with client.stream("GET", url, headers=headers) as resp: - async for chunk in resp.aiter_bytes(): - yield chunk + async for chunk in resp.aiter_bytes(): yield chunk - # If it is an M3U8 Playlist, we need to rewrite its internal links to ALSO use the proxy if url.split("?")[0].endswith(".m3u8"): async with httpx.AsyncClient(follow_redirects=True) as client: resp = await client.get(url, headers=headers) lines = resp.text.splitlines() - base_proxy = f"{str(request.base_url).rstrip('/')}/hls-proxy?url=" new_lines = [] - for line in lines: - if line.startswith("#") or not line.strip(): - new_lines.append(line) + if line.startswith("#") or not line.strip(): new_lines.append(line) else: - # Merge relative paths (e.g. chunk1.ts) with the absolute url - absolute_url = urllib.parse.urljoin(url, line.strip()) - # Wrap it in our proxy path - proxy_url = f"{base_proxy}{urllib.parse.quote(absolute_url)}" - new_lines.append(proxy_url) + abs_url = urllib.parse.urljoin(url, line.strip()) + new_lines.append(f"{base_proxy}{urllib.parse.quote(abs_url)}") + return Response(content="\n".join(new_lines), media_type="application/vnd.apple.mpegurl") + else: return StreamingResponse(stream_generator(), media_type="video/MP2T") - return Response( - content="\n".join(new_lines), media_type="application/vnd.apple.mpegurl" - ) - - else: - # Stream the binary video chunk (.ts) - return StreamingResponse(stream_generator(), media_type="video/MP2T") - - -# 🔥 YOUR PRIVATE PROXY ROUTE 🔥 @app.get("/proxy-mapper") async def proxy_mapper(url: str): - """Proxies requests to the mapper API to bypass CORS""" try: async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: - resp = await client.get(url) - return resp.json() - except Exception as e: - return {"error": str(e)} + resp = await client.get(url); return resp.json() + except Exception as e: return {"error": str(e)} if __name__ == "__main__": import uvicorn - - uvicorn.run(app, host="0.0.0.0", port=7860) + uvicorn.run(app, host="0.0.0.0", port=7860) \ No newline at end of file