Files
movie-hub/main.py
Aira Catapang 330a833e00 Update main.py
remove packs and add type series or movie
2026-04-04 09:27:22 +00:00

665 lines
24 KiB
Python

import asyncio
import base64
import codecs
import json
import re
import logging
import datetime
from typing import Dict, List, Optional
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Query, HTTPException
from bs4 import BeautifulSoup
from fastapi.responses import JSONResponse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("HDHub-Resolver")
# ==========================================
# CONFIGURATION
# ==========================================
HDHUB_BASE_URL = "https://4khdhub.dad"
def safe_b64decode(data: str) -> str:
if not data:
return ""
try:
data = data.strip()
missing = len(data) % 4
if missing:
data += "=" * (4 - missing)
return base64.b64decode(data, validate=False).decode("utf-8", errors="ignore")
except Exception:
return ""
class HDHubEngine:
def __init__(self):
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=15.0),
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
},
)
# ==========================================
# UTILS
# ==========================================
def _convert_pixeldrain(self, url: str) -> str:
"""Converts normal Pixeldrain viewer links to direct download API links."""
match = re.search(
r"(https?://pixeldrain\.[a-z]+)/u/([a-zA-Z0-9_-]+)", url, re.IGNORECASE
)
if match:
base_url = match.group(1)
file_id = match.group(2)
return f"{base_url}/api/file/{file_id}?download"
return url
# ==========================================
# RESOLVER MODULE
# ==========================================
async def resolve(self, url: str, depth: int = 0) -> Dict:
if depth > 12:
return {"error": "Max recursion depth reached", "url": url}
logger.info(f"Resolving (depth {depth}): {url}")
if "pixeldrain" in url.lower():
return {
"found_links": [
{
"server": "Pixeldrain Direct",
"link": self._convert_pixeldrain(url),
}
],
"count": 1,
"type": "direct",
}
try:
resp = await self.client.get(url)
text = resp.text
final_url = str(resp.url)
if any(
ext in final_url.lower()
for ext in [".mkv", ".mp4", ".m3u8", ".zip", "pixeldrain"]
):
return {
"found_links": [
{
"server": "Direct Link",
"link": self._convert_pixeldrain(final_url),
}
],
"count": 1,
"type": "direct",
}
if any(
k in final_url.lower()
for k in ["gadgetsweb", "cryptoinsights", "techly360", "?id="]
):
return await self._handle_wrapper(final_url, text, depth)
if any(
k in final_url.lower()
for k in ["hubcloud", "gamerxyt", "shikshakdaak", "drive/"]
):
return await self._handle_hubcloud_style_page(final_url, text, depth)
return await self._fallback_search(text, depth)
except httpx.HTTPStatusError as e:
return {"error": f"HTTP {e.response.status_code}", "url": url}
except Exception as e:
logger.exception("Resolution error")
return {"error": str(e), "url": url}
async def _handle_hubcloud_style_page(
self, page_url: str, text: str, depth: int
) -> Dict:
soup = BeautifulSoup(text, "html.parser")
links_data = []
seen = set()
current_minute = str(datetime.datetime.now().minute).zfill(2)
base_to_ignore = set()
s3 = soup.find("a", id="s3")
if s3 and s3.get("href"):
base_s3 = s3["href"].strip()
base_to_ignore.add(base_s3)
dynamic_s3 = base_s3 + "_1" + current_minute
seen.add(dynamic_s3)
links_data.append(
{
"server": s3.text.strip().replace("Download", "").strip()
or "FSLv2 Server",
"link": dynamic_s3,
}
)
fsl = soup.find("a", id="fsl")
if fsl and fsl.get("href"):
base_fsl = fsl["href"].strip()
base_to_ignore.add(base_fsl)
dynamic_fsl = base_fsl + "1" + current_minute
seen.add(dynamic_fsl)
links_data.append(
{
"server": fsl.text.strip().replace("Download", "").strip()
or "FSL Server",
"link": dynamic_fsl,
}
)
junk_keywords = [
"hubcloud.fans",
"drive/admin",
"t.me",
"tinyurl.com",
"one.one.one.one",
"google.com/search",
"ampproject.org",
"bloggingvector.shop",
]
for a in soup.find_all("a", href=True):
href = a["href"].strip()
if not href or href.startswith(("#", "javascript:")):
continue
if href.startswith("/"):
href = f"https://{httpx.URL(page_url).host}{href}"
if href in base_to_ignore or any(j in href.lower() for j in junk_keywords):
continue
if "gamerxyt.com/hubcloud.php" in href.lower() and "host=" in href.lower():
return await self.resolve(href, depth + 1)
if href not in seen:
seen.add(href)
label = a.text.strip().replace("Download", "").strip()
links_data.append({"server": label or "Unknown Server", "link": href})
download_links = []
for item in links_data:
lower_link = item["link"].lower()
if any(
x in lower_link
for x in [
".mkv",
".mp4",
".zip",
"pixeldrain",
"fsl-buckets",
"toxix.buzz",
"hubcdn.fans",
"cloudserver",
]
):
item["link"] = self._convert_pixeldrain(item["link"])
download_links.append(item)
if download_links:
return {
"found_links": download_links,
"count": len(download_links),
"source": "hubcloud_gamerxyt",
"page_url": page_url,
}
return {"detail": "No usable download links found", "page_url": page_url}
async def _handle_wrapper(self, url: str, text: str, depth: int) -> Dict:
direct_match = re.search(
r'(https?://(?:www\.)?gamerxyt\.com/hubcloud\.php\?[^"\'>\s]+)', text
)
if direct_match:
return await self.resolve(direct_match.group(1), depth + 1)
for b64 in re.findall(r"[A-Za-z0-9+/=]{40,}", text):
dec = safe_b64decode(b64)
if "gamerxyt.com" in dec or "hubcloud.php" in dec:
m = re.search(r'(https?://[^"\'>\s]+)', dec)
if m:
return await self.resolve(m.group(1), depth + 1)
patterns = [
r"s\('o','([A-Za-z0-9+/=]+)'",
r"ck\('_wp_http[^']*','([^']+)'",
r"['\"]([A-Za-z0-9+/=]{100,})['\"]",
]
combined = "".join(["".join(re.findall(pat, text)) for pat in patterns])
if combined:
step1 = safe_b64decode(combined)
step2 = safe_b64decode(step1)
rotated = codecs.encode(step2, "rot_13") if step2 else step2
final_str = safe_b64decode(rotated)
try:
if final_str.strip().startswith("{"):
data = json.loads(final_str)
next_url = data.get("o") or data.get("url") or data.get("link")
if next_url:
decoded = safe_b64decode(next_url)
final = decoded if decoded.startswith("http") else next_url
return await self.resolve(final, depth + 1)
except Exception:
pass
return {"error": "Wrapper failed to extract link", "url": url}
async def _fallback_search(self, text: str, depth: int) -> Dict:
for b64 in re.findall(r"[A-Za-z0-9+/=]{100,}", text):
dec = safe_b64decode(b64)
if dec.startswith(("http://", "https://")):
return await self.resolve(dec, depth + 1)
return {"error": "Unsupported page", "detail": "No recognizable pattern"}
# ==========================================
# SCRAPING MODULE (Home, Info, Search)
# ==========================================
async def get_home(self, cat_type: str = "latest_movie") -> Dict:
cat_type = cat_type.lower().strip()
if cat_type in ["movies", "movie"]:
cat_type = "movie"
elif cat_type in ["lates_movie", "latest_movies", "latest"]:
cat_type = "latest_movie"
paths = {
"movie": "/category/movies/",
"anime": "/category/anime/",
"ott": "/category/web-series/",
"latest_movie": "/",
}
target_path = paths.get(cat_type, "/")
url = f"{HDHUB_BASE_URL.rstrip('/')}{target_path}"
try:
resp = await self.client.get(url)
if resp.status_code == 404 and cat_type != "latest_movie":
fallback_paths = {
"movie": "/movies/",
"anime": "/anime/",
"ott": "/web-series/",
}
url = f"{HDHUB_BASE_URL.rstrip('/')}{fallback_paths.get(cat_type, target_path)}"
resp = await self.client.get(url)
return self._parse_movie_cards(resp.text, url, category=cat_type)
except Exception as e:
return {"error": str(e)}
async def search(self, query: str) -> Dict:
url = f"{HDHUB_BASE_URL.rstrip('/')}/"
try:
resp = await self.client.get(url, params={"s": query})
return self._parse_movie_cards(resp.text, str(resp.url), query=query)
except Exception as e:
return {"error": str(e), "query": query}
def _parse_movie_cards(
self, html: str, source_url: str, category: str = None, query: str = None
) -> Dict:
"""Helper to parse movie cards for both home and search endpoints."""
soup = BeautifulSoup(html, "html.parser")
results = []
for a_tag in soup.find_all("a", class_="movie-card"):
href = a_tag.get("href")
if not href:
continue
if href.startswith("/"):
href = HDHUB_BASE_URL.rstrip("/") + href
title_tag = a_tag.find("h3", class_="movie-card-title")
title = title_tag.text.strip() if title_tag else "Unknown Title"
img_tag = a_tag.find("img")
img_url = img_tag.get("src") if img_tag else ""
if title:
results.append({"title": title, "url": href, "image": img_url})
unique_results = []
seen = set()
for r in results:
if r["url"] not in seen:
seen.add(r["url"])
unique_results.append(r)
response = {"source": source_url}
if category:
response["category"] = category
if query:
response["query"] = query
response["results"] = unique_results
return response
async def get_info(self, url: str) -> Dict:
try:
resp = await self.client.get(url)
soup = BeautifulSoup(resp.text, "html.parser")
# Basic details
title_tag = soup.find("h1", class_="page-title") or soup.find("h1")
title = title_tag.text.strip() if title_tag else "Unknown"
poster_tag = soup.find("img", src=re.compile(r"tmdb\.org"))
poster = poster_tag.get("src") if poster_tag else ""
# ---------------------------------------------------------
# HTML SCRAPING: Score, Genres, Year, Cast, Trailer
# ---------------------------------------------------------
score = ""
score_tag = soup.find("span", class_="imdb-score")
if score_tag:
score = score_tag.text.strip()
genres = []
junk_tags = [
"1080p",
"2160p",
"720p",
"dv hdr",
"sdr",
"movies",
"series",
"hindi",
"english",
]
for a_tag in soup.select(".badge.badge-outline a[href^='/category/']"):
tag_text = a_tag.text.strip()
if tag_text.lower() not in junk_tags:
genres.append(tag_text)
# --- DETERMINE IF MOVIE OR SERIES ---
content_type = "movie"
url_lower = url.lower()
if "-series-" in url_lower:
content_type = "series"
elif any(g.lower() in ["series", "web series", "tv show"] for g in genres):
content_type = "series"
elif soup.find("div", class_="episode-download-item") or soup.find(
id="episodes"
):
content_type = "series"
trailer_url = ""
trailer_btn = soup.find(id="trailer-btn")
if trailer_btn and trailer_btn.get("data-trailer-url"):
trailer_url = trailer_btn.get("data-trailer-url")
# Extracting Year & Cast from the Metadata List
year = ""
cast = ""
for item in soup.find_all("div", class_="metadata-item"):
label = item.find("span", class_="metadata-label")
value = item.find("span", class_="metadata-value")
if label and value:
lbl = label.text.strip().lower()
val = value.text.strip()
if "air" in lbl or "release" in lbl or "year" in lbl:
match = re.search(r"\d{4}", val)
if match:
year = match.group(0)
if "stars" in lbl or "cast" in lbl:
cast = val
description = ""
for p in soup.find_all("p"):
text_clean = p.text.strip()
if (
len(text_clean) > 50
and "download" not in text_clean.lower()
and (
"band together" in text_clean.lower()
or "young" in text_clean.lower()
or len(text_clean) > 80
)
):
description = text_clean
break
# ---------------------------------------------------------
# PARSE LINKS BASED ON TYPE (Movie vs Series)
# ---------------------------------------------------------
download_links = {}
if content_type == "series":
# EPISODES PARSING (Ignores "Packs" completely)
grouped_episodes = {}
current_season_context = "Unknown Quality"
seen_episode_links = set()
elements = soup.find_all(
lambda tag: (
tag.name == "div"
and tag.get("class")
and (
"episode-header" in tag.get("class")
or "episode-download-item" in tag.get("class")
)
)
)
for element in elements:
classes = element.get("class", [])
if "episode-header" in classes:
current_season_context = re.sub(
r"\s+", " ", element.text.strip()
)
if current_season_context not in grouped_episodes:
grouped_episodes[current_season_context] = {}
elif "episode-download-item" in classes:
ep_info_tag = element.find("span", class_="badge-psa")
ep_num_str = re.sub(
r"\s+",
" ",
ep_info_tag.text.strip()
if ep_info_tag
else "Unknown Episode",
)
if current_season_context not in grouped_episodes:
grouped_episodes[current_season_context] = {}
if ep_num_str not in grouped_episodes[current_season_context]:
grouped_episodes[current_season_context][ep_num_str] = []
links_div = element.find("div", class_="episode-links")
if links_div:
for btn in links_div.find_all("a", class_="btn"):
href, server_name = (
btn.get("href"),
btn.text.strip().replace("\xa0", "").strip(),
)
# EXCLUDE HUBDRIVE LINKS
if "hubdrive" in server_name.lower():
continue
uniq_key = (
current_season_context,
ep_num_str,
server_name,
)
if href and uniq_key not in seen_episode_links:
seen_episode_links.add(uniq_key)
grouped_episodes[current_season_context][
ep_num_str
].append({"server": server_name, "url": href})
# Format array
formatted_episodes = []
for season_quality, episodes_dict in grouped_episodes.items():
if not episodes_dict:
continue
ep_list = []
for ep_name, links in episodes_dict.items():
if links: # Remove empty episodes if HubDrive was the only link
ep_list.append({"episode": ep_name, "links": links})
if ep_list:
formatted_episodes.append(
{"season_quality": season_quality, "episodes": ep_list}
)
download_links["episodes"] = formatted_episodes
else:
# MOVIE PARSING (Grabs direct files/qualities)
movie_links = []
seen_movie_links = set()
for pack_item in soup.find_all("div", class_="download-item"):
header = pack_item.find("div", class_="download-header")
if not header:
continue
quality_name = re.sub(
r"\s+", " ", header.text.replace("\n", " ").strip()
)
links_div = pack_item.find("div", class_="grid grid-cols-2 gap-2")
if links_div:
btn_links = []
for btn in links_div.find_all("a", class_="btn"):
href, server_name = (
btn.get("href"),
btn.text.strip().replace("\xa0", "").strip(),
)
# EXCLUDE HUBDRIVE LINKS
if "hubdrive" in server_name.lower():
continue
uniq_key = (quality_name, server_name)
if href and uniq_key not in seen_movie_links:
seen_movie_links.add(uniq_key)
btn_links.append({"server": server_name, "url": href})
if btn_links:
movie_links.append(
{"quality": quality_name, "links": btn_links}
)
download_links["movie_links"] = movie_links
return {
"title": title,
"type": content_type,
"year": year,
"score": score,
"genres": genres,
"cast": cast,
"trailer": trailer_url,
"poster": poster,
"description": description,
"download_links": download_links,
"page_url": url,
}
except Exception as e:
return {"error": str(e), "url": url}
# ====================== FastAPI ======================
engine = HDHubEngine()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
await engine.client.aclose()
app = FastAPI(lifespan=lifespan, title="HDHub Scraper & Resolver v4.4")
@app.get("/")
async def root_directory():
return JSONResponse(
{
"name": "HDHub API Scraper & Resolver",
"version": "4.4",
"features": "Type Detection (Movie/Series). Removed Series Packs. Added Search. HubDrive Filtered. Pixeldrain Auto-Converter.",
"endpoints": {
"/home": {
"description": "Fetch homepage or category-specific movie/series lists.",
"method": "GET",
"parameters": {
"type": "(Optional) movie, anime, ott, latest_movie."
},
"example": "/home?type=anime",
},
"/search": {
"description": "Search for movies or series by title.",
"method": "GET",
"parameters": {"query": "(Required) The search keyword."},
"example": "/search?query=batman",
},
"/info": {
"description": "Scrape full info. Automatically detects Movie or Series and hides series 'Packs'. HubDrive links are hidden.",
"method": "GET",
"parameters": {
"url": "(Required) The full HDHub post URL obtained from /home or /search."
},
"example": "/info?url=https://4khdhub.dad/xo-kitty-series-1017/",
},
"/resolve": {
"description": "Bypass intermediate ad/wrapper links to get final download URL. Automatically converts Pixeldrain links.",
"method": "GET",
"parameters": {"url": "(Required) Wrapper URL from /info."},
"example": "/resolve?url=https://gadgetsweb.xyz/?id=...",
},
},
}
)
@app.get("/home")
async def api_home(
type: str = Query(
"latest_movie",
description="Type of content available: movie, anime, ott, latest_movie",
),
):
return await engine.get_home(type)
@app.get("/search")
async def api_search(query: str = Query(..., description="The search keyword")):
return await engine.search(query)
@app.get("/info")
async def api_info(
url: str = Query(..., description="Post URL (e.g. from /home or /search)"),
):
if not url.startswith(("http://", "https://")):
raise HTTPException(400, "Invalid URL")
return await engine.get_info(url)
@app.get("/resolve")
async def api_resolve(url: str = Query(..., description="Wrapper URL to resolve")):
if not url.startswith(("http://", "https://")):
raise HTTPException(400, "Invalid URL")
return await engine.resolve(url)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)