Update main.py

fix the api to work
This commit is contained in:
Aira Catapang
2026-04-02 04:04:31 +00:00
committed by system
parent b218f56582
commit 55fc4d8ce5

520
main.py
View File

@@ -9,9 +9,11 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, Request
from fastapi.responses import StreamingResponse, Response, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from playwright.async_api import async_playwright, BrowserContext
from playwright_stealth import Stealth
from playwright.async_api import async_playwright, BrowserContext
from playwright_stealth import stealth_async # FIX: Used correct stealth import
# FIX: animepahe.com redirects to .ru. Using .ru directly prevents dropped cookies/sessions.
BASE_URL = "https://animepahe.com"
ANIWATCHTV_BASE = "https://aniwatchtv.to"
ANILIST_API = "https://graphql.anilist.co"
@@ -22,13 +24,18 @@ IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true"
_info_cache: dict = {}
_mal_synopsis_cache: dict = {}
class AnimePahe:
def __init__(self):
self.playwright = None
self.context: Optional[BrowserContext] = None
self.ad_domains = [
"doubleclick.net", "adservice.google", "popads.net",
"propellerads", "exoclick", "bebi.com",
"doubleclick.net",
"adservice.google",
"popads.net",
"propellerads",
"exoclick",
"bebi.com",
]
async def start(self):
@@ -36,21 +43,27 @@ class AnimePahe:
self.context = await self.playwright.chromium.launch_persistent_context(
user_data_dir="./browser_data",
headless=IS_HEADLESS,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
# FIX: Removed hardcoded user_agent to prevent Cloudflare Fingerprint mismatches
args=["--disable-blink-features=AutomationControlled", "--no-sandbox"],
)
await self.context.route("**/*", self._intercept_assets)
# Initial visit to ensure cookies are set
page = await self.context.new_page()
try:
await Stealth().apply_stealth_async(page)
await page.goto(BASE_URL, wait_until="networkidle", timeout=60000)
except: pass
finally: await page.close()
await stealth_async(page)
# FIX: domcontentloaded instead of networkidle
await page.goto(BASE_URL, wait_until="domcontentloaded", timeout=45000)
await asyncio.sleep(2)
except:
pass
finally:
await page.close()
async def stop(self):
if self.context: await self.context.close()
if self.playwright: await self.playwright.stop()
if self.context:
await self.context.close()
if self.playwright:
await self.playwright.stop()
async def _intercept_assets(self, route):
url = route.request.url.lower()
@@ -67,10 +80,9 @@ class AnimePahe:
async def _fetch_json(self, url: str):
page = await self.context.new_page()
try:
# FIX: Referer header is required for AnimePahe API
await page.set_extra_http_headers({"Referer": BASE_URL})
# FIX: Use networkidle instead of domcontentloaded to bypass CF
await page.goto(url, wait_until="networkidle", timeout=30000)
# FIX: Use domcontentloaded for API JSON calls
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
txt = await page.evaluate("document.body.innerText")
return json.loads(txt)
except:
@@ -78,14 +90,18 @@ class AnimePahe:
finally:
await page.close()
def _generate_mp4(self, m3u8_url: Optional[str], anime_name: str, episode: str, res: str) -> Optional[str]:
if not m3u8_url: return None
def _generate_mp4(
self, m3u8_url: Optional[str], anime_name: str, episode: str, res: str
) -> Optional[str]:
if not m3u8_url:
return None
match = re.search(r"https?://([^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8", m3u8_url)
if match:
subdomain = match.group(1)
token_path = match.group(2)
clean_name = re.sub(r"[^\w\s]", "", anime_name).strip().replace(" ", "_")
if not clean_name: clean_name = "Anime"
if not clean_name:
clean_name = "Anime"
filename = f"{clean_name}_EP{episode}_{res}P.mp4"
return f"https://{subdomain}.kwik.cx/mp4/{token_path}?file={filename}"
return None
@@ -110,18 +126,28 @@ class AnimePahe:
async def _scrape_ids(self, session: str) -> dict:
page = await self.context.new_page()
try:
await Stealth().apply_stealth_async(page)
# FIX: Changed wait_until to networkidle
await stealth_async(page)
try:
await page.goto(f"{BASE_URL}/anime/{session}", wait_until="networkidle", timeout=30000)
await page.goto(
f"{BASE_URL}/anime/{session}",
wait_until="domcontentloaded",
timeout=30000,
)
except Exception as e:
if "Timeout" not in str(e): raise e
if "Timeout" not in str(e):
raise e
try:
await page.wait_for_selector("div.anime-info, div.anime-summary, aside, main", timeout=15000)
except: pass
# Wait for Cloudflare to clear
await page.wait_for_function(
"!document.title.includes('Just a moment')", timeout=15000
)
await page.wait_for_selector(
"div.anime-info, div.anime-summary, aside, main", timeout=10000
)
except:
pass
await asyncio.sleep(2)
ids = await page.evaluate("""() => {
let ids = {}
document.querySelectorAll("a[href]").forEach(a => {
@@ -144,10 +170,14 @@ class AnimePahe:
await page.close()
async def _fetch_mal_synopsis(self, mal_id: str) -> Optional[str]:
if mal_id in _mal_synopsis_cache: return _mal_synopsis_cache[mal_id]
if mal_id in _mal_synopsis_cache:
return _mal_synopsis_cache[mal_id]
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(f"{JIKAN_API}/anime/{mal_id}", headers={"Accept": "application/json"})
resp = await client.get(
f"{JIKAN_API}/anime/{mal_id}",
headers={"Accept": "application/json"},
)
resp.raise_for_status()
synopsis = resp.json().get("data", {}).get("synopsis")
_mal_synopsis_cache[mal_id] = synopsis
@@ -157,8 +187,10 @@ class AnimePahe:
return None
async def _collect_buttons(self, page) -> list:
# FIX: Ensure resolution menu is attached and visible
await page.wait_for_selector("#resolutionMenu button", state="visible", timeout=10000)
# FIX: state="attached" instead of visible (dropdown items are hidden by CSS until clicked)
await page.wait_for_selector(
"#resolutionMenu button", state="attached", timeout=10000
)
buttons = await page.locator("#resolutionMenu button").all()
res_data = []
for btn in buttons:
@@ -166,13 +198,17 @@ class AnimePahe:
res_match = re.search(r"(\d+)", text)
audio_lang = (await btn.get_attribute("data-audio") or "jpn").lower()
audio_type = "dub" if audio_lang == "eng" else "sub"
res_data.append({
res_data.append(
{
"embed": await btn.get_attribute("data-src"),
"res": int(res_match.group(1)) if res_match else 720,
"fansub": text.split("·")[0].strip() if "·" in text else "Unknown",
"fansub": text.split("·")[0].strip()
if "·" in text
else "Unknown",
"audio": audio_type,
"audio_lang": audio_lang,
})
}
)
return res_data
async def _embed_to_m3u8(self, embed_url: str) -> Optional[str]:
@@ -182,7 +218,6 @@ class AnimePahe:
def capture(req):
nonlocal m3u8
# FIX: Sniff both m3u8 and mp4 from Kwik network traffic
if (".m3u8" in req.url or ".mp4" in req.url) and not found.is_set():
m3u8 = req.url
found.set()
@@ -190,10 +225,11 @@ class AnimePahe:
p.on("request", capture)
try:
await p.set_extra_http_headers({"Referer": BASE_URL})
# FIX: Using networkidle for Kwik
await p.goto(embed_url, wait_until="networkidle", timeout=15000)
# FIX: domcontentloaded for video players
await p.goto(embed_url, wait_until="domcontentloaded", timeout=20000)
for _ in range(6):
if found.is_set(): break
if found.is_set():
break
await p.evaluate("""() => {
document.querySelectorAll('video').forEach(v => {
v.muted = true;
@@ -207,9 +243,12 @@ class AnimePahe:
await asyncio.sleep(1.5)
try:
await asyncio.wait_for(found.wait(), timeout=5.0)
except asyncio.TimeoutError: pass
except Exception: pass
finally: await p.close()
except asyncio.TimeoutError:
pass
except Exception:
pass
finally:
await p.close()
return m3u8
async def _fetch_anilist(self, anilist_id: str) -> dict:
@@ -230,7 +269,10 @@ class AnimePahe:
resp = await client.post(
ANILIST_API,
json={"query": query, "variables": {"id": int(anilist_id)}},
headers={"Content-Type": "application/json", "Accept": "application/json"},
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
resp.raise_for_status()
result = resp.json()
@@ -238,61 +280,108 @@ class AnimePahe:
return {"error": f"AniList fetch failed: {str(e)}"}
media = result.get("data", {}).get("Media")
if not media: return {"error": "AniList returned no data"}
if not media:
return {"error": "AniList returned no data"}
mal_id = str(media.get("idMal") or "")
mal_synopsis = await self._fetch_mal_synopsis(mal_id) if mal_id else None
synopsis = mal_synopsis or media.get("description")
def fmt_date(d):
if not d or not d.get("year"): return None
return "-".join(str(p).zfill(2) for p in [d.get("year"), d.get("month"), d.get("day")] if p)
if not d or not d.get("year"):
return None
return "-".join(
str(p).zfill(2)
for p in [d.get("year"), d.get("month"), d.get("day")]
if p
)
trailer = None
if media.get("trailer"):
t = media["trailer"]
if t.get("site") == "youtube": trailer = f"https://www.youtube.com/watch?v={t['id']}"
elif t.get("site") == "dailymotion": trailer = f"https://www.dailymotion.com/video/{t['id']}"
if t.get("site") == "youtube":
trailer = f"https://www.youtube.com/watch?v={t['id']}"
elif t.get("site") == "dailymotion":
trailer = f"https://www.dailymotion.com/video/{t['id']}"
relations = {}
for edge in media.get("relations", {}).get("edges", []):
node = edge.get("node", {})
if not node: continue
if not node:
continue
rel = edge.get("relationType", "OTHER")
relations.setdefault(rel, []).append({
"id": node.get("id"), "mal_id": node.get("idMal"),
"title": (node.get("title", {}).get("english") or node.get("title", {}).get("romaji")),
"format": node.get("format"), "status": node.get("status"), "episodes": node.get("episodes"),
"score": node.get("averageScore"), "image": node.get("coverImage", {}).get("medium"),
"url": node.get("siteUrl"), "relation_type": rel,
})
relations.setdefault(rel, []).append(
{
"id": node.get("id"),
"mal_id": node.get("idMal"),
"title": (
node.get("title", {}).get("english")
or node.get("title", {}).get("romaji")
),
"format": node.get("format"),
"status": node.get("status"),
"episodes": node.get("episodes"),
"score": node.get("averageScore"),
"image": node.get("coverImage", {}).get("medium"),
"url": node.get("siteUrl"),
"relation_type": rel,
}
)
recommendations = []
for node in media.get("recommendations", {}).get("nodes", []):
rec = node.get("mediaRecommendation")
if not rec: continue
recommendations.append({
"id": rec.get("id"), "mal_id": rec.get("idMal"),
if not rec:
continue
recommendations.append(
{
"id": rec.get("id"),
"mal_id": rec.get("idMal"),
"title": rec["title"].get("english") or rec["title"].get("romaji"),
"format": rec.get("format"), "status": rec.get("status"), "episodes": rec.get("episodes"),
"score": rec.get("averageScore"), "image": rec.get("coverImage", {}).get("medium"),
"url": rec.get("siteUrl"), "rating": node.get("rating"),
})
"format": rec.get("format"),
"status": rec.get("status"),
"episodes": rec.get("episodes"),
"score": rec.get("averageScore"),
"image": rec.get("coverImage", {}).get("medium"),
"url": rec.get("siteUrl"),
"rating": node.get("rating"),
}
)
return {
"id": media.get("id"), "mal_id": media.get("idMal"), "title": media["title"],
"synonyms": media.get("synonyms", []), "synopsis": synopsis,
"format": media.get("format"), "status": media.get("status"), "episodes": media.get("episodes"),
"duration": media.get("duration"), "source": media.get("source"), "country": media.get("countryOfOrigin"),
"is_adult": media.get("isAdult"), "start_date": fmt_date(media.get("startDate")),
"end_date": fmt_date(media.get("endDate")), "season": media.get("season"), "season_year": media.get("seasonYear"),
"average_score": media.get("averageScore"), "mean_score": media.get("meanScore"),
"popularity": media.get("popularity"), "favourites": media.get("favourites"), "trending": media.get("trending"),
"genres": media.get("genres", []), "cover_image": media.get("coverImage", {}),
"banner_image": media.get("bannerImage"), "trailer": trailer,
"studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])], "next_airing": media.get("nextAiringEpisode"),
"external_links": [{"site": l["site"], "url": l["url"], "type": l["type"]} for l in media.get("externalLinks", [])],
"relations": relations, "recommendations": recommendations,
"id": media.get("id"),
"mal_id": media.get("idMal"),
"title": media["title"],
"synonyms": media.get("synonyms", []),
"synopsis": synopsis,
"format": media.get("format"),
"status": media.get("status"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"source": media.get("source"),
"country": media.get("countryOfOrigin"),
"is_adult": media.get("isAdult"),
"start_date": fmt_date(media.get("startDate")),
"end_date": fmt_date(media.get("endDate")),
"season": media.get("season"),
"season_year": media.get("seasonYear"),
"average_score": media.get("averageScore"),
"mean_score": media.get("meanScore"),
"popularity": media.get("popularity"),
"favourites": media.get("favourites"),
"trending": media.get("trending"),
"genres": media.get("genres", []),
"cover_image": media.get("coverImage", {}),
"banner_image": media.get("bannerImage"),
"trailer": trailer,
"studios": [s["name"] for s in media.get("studios", {}).get("nodes", [])],
"next_airing": media.get("nextAiringEpisode"),
"external_links": [
{"site": l["site"], "url": l["url"], "type": l["type"]}
for l in media.get("externalLinks", [])
],
"relations": relations,
"recommendations": recommendations,
}
async def search(self, q: str):
@@ -303,56 +392,180 @@ class AnimePahe:
return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}")
async def get_episodes(self, anime_id: str, p: int = 1, resolve: bool = False):
data = await self._fetch_json(f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}")
if not data or not resolve: return data
data = await self._fetch_json(
f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}"
)
if not data or not resolve:
return data
episodes = data.get("data", [])
async def enrich(ep):
ep_session = ep.get("session")
if not ep_session: return ep
stream = await self.resolve(anime_id, ep_session)
ep["sub"] = stream.get("sub"); ep["dub"] = stream.get("dub")
if not ep_session:
return ep
stream = await self.resolve(anime_id, ep_session)
ep["sub"] = stream.get("sub")
ep["dub"] = stream.get("dub")
return ep
data["data"] = list(await asyncio.gather(*[enrich(ep) for ep in episodes]))
return data
async def get_ids(self, session: str):
try:
ids = await self._scrape_ids(session)
return {k: ids.get(k) for k in ["animepahe", "anilist", "mal", "anidb", "ann", "animePlanet"]}
except Exception as e: return {"error": f"Failed: {str(e)}"}
return {
k: ids.get(k)
for k in ["animepahe", "anilist", "mal", "anidb", "ann", "animePlanet"]
}
except Exception as e:
return {"error": f"Failed: {str(e)}"}
async def get_info(self, session: str):
try:
ids = await self._scrape_ids(session)
anilist_id = ids.get("anilist")
if not anilist_id: return {"error": "Could find AniList ID", "ids": ids}
if anilist_id in _info_cache: return _info_cache[anilist_id]
if not anilist_id:
return {"error": "Could find AniList ID", "ids": ids}
if anilist_id in _info_cache:
return _info_cache[anilist_id]
data = await self._fetch_anilist(anilist_id)
if "error" in data: return {"error": data["error"], "ids": ids}
if "error" in data:
return {"error": data["error"], "ids": ids}
data["ids"] = {**ids, "anilist": anilist_id}
_info_cache[anilist_id] = data
return data
except Exception as e: return {"error": f"Failed: {str(e)}"}
except Exception as e:
return {"error": f"Failed: {str(e)}"}
async def _embed_to_m3u8(self, embed_url: str) -> Optional[str]:
p = await self.context.new_page()
m3u8 = None
found = asyncio.Event()
def capture(req):
nonlocal m3u8
if (".m3u8" in req.url or ".mp4" in req.url) and not found.is_set():
m3u8 = req.url
found.set()
p.on("request", capture)
try:
await p.set_extra_http_headers({"Referer": BASE_URL})
await p.goto(embed_url, wait_until="domcontentloaded", timeout=15000)
# Fast polling (0.5s instead of 1.5s sleeps)
for _ in range(10):
if found.is_set():
break
await p.evaluate("""() => {
document.querySelectorAll('video').forEach(v => {
v.muted = true;
const p = v.play();
if (p !== undefined) p.catch(() => {});
});
document.querySelectorAll('button, .vjs-big-play-button').forEach(b => {
try { b.click() } catch(e) {}
});
}""")
try:
await asyncio.wait_for(found.wait(), timeout=0.5)
except asyncio.TimeoutError:
pass
except Exception:
pass
finally:
await p.close()
return m3u8
async def resolve(self, anime_session: str, episode_session: str):
play_url = f"{BASE_URL}/play/{anime_session}/{episode_session}"
page = await self.context.new_page()
try:
await Stealth().apply_stealth_async(page)
# FIX: networkidle
try:
await page.goto(play_url, wait_until="networkidle", timeout=45000)
except Exception as e:
if "Timeout" not in str(e): raise e
anime_url = f"{BASE_URL}/anime/{anime_session}"
page = await self.context.new_page()
try:
await stealth_async(page)
await page.add_init_script(
"Object.defineProperty(navigator, 'webdriver', { get: () => undefined });"
)
await page.set_extra_http_headers({"Referer": anime_url})
# 1. ATTEMPT DIRECT VISIT FAST (Saves ~5 seconds if cookies are already valid)
await page.goto(play_url, wait_until="domcontentloaded", timeout=20000)
title = await page.title()
# 2. HANDLE DDOS-GUARD QUICKLY
if "DDoS-Guard" in title or "Just a moment" in title:
try:
await page.wait_for_function(
"!document.title.includes('DDoS-Guard') && !document.title.includes('Just a moment')",
timeout=15000,
)
title = await page.title()
except:
pass
# 3. IF 404 HIT, FALLBACK TO ANIME PAGE TO REFRESH COOKIES
if "404" in title:
await page.goto(anime_url, wait_until="domcontentloaded", timeout=15000)
try:
await page.wait_for_function(
"!document.title.includes('DDoS-Guard') && !document.title.includes('Just a moment')",
timeout=10000,
)
except:
pass
# Try play URL again now that cookies are refreshed
await page.set_extra_http_headers({"Referer": anime_url})
await page.goto(play_url, wait_until="domcontentloaded", timeout=15000)
# 4. GET LINKS (No clicking needed, they exist in the DOM as attached)
try:
await page.wait_for_selector(
"#resolutionMenu button", state="attached", timeout=10000
)
except Exception:
html = await page.content()
with open("debug_no_buttons.html", "w", encoding="utf-8") as f:
f.write(html)
return {
"error": "No resolution buttons found",
"page_title": await page.title(),
}
await page.wait_for_selector("#resolutionMenu button", state="attached", timeout=30000)
anime_name, episode_num = await self._scrape_play_meta(page)
res_data = await self._collect_buttons(page)
buttons = await page.locator("#resolutionMenu button").all()
res_data = []
for btn in buttons:
embed_url = await btn.get_attribute("data-src")
res_text = await btn.get_attribute("data-resolution")
fansub = await btn.get_attribute("data-fansub") or "Unknown"
audio_lang = await btn.get_attribute("data-audio") or "jpn"
if embed_url:
res_data.append(
{
"res": int(res_text)
if res_text and res_text.isdigit()
else 720,
"embed": embed_url,
"audio_lang": audio_lang,
"audio": "dub" if audio_lang == "eng" else "sub",
"fansub": fansub,
}
)
await page.close()
page = None
# 5. RESOLVE EXTRACTED LINKS CONCURRENTLY
subs = [r for r in res_data if r["audio"] == "sub"]
dubs = [r for r in res_data if r["audio"] == "dub"]
best_sub = max(subs, key=lambda x: x["res"]) if subs else None
best_dub = max(dubs, key=lambda x: x["res"]) if dubs else None
@@ -361,43 +574,65 @@ class AnimePahe:
m3u8 = await self._embed_to_m3u8(item["embed"])
res_str = str(item["res"])
return {
"resolution": res_str, "fansub": item["fansub"], "audio": item["audio"],
"audio_lang": item["audio_lang"], "url": m3u8,
"download": self._generate_mp4(m3u8, anime_name, episode_num, res_str),
"resolution": res_str,
"fansub": item["fansub"],
"audio": item["audio"],
"audio_lang": item["audio_lang"],
"url": m3u8,
"download": self._generate_mp4(
m3u8, anime_name, episode_num, res_str
),
}
except Exception as e:
return {"resolution": str(item["res"]), "fansub": item["fansub"], "error": str(e)}
return {
"resolution": str(item["res"]),
"fansub": item["fansub"],
"error": str(e),
}
tasks = []
if best_sub: tasks.append(resolve_one(best_sub))
if best_dub: tasks.append(resolve_one(best_dub))
if best_sub:
tasks.append(resolve_one(best_sub))
if best_dub:
tasks.append(resolve_one(best_dub))
results = await asyncio.gather(*tasks)
return {
"anime": anime_session, "episode": episode_session, "anime_name": anime_name,
"anime": anime_session,
"episode": episode_session,
"anime_name": anime_name,
"episode_num": episode_num,
"sub": results[0] if best_sub else None,
"dub": results[1] if best_sub and best_dub else (results[0] if best_dub else None),
"dub": results[1]
if best_sub and best_dub
else (results[0] if best_dub else None),
}
except Exception as e:
if page:
try: await page.screenshot(path="debug_error.png", full_page=True)
except: pass
try:
await page.screenshot(path="debug_error.png", full_page=True)
except:
pass
return {"error": str(e), "hint": "Check debug_error.png"}
finally:
if page: await page.close()
if page:
await page.close()
async def get_seasons(self, anime_id: str) -> dict:
url = f"{ANIWATCHTV_BASE}/{anime_id}"
page = await self.context.new_page()
try:
await page.goto(url, wait_until="networkidle", timeout=30000)
# FIX: domcontentloaded
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
await asyncio.sleep(1)
for selector in [".os-list", ".seasons-block", "[class*='season']", "main"]:
try:
await page.wait_for_selector(selector, timeout=5000)
break
except: continue
except:
continue
seasons = await page.evaluate(f"""() => {{
const BASE = "{ANIWATCHTV_BASE}";
@@ -424,40 +659,62 @@ class AnimePahe:
return results;
}}""")
return {"id": anime_id, "total": len(seasons), "seasons": seasons}
except Exception as e: return {"id": anime_id, "total": 0, "seasons": [], "error": str(e)}
finally: await page.close()
except Exception as e:
return {"id": anime_id, "total": 0, "seasons": [], "error": str(e)}
finally:
await page.close()
pahe = AnimePahe()
@asynccontextmanager
async def lifespan(app: FastAPI):
await pahe.start()
yield
await pahe.stop()
app = FastAPI(lifespan=lifespan)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root(): return {"status": "ok"}
async def root():
return {"status": "ok"}
@app.get("/search")
async def api_search(q: str): return await pahe.search(q)
async def api_search(q: str):
return await pahe.search(q)
@app.get("/latest")
async def api_latest(p: int = 1): return await pahe.get_latest(p)
async def api_latest(p: int = 1):
return await pahe.get_latest(p)
@app.get("/info/{session}")
async def api_info(session: str): return await pahe.get_info(session)
async def api_info(session: str):
return await pahe.get_info(session)
@app.get("/episodes/{session}")
async def api_episodes(session: str, p: int = 1, resolve: bool = False):
return await pahe.get_episodes(session, p, resolve)
@app.get("/resolve/{anime}/{episode}")
async def api_resolve(anime: str, episode: str):
return await pahe.resolve(anime, episode)
@app.get("/seasons/{anime_id:path}")
async def api_seasons(anime_id: str, request: Request):
data = await pahe.get_seasons(anime_id)
@@ -467,22 +724,32 @@ async def api_seasons(anime_id: str, request: Request):
season["posterProxied"] = f"{base_url}/poster?url={season['poster']}"
return data
@app.get("/poster")
async def api_poster(url: str = Query(..., description="CDN image proxy")):
try:
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
resp = await client.get(url, headers={"Referer": ANIWATCHTV_BASE, "User-Agent": "Mozilla/5.0"})
resp = await client.get(
url, headers={"Referer": ANIWATCHTV_BASE, "User-Agent": "Mozilla/5.0"}
)
resp.raise_for_status()
return Response(content=resp.content, media_type=resp.headers.get("content-type", "image/jpeg"))
except Exception as e: return Response(content=f"Error: {e}", status_code=502)
return Response(
content=resp.content,
media_type=resp.headers.get("content-type", "image/jpeg"),
)
except Exception as e:
return Response(content=f"Error: {e}", status_code=502)
@app.get("/hls-proxy")
async def hls_proxy(url: str, request: Request):
headers = {"Referer": "https://kwik.cx/", "User-Agent": "Mozilla/5.0"}
async def stream_generator():
async with httpx.AsyncClient(follow_redirects=True) as client:
async with client.stream("GET", url, headers=headers) as resp:
async for chunk in resp.aiter_bytes(): yield chunk
async for chunk in resp.aiter_bytes():
yield chunk
if url.split("?")[0].endswith(".m3u8"):
async with httpx.AsyncClient(follow_redirects=True) as client:
@@ -491,20 +758,29 @@ async def hls_proxy(url: str, request: Request):
base_proxy = f"{str(request.base_url).rstrip('/')}/hls-proxy?url="
new_lines = []
for line in lines:
if line.startswith("#") or not line.strip(): new_lines.append(line)
if line.startswith("#") or not line.strip():
new_lines.append(line)
else:
abs_url = urllib.parse.urljoin(url, line.strip())
new_lines.append(f"{base_proxy}{urllib.parse.quote(abs_url)}")
return Response(content="\n".join(new_lines), media_type="application/vnd.apple.mpegurl")
else: return StreamingResponse(stream_generator(), media_type="video/MP2T")
return Response(
content="\n".join(new_lines), media_type="application/vnd.apple.mpegurl"
)
else:
return StreamingResponse(stream_generator(), media_type="video/MP2T")
@app.get("/proxy-mapper")
async def proxy_mapper(url: str):
try:
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(url); return resp.json()
except Exception as e: return {"error": str(e)}
resp = await client.get(url)
return resp.json()
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)