diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..576cb4e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +# Use the official Microsoft Playwright image (Includes all necessary Chromium dependencies) +FROM mcr.microsoft.com/playwright/python:v1.58.0-jammy + +# Set the working directory +WORKDIR /app + +# Copy requirement files first (for Docker layer caching) +COPY requirements.txt . + +# Install Python packages +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application code +COPY . . + +# Set environment variables for production cloud deployment +ENV HEADLESS="true" +ENV PORT=8000 + +# Expose the application port +EXPOSE $PORT + +# Run the FastAPI server via Uvicorn +CMD ["python", "main.py"] diff --git a/main.py b/main.py new file mode 100644 index 0000000..f6b2b26 --- /dev/null +++ b/main.py @@ -0,0 +1,155 @@ +import json +import asyncio +import re +import os +from typing import Optional, List +from contextlib import asynccontextmanager +from fastapi import FastAPI +from playwright.async_api import async_playwright, BrowserContext, Request + +# --- CONFIG --- +BASE_URL = "https://animepahe.si" +IS_HEADLESS = os.environ.get("HEADLESS", "true").lower() == "true" + +class AnimePahe: + def __init__(self): + self.playwright = None + self.context: Optional[BrowserContext] = None + self.ad_domains = ["doubleclick.net", "adservice.google", "popads.net", "propellerads", "exoclick", "bebi.com"] + + async def start(self): + self.playwright = await async_playwright().start() + self.context = await self.playwright.chromium.launch_persistent_context( + user_data_dir="./browser_data", + headless=IS_HEADLESS, + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + args=["--disable-blink-features=AutomationControlled", "--no-sandbox"] + ) + await self.context.route("**/*", self._intercept_assets) + + async def _intercept_assets(self, route): + url = route.request.url.lower() + if any(ad in url for ad in self.ad_domains) or url.endswith((".png", ".jpg", ".css", ".woff")): + await route.abort() + else: + await route.continue_() + + async def stop(self): + if self.context: await self.context.close() + if self.playwright: await self.playwright.stop() + + # --- SHARED HELPERS --- + async def _fetch_json(self, url: str): + page = await self.context.new_page() + try: + await page.goto(url, wait_until="domcontentloaded") + return json.loads(await page.evaluate("document.body.innerText")) + except: return None + finally: await page.close() + + def _generate_mp4(self, m3u8_url: Optional[str], anime_id: str, res: str) -> Optional[str]: + if not m3u8_url: return None + # Your working string replacement logic + match = re.search(r'(https?://[^.]+)[^/]*/stream/(.*?)/[^/]+\.m3u8', m3u8_url) + if match: + return f"{match.group(1)}.kwik.cx/mp4/{match.group(2)}?file=AnimePahe_{anime_id}_{res}p.mp4" + return None + + # --- ENDPOINTS --- + async def search(self, q: str): + data = await self._fetch_json(f"{BASE_URL}/api?m=search&q={q}") + return data.get("data", []) if data else [] + + async def get_latest(self, p: int = 1): + return await self._fetch_json(f"{BASE_URL}/api?m=airing&page={p}") + + async def get_episodes(self, anime_id: str, p: int = 1): + return await self._fetch_json(f"{BASE_URL}/api?m=release&id={anime_id}&sort=episode_desc&page={p}") + + async def get_info(self, session: str): + page = await self.context.new_page() + try: + await page.goto(f"{BASE_URL}/anime/{session}", wait_until="domcontentloaded") + content = await page.content() + # Scrape basic metadata + title = (re.search(r'