Update main.py

This commit is contained in:
Aira Catapang
2026-03-17 03:36:03 +00:00
committed by system
parent 30ea40ddcf
commit 6bbfa2c0e2

576
main.py
View File

@@ -2,154 +2,632 @@ import json
import asyncio import asyncio
import re import re
import os import os
from typing import Optional, List import httpx
from typing import Optional
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI
from playwright.async_api import async_playwright, BrowserContext, Request from playwright.async_api import async_playwright, BrowserContext
# --- CONFIG ---
BASE_URL = "https://animepahe.si" BASE_URL = "https://animepahe.si"
ANILIST_API = "https://graphql.anilist.co"
JIKAN_API = "https://api.jikan.moe/v4"
IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true" IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true"
# In-memory caches
_info_cache: dict = {} # keyed by anilist_id — full merged result
_mal_synopsis_cache: dict = {} # keyed by mal_id
# AniList relation types considered "direct"
DIRECT_RELATION_TYPES = {
"SEQUEL",
"PREQUEL",
"SIDE_STORY",
"PARENT",
"FULL_STORY",
}
class AnimePahe: class AnimePahe:
def __init__(self): def __init__(self):
self.playwright = None self.playwright = None
self.context: Optional[BrowserContext] = None self.context: Optional[BrowserContext] = None
self.ad_domains = ["doubleclick.net", "adservice.google", "popads.net", "propellerads", "exoclick", "bebi.com"]
self.ad_domains = [
"doubleclick.net",
"adservice.google",
"popads.net",
"propellerads",
"exoclick",
"bebi.com",
]
async def start(self): async def start(self):
self.playwright = await async_playwright().start() self.playwright = await async_playwright().start()
self.context = await self.playwright.chromium.launch_persistent_context( self.context = await self.playwright.chromium.launch_persistent_context(
user_data_dir="./browser_data", user_data_dir="./browser_data",
headless=IS_HEADLESS, headless=IS_HEADLESS,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
args=["--disable-blink-features=AutomationControlled", "--no-sandbox"] args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
],
) )
await self.context.route("**/*", self._intercept_assets) await self.context.route("**/*", self._intercept_assets)
async def stop(self):
if self.context:
await self.context.close()
if self.playwright:
await self.playwright.stop()
async def _intercept_assets(self, route): async def _intercept_assets(self, route):
url = route.request.url.lower() url = route.request.url.lower()
if any(ad in url for ad in self.ad_domains) or url.endswith((".png", ".jpg", ".css", ".woff")): if any(ad in url for ad in self.ad_domains) or url.endswith(
(".png", ".jpg", ".jpeg", ".webp", ".woff")
):
await route.abort() await route.abort()
else: else:
await route.continue_() await route.continue_()
async def stop(self):
if self.context: await self.context.close()
if self.playwright: await self.playwright.stop()
# --- SHARED HELPERS ---
async def _fetch_json(self, url: str): async def _fetch_json(self, url: str):
page = await self.context.new_page() page = await self.context.new_page()
try: try:
await page.goto(url, wait_until="domcontentloaded") await page.goto(url, wait_until="domcontentloaded")
return json.loads(await page.evaluate("document.body.innerText")) txt = await page.evaluate("document.body.innerText")
except: return None return json.loads(txt)
finally: await page.close() except:
return None
finally:
await page.close()
def _generate_mp4(self, m3u8_url: Optional[str], anime_id: str, res: str) -> Optional[str]: def _generate_mp4(self, m3u8_url: Optional[str], anime_id: str, res: str):
if not m3u8_url: return None if not m3u8_url:
# Your working string replacement logic return None
match = re.search(r'(https?://[^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8', m3u8_url) match = re.search(r"(https?://[^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url)
if match: if match:
return f"{match.group(1)}.kwik.cx/mp4/{match.group(2)}?file=AnimePahe_{anime_id}_{res}p.mp4" return f"{match.group(1)}.kwik.cx/mp4/{match.group(2)}?file=AnimePahe_{anime_id}_{res}p.mp4"
return None return None
# --- ENDPOINTS --- # ---------------- SCRAPE IDs ONLY ----------------
async def _scrape_ids(self, session: str) -> dict:
"""
Open AnimePahe anime page and collect only the external IDs.
"""
page = await self.context.new_page()
try:
await page.goto(
f"{BASE_URL}/anime/{session}",
wait_until="networkidle",
timeout=30000,
)
await page.wait_for_selector(".anime-info", timeout=10000)
await asyncio.sleep(1)
ids = await page.evaluate("""() => {
let ids = {}
document.querySelectorAll("a[href]").forEach(a => {
const url = a.href || ""
if (url.includes("myanimelist.net/anime"))
ids["mal"] = url.split("/").filter(Boolean).pop()
if (url.includes("anilist.co/anime"))
ids["anilist"] = url.split("/").filter(Boolean).pop()
if (url.includes("anidb.net"))
ids["anidb"] = url.split("/").filter(Boolean).pop()
if (url.includes("kitsu.io/anime"))
ids["kitsu"] = url.split("/").filter(Boolean).pop()
if (url.includes("animenewsnetwork.com")) {
const m = url.match(/id=(\\d+)/)
if (m) ids["ann"] = m[1]
}
if (url.includes("anime-planet.com/anime"))
ids["animePlanet"] = url.split("/").filter(Boolean).pop()
})
return ids
}""")
ids["animepahe"] = session
return ids
except Exception as e:
print(f"[scrape_ids] ERROR: {e}")
return {"animepahe": session}
finally:
await page.close()
# ---------------- MAL SYNOPSIS ----------------
async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]:
"""
Fetch synopsis from MyAnimeList via Jikan API (no auth needed).
Falls back to None if unavailable.
"""
if mal_id in _mal_synopsis_cache:
return _mal_synopsis_cache[mal_id]
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{JIKAN_API}/anime/{mal_id}",
headers={"Accept": "application/json"},
)
resp.raise_for_status()
data = resp.json()
synopsis = data.get("data", {}).get("synopsis")
_mal_synopsis_cache[mal_id] = synopsis
return synopsis
except Exception as e:
print(f"[mal_synopsis] fetch failed for mal_id={mal_id}: {e}")
_mal_synopsis_cache[mal_id] = None
return None
# ---------------- ANILIST ----------------
async def _fetch_anilist(self, anilist_id: str) -> dict:
"""
Query AniList GraphQL API.
Relations: direct (Sequel/Prequel/etc.) + indirect combined into
a single "Related" list — direct entries first.
"""
query = """
query ($id: Int) {
Media(id: $id, type: ANIME) {
id
idMal
title {
romaji
english
native
}
synonyms
description(asHtml: false)
format
status
episodes
duration
source
countryOfOrigin
isAdult
startDate { year month day }
endDate { year month day }
season
seasonYear
averageScore
meanScore
popularity
favourites
trending
genres
tags {
name
category
rank
isMediaSpoiler
}
coverImage {
extraLarge
large
medium
color
}
bannerImage
trailer {
id
site
}
studios(isMain: true) {
nodes { name siteUrl }
}
staff(perPage: 10) {
edges {
role
node {
name { full }
image { medium }
siteUrl
}
}
}
characters(perPage: 10, sort: [ROLE, RELEVANCE]) {
edges {
role
node {
name { full }
image { medium }
siteUrl
}
voiceActors(language: JAPANESE) {
name { full }
image { medium }
siteUrl
}
}
}
relations {
edges {
relationType(version: 2)
node {
id
idMal
type
title { romaji english }
format
status
episodes
coverImage { medium }
siteUrl
}
}
}
recommendations(perPage: 20, sort: RATING_DESC) {
nodes {
rating
mediaRecommendation {
id
idMal
title { romaji english }
format
status
episodes
averageScore
coverImage { medium }
siteUrl
}
}
}
externalLinks {
site
url
type
}
nextAiringEpisode {
airingAt
episode
}
}
}
"""
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
ANILIST_API,
json={"query": query, "variables": {"id": int(anilist_id)}},
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
print(f"[anilist] fetch failed for id={anilist_id}: {e}")
return {"error": f"AniList fetch failed: {str(e)}"}
media = result.get("data", {}).get("Media")
if not media:
return {"error": "AniList returned no data"}
# ── MAL synopsis — cleaner than AniList's HTML-heavy description ──
mal_id = str(media.get("idMal") or "")
mal_synopsis = await self._fetch_mal_synopsis(mal_id) if mal_id else None
synopsis = mal_synopsis or media.get("description")
# ── Format dates ──────────────────────────────────────────────
def fmt_date(d):
if not d or not d.get("year"):
return None
parts = [d.get("year"), d.get("month"), d.get("day")]
return "-".join(str(p).zfill(2) for p in parts if p)
# ── Trailer URL ───────────────────────────────────────────────
trailer = None
if media.get("trailer"):
t = media["trailer"]
if t.get("site") == "youtube":
trailer = f"https://www.youtube.com/watch?v={t['id']}"
elif t.get("site") == "dailymotion":
trailer = f"https://www.dailymotion.com/video/{t['id']}"
# ── Relations — direct first, indirect after, all in "Related" ─
direct = []
indirect = []
for edge in media.get("relations", {}).get("edges", []):
rel_type = edge.get("relationType", "OTHER")
node = edge.get("node", {})
# Skip non-anime relations (manga, novel, one-shot, etc.)
if node.get("type") != "ANIME":
continue
entry = {
"id": node.get("id"),
"mal_id": node.get("idMal"),
"title": node["title"].get("english") or node["title"].get("romaji"),
"format": node.get("format"),
"status": node.get("status"),
"episodes": node.get("episodes"),
"image": node.get("coverImage", {}).get("medium"),
"url": node.get("siteUrl"),
"relation_type": rel_type,
}
if rel_type in DIRECT_RELATION_TYPES:
direct.append(entry)
else:
indirect.append(entry)
# Combined: direct first, indirect after — all under one "Related" key
combined = direct + indirect
relations = {"Related": combined} if combined else {}
# ── Recommendations ───────────────────────────────────────────
recommendations = []
for node in media.get("recommendations", {}).get("nodes", []):
rec = node.get("mediaRecommendation")
if not rec:
continue
recommendations.append(
{
"id": rec.get("id"),
"mal_id": rec.get("idMal"),
"title": rec["title"].get("english") or rec["title"].get("romaji"),
"format": rec.get("format"),
"status": rec.get("status"),
"episodes": rec.get("episodes"),
"score": rec.get("averageScore"),
"image": rec.get("coverImage", {}).get("medium"),
"url": rec.get("siteUrl"),
"rating": node.get("rating"),
}
)
# ── Characters ────────────────────────────────────────────────
characters = []
for edge in media.get("characters", {}).get("edges", []):
node = edge.get("node", {})
vas = edge.get("voiceActors", [])
characters.append(
{
"name": node.get("name", {}).get("full"),
"image": node.get("image", {}).get("medium"),
"role": edge.get("role"),
"url": node.get("siteUrl"),
"voice_actor": {
"name": vas[0]["name"]["full"],
"image": vas[0].get("image", {}).get("medium"),
"url": vas[0].get("siteUrl"),
}
if vas
else None,
}
)
# ── Staff ─────────────────────────────────────────────────────
staff = []
for edge in media.get("staff", {}).get("edges", []):
node = edge.get("node", {})
staff.append(
{
"name": node.get("name", {}).get("full"),
"image": node.get("image", {}).get("medium"),
"role": edge.get("role"),
"url": node.get("siteUrl"),
}
)
return {
"id": media.get("id"),
"mal_id": media.get("idMal"),
"title": {
"romaji": media["title"].get("romaji"),
"english": media["title"].get("english"),
"native": media["title"].get("native"),
},
"synonyms": media.get("synonyms", []),
"synopsis": synopsis,
"format": media.get("format"),
"status": media.get("status"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"source": media.get("source"),
"country": media.get("countryOfOrigin"),
"is_adult": media.get("isAdult"),
"start_date": fmt_date(media.get("startDate")),
"end_date": fmt_date(media.get("endDate")),
"season": media.get("season"),
"season_year": media.get("seasonYear"),
"average_score": media.get("averageScore"),
"mean_score": media.get("meanScore"),
"popularity": media.get("popularity"),
"favourites": media.get("favourites"),
"trending": media.get("trending"),
"genres": media.get("genres", []),
"tags": [
{
"name": t["name"],
"category": t["category"],
"rank": t["rank"],
"spoiler": t["isMediaSpoiler"],
}
for t in media.get("tags", [])
],
"cover_image": media.get("coverImage", {}),
"banner_image": media.get("bannerImage"),
"trailer": trailer,
"studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])],
"next_airing": media.get("nextAiringEpisode"),
"external_links": [
{"site": l["site"], "url": l["url"], "type": l["type"]}
for l in media.get("externalLinks", [])
],
"characters": characters,
"staff": staff,
"relations": relations,
"recommendations": recommendations,
}
# ---------------- SEARCH ----------------
async def search(self, q: str): async def search(self, q: str):
data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}") data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}")
return data.get("data", []) if data else [] return data.get("data", []) if data else []
# ---------------- LATEST ----------------
async def get_latest(self, p: int = 1): async def get_latest(self, p: int = 1):
return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}") return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}")
# ---------------- EPISODES ----------------
async def get_episodes(self, anime_id: str, p: int = 1): async def get_episodes(self, anime_id: str, p: int = 1):
return await self._fetch_json(f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}") return await self._fetch_json(
f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}"
)
# ---------------- INFO ----------------
async def get_info(self, session: str): async def get_info(self, session: str):
page = await self.context.new_page()
try: try:
await page.goto(f"{BASE_URL}/anime/{session}", wait_until="domcontentloaded") # Step 1 — scrape IDs from AnimePahe page
content = await page.content() ids = await self._scrape_ids(session)
# Scrape basic metadata
title = (re.search(r'<h1><span>(.*?)</span>', content) or re.search(r'<title>(.*?)</title>', content)).group(1) anilist_id = ids.get("anilist")
studio = (re.search(r'<strong>Studio:</strong>\s*(.*?)<', content) or [0, "Unknown"])[1] if not anilist_id:
return {"title": title.strip(), "studio": studio.strip(), "session": session} return {
finally: await page.close() "error": "Could not find AniList ID on AnimePahe page",
"ids": ids,
}
# Step 2 — return from cache if already built
if anilist_id in _info_cache:
return _info_cache[anilist_id]
# Step 3 — fetch everything from AniList (includes relations)
data = await self._fetch_anilist(anilist_id)
if "error" in data:
return {"error": data["error"], "ids": ids}
# Step 4 — inject all scraped IDs
data["ids"] = {
"animepahe": ids.get("animepahe"),
"anilist": anilist_id,
"mal": ids.get("mal"),
"anidb": ids.get("anidb"),
"kitsu": ids.get("kitsu"),
"ann": ids.get("ann"),
"animePlanet": ids.get("animePlanet"),
}
# Step 5 — cache and return
_info_cache[anilist_id] = data
return data
except Exception as e:
print(f"[get_info] ERROR: {e}")
return {"error": f"Failed: {str(e)}"}
# --- THE FIXED RESOLVER --- # --- THE FIXED RESOLVER ---
async def resolve(self, anime_session: str, episode_session: str): async def resolve(self, anime_session: str, episode_session: str):
play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}" play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}"
page = await self.context.new_page() page = await self.context.new_page()
try: try:
await page.goto(play_url, wait_until="domcontentloaded") await page.goto(play_url, wait_until="domcontentloaded")
await page.wait_for_selector("#resolutionMenu button", timeout=5000) await page.wait_for_selector("#resolutionMenu button", timeout=5000)
buttons = await page.locator("#resolutionMenu button").all() buttons = await page.locator("#resolutionMenu button").all()
res_data = [] res_data = []
for btn in buttons: for btn in buttons:
text = (await btn.inner_text()).strip() text = (await btn.inner_text()).strip()
res_data.append({ res_data.append(
"embed": await btn.get_attribute("data-src"), {
"res": (re.search(r'(\d+)', text) or ["720"])[0], "embed": await btn.get_attribute("data-src"),
"fanSub": text.split("·")[0].strip() if "·" in text else "Unknown" "res": (re.search(r"(\d+)", text) or ["720"])[0],
}) "fanSub": text.split("·")[0].strip()
if "·" in text
else "Unknown",
}
)
await page.close() await page.close()
# Parallel resolution using the "Request Capture" method # Parallel resolution using the "Request Capture" method
async def get_single_mp4(item): async def get_single_mp4(item):
p = await self.context.new_page() p = await self.context.new_page()
m3u8 = None m3u8 = None
def log_req(req): def log_req(req):
nonlocal m3u8 nonlocal m3u8
if ".m3u8" in req.url: m3u8 = req.url if ".m3u8" in req.url:
m3u8 = req.url
p.on("request", log_req) p.on("request", log_req)
try: try:
await p.set_extra_http_headers({"Referer": BASE_URL}) await p.set_extra_http_headers({"Referer": BASE_URL})
await p.goto(item['embed'], wait_until="domcontentloaded") await p.goto(item["embed"], wait_until="domcontentloaded")
# Force the player to trigger the m3u8 request # Force the player to trigger the m3u8 request
for _ in range(5): for _ in range(5):
if m3u8: break if m3u8:
await p.evaluate("document.querySelectorAll('button, video').forEach(el => el.click())") break
await p.evaluate(
"document.querySelectorAll('button, video').forEach(el => el.click())"
)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
item["url"] = m3u8 item["url"] = m3u8
item["download"] = self._generate_mp4(m3u8, anime_session, item['res']) item["download"] = self._generate_mp4(
m3u8, anime_session, item["res"]
)
return item return item
finally: await p.close() finally:
await p.close()
sources = await asyncio.gather(*[get_single_mp4(i) for i in res_data]) sources = await asyncio.gather(*[get_single_mp4(i) for i in res_data])
return {"anime": anime_session, "sources": sources} return {"anime": anime_session, "sources": sources}
except Exception as e: except Exception as e:
return {"error": str(e)} return {"error": str(e)}
# --- FASTAPI SETUP ---
pahe = AnimePahe() pahe = AnimePahe()
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
await pahe.start() await pahe.start()
yield yield
await pahe.stop() await pahe.stop()
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)
@app.get("/search") @app.get("/search")
async def api_search(q: str): return await pahe.search(q) async def api_search(q: str):
return await pahe.search(q)
@app.get("/latest") @app.get("/latest")
async def api_latest(p: int = 1): return await pahe.get_latest(p) async def api_latest(p: int = 1):
return await pahe.get_latest(p)
@app.get("/info/{session}") @app.get("/info/{session}")
async def api_info(session: str): return await pahe.get_info(session) async def api_info(session: str):
return await pahe.get_info(session)
@app.get("/episodes/{session}") @app.get("/episodes/{session}")
async def api_episodes(session: str, p: int = 1): return await pahe.get_episodes(session, p) async def api_episodes(session: str, p: int = 1):
return await pahe.get_episodes(session, p)
@app.get("/resolve/{anime}/{episode}") @app.get("/resolve/{anime}/{episode}")
async def api_resolve(anime: str, episode: str): return await pahe.resolve(anime, episode) async def api_resolve(anime: str, episode: str):
return await pahe.resolve(anime, episode)
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860) uvicorn.run(app, host="0.0.0.0", port=7860)