Files
aniscrap/main.py
Aira Catapang 99a9cdb91a Update main.py
2026-03-17 06:17:33 +00:00

847 lines
29 KiB
Python

import json
import asyncio
import re
import os
import httpx
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, Request
from fastapi.responses import StreamingResponse, Response, JSONResponse
from playwright.async_api import async_playwright, BrowserContext
BASE_URL = "https://animepahe.si"
ANIWATCHTV_BASE = "https://aniwatchtv.to"
ANILIST_API = "https://graphql.anilist.co"
JIKAN_API = "https://api.jikan.moe/v4"
IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true"
# In-memory caches
_info_cache: dict = {}
_mal_synopsis_cache: dict = {}
class AnimePahe:
def __init__(self):
self.playwright = None
self.context: Optional[BrowserContext] = None
self.ad_domains = [
"doubleclick.net",
"adservice.google",
"popads.net",
"propellerads",
"exoclick",
"bebi.com",
]
async def start(self):
self.playwright = await async_playwright().start()
self.context = await self.playwright.chromium.launch_persistent_context(
user_data_dir="./browser_data",
headless=IS_HEADLESS,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
],
)
await self.context.route("**/*", self._intercept_assets)
async def stop(self):
if self.context:
await self.context.close()
if self.playwright:
await self.playwright.stop()
async def _intercept_assets(self, route):
url = route.request.url.lower()
# Allow all requests from aniwatchtv so season posters can load
if "aniwatchtv.to" in url:
await route.continue_()
return
if any(ad in url for ad in self.ad_domains) or url.endswith(
(".png", ".jpg", ".jpeg", ".webp", ".woff")
):
await route.abort()
else:
await route.continue_()
async def _fetch_json(self, url: str):
page = await self.context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded")
txt = await page.evaluate("document.body.innerText")
return json.loads(txt)
except:
return None
finally:
await page.close()
def _generate_mp4(
self, m3u8_url: Optional[str], anime_name: str, episode: str, res: str
) -> Optional[str]:
if not m3u8_url:
return None
match = re.search(r"(https?://[^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url)
if match:
clean_name = re.sub(r"[^\w\s]", "", anime_name).strip().replace(" ", "_")
filename = f"{clean_name}_EP{episode}_{res}P.mp4"
return f"{match.group(1)}.kwik.cx/mp4/{match.group(2)}?file={filename}"
return None
async def _scrape_play_meta(self, page) -> tuple:
meta = await page.evaluate("""() => {
const titleEl = document.querySelector('.theatre-info h2 a, .anime-title, h2 a[href*="/anime/"]')
const epEl = document.querySelector('.theatre-info h2, .episode-title, h2')
let title = titleEl ? titleEl.innerText.trim() : ''
let episode = ''
if (epEl) {
const m = epEl.innerText.match(/episode\\s*(\\d+)/i)
if (m) episode = m[1]
}
if (!title || !episode) {
const t = document.title || ''
const m = t.match(/^(.+?)\\s*[-\\u2013]\\s*Episode\\s*(\\d+)/i)
if (m) {
if (!title) title = m[1].trim()
if (!episode) episode = m[2].trim()
}
}
return { title, episode }
}""")
title = (meta.get("title") or "").strip() or "Unknown"
episode = (meta.get("episode") or "").strip() or "00"
return title, episode
# ---------------- SCRAPE IDs ONLY ----------------
async def _scrape_ids(self, session: str) -> dict:
page = await self.context.new_page()
try:
await page.goto(
f"{BASE_URL}/anime/{session}",
wait_until="domcontentloaded",
timeout=30000,
)
# Wait for the anime info block to render
try:
await page.wait_for_selector(
"div.anime-info, div.anime-summary, aside, main", timeout=10000
)
except:
pass
# Extra wait for JS-rendered content
await asyncio.sleep(2)
# Debug: log all hrefs found on page
all_links = await page.evaluate("""() => {
return Array.from(document.querySelectorAll('a[href]')).map(a => a.href)
}""")
print(f"[scrape_ids] All links found: {all_links}")
ids = await page.evaluate("""() => {
let ids = {}
document.querySelectorAll("a[href]").forEach(a => {
const url = a.href || ""
if (url.includes("myanimelist.net/anime"))
ids["mal"] = url.split("/").filter(Boolean).pop()
if (url.includes("anilist.co/anime"))
ids["anilist"] = url.split("/").filter(Boolean).pop()
if (url.includes("anidb.net"))
ids["anidb"] = url.split("/").filter(Boolean).pop()
if (url.includes("animenewsnetwork.com")) {
const m = url.match(/id=(\\d+)/)
if (m) ids["ann"] = m[1]
}
if (url.includes("anime-planet.com/anime"))
ids["animePlanet"] = url.split("/").filter(Boolean).pop()
})
return ids
}""")
print(f"[scrape_ids] Extracted ids: {ids}")
ids["animepahe"] = session
return ids
except Exception as e:
print(f"[scrape_ids] ERROR: {e}")
return {"animepahe": session}
finally:
await page.close()
# ---------------- MAL SYNOPSIS ----------------
async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]:
if mal_id in _mal_synopsis_cache:
return _mal_synopsis_cache[mal_id]
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{JIKAN_API}/anime/{mal_id}",
headers={"Accept": "application/json"},
)
resp.raise_for_status()
synopsis = resp.json().get("data", {}).get("synopsis")
_mal_synopsis_cache[mal_id] = synopsis
return synopsis
except Exception as e:
print(f"[mal_synopsis] failed for mal_id={mal_id}: {e}")
_mal_synopsis_cache[mal_id] = None
return None
# ---------------- SHARED RESOLVE HELPERS ----------------
async def _collect_buttons(self, page) -> list:
buttons = await page.locator("#resolutionMenu button").all()
res_data = []
for btn in buttons:
text = (await btn.inner_text()).strip()
res_match = re.search(r"(\d+)", text)
audio_lang = (await btn.get_attribute("data-audio") or "jpn").lower()
audio_type = "dub" if audio_lang == "eng" else "sub"
res_data.append(
{
"embed": await btn.get_attribute("data-src"),
"res": int(res_match.group(1)) if res_match else 720,
"fansub": text.split("·")[0].strip() if "·" in text else "Unknown",
"audio": audio_type,
"audio_lang": audio_lang,
}
)
return res_data
async def _embed_to_m3u8(self, embed_url: str) -> Optional[str]:
p = await self.context.new_page()
m3u8 = None
found = asyncio.Event()
def capture(req):
nonlocal m3u8
if ".m3u8" in req.url and not found.is_set():
m3u8 = req.url
found.set()
p.on("request", capture)
try:
await p.set_extra_http_headers({"Referer": BASE_URL})
await p.goto(embed_url, wait_until="domcontentloaded")
await p.evaluate(
"document.querySelectorAll('button, video, [class*=play]').forEach(el => el.click())"
)
try:
await asyncio.wait_for(found.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
finally:
await p.close()
return m3u8
# ---------------- ANILIST ----------------
async def _fetch_anilist(self, anilist_id: str) -> dict:
query = """
query ($id: Int) {
Media(id: $id, type: ANIME) {
id
idMal
title {
romaji
english
native
}
synonyms
description(asHtml: false)
format
status
episodes
duration
source
countryOfOrigin
isAdult
startDate { year month day }
endDate { year month day }
season
seasonYear
averageScore
meanScore
popularity
favourites
trending
genres
coverImage {
extraLarge
large
medium
color
}
bannerImage
trailer {
id
site
}
studios(isMain: true) {
nodes { name siteUrl }
}
relations {
edges {
relationType(version: 2)
node {
id
idMal
title { romaji english }
format
status
episodes
averageScore
coverImage { medium }
siteUrl
}
}
}
recommendations(perPage: 20, sort: RATING_DESC) {
nodes {
rating
mediaRecommendation {
id
idMal
title { romaji english }
format
status
episodes
averageScore
coverImage { medium }
siteUrl
}
}
}
externalLinks {
site
url
type
}
nextAiringEpisode {
airingAt
episode
}
}
}
"""
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
ANILIST_API,
json={"query": query, "variables": {"id": int(anilist_id)}},
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
print(f"[anilist] failed for id={anilist_id}: {e}")
return {"error": f"AniList fetch failed: {str(e)}"}
media = result.get("data", {}).get("Media")
if not media:
return {"error": "AniList returned no data"}
mal_id = str(media.get("idMal") or "")
mal_synopsis = await self._fetch_mal_synopsis(mal_id) if mal_id else None
synopsis = mal_synopsis or media.get("description")
def fmt_date(d):
if not d or not d.get("year"):
return None
parts = [d.get("year"), d.get("month"), d.get("day")]
return "-".join(str(p).zfill(2) for p in parts if p)
trailer = None
if media.get("trailer"):
t = media["trailer"]
if t.get("site") == "youtube":
trailer = f"https://www.youtube.com/watch?v={t['id']}"
elif t.get("site") == "dailymotion":
trailer = f"https://www.dailymotion.com/video/{t['id']}"
# ---------- Relations from AniList ----------
relations: dict[str, list] = {}
for edge in media.get("relations", {}).get("edges", []):
node = edge.get("node", {})
if not node:
continue
relation_type = edge.get("relationType", "OTHER")
entry = {
"id": node.get("id"),
"mal_id": node.get("idMal"),
"title": (
node.get("title", {}).get("english")
or node.get("title", {}).get("romaji")
),
"format": node.get("format"),
"status": node.get("status"),
"episodes": node.get("episodes"),
"score": node.get("averageScore"),
"image": node.get("coverImage", {}).get("medium"),
"url": node.get("siteUrl"),
"relation_type": relation_type,
}
relations.setdefault(relation_type, []).append(entry)
# ---------- Recommendations ----------
recommendations = []
for node in media.get("recommendations", {}).get("nodes", []):
rec = node.get("mediaRecommendation")
if not rec:
continue
recommendations.append(
{
"id": rec.get("id"),
"mal_id": rec.get("idMal"),
"title": rec["title"].get("english") or rec["title"].get("romaji"),
"format": rec.get("format"),
"status": rec.get("status"),
"episodes": rec.get("episodes"),
"score": rec.get("averageScore"),
"image": rec.get("coverImage", {}).get("medium"),
"url": rec.get("siteUrl"),
"rating": node.get("rating"),
}
)
return {
"id": media.get("id"),
"mal_id": media.get("idMal"),
"title": {
"romaji": media["title"].get("romaji"),
"english": media["title"].get("english"),
"native": media["title"].get("native"),
},
"synonyms": media.get("synonyms", []),
"synopsis": synopsis,
"format": media.get("format"),
"status": media.get("status"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"source": media.get("source"),
"country": media.get("countryOfOrigin"),
"is_adult": media.get("isAdult"),
"start_date": fmt_date(media.get("startDate")),
"end_date": fmt_date(media.get("endDate")),
"season": media.get("season"),
"season_year": media.get("seasonYear"),
"average_score": media.get("averageScore"),
"mean_score": media.get("meanScore"),
"popularity": media.get("popularity"),
"favourites": media.get("favourites"),
"trending": media.get("trending"),
"genres": media.get("genres", []),
"cover_image": media.get("coverImage", {}),
"banner_image": media.get("bannerImage"),
"trailer": trailer,
"studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])],
"next_airing": media.get("nextAiringEpisode"),
"external_links": [
{"site": l["site"], "url": l["url"], "type": l["type"]}
for l in media.get("externalLinks", [])
],
"relations": relations,
"recommendations": recommendations,
}
# ---------------- SEARCH ----------------
async def search(self, q: str):
data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}")
return data.get("data", []) if data else []
# ---------------- LATEST ----------------
async def get_latest(self, p: int = 1):
return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}")
# ---------------- EPISODES ----------------
async def get_episodes(self, anime_id: str, p: int = 1, resolve: bool = False):
data = await self._fetch_json(
f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}"
)
if not data or not resolve:
return data
episodes = data.get("data", [])
async def enrich(ep):
ep_session = ep.get("session")
if not ep_session:
return ep
stream = await self.resolve(anime_id, ep_session)
ep["sub"] = stream.get("sub")
ep["dub"] = stream.get("dub")
return ep
data["data"] = list(await asyncio.gather(*[enrich(ep) for ep in episodes]))
return data
# ---------------- IDS ONLY ----------------
async def get_ids(self, session: str):
try:
ids = await self._scrape_ids(session)
return {
"animepahe": ids.get("animepahe"),
"anilist": ids.get("anilist"),
"mal": ids.get("mal"),
"anidb": ids.get("anidb"),
"ann": ids.get("ann"),
"animePlanet": ids.get("animePlanet"),
}
except Exception as e:
print(f"[get_ids] ERROR: {e}")
return {"error": f"Failed: {str(e)}"}
# ---------------- INFO ----------------
async def get_info(self, session: str):
try:
ids = await self._scrape_ids(session)
anilist_id = ids.get("anilist")
if not anilist_id:
return {
"error": "Could not find AniList ID on AnimePahe page",
"ids": ids,
}
if anilist_id in _info_cache:
return _info_cache[anilist_id]
data = await self._fetch_anilist(anilist_id)
if "error" in data:
return {"error": data["error"], "ids": ids}
data["ids"] = {
"animepahe": ids.get("animepahe"),
"anilist": anilist_id,
"mal": ids.get("mal"),
"anidb": ids.get("anidb"),
"ann": ids.get("ann"),
"animePlanet": ids.get("animePlanet"),
}
_info_cache[anilist_id] = data
return data
except Exception as e:
print(f"[get_info] ERROR: {e}")
return {"error": f"Failed: {str(e)}"}
# ---------------- RESOLVE ----------------
async def resolve(self, anime_session: str, episode_session: str):
play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}"
page = await self.context.new_page()
try:
await page.goto(play_url, wait_until="domcontentloaded")
await page.wait_for_selector(
"#resolutionMenu button",
state="attached",
timeout=15000,
)
anime_name, episode_num = await self._scrape_play_meta(page)
res_data = await self._collect_buttons(page)
await page.close()
page = None
subs = [r for r in res_data if r["audio"] == "sub"]
dubs = [r for r in res_data if r["audio"] == "dub"]
best_sub = max(subs, key=lambda x: x["res"]) if subs else None
best_dub = max(dubs, key=lambda x: x["res"]) if dubs else None
async def resolve_one(item):
try:
m3u8 = await self._embed_to_m3u8(item["embed"])
res_str = str(item["res"])
return {
"resolution": res_str,
"fansub": item["fansub"],
"audio": item["audio"],
"audio_lang": item["audio_lang"],
"url": m3u8,
"download": self._generate_mp4(
m3u8, anime_name, episode_num, res_str
),
}
except Exception as e:
return {
"resolution": str(item["res"]),
"fansub": item["fansub"],
"audio": item["audio"],
"audio_lang": item["audio_lang"],
"url": None,
"download": None,
"error": str(e),
}
tasks = []
if best_sub:
tasks.append(resolve_one(best_sub))
if best_dub:
tasks.append(resolve_one(best_dub))
results = await asyncio.gather(*tasks)
sub_result = results[0] if best_sub else None
dub_result = (
results[1]
if best_sub and best_dub
else (results[0] if best_dub else None)
)
return {
"anime": anime_session,
"episode": episode_session,
"anime_name": anime_name,
"episode_num": episode_num,
"sub": sub_result,
"dub": dub_result,
}
except Exception as e:
return {"error": str(e)}
finally:
if page:
await page.close()
# ---------------- SEASONS ----------------
async def get_seasons(self, anime_id: str) -> dict:
"""
Scrape the 'More Seasons' section from aniwatchtv.to using the
existing Playwright browser context.
anime_id is the full slug, e.g. jujutsu-kaisen-the-culling-game-part-1-20401
"""
url = f"{ANIWATCHTV_BASE}/{anime_id}"
page = await self.context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
# Short wait for lazy-loaded images and JS rendering
await asyncio.sleep(1)
# Wait for season block — try common selectors gracefully
for selector in [".os-list", ".seasons-block", "[class*='season']", "main"]:
try:
await page.wait_for_selector(selector, timeout=5000)
break
except:
continue
seasons = await page.evaluate(f"""() => {{
const BASE = "{ANIWATCHTV_BASE}";
const currentId = "{anime_id}";
const results = [];
const seen = new Set();
// Strategy 1: dedicated season list block (.os-list or similar)
const block = (
document.querySelector('.os-list') ||
document.querySelector('.seasons-block') ||
document.querySelector('[class*="os-list"]') ||
document.querySelector('[class*="season-list"]')
);
// Strategy 2: find a heading that says "More Seasons" and walk up
const fallbackContainer = (() => {{
for (const el of document.querySelectorAll('*')) {{
if (/more\\s+seasons?/i.test(el.innerText?.trim() || '')) {{
let p = el.parentElement;
for (let i = 0; i < 5; i++) {{
if (!p) break;
if (p.querySelectorAll('a[href]').length > 0) return p;
p = p.parentElement;
}}
}}
}}
return null;
}})();
const container = block || fallbackContainer;
if (!container) return results;
for (const a of container.querySelectorAll('a[href]')) {{
const href = a.getAttribute('href') || '';
const fullUrl = href.startsWith('http') ? href
: href.startsWith('/') ? BASE + href
: null;
if (!fullUrl) continue;
const slug = fullUrl.replace(/\\/$/, '').split('/').pop();
// Include ALL slugs — current page included — dedupe only
if (!slug || seen.has(slug)) continue;
seen.add(slug);
const numericMatch = slug.match(/-(\\d+)$/);
const numericId = numericMatch ? numericMatch[1] : null;
const titleEl = a.querySelector('span, [class*="title"], [class*="name"]');
const title = (titleEl?.innerText?.trim() || a.innerText?.trim() || slug);
// Poster is in a sibling/child div.season-poster as a CSS background-image
const posterEl = a.querySelector('.season-poster') || a.closest('li, div')?.querySelector('.season-poster');
let poster = null;
if (posterEl) {{
const bg = posterEl.style.backgroundImage || window.getComputedStyle(posterEl).backgroundImage;
const bg2 = bg.split('url(').pop().split(')')[0].replace(/['"/]/g, '').trim();
if (bg2 && bg2.startsWith('http')) poster = bg2;
}}
results.push({{ title, id: slug, numericId, url: fullUrl, poster }});
}}
return results;
}}""")
return {
"id": anime_id,
"total": len(seasons),
"seasons": seasons,
}
except Exception as e:
print(f"[get_seasons] ERROR: {e}")
return {"id": anime_id, "total": 0, "seasons": [], "error": str(e)}
finally:
await page.close()
pahe = AnimePahe()
@asynccontextmanager
async def lifespan(app: FastAPI):
await pahe.start()
yield
await pahe.stop()
app = FastAPI(lifespan=lifespan)
@app.get("/", response_class=JSONResponse)
async def root():
return {
"status": "ok",
"routes": [
"/search?q=:title",
"/latest?p=:page",
"/info/:session",
"/ids/:session",
"/episodes/:session?p=:page&resolve=false|true",
"/resolve/:animeSession/:episodeSession",
"/seasons/:animeId - e.g. /seasons/jujutsu-kaisen-the-culling-game-part-1-20401",
"/poster?url=:cdnImageUrl - proxy hotlink-protected poster images",
],
}
@app.get("/search")
async def api_search(q: str):
return await pahe.search(q)
@app.get("/latest")
async def api_latest(p: int = 1):
return await pahe.get_latest(p)
@app.get("/info/{session}")
async def api_info(session: str):
return await pahe.get_info(session)
@app.get("/ids/{session}")
async def api_ids(session: str):
return await pahe.get_ids(session)
@app.get("/episodes/{session}")
async def api_episodes(session: str, p: int = 1, resolve: bool = False):
return await pahe.get_episodes(session, p, resolve)
@app.get("/resolve/{anime}/{episode}")
async def api_resolve(anime: str, episode: str):
return await pahe.resolve(anime, episode)
@app.get("/seasons/{anime_id:path}")
async def api_seasons(anime_id: str, request: Request):
"""
Scrape the More Seasons section from aniwatchtv.to.
Example:
GET /seasons/jujutsu-kaisen-the-culling-game-part-1-20401
Returns:
id - the slug passed in
total - number of other seasons found
seasons[] - list of { title, id, numericId, url, poster }
"""
data = await pahe.get_seasons(anime_id)
base_url = str(request.base_url).rstrip("/")
for season in data.get("seasons", []):
if season.get("poster"):
season["posterProxied"] = f"{base_url}/poster?url={season['poster']}"
else:
season["posterProxied"] = None
return data
@app.get("/poster")
async def api_poster(url: str = Query(..., description="CDN image URL to proxy")):
"""
Proxy a hotlink-protected poster image with the correct Referer header.
Use this to display season/anime posters in the browser.
Example:
GET /poster?url=https://cdn.noitatnemucod.net/thumbnail/100x200/100/abc123.jpg
"""
try:
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
resp = await client.get(
url,
headers={
"Referer": "https://aniwatchtv.to/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
},
)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "image/jpeg")
return Response(content=resp.content, media_type=content_type)
except Exception as e:
return Response(
content=f"Failed to fetch image: {e}",
status_code=502,
media_type="text/plain",
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)