Files
aniscrap/main.py
Aira Catapang d408de57b6 Update main.py
2026-03-17 03:49:45 +00:00

797 lines
26 KiB
Python

import json
import asyncio
import re
import os
import httpx
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI
from playwright.async_api import async_playwright, BrowserContext
BASE_URL = "https://animepahe.si"
ANILIST_API = "https://graphql.anilist.co"
JIKAN_API = "https://api.jikan.moe/v4"
KITSU_API = "https://kitsu.io/api/edge"
IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true"
# In-memory caches
_info_cache: dict = {}
_mal_synopsis_cache: dict = {}
_kitsu_relations_cache: dict = {}
KITSU_HEADERS = {
"Accept": "application/vnd.api+json",
"Content-Type": "application/vnd.api+json",
}
# Direct relation types (shown first)
DIRECT_RELATION_TYPES = {"sequel", "prequel", "parent", "full_story", "side_story"}
class AnimePahe:
def __init__(self):
self.playwright = None
self.context: Optional[BrowserContext] = None
self.ad_domains = [
"doubleclick.net",
"adservice.google",
"popads.net",
"propellerads",
"exoclick",
"bebi.com",
]
async def start(self):
self.playwright = await async_playwright().start()
self.context = await self.playwright.chromium.launch_persistent_context(
user_data_dir="./browser_data",
headless=IS_HEADLESS,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
],
)
await self.context.route("**/*", self._intercept_assets)
async def stop(self):
if self.context:
await self.context.close()
if self.playwright:
await self.playwright.stop()
async def _intercept_assets(self, route):
url = route.request.url.lower()
if any(ad in url for ad in self.ad_domains) or url.endswith(
(".png", ".jpg", ".jpeg", ".webp", ".woff")
):
await route.abort()
else:
await route.continue_()
async def _fetch_json(self, url: str):
page = await self.context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded")
txt = await page.evaluate("document.body.innerText")
return json.loads(txt)
except:
return None
finally:
await page.close()
def _generate_mp4(self, m3u8_url: Optional[str], anime_id: str, res: str):
if not m3u8_url:
return None
match = re.search(r"(https?://[^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url)
if match:
return f"{match.group(1)}.kwik.cx/mp4/{match.group(2)}?file=AnimePahe_{anime_id}_{res}p.mp4"
return None
# ---------------- SCRAPE IDs ONLY ----------------
async def _scrape_ids(self, session: str) -> dict:
page = await self.context.new_page()
try:
await page.goto(
f"{BASE_URL}/anime/{session}",
wait_until="networkidle",
timeout=30000,
)
await page.wait_for_selector(".anime-info", timeout=10000)
await asyncio.sleep(1)
ids = await page.evaluate("""() => {
let ids = {}
document.querySelectorAll("a[href]").forEach(a => {
const url = a.href || ""
if (url.includes("myanimelist.net/anime"))
ids["mal"] = url.split("/").filter(Boolean).pop()
if (url.includes("anilist.co/anime"))
ids["anilist"] = url.split("/").filter(Boolean).pop()
if (url.includes("anidb.net"))
ids["anidb"] = url.split("/").filter(Boolean).pop()
if (url.includes("kitsu.io/anime"))
ids["kitsu"] = url.split("/").filter(Boolean).pop()
if (url.includes("animenewsnetwork.com")) {
const m = url.match(/id=(\\d+)/)
if (m) ids["ann"] = m[1]
}
if (url.includes("anime-planet.com/anime"))
ids["animePlanet"] = url.split("/").filter(Boolean).pop()
})
return ids
}""")
ids["animepahe"] = session
return ids
except Exception as e:
print(f"[scrape_ids] ERROR: {e}")
return {"animepahe": session}
finally:
await page.close()
# ---------------- MAL SYNOPSIS ----------------
async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]:
if mal_id in _mal_synopsis_cache:
return _mal_synopsis_cache[mal_id]
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{JIKAN_API}/anime/{mal_id}",
headers={"Accept": "application/json"},
)
resp.raise_for_status()
synopsis = resp.json().get("data", {}).get("synopsis")
_mal_synopsis_cache[mal_id] = synopsis
return synopsis
except Exception as e:
print(f"[mal_synopsis] failed for mal_id={mal_id}: {e}")
_mal_synopsis_cache[mal_id] = None
return None
# ---------------- KITSU RELATIONS ----------------
async def _fetch_kitsu_relations(self, kitsu_id: str) -> list:
"""
Fetch ALL related anime from Kitsu — full chain including all seasons,
movies, OVAs, specials. Direct types listed first.
"""
if kitsu_id in _kitsu_relations_cache:
return _kitsu_relations_cache[kitsu_id]
try:
async with httpx.AsyncClient(timeout=15) as client:
url = (
f"{KITSU_API}/anime/{kitsu_id}/media-relationships"
f"?include=destination"
f"&fields[anime]=canonicalTitle,posterImage,episodeCount,status,subtype,startDate"
f"&page[limit]=20"
)
resp = await client.get(url, headers=KITSU_HEADERS)
resp.raise_for_status()
data = resp.json()
except Exception as e:
print(f"[kitsu_relations] failed for kitsu_id={kitsu_id}: {e}")
_kitsu_relations_cache[kitsu_id] = []
return []
# Build lookup of included resources by id
included = {}
for item in data.get("included", []):
included[item["id"]] = item
direct = []
indirect = []
for rel in data.get("data", []):
attrs = rel.get("attributes", {})
role = (attrs.get("role") or "").lower()
dest_data = (
rel.get("relationships", {}).get("destination", {}).get("data", {})
)
dest_type = dest_data.get("type", "")
dest_id = dest_data.get("id", "")
# Only include anime destinations
if dest_type != "anime":
continue
dest = included.get(dest_id, {})
dest_attrs = dest.get("attributes", {})
poster = dest_attrs.get("posterImage") or {}
entry = {
"kitsu_id": dest_id,
"title": dest_attrs.get("canonicalTitle"),
"format": dest_attrs.get("subtype"),
"status": dest_attrs.get("status"),
"episodes": dest_attrs.get("episodeCount"),
"start_date": dest_attrs.get("startDate"),
"image": (
poster.get("small")
or poster.get("medium")
or poster.get("original")
),
"url": f"https://kitsu.io/anime/{dest_id}",
"relation_type": role,
}
if role in DIRECT_RELATION_TYPES:
direct.append(entry)
else:
indirect.append(entry)
combined = direct + indirect
_kitsu_relations_cache[kitsu_id] = combined
return combined
# ---------------- ANILIST ----------------
async def _fetch_anilist(self, anilist_id: str) -> dict:
query = """
query ($id: Int) {
Media(id: $id, type: ANIME) {
id
idMal
title {
romaji
english
native
}
synonyms
description(asHtml: false)
format
status
episodes
duration
source
countryOfOrigin
isAdult
startDate { year month day }
endDate { year month day }
season
seasonYear
averageScore
meanScore
popularity
favourites
trending
genres
tags {
name
category
rank
isMediaSpoiler
}
coverImage {
extraLarge
large
medium
color
}
bannerImage
trailer {
id
site
}
studios(isMain: true) {
nodes { name siteUrl }
}
staff(perPage: 10) {
edges {
role
node {
name { full }
image { medium }
siteUrl
}
}
}
characters(perPage: 10, sort: [ROLE, RELEVANCE]) {
edges {
role
node {
name { full }
image { medium }
siteUrl
}
voiceActors(language: JAPANESE) {
name { full }
image { medium }
siteUrl
}
}
}
recommendations(perPage: 20, sort: RATING_DESC) {
nodes {
rating
mediaRecommendation {
id
idMal
title { romaji english }
format
status
episodes
averageScore
coverImage { medium }
siteUrl
}
}
}
externalLinks {
site
url
type
}
nextAiringEpisode {
airingAt
episode
}
}
}
"""
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
ANILIST_API,
json={"query": query, "variables": {"id": int(anilist_id)}},
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
print(f"[anilist] failed for id={anilist_id}: {e}")
return {"error": f"AniList fetch failed: {str(e)}"}
media = result.get("data", {}).get("Media")
if not media:
return {"error": "AniList returned no data"}
# MAL synopsis
mal_id = str(media.get("idMal") or "")
mal_synopsis = await self._fetch_mal_synopsis(mal_id) if mal_id else None
synopsis = mal_synopsis or media.get("description")
# Format dates
def fmt_date(d):
if not d or not d.get("year"):
return None
parts = [d.get("year"), d.get("month"), d.get("day")]
return "-".join(str(p).zfill(2) for p in parts if p)
# Trailer
trailer = None
if media.get("trailer"):
t = media["trailer"]
if t.get("site") == "youtube":
trailer = f"https://www.youtube.com/watch?v={t['id']}"
elif t.get("site") == "dailymotion":
trailer = f"https://www.dailymotion.com/video/{t['id']}"
# Recommendations
recommendations = []
for node in media.get("recommendations", {}).get("nodes", []):
rec = node.get("mediaRecommendation")
if not rec:
continue
recommendations.append(
{
"id": rec.get("id"),
"mal_id": rec.get("idMal"),
"title": rec["title"].get("english") or rec["title"].get("romaji"),
"format": rec.get("format"),
"status": rec.get("status"),
"episodes": rec.get("episodes"),
"score": rec.get("averageScore"),
"image": rec.get("coverImage", {}).get("medium"),
"url": rec.get("siteUrl"),
"rating": node.get("rating"),
}
)
# Characters
characters = []
for edge in media.get("characters", {}).get("edges", []):
node = edge.get("node", {})
vas = edge.get("voiceActors", [])
characters.append(
{
"name": node.get("name", {}).get("full"),
"image": node.get("image", {}).get("medium"),
"role": edge.get("role"),
"url": node.get("siteUrl"),
"voice_actor": {
"name": vas[0]["name"]["full"],
"image": vas[0].get("image", {}).get("medium"),
"url": vas[0].get("siteUrl"),
}
if vas
else None,
}
)
# Staff
staff = []
for edge in media.get("staff", {}).get("edges", []):
node = edge.get("node", {})
staff.append(
{
"name": node.get("name", {}).get("full"),
"image": node.get("image", {}).get("medium"),
"role": edge.get("role"),
"url": node.get("siteUrl"),
}
)
return {
"id": media.get("id"),
"mal_id": media.get("idMal"),
"title": {
"romaji": media["title"].get("romaji"),
"english": media["title"].get("english"),
"native": media["title"].get("native"),
},
"synonyms": media.get("synonyms", []),
"synopsis": synopsis,
"format": media.get("format"),
"status": media.get("status"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"source": media.get("source"),
"country": media.get("countryOfOrigin"),
"is_adult": media.get("isAdult"),
"start_date": fmt_date(media.get("startDate")),
"end_date": fmt_date(media.get("endDate")),
"season": media.get("season"),
"season_year": media.get("seasonYear"),
"average_score": media.get("averageScore"),
"mean_score": media.get("meanScore"),
"popularity": media.get("popularity"),
"favourites": media.get("favourites"),
"trending": media.get("trending"),
"genres": media.get("genres", []),
"tags": [
{
"name": t["name"],
"category": t["category"],
"rank": t["rank"],
"spoiler": t["isMediaSpoiler"],
}
for t in media.get("tags", [])
],
"cover_image": media.get("coverImage", {}),
"banner_image": media.get("bannerImage"),
"trailer": trailer,
"studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])],
"next_airing": media.get("nextAiringEpisode"),
"external_links": [
{"site": l["site"], "url": l["url"], "type": l["type"]}
for l in media.get("externalLinks", [])
],
"characters": characters,
"staff": staff,
"relations": {}, # filled by get_info() from Kitsu
"recommendations": recommendations,
}
# ---------------- SEARCH ----------------
async def search(self, q: str):
data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}")
return data.get("data", []) if data else []
# ---------------- LATEST ----------------
async def get_latest(self, p: int = 1):
return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}")
# ---------------- EPISODES ----------------
async def get_episodes(self, anime_id: str, p: int = 1, resolve: bool = False):
"""
Fetch episode list. If resolve=True, also resolve the highest-res
stream URL and download link for each episode concurrently.
"""
data = await self._fetch_json(
f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}"
)
if not data or not resolve:
return data
episodes = data.get("data", [])
async def enrich(ep):
ep_session = ep.get("session")
if not ep_session:
return ep
stream = await self._resolve_episode(anime_id, ep_session)
ep["url"] = stream.get("url")
ep["download"] = stream.get("download")
ep["resolution"] = stream.get("resolution")
ep["fansub"] = stream.get("fansub")
return ep
data["data"] = list(await asyncio.gather(*[enrich(ep) for ep in episodes]))
return data
# ---------------- INFO ----------------
async def get_info(self, session: str):
try:
# Step 1 — scrape IDs from AnimePahe
ids = await self._scrape_ids(session)
anilist_id = ids.get("anilist")
if not anilist_id:
return {
"error": "Could not find AniList ID on AnimePahe page",
"ids": ids,
}
# Step 2 — return from cache if already built
if anilist_id in _info_cache:
return _info_cache[anilist_id]
# Step 3 — fetch AniList data + Kitsu relations concurrently
kitsu_id = ids.get("kitsu")
async def empty_relations():
return []
anilist_task = self._fetch_anilist(anilist_id)
kitsu_task = (
self._fetch_kitsu_relations(kitsu_id) if kitsu_id else empty_relations()
)
data, kitsu_relations = await asyncio.gather(anilist_task, kitsu_task)
if "error" in data:
return {"error": data["error"], "ids": ids}
# Step 4 — inject Kitsu relations under "Related"
data["relations"] = {"Related": kitsu_relations} if kitsu_relations else {}
# Step 5 — inject all IDs
data["ids"] = {
"animepahe": ids.get("animepahe"),
"anilist": anilist_id,
"mal": ids.get("mal"),
"anidb": ids.get("anidb"),
"kitsu": kitsu_id,
"ann": ids.get("ann"),
"animePlanet": ids.get("animePlanet"),
}
# Step 6 — cache fully merged result
_info_cache[anilist_id] = data
return data
except Exception as e:
print(f"[get_info] ERROR: {e}")
return {"error": f"Failed: {str(e)}"}
# ---------------- RESOLVE (single episode → highest res only) ----------------
async def _resolve_episode(self, anime_session: str, episode_session: str) -> dict:
"""
Open the play page, collect all resolution buttons, resolve only the
highest-resolution embed to its m3u8, and return url + download link.
"""
play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}"
page = await self.context.new_page()
try:
await page.goto(play_url, wait_until="domcontentloaded")
await page.wait_for_selector(
"#resolutionMenu button",
state="attached",
timeout=15000,
)
buttons = await page.locator("#resolutionMenu button").all()
res_data = []
for btn in buttons:
text = (await btn.inner_text()).strip()
res_match = re.search(r"(\d+)", text)
res_data.append(
{
"embed": await btn.get_attribute("data-src"),
"res": int(res_match.group(1)) if res_match else 720,
"fansub": text.split("·")[0].strip()
if "·" in text
else "Unknown",
}
)
await page.close()
page = None
if not res_data:
return {
"url": None,
"download": None,
"resolution": None,
"fansub": None,
}
# Pick highest resolution
best = max(res_data, key=lambda x: x["res"])
# Resolve best embed to m3u8
p = await self.context.new_page()
m3u8 = None
def capture(req):
nonlocal m3u8
if ".m3u8" in req.url:
m3u8 = req.url
p.on("request", capture)
try:
await p.set_extra_http_headers({"Referer": BASE_URL})
await p.goto(best["embed"], wait_until="domcontentloaded")
for _ in range(10):
if m3u8:
break
await p.evaluate(
"document.querySelectorAll('button, video, [class*=play]')"
".forEach(el => el.click())"
)
await asyncio.sleep(0.5)
finally:
await p.close()
res_str = str(best["res"])
return {
"url": m3u8,
"download": self._generate_mp4(m3u8, anime_session, res_str),
"resolution": res_str,
"fansub": best["fansub"],
}
except Exception as e:
return {
"url": None,
"download": None,
"resolution": None,
"fansub": None,
"error": str(e),
}
finally:
if page:
await page.close()
async def resolve(self, anime_session: str, episode_session: str):
"""Resolve all sources for a single episode (all resolutions)."""
play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}"
page = await self.context.new_page()
try:
await page.goto(play_url, wait_until="domcontentloaded")
await page.wait_for_selector(
"#resolutionMenu button",
state="attached",
timeout=15000,
)
buttons = await page.locator("#resolutionMenu button").all()
res_data = []
for btn in buttons:
text = (await btn.inner_text()).strip()
res_match = re.search(r"(\d+)", text)
res_data.append(
{
"embed": await btn.get_attribute("data-src"),
"res": res_match.group(1) if res_match else "720",
"fansub": text.split("·")[0].strip()
if "·" in text
else "Unknown",
}
)
await page.close()
page = None
async def get_single_source(item):
p = await self.context.new_page()
m3u8 = None
def capture(req):
nonlocal m3u8
if ".m3u8" in req.url:
m3u8 = req.url
p.on("request", capture)
try:
await p.set_extra_http_headers({"Referer": BASE_URL})
await p.goto(item["embed"], wait_until="domcontentloaded")
for _ in range(10):
if m3u8:
break
await p.evaluate(
"document.querySelectorAll('button, video, [class*=play]')"
".forEach(el => el.click())"
)
await asyncio.sleep(0.5)
return {
"resolution": item["res"],
"fansub": item["fansub"],
"url": m3u8,
"download": self._generate_mp4(
m3u8, anime_session, item["res"]
),
}
except Exception as e:
return {
"resolution": item["res"],
"fansub": item["fansub"],
"url": None,
"download": None,
"error": str(e),
}
finally:
await p.close()
sources = await asyncio.gather(*[get_single_source(i) for i in res_data])
return {"anime": anime_session, "sources": list(sources)}
except Exception as e:
return {"error": str(e)}
finally:
if page:
await page.close()
pahe = AnimePahe()
@asynccontextmanager
async def lifespan(app: FastAPI):
await pahe.start()
yield
await pahe.stop()
app = FastAPI(lifespan=lifespan)
@app.get("/search")
async def api_search(q: str):
return await pahe.search(q)
@app.get("/latest")
async def api_latest(p: int = 1):
return await pahe.get_latest(p)
@app.get("/info/{session}")
async def api_info(session: str):
return await pahe.get_info(session)
@app.get("/episodes/{session}")
async def api_episodes(session: str, p: int = 1, resolve: bool = False):
return await pahe.get_episodes(session, p, resolve)
@app.get("/resolve/{anime}/{episode}")
async def api_resolve(anime: str, episode: str):
return await pahe.resolve(anime, episode)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)