Files
aniscrap/main.py
Aira Catapang c8e677c025 Update main.py
2026-03-18 10:58:20 +00:00

724 lines
27 KiB
Python

import json
import asyncio
import re
import os
import httpx
import urllib.parse
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, Request
from fastapi.responses import StreamingResponse, Response, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from playwright.async_api import async_playwright, BrowserContext
from playwright_stealth import Stealth
BASE_URL = "https://animepahe.si"
ANIWATCHTV_BASE = "https://aniwatchtv.to"
ANILIST_API = "https://graphql.anilist.co"
JIKAN_API = "https://api.jikan.moe/v4"
IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true"
# In-memory caches
_info_cache: dict = {}
_mal_synopsis_cache: dict = {}
class AnimePahe:
def __init__(self):
self.playwright = None
self.context: Optional[BrowserContext] = None
self.ad_domains = [
"doubleclick.net",
"adservice.google",
"popads.net",
"propellerads",
"exoclick",
"bebi.com",
]
async def start(self):
self.playwright = await async_playwright().start()
self.context = await self.playwright.chromium.launch_persistent_context(
user_data_dir="./browser_data",
headless=IS_HEADLESS,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
],
)
await self.context.route("**/*", self._intercept_assets)
async def stop(self):
if self.context:
await self.context.close()
if self.playwright:
await self.playwright.stop()
async def _intercept_assets(self, route):
url = route.request.url.lower()
if "aniwatchtv.to" in url or "kwik" in url:
await route.continue_()
return
if any(ad in url for ad in self.ad_domains) or url.endswith(
(".png", ".jpg", ".jpeg", ".webp", ".woff", ".gif")
):
await route.abort()
else:
await route.continue_()
async def _fetch_json(self, url: str):
page = await self.context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded")
txt = await page.evaluate("document.body.innerText")
return json.loads(txt)
except:
return None
finally:
await page.close()
def _generate_mp4(
self, m3u8_url: Optional[str], anime_name: str, episode: str, res: str
) -> Optional[str]:
if not m3u8_url:
return None
# Regex explanation:
# https?://([^.]+) matches the subdomain (e.g., vault-99)
# [^/]*/stream/ ignores the rest of the domain (e.g., .owocdn.top) and matches /stream/
# (.*?)/[^/]+\.m3u8 matches the entire token path up to the final /uwu.m3u8
match = re.search(r"https?://([^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url)
if match:
subdomain = match.group(1) # e.g., "vault-99"
token_path = match.group(
2
) # e.g., "99/01/d138b9bb16e0a47024fad856caab2fba99d7cbd661ef2662a3572694eaebcf9a"
clean_name = re.sub(r"[^\w\s]", "", anime_name).strip().replace(" ", "_")
if not clean_name:
clean_name = "Anime"
filename = f"{clean_name}_EP{episode}_{res}P.mp4"
# Reconstruct the string using the kwik.cx domain and /mp4/ endpoint
return f"https://{subdomain}.kwik.cx/mp4/{token_path}?file={filename}"
return None
async def _scrape_play_meta(self, page) -> tuple:
meta = await page.evaluate("""() => {
const titleEl = document.querySelector('.theatre-info h1 a, .theatre-info h2 a, .anime-title, h1, h2');
let title = titleEl ? titleEl.innerText.trim() : '';
let episode = '';
const t = document.title || '';
const m = t.match(/^(.+?)\\s*[-\\u2013]\\s*(?:Episode\\s*)?(\\d+(?:\\.\\d+)?)/i);
if (m) {
if (!title || title.length < 2) title = m[1].trim();
if (!episode) episode = m[2].trim();
}
return { title, episode }
}""")
title = (meta.get("title") or "").strip() or "Unknown"
episode = (meta.get("episode") or "").strip() or "00"
return title, episode
async def _scrape_ids(self, session: str) -> dict:
page = await self.context.new_page()
try:
# 1. Apply stealth to bypass Cloudflare on the info page
await Stealth().apply_stealth_async(page)
# 2. Use wait_until="commit" and catch timeouts just like we did in resolve
try:
await page.goto(
f"{BASE_URL}/anime/{session}",
wait_until="commit",
timeout=30000,
)
except Exception as e:
if "Timeout" not in str(e):
raise e
# Wait for the main anime content to appear
try:
await page.wait_for_selector(
"div.anime-info, div.anime-summary, aside, main", timeout=15000
)
except:
pass
await asyncio.sleep(2)
ids = await page.evaluate("""() => {
let ids = {}
document.querySelectorAll("a[href]").forEach(a => {
const url = a.href || ""
if (url.includes("myanimelist.net/anime")) ids["mal"] = url.split("/").filter(Boolean).pop()
if (url.includes("anilist.co/anime")) ids["anilist"] = url.split("/").filter(Boolean).pop()
if (url.includes("anidb.net")) ids["anidb"] = url.split("/").filter(Boolean).pop()
if (url.includes("animenewsnetwork.com")) {
const m = url.match(/id=(\\d+)/)
if (m) ids["ann"] = m[1]
}
if (url.includes("anime-planet.com/anime")) ids["animePlanet"] = url.split("/").filter(Boolean).pop()
})
return ids
}""")
ids["animepahe"] = session
return ids
except Exception as e:
return {"animepahe": session}
finally:
await page.close()
async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]:
if mal_id in _mal_synopsis_cache:
return _mal_synopsis_cache[mal_id]
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{JIKAN_API}/anime/{mal_id}",
headers={"Accept": "application/json"},
)
resp.raise_for_status()
synopsis = resp.json().get("data", {}).get("synopsis")
_mal_synopsis_cache[mal_id] = synopsis
return synopsis
except Exception:
_mal_synopsis_cache[mal_id] = None
return None
async def _collect_buttons(self, page) -> list:
buttons = await page.locator("#resolutionMenu button").all()
res_data = []
for btn in buttons:
text = (await btn.inner_text()).strip()
res_match = re.search(r"(\d+)", text)
audio_lang = (await btn.get_attribute("data-audio") or "jpn").lower()
audio_type = "dub" if audio_lang == "eng" else "sub"
res_data.append(
{
"embed": await btn.get_attribute("data-src"),
"res": int(res_match.group(1)) if res_match else 720,
"fansub": text.split("·")[0].strip() if "·" in text else "Unknown",
"audio": audio_type,
"audio_lang": audio_lang,
}
)
return res_data
async def _embed_to_m3u8(self, embed_url: str) -> Optional[str]:
p = await self.context.new_page()
m3u8 = None
found = asyncio.Event()
def capture(req):
nonlocal m3u8
if ".m3u8" in req.url and not found.is_set():
m3u8 = req.url
found.set()
p.on("request", capture)
try:
await p.set_extra_http_headers({"Referer": "https://animepahe.si/"})
await p.goto(embed_url, wait_until="domcontentloaded", timeout=15000)
for _ in range(6):
if found.is_set():
break
await p.evaluate("""() => {
document.querySelectorAll('video').forEach(v => {
v.muted = true;
const p = v.play();
if (p !== undefined) p.catch(() => {});
});
document.querySelectorAll('button, .vjs-big-play-button').forEach(b => {
try { b.click() } catch(e) {}
});
}""")
await asyncio.sleep(1.5)
try:
await asyncio.wait_for(found.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
except Exception:
pass
finally:
await p.close()
return m3u8
async def _fetch_anilist(self, anilist_id: str) -> dict:
query = """
query ($id: Int) {
Media(id: $id, type: ANIME) {
id idMal title { romaji english native } synonyms description(asHtml: false) format status episodes duration source countryOfOrigin isAdult
startDate { year month day } endDate { year month day } season seasonYear averageScore meanScore popularity favourites trending genres
coverImage { extraLarge large medium color } bannerImage trailer { id site } studios(isMain: true) { nodes { name siteUrl } }
relations { edges { relationType(version: 2) node { id idMal title { romaji english } format status episodes averageScore coverImage { medium } siteUrl } } }
recommendations(perPage: 20, sort: RATING_DESC) { nodes { rating mediaRecommendation { id idMal title { romaji english } format status episodes averageScore coverImage { medium } siteUrl } } }
externalLinks { site url type } nextAiringEpisode { airingAt episode }
}
}
"""
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
ANILIST_API,
json={"query": query, "variables": {"id": int(anilist_id)}},
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
return {"error": f"AniList fetch failed: {str(e)}"}
media = result.get("data", {}).get("Media")
if not media:
return {"error": "AniList returned no data"}
mal_id = str(media.get("idMal") or "")
mal_synopsis = await self._fetch_mal_synopsis(mal_id) if mal_id else None
synopsis = mal_synopsis or media.get("description")
def fmt_date(d):
if not d or not d.get("year"):
return None
return "-".join(
str(p).zfill(2)
for p in [d.get("year"), d.get("month"), d.get("day")]
if p
)
trailer = None
if media.get("trailer"):
t = media["trailer"]
if t.get("site") == "youtube":
trailer = f"https://www.youtube.com/watch?v={t['id']}"
elif t.get("site") == "dailymotion":
trailer = f"https://www.dailymotion.com/video/{t['id']}"
relations = {}
for edge in media.get("relations", {}).get("edges", []):
node = edge.get("node", {})
if not node:
continue
rel = edge.get("relationType", "OTHER")
relations.setdefault(rel, []).append(
{
"id": node.get("id"),
"mal_id": node.get("idMal"),
"title": (
node.get("title", {}).get("english")
or node.get("title", {}).get("romaji")
),
"format": node.get("format"),
"status": node.get("status"),
"episodes": node.get("episodes"),
"score": node.get("averageScore"),
"image": node.get("coverImage", {}).get("medium"),
"url": node.get("siteUrl"),
"relation_type": rel,
}
)
recommendations = []
for node in media.get("recommendations", {}).get("nodes", []):
rec = node.get("mediaRecommendation")
if not rec:
continue
recommendations.append(
{
"id": rec.get("id"),
"mal_id": rec.get("idMal"),
"title": rec["title"].get("english") or rec["title"].get("romaji"),
"format": rec.get("format"),
"status": rec.get("status"),
"episodes": rec.get("episodes"),
"score": rec.get("averageScore"),
"image": rec.get("coverImage", {}).get("medium"),
"url": rec.get("siteUrl"),
"rating": node.get("rating"),
}
)
return {
"id": media.get("id"),
"mal_id": media.get("idMal"),
"title": media["title"],
"synonyms": media.get("synonyms", []),
"synopsis": synopsis,
"format": media.get("format"),
"status": media.get("status"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"source": media.get("source"),
"country": media.get("countryOfOrigin"),
"is_adult": media.get("isAdult"),
"start_date": fmt_date(media.get("startDate")),
"end_date": fmt_date(media.get("endDate")),
"season": media.get("season"),
"season_year": media.get("seasonYear"),
"average_score": media.get("averageScore"),
"mean_score": media.get("meanScore"),
"popularity": media.get("popularity"),
"favourites": media.get("favourites"),
"trending": media.get("trending"),
"genres": media.get("genres", []),
"cover_image": media.get("coverImage", {}),
"banner_image": media.get("bannerImage"),
"trailer": trailer,
"studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])],
"next_airing": media.get("nextAiringEpisode"),
"external_links": [
{"site": l["site"], "url": l["url"], "type": l["type"]}
for l in media.get("externalLinks", [])
],
"relations": relations,
"recommendations": recommendations,
}
async def search(self, q: str):
data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}")
return data.get("data", []) if data else []
async def get_latest(self, p: int = 1):
return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}")
async def get_episodes(self, anime_id: str, p: int = 1, resolve: bool = False):
data = await self._fetch_json(
f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}"
)
if not data or not resolve:
return data
episodes = data.get("data", [])
async def enrich(ep):
ep_session = ep.get("session")
if not ep_session:
return ep
stream = await self.resolve(anime_id, ep_session)
ep["sub"] = stream.get("sub")
ep["dub"] = stream.get("dub")
return ep
data["data"] = list(await asyncio.gather(*[enrich(ep) for ep in episodes]))
return data
async def get_ids(self, session: str):
try:
ids = await self._scrape_ids(session)
return {
"animepahe": ids.get("animepahe"),
"anilist": ids.get("anilist"),
"mal": ids.get("mal"),
"anidb": ids.get("anidb"),
"ann": ids.get("ann"),
"animePlanet": ids.get("animePlanet"),
}
except Exception as e:
return {"error": f"Failed: {str(e)}"}
async def get_info(self, session: str):
try:
ids = await self._scrape_ids(session)
anilist_id = ids.get("anilist")
if not anilist_id:
return {"error": "Could not find AniList ID", "ids": ids}
if anilist_id in _info_cache:
return _info_cache[anilist_id]
data = await self._fetch_anilist(anilist_id)
if "error" in data:
return {"error": data["error"], "ids": ids}
data["ids"] = {
"animepahe": ids.get("animepahe"),
"anilist": anilist_id,
"mal": ids.get("mal"),
"anidb": ids.get("anidb"),
"ann": ids.get("ann"),
"animePlanet": ids.get("animePlanet"),
}
_info_cache[anilist_id] = data
return data
except Exception as e:
return {"error": f"Failed: {str(e)}"}
async def resolve(self, anime_session: str, episode_session: str):
play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}"
page = await self.context.new_page()
try:
# 1. Apply stealth to bypass Cloudflare
await Stealth().apply_stealth_async(page)
try:
await page.goto(play_url, wait_until="commit", timeout=45000)
except Exception as e:
if "Timeout" not in str(e):
raise e
# 2. Increase timeout to 30 seconds to give Cloudflare time to auto-resolve
await page.wait_for_selector(
"#resolutionMenu button", state="attached", timeout=30000
)
anime_name, episode_num = await self._scrape_play_meta(page)
res_data = await self._collect_buttons(page)
# Don't close the page quite yet, pass it to _embed_to_m3u8 if needed
# Wait, actually we can close it here since _embed_to_m3u8 creates its own page.
await page.close()
page = None
subs = [r for r in res_data if r["audio"] == "sub"]
dubs = [r for r in res_data if r["audio"] == "dub"]
best_sub = max(subs, key=lambda x: x["res"]) if subs else None
best_dub = max(dubs, key=lambda x: x["res"]) if dubs else None
async def resolve_one(item):
try:
m3u8 = await self._embed_to_m3u8(item["embed"])
res_str = str(item["res"])
return {
"resolution": res_str,
"fansub": item["fansub"],
"audio": item["audio"],
"audio_lang": item["audio_lang"],
"url": m3u8,
"download": self._generate_mp4(
m3u8, anime_name, episode_num, res_str
),
}
except Exception as e:
return {
"resolution": str(item["res"]),
"fansub": item["fansub"],
"audio": item["audio"],
"audio_lang": item["audio_lang"],
"url": None,
"download": None,
"error": str(e),
}
tasks = []
if best_sub:
tasks.append(resolve_one(best_sub))
if best_dub:
tasks.append(resolve_one(best_dub))
results = await asyncio.gather(*tasks)
return {
"anime": anime_session,
"episode": episode_session,
"anime_name": anime_name,
"episode_num": episode_num,
"sub": results[0] if best_sub else None,
"dub": results[1]
if best_sub and best_dub
else (results[0] if best_dub else None),
}
except Exception as e:
# 3. TAKE A SCREENSHOT ON FAILURE to see what blocked the bot
if page:
try:
await page.screenshot(path="debug_error.png", full_page=True)
except:
pass
return {
"error": str(e),
"hint": "Check debug_error.png to see what the browser got stuck on.",
}
finally:
if page:
await page.close()
async def get_seasons(self, anime_id: str) -> dict:
url = f"{ANIWATCHTV_BASE}/{anime_id}"
page = await self.context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
await asyncio.sleep(1)
for selector in [".os-list", ".seasons-block", "[class*='season']", "main"]:
try:
await page.wait_for_selector(selector, timeout=5000)
break
except:
continue
seasons = await page.evaluate(f"""() => {{
const BASE = "{ANIWATCHTV_BASE}";
const results =[];
const seen = new Set();
const container = document.querySelector('.os-list') || document.querySelector('.seasons-block') || document.querySelector('[class*="os-list"]');
if (!container) return results;
for (const a of container.querySelectorAll('a[href]')) {{
const href = a.getAttribute('href') || '';
const fullUrl = href.startsWith('http') ? href : BASE + href;
if (!fullUrl) continue;
const slug = fullUrl.replace(/\\/$/, '').split('/').pop();
if (!slug || seen.has(slug)) continue;
seen.add(slug);
const title = (a.querySelector('span, [class*="title"]')?.innerText?.trim() || a.innerText?.trim() || slug);
const posterEl = a.querySelector('.season-poster') || a.closest('li, div')?.querySelector('.season-poster');
let poster = null;
if (posterEl) {{
const bg = posterEl.style.backgroundImage || window.getComputedStyle(posterEl).backgroundImage;
const bg2 = bg.split('url(').pop().split(')')[0].replace(/['"/]/g, '').trim();
if (bg2 && bg2.startsWith('http')) poster = bg2;
}}
results.push({{ title, id: slug, url: fullUrl, poster }});
}}
return results;
}}""")
return {"id": anime_id, "total": len(seasons), "seasons": seasons}
except Exception as e:
return {"id": anime_id, "total": 0, "seasons": [], "error": str(e)}
finally:
await page.close()
pahe = AnimePahe()
@asynccontextmanager
async def lifespan(app: FastAPI):
await pahe.start()
yield
await pahe.stop()
app = FastAPI(lifespan=lifespan)
# 🔥 ENABLE CORS SO REACT CAN COMMUNICATE WITH THIS API 🔥
app.add_middleware(
CORSMiddleware,
allow_origins=[
"*"
], # For dev, allows all origins. Change to your Vite URL in prod.
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", response_class=JSONResponse)
async def root():
return {"status": "ok"}
@app.get("/search")
async def api_search(q: str):
return await pahe.search(q)
@app.get("/latest")
async def api_latest(p: int = 1):
return await pahe.get_latest(p)
@app.get("/info/{session}")
async def api_info(session: str):
return await pahe.get_info(session)
@app.get("/episodes/{session}")
async def api_episodes(session: str, p: int = 1, resolve: bool = False):
return await pahe.get_episodes(session, p, resolve)
@app.get("/resolve/{anime}/{episode}")
async def api_resolve(anime: str, episode: str):
return await pahe.resolve(anime, episode)
@app.get("/seasons/{anime_id:path}")
async def api_seasons(anime_id: str, request: Request):
data = await pahe.get_seasons(anime_id)
base_url = str(request.base_url).rstrip("/")
for season in data.get("seasons", []):
if season.get("poster"):
season["posterProxied"] = f"{base_url}/poster?url={season['poster']}"
return data
@app.get("/poster")
async def api_poster(url: str = Query(..., description="CDN image proxy")):
try:
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
resp = await client.get(
url,
headers={
"Referer": "https://aniwatchtv.to/",
"User-Agent": "Mozilla/5.0",
},
)
resp.raise_for_status()
return Response(
content=resp.content,
media_type=resp.headers.get("content-type", "image/jpeg"),
)
except Exception as e:
return Response(content=f"Error: {e}", status_code=502)
# 🔥 NEW HLS PROXY TO BYPASS CORS & 403 🔥
@app.get("/hls-proxy")
async def hls_proxy(url: str, request: Request):
headers = {
"Referer": "https://kwik.cx/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
}
# Helper function to stream chunks directly
async def stream_generator():
async with httpx.AsyncClient(follow_redirects=True) as client:
async with client.stream("GET", url, headers=headers) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
# If it is an M3U8 Playlist, we need to rewrite its internal links to ALSO use the proxy
if url.split("?")[0].endswith(".m3u8"):
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.get(url, headers=headers)
lines = resp.text.splitlines()
base_proxy = f"{str(request.base_url).rstrip('/')}/hls-proxy?url="
new_lines = []
for line in lines:
if line.startswith("#") or not line.strip():
new_lines.append(line)
else:
# Merge relative paths (e.g. chunk1.ts) with the absolute url
absolute_url = urllib.parse.urljoin(url, line.strip())
# Wrap it in our proxy path
proxy_url = f"{base_proxy}{urllib.parse.quote(absolute_url)}"
new_lines.append(proxy_url)
return Response(
content="\n".join(new_lines), media_type="application/vnd.apple.mpegurl"
)
else:
# Stream the binary video chunk (.ts)
return StreamingResponse(stream_generator(), media_type="video/MP2T")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)