Update main.py

This commit is contained in:
Aira Catapang
2026-03-18 10:32:59 +00:00
committed by system
parent 07048d71c4
commit 630ad213ad

348
main.py
View File

@@ -3,11 +3,14 @@ import asyncio
import re
import os
import httpx
import urllib.parse
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, Request
from fastapi.responses import StreamingResponse, Response, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from playwright.async_api import async_playwright, BrowserContext
from playwright_stealth import Stealth
BASE_URL = "https://animepahe.si"
ANIWATCHTV_BASE = "https://aniwatchtv.to"
@@ -55,7 +58,6 @@ class AnimePahe:
async def _intercept_assets(self, route):
url = route.request.url.lower()
# Allow requests from aniwatchtv & kwik (video host) so players/posters load correctly
if "aniwatchtv.to" in url or "kwik" in url:
await route.continue_()
return
@@ -83,16 +85,28 @@ class AnimePahe:
) -> Optional[str]:
if not m3u8_url:
return None
# Example: https://na-02.kwik.cx/stream/abc123def/index.m3u8
match = re.search(r"(https?://[^/]+)/stream/([^/]+)/", m3u8_url)
# Regex explanation:
# https?://([^.]+) matches the subdomain (e.g., vault-99)
# [^/]*/stream/ ignores the rest of the domain (e.g., .owocdn.top) and matches /stream/
# (.*?)/[^/]+\.m3u8 matches the entire token path up to the final /uwu.m3u8
match = re.search(r"https?://([^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url)
if match:
subdomain = match.group(1) # e.g., "vault-99"
token_path = match.group(
2
) # e.g., "99/01/d138b9bb16e0a47024fad856caab2fba99d7cbd661ef2662a3572694eaebcf9a"
clean_name = re.sub(r"[^\w\s]", "", anime_name).strip().replace(" ", "_")
if not clean_name:
clean_name = "Anime"
filename = f"{clean_name}_EP{episode}_{res}P.mp4"
domain = match.group(1) # e.g. https://na-02.kwik.cx
token = match.group(2) # e.g. abc123def
return f"{domain}/mp4/{token}?file={filename}"
# Reconstruct the string using the kwik.cx domain and /mp4/ endpoint
return f"https://{subdomain}.kwik.cx/mp4/{token_path}?file={filename}"
return None
async def _scrape_play_meta(self, page) -> tuple:
@@ -102,7 +116,6 @@ class AnimePahe:
let episode = '';
const t = document.title || '';
// Match exactly: "Anime Name - 01 - AnimePahe" OR "Anime Name - Episode 01 - AnimePahe"
const m = t.match(/^(.+?)\\s*[-\\u2013]\\s*(?:Episode\\s*)?(\\d+(?:\\.\\d+)?)/i);
if (m) {
@@ -117,8 +130,6 @@ class AnimePahe:
episode = (meta.get("episode") or "").strip() or "00"
return title, episode
# ---------------- SCRAPE IDs ONLY ----------------
async def _scrape_ids(self, session: str) -> dict:
page = await self.context.new_page()
try:
@@ -127,47 +138,35 @@ class AnimePahe:
wait_until="domcontentloaded",
timeout=30000,
)
try:
await page.wait_for_selector(
"div.anime-info, div.anime-summary, aside, main", timeout=10000
)
except:
pass
await asyncio.sleep(2)
ids = await page.evaluate("""() => {
let ids = {}
document.querySelectorAll("a[href]").forEach(a => {
const url = a.href || ""
if (url.includes("myanimelist.net/anime"))
ids["mal"] = url.split("/").filter(Boolean).pop()
if (url.includes("anilist.co/anime"))
ids["anilist"] = url.split("/").filter(Boolean).pop()
if (url.includes("anidb.net"))
ids["anidb"] = url.split("/").filter(Boolean).pop()
if (url.includes("myanimelist.net/anime")) ids["mal"] = url.split("/").filter(Boolean).pop()
if (url.includes("anilist.co/anime")) ids["anilist"] = url.split("/").filter(Boolean).pop()
if (url.includes("anidb.net")) ids["anidb"] = url.split("/").filter(Boolean).pop()
if (url.includes("animenewsnetwork.com")) {
const m = url.match(/id=(\\d+)/)
if (m) ids["ann"] = m[1]
}
if (url.includes("anime-planet.com/anime"))
ids["animePlanet"] = url.split("/").filter(Boolean).pop()
if (url.includes("anime-planet.com/anime")) ids["animePlanet"] = url.split("/").filter(Boolean).pop()
})
return ids
}""")
ids["animepahe"] = session
return ids
except Exception as e:
print(f"[scrape_ids] ERROR: {e}")
return {"animepahe": session}
finally:
await page.close()
# ---------------- MAL SYNOPSIS ----------------
async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]:
if mal_id in _mal_synopsis_cache:
return _mal_synopsis_cache[mal_id]
@@ -181,13 +180,10 @@ class AnimePahe:
synopsis = resp.json().get("data", {}).get("synopsis")
_mal_synopsis_cache[mal_id] = synopsis
return synopsis
except Exception as e:
print(f"[mal_synopsis] failed for mal_id={mal_id}: {e}")
except Exception:
_mal_synopsis_cache[mal_id] = None
return None
# ---------------- SHARED RESOLVE HELPERS ----------------
async def _collect_buttons(self, page) -> list:
buttons = await page.locator("#resolutionMenu button").all()
res_data = []
@@ -220,11 +216,8 @@ class AnimePahe:
p.on("request", capture)
try:
# Set the exact referer Kwik expects to prevent token rejections
await p.set_extra_http_headers({"Referer": "https://animepahe.si/"})
await p.goto(embed_url, wait_until="domcontentloaded", timeout=15000)
# Click loop: Muting allows browsers to bypass autoplay restrictions safely
for _ in range(6):
if found.is_set():
break
@@ -239,45 +232,30 @@ class AnimePahe:
});
}""")
await asyncio.sleep(1.5)
try:
await asyncio.wait_for(found.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
except Exception as e:
print(f"[_embed_to_m3u8] ERROR: {e}")
except Exception:
pass
finally:
await p.close()
return m3u8
# ---------------- ANILIST ----------------
async def _fetch_anilist(self, anilist_id: str) -> dict:
query = """
query ($id: Int) {
Media(id: $id, type: ANIME) {
id
idMal
title { romaji english native }
synonyms
description(asHtml: false)
format status episodes duration source countryOfOrigin isAdult
startDate { year month day }
endDate { year month day }
season seasonYear averageScore meanScore popularity favourites trending genres
coverImage { extraLarge large medium color }
bannerImage
trailer { id site }
studios(isMain: true) { nodes { name siteUrl } }
id idMal title { romaji english native } synonyms description(asHtml: false) format status episodes duration source countryOfOrigin isAdult
startDate { year month day } endDate { year month day } season seasonYear averageScore meanScore popularity favourites trending genres
coverImage { extraLarge large medium color } bannerImage trailer { id site } studios(isMain: true) { nodes { name siteUrl } }
relations { edges { relationType(version: 2) node { id idMal title { romaji english } format status episodes averageScore coverImage { medium } siteUrl } } }
recommendations(perPage: 20, sort: RATING_DESC) { nodes { rating mediaRecommendation { id idMal title { romaji english } format status episodes averageScore coverImage { medium } siteUrl } } }
externalLinks { site url type }
nextAiringEpisode { airingAt episode }
externalLinks { site url type } nextAiringEpisode { airingAt episode }
}
}
"""
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
@@ -291,7 +269,6 @@ class AnimePahe:
resp.raise_for_status()
result = resp.json()
except Exception as e:
print(f"[anilist] failed for id={anilist_id}: {e}")
return {"error": f"AniList fetch failed: {str(e)}"}
media = result.get("data", {}).get("Media")
@@ -305,8 +282,11 @@ class AnimePahe:
def fmt_date(d):
if not d or not d.get("year"):
return None
parts = [d.get("year"), d.get("month"), d.get("day")]
return "-".join(str(p).zfill(2) for p in parts if p)
return "-".join(
str(p).zfill(2)
for p in [d.get("year"), d.get("month"), d.get("day")]
if p
)
trailer = None
if media.get("trailer"):
@@ -316,31 +296,30 @@ class AnimePahe:
elif t.get("site") == "dailymotion":
trailer = f"https://www.dailymotion.com/video/{t['id']}"
# ---------- Relations from AniList ----------
relations: dict[str, list] = {}
relations = {}
for edge in media.get("relations", {}).get("edges", []):
node = edge.get("node", {})
if not node:
continue
relation_type = edge.get("relationType", "OTHER")
entry = {
"id": node.get("id"),
"mal_id": node.get("idMal"),
"title": (
node.get("title", {}).get("english")
or node.get("title", {}).get("romaji")
),
"format": node.get("format"),
"status": node.get("status"),
"episodes": node.get("episodes"),
"score": node.get("averageScore"),
"image": node.get("coverImage", {}).get("medium"),
"url": node.get("siteUrl"),
"relation_type": relation_type,
}
relations.setdefault(relation_type, []).append(entry)
rel = edge.get("relationType", "OTHER")
relations.setdefault(rel, []).append(
{
"id": node.get("id"),
"mal_id": node.get("idMal"),
"title": (
node.get("title", {}).get("english")
or node.get("title", {}).get("romaji")
),
"format": node.get("format"),
"status": node.get("status"),
"episodes": node.get("episodes"),
"score": node.get("averageScore"),
"image": node.get("coverImage", {}).get("medium"),
"url": node.get("siteUrl"),
"relation_type": rel,
}
)
# ---------- Recommendations ----------
recommendations = []
for node in media.get("recommendations", {}).get("nodes", []):
rec = node.get("mediaRecommendation")
@@ -364,11 +343,7 @@ class AnimePahe:
return {
"id": media.get("id"),
"mal_id": media.get("idMal"),
"title": {
"romaji": media["title"].get("romaji"),
"english": media["title"].get("english"),
"native": media["title"].get("native"),
},
"title": media["title"],
"synonyms": media.get("synonyms", []),
"synopsis": synopsis,
"format": media.get("format"),
@@ -401,27 +376,19 @@ class AnimePahe:
"recommendations": recommendations,
}
# ---------------- SEARCH ----------------
async def search(self, q: str):
data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}")
return data.get("data", []) if data else []
# ---------------- LATEST ----------------
async def get_latest(self, p: int = 1):
return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}")
# ---------------- EPISODES ----------------
async def get_episodes(self, anime_id: str, p: int = 1, resolve: bool = False):
data = await self._fetch_json(
f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}"
)
if not data or not resolve:
return data
episodes = data.get("data", [])
async def enrich(ep):
@@ -436,8 +403,6 @@ class AnimePahe:
data["data"] = list(await asyncio.gather(*[enrich(ep) for ep in episodes]))
return data
# ---------------- IDS ONLY ----------------
async def get_ids(self, session: str):
try:
ids = await self._scrape_ids(session)
@@ -450,30 +415,19 @@ class AnimePahe:
"animePlanet": ids.get("animePlanet"),
}
except Exception as e:
print(f"[get_ids] ERROR: {e}")
return {"error": f"Failed: {str(e)}"}
# ---------------- INFO ----------------
async def get_info(self, session: str):
try:
ids = await self._scrape_ids(session)
anilist_id = ids.get("anilist")
if not anilist_id:
return {
"error": "Could not find AniList ID on AnimePahe page",
"ids": ids,
}
return {"error": "Could not find AniList ID", "ids": ids}
if anilist_id in _info_cache:
return _info_cache[anilist_id]
data = await self._fetch_anilist(anilist_id)
if "error" in data:
return {"error": data["error"], "ids": ids}
data["ids"] = {
"animepahe": ids.get("animepahe"),
"anilist": anilist_id,
@@ -482,30 +436,35 @@ class AnimePahe:
"ann": ids.get("ann"),
"animePlanet": ids.get("animePlanet"),
}
_info_cache[anilist_id] = data
return data
except Exception as e:
print(f"[get_info] ERROR: {e}")
return {"error": f"Failed: {str(e)}"}
# ---------------- RESOLVE ----------------
async def resolve(self, anime_session: str, episode_session: str):
play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}"
page = await self.context.new_page()
try:
await page.goto(play_url, wait_until="domcontentloaded")
# 1. Apply stealth to bypass Cloudflare
await Stealth().apply_stealth_async(page)
try:
await page.goto(play_url, wait_until="commit", timeout=45000)
except Exception as e:
if "Timeout" not in str(e):
raise e
# 2. Increase timeout to 30 seconds to give Cloudflare time to auto-resolve
await page.wait_for_selector(
"#resolutionMenu button",
state="attached",
timeout=15000,
"#resolutionMenu button", state="attached", timeout=30000
)
anime_name, episode_num = await self._scrape_play_meta(page)
res_data = await self._collect_buttons(page)
# Don't close the page quite yet, pass it to _embed_to_m3u8 if needed
# Wait, actually we can close it here since _embed_to_m3u8 creates its own page.
await page.close()
page = None
@@ -544,33 +503,33 @@ class AnimePahe:
tasks.append(resolve_one(best_sub))
if best_dub:
tasks.append(resolve_one(best_dub))
results = await asyncio.gather(*tasks)
sub_result = results[0] if best_sub else None
dub_result = (
results[1]
if best_sub and best_dub
else (results[0] if best_dub else None)
)
return {
"anime": anime_session,
"episode": episode_session,
"anime_name": anime_name,
"episode_num": episode_num,
"sub": sub_result,
"dub": dub_result,
"sub": results[0] if best_sub else None,
"dub": results[1]
if best_sub and best_dub
else (results[0] if best_dub else None),
}
except Exception as e:
return {"error": str(e)}
# 3. TAKE A SCREENSHOT ON FAILURE to see what blocked the bot
if page:
try:
await page.screenshot(path="debug_error.png", full_page=True)
except:
pass
return {
"error": str(e),
"hint": "Check debug_error.png to see what the browser got stuck on.",
}
finally:
if page:
await page.close()
# ---------------- SEASONS ----------------
async def get_seasons(self, anime_id: str) -> dict:
url = f"{ANIWATCHTV_BASE}/{anime_id}"
page = await self.context.new_page()
@@ -588,51 +547,20 @@ class AnimePahe:
seasons = await page.evaluate(f"""() => {{
const BASE = "{ANIWATCHTV_BASE}";
const currentId = "{anime_id}";
const results =[];
const seen = new Set();
const block = (
document.querySelector('.os-list') ||
document.querySelector('.seasons-block') ||
document.querySelector('[class*="os-list"]') ||
document.querySelector('[class*="season-list"]')
);
const fallbackContainer = (() => {{
for (const el of document.querySelectorAll('*')) {{
if (/more\\s+seasons?/i.test(el.innerText?.trim() || '')) {{
let p = el.parentElement;
for (let i = 0; i < 5; i++) {{
if (!p) break;
if (p.querySelectorAll('a[href]').length > 0) return p;
p = p.parentElement;
}}
}}
}}
return null;
}})();
const container = block || fallbackContainer;
const container = document.querySelector('.os-list') || document.querySelector('.seasons-block') || document.querySelector('[class*="os-list"]');
if (!container) return results;
for (const a of container.querySelectorAll('a[href]')) {{
const href = a.getAttribute('href') || '';
const fullUrl = href.startsWith('http') ? href
: href.startsWith('/') ? BASE + href
: null;
const fullUrl = href.startsWith('http') ? href : BASE + href;
if (!fullUrl) continue;
const slug = fullUrl.replace(/\\/$/, '').split('/').pop();
if (!slug || seen.has(slug)) continue;
seen.add(slug);
const numericMatch = slug.match(/-(\\d+)$/);
const numericId = numericMatch ? numericMatch[1] : null;
const titleEl = a.querySelector('span, [class*="title"], [class*="name"]');
const title = (titleEl?.innerText?.trim() || a.innerText?.trim() || slug);
const title = (a.querySelector('span, [class*="title"]')?.innerText?.trim() || a.innerText?.trim() || slug);
const posterEl = a.querySelector('.season-poster') || a.closest('li, div')?.querySelector('.season-poster');
let poster = null;
if (posterEl) {{
@@ -640,21 +568,12 @@ class AnimePahe:
const bg2 = bg.split('url(').pop().split(')')[0].replace(/['"/]/g, '').trim();
if (bg2 && bg2.startsWith('http')) poster = bg2;
}}
results.push({{ title, id: slug, numericId, url: fullUrl, poster }});
results.push({{ title, id: slug, url: fullUrl, poster }});
}}
return results;
}}""")
return {
"id": anime_id,
"total": len(seasons),
"seasons": seasons,
}
return {"id": anime_id, "total": len(seasons), "seasons": seasons}
except Exception as e:
print(f"[get_seasons] ERROR: {e}")
return {"id": anime_id, "total": 0, "seasons": [], "error": str(e)}
finally:
await page.close()
@@ -672,22 +591,21 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
# 🔥 ENABLE CORS SO REACT CAN COMMUNICATE WITH THIS API 🔥
app.add_middleware(
CORSMiddleware,
allow_origins=[
"*"
], # For dev, allows all origins. Change to your Vite URL in prod.
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", response_class=JSONResponse)
async def root():
return {
"status": "ok",
"routes": [
"/search?q=:title",
"/latest?p=:page",
"/info/:session",
"/ids/:session",
"/episodes/:session?p=:page&resolve=false|true",
"/resolve/:animeSession/:episodeSession",
"/seasons/:animeId",
"/poster?url=:cdnImageUrl",
],
}
return {"status": "ok"}
@app.get("/search")
@@ -705,11 +623,6 @@ async def api_info(session: str):
return await pahe.get_info(session)
@app.get("/ids/{session}")
async def api_ids(session: str):
return await pahe.get_ids(session)
@app.get("/episodes/{session}")
async def api_episodes(session: str, p: int = 1, resolve: bool = False):
return await pahe.get_episodes(session, p, resolve)
@@ -727,13 +640,11 @@ async def api_seasons(anime_id: str, request: Request):
for season in data.get("seasons", []):
if season.get("poster"):
season["posterProxied"] = f"{base_url}/poster?url={season['poster']}"
else:
season["posterProxied"] = None
return data
@app.get("/poster")
async def api_poster(url: str = Query(..., description="CDN image URL to proxy")):
async def api_poster(url: str = Query(..., description="CDN image proxy")):
try:
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
resp = await client.get(
@@ -744,15 +655,56 @@ async def api_poster(url: str = Query(..., description="CDN image URL to proxy")
},
)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "image/jpeg")
return Response(content=resp.content, media_type=content_type)
return Response(
content=resp.content,
media_type=resp.headers.get("content-type", "image/jpeg"),
)
except Exception as e:
return Response(content=f"Error: {e}", status_code=502)
# 🔥 NEW HLS PROXY TO BYPASS CORS & 403 🔥
@app.get("/hls-proxy")
async def hls_proxy(url: str, request: Request):
headers = {
"Referer": "https://kwik.cx/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
}
# Helper function to stream chunks directly
async def stream_generator():
async with httpx.AsyncClient(follow_redirects=True) as client:
async with client.stream("GET", url, headers=headers) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
# If it is an M3U8 Playlist, we need to rewrite its internal links to ALSO use the proxy
if url.split("?")[0].endswith(".m3u8"):
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.get(url, headers=headers)
lines = resp.text.splitlines()
base_proxy = f"{str(request.base_url).rstrip('/')}/hls-proxy?url="
new_lines = []
for line in lines:
if line.startswith("#") or not line.strip():
new_lines.append(line)
else:
# Merge relative paths (e.g. chunk1.ts) with the absolute url
absolute_url = urllib.parse.urljoin(url, line.strip())
# Wrap it in our proxy path
proxy_url = f"{base_proxy}{urllib.parse.quote(absolute_url)}"
new_lines.append(proxy_url)
return Response(
content=f"Failed to fetch image: {e}",
status_code=502,
media_type="text/plain",
content="\n".join(new_lines), media_type="application/vnd.apple.mpegurl"
)
else:
# Stream the binary video chunk (.ts)
return StreamingResponse(stream_generator(), media_type="video/MP2T")
if __name__ == "__main__":
import uvicorn