Upload 3 files

This commit is contained in:
Aira Catapang
2026-04-04 07:34:08 +00:00
committed by system
parent 3d4135ca0e
commit 88cc85b98b
3 changed files with 662 additions and 0 deletions

23
Dockerfile Normal file
View File

@@ -0,0 +1,23 @@
# Use official Python lightweight image
FROM python:3.10-slim
# Set the working directory
WORKDIR /app
# Copy requirements and install them
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application files
COPY . .
# Set up a non-root user (Required by Hugging Face Spaces)
RUN useradd -m -u 1000 user
RUN chown -R user:user /app
USER user
# Expose the default Hugging Face Space port
EXPOSE 7860
# Command to run the FastAPI application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860"]

635
main.py Normal file
View File

@@ -0,0 +1,635 @@
import asyncio
import base64
import codecs
import json
import re
import logging
import datetime
from typing import Dict, List, Optional
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Query, HTTPException
from bs4 import BeautifulSoup
from fastapi.responses import JSONResponse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("HDHub-Resolver")
# ==========================================
# CONFIGURATION
# ==========================================
HDHUB_BASE_URL = "https://4khdhub.dad"
def safe_b64decode(data: str) -> str:
if not data:
return ""
try:
data = data.strip()
missing = len(data) % 4
if missing:
data += "=" * (4 - missing)
return base64.b64decode(data, validate=False).decode("utf-8", errors="ignore")
except Exception:
return ""
class HDHubEngine:
def __init__(self):
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=15.0),
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
},
)
# ==========================================
# UTILS
# ==========================================
def _convert_pixeldrain(self, url: str) -> str:
"""Converts normal Pixeldrain viewer links to direct download API links."""
match = re.search(
r"(https?://pixeldrain\.[a-z]+)/u/([a-zA-Z0-9_-]+)", url, re.IGNORECASE
)
if match:
base_url = match.group(1)
file_id = match.group(2)
return f"{base_url}/api/file/{file_id}?download"
return url
# ==========================================
# RESOLVER MODULE
# ==========================================
async def resolve(self, url: str, depth: int = 0) -> Dict:
if depth > 12:
return {"error": "Max recursion depth reached", "url": url}
logger.info(f"Resolving (depth {depth}): {url}")
if "pixeldrain" in url.lower():
return {
"found_links": [
{
"server": "Pixeldrain Direct",
"link": self._convert_pixeldrain(url),
}
],
"count": 1,
"type": "direct",
}
try:
resp = await self.client.get(url)
text = resp.text
final_url = str(resp.url)
if any(
ext in final_url.lower()
for ext in [".mkv", ".mp4", ".m3u8", ".zip", "pixeldrain"]
):
return {
"found_links": [
{
"server": "Direct Link",
"link": self._convert_pixeldrain(final_url),
}
],
"count": 1,
"type": "direct",
}
if any(
k in final_url.lower()
for k in ["gadgetsweb", "cryptoinsights", "techly360", "?id="]
):
return await self._handle_wrapper(final_url, text, depth)
if any(
k in final_url.lower()
for k in ["hubcloud", "gamerxyt", "shikshakdaak", "drive/"]
):
return await self._handle_hubcloud_style_page(final_url, text, depth)
return await self._fallback_search(text, depth)
except httpx.HTTPStatusError as e:
return {"error": f"HTTP {e.response.status_code}", "url": url}
except Exception as e:
logger.exception("Resolution error")
return {"error": str(e), "url": url}
async def _handle_hubcloud_style_page(
self, page_url: str, text: str, depth: int
) -> Dict:
soup = BeautifulSoup(text, "html.parser")
links_data = []
seen = set()
current_minute = str(datetime.datetime.now().minute).zfill(2)
base_to_ignore = set()
s3 = soup.find("a", id="s3")
if s3 and s3.get("href"):
base_s3 = s3["href"].strip()
base_to_ignore.add(base_s3)
dynamic_s3 = base_s3 + "_1" + current_minute
seen.add(dynamic_s3)
links_data.append(
{
"server": s3.text.strip().replace("Download", "").strip()
or "FSLv2 Server",
"link": dynamic_s3,
}
)
fsl = soup.find("a", id="fsl")
if fsl and fsl.get("href"):
base_fsl = fsl["href"].strip()
base_to_ignore.add(base_fsl)
dynamic_fsl = base_fsl + "1" + current_minute
seen.add(dynamic_fsl)
links_data.append(
{
"server": fsl.text.strip().replace("Download", "").strip()
or "FSL Server",
"link": dynamic_fsl,
}
)
junk_keywords = [
"hubcloud.fans",
"drive/admin",
"t.me",
"tinyurl.com",
"one.one.one.one",
"google.com/search",
"ampproject.org",
"bloggingvector.shop",
]
for a in soup.find_all("a", href=True):
href = a["href"].strip()
if not href or href.startswith(("#", "javascript:")):
continue
if href.startswith("/"):
href = f"https://{httpx.URL(page_url).host}{href}"
if href in base_to_ignore or any(j in href.lower() for j in junk_keywords):
continue
if "gamerxyt.com/hubcloud.php" in href.lower() and "host=" in href.lower():
return await self.resolve(href, depth + 1)
if href not in seen:
seen.add(href)
label = a.text.strip().replace("Download", "").strip()
links_data.append({"server": label or "Unknown Server", "link": href})
download_links = []
for item in links_data:
lower_link = item["link"].lower()
if any(
x in lower_link
for x in [
".mkv",
".mp4",
".zip",
"pixeldrain",
"fsl-buckets",
"toxix.buzz",
"hubcdn.fans",
"cloudserver",
]
):
item["link"] = self._convert_pixeldrain(item["link"])
download_links.append(item)
if download_links:
return {
"found_links": download_links,
"count": len(download_links),
"source": "hubcloud_gamerxyt",
"page_url": page_url,
}
return {"detail": "No usable download links found", "page_url": page_url}
async def _handle_wrapper(self, url: str, text: str, depth: int) -> Dict:
direct_match = re.search(
r'(https?://(?:www\.)?gamerxyt\.com/hubcloud\.php\?[^"\'>\s]+)', text
)
if direct_match:
return await self.resolve(direct_match.group(1), depth + 1)
for b64 in re.findall(r"[A-Za-z0-9+/=]{40,}", text):
dec = safe_b64decode(b64)
if "gamerxyt.com" in dec or "hubcloud.php" in dec:
m = re.search(r'(https?://[^"\'>\s]+)', dec)
if m:
return await self.resolve(m.group(1), depth + 1)
patterns = [
r"s\('o','([A-Za-z0-9+/=]+)'",
r"ck\('_wp_http[^']*','([^']+)'",
r"['\"]([A-Za-z0-9+/=]{100,})['\"]",
]
combined = "".join(["".join(re.findall(pat, text)) for pat in patterns])
if combined:
step1 = safe_b64decode(combined)
step2 = safe_b64decode(step1)
rotated = codecs.encode(step2, "rot_13") if step2 else step2
final_str = safe_b64decode(rotated)
try:
if final_str.strip().startswith("{"):
data = json.loads(final_str)
next_url = data.get("o") or data.get("url") or data.get("link")
if next_url:
decoded = safe_b64decode(next_url)
final = decoded if decoded.startswith("http") else next_url
return await self.resolve(final, depth + 1)
except Exception:
pass
return {"error": "Wrapper failed to extract link", "url": url}
async def _fallback_search(self, text: str, depth: int) -> Dict:
for b64 in re.findall(r"[A-Za-z0-9+/=]{100,}", text):
dec = safe_b64decode(b64)
if dec.startswith(("http://", "https://")):
return await self.resolve(dec, depth + 1)
return {"error": "Unsupported page", "detail": "No recognizable pattern"}
# ==========================================
# SCRAPING MODULE (Home, Info, Search)
# ==========================================
async def get_home(self, cat_type: str = "latest_movie") -> Dict:
cat_type = cat_type.lower().strip()
if cat_type in ["movies", "movie"]:
cat_type = "movie"
elif cat_type in ["lates_movie", "latest_movies", "latest"]:
cat_type = "latest_movie"
paths = {
"movie": "/category/movies/",
"anime": "/category/anime/",
"ott": "/category/web-series/",
"latest_movie": "/",
}
target_path = paths.get(cat_type, "/")
url = f"{HDHUB_BASE_URL.rstrip('/')}{target_path}"
try:
resp = await self.client.get(url)
if resp.status_code == 404 and cat_type != "latest_movie":
fallback_paths = {
"movie": "/movies/",
"anime": "/anime/",
"ott": "/web-series/",
}
url = f"{HDHUB_BASE_URL.rstrip('/')}{fallback_paths.get(cat_type, target_path)}"
resp = await self.client.get(url)
return self._parse_movie_cards(resp.text, url, category=cat_type)
except Exception as e:
return {"error": str(e)}
async def search(self, query: str) -> Dict:
url = f"{HDHUB_BASE_URL.rstrip('/')}/"
try:
resp = await self.client.get(url, params={"s": query})
return self._parse_movie_cards(resp.text, str(resp.url), query=query)
except Exception as e:
return {"error": str(e), "query": query}
def _parse_movie_cards(
self, html: str, source_url: str, category: str = None, query: str = None
) -> Dict:
"""Helper to parse movie cards for both home and search endpoints."""
soup = BeautifulSoup(html, "html.parser")
results = []
for a_tag in soup.find_all("a", class_="movie-card"):
href = a_tag.get("href")
if not href:
continue
if href.startswith("/"):
href = HDHUB_BASE_URL.rstrip("/") + href
title_tag = a_tag.find("h3", class_="movie-card-title")
title = title_tag.text.strip() if title_tag else "Unknown Title"
img_tag = a_tag.find("img")
img_url = img_tag.get("src") if img_tag else ""
if title:
results.append({"title": title, "url": href, "image": img_url})
unique_results = []
seen = set()
for r in results:
if r["url"] not in seen:
seen.add(r["url"])
unique_results.append(r)
response = {"source": source_url}
if category:
response["category"] = category
if query:
response["query"] = query
response["results"] = unique_results
return response
async def get_info(self, url: str) -> Dict:
try:
resp = await self.client.get(url)
soup = BeautifulSoup(resp.text, "html.parser")
# Basic details
title_tag = soup.find("h1", class_="page-title") or soup.find("h1")
title = title_tag.text.strip() if title_tag else "Unknown"
poster_tag = soup.find("img", src=re.compile(r"tmdb\.org"))
poster = poster_tag.get("src") if poster_tag else ""
# ---------------------------------------------------------
# HTML SCRAPING: Score, Genres, Year, Cast, Trailer
# ---------------------------------------------------------
score = ""
score_tag = soup.find("span", class_="imdb-score")
if score_tag:
score = score_tag.text.strip()
genres = []
junk_tags = [
"1080p",
"2160p",
"720p",
"dv hdr",
"sdr",
"movies",
"series",
"hindi",
"english",
]
for a_tag in soup.select(".badge.badge-outline a[href^='/category/']"):
tag_text = a_tag.text.strip()
if tag_text.lower() not in junk_tags:
genres.append(tag_text)
trailer_url = ""
trailer_btn = soup.find(id="trailer-btn")
if trailer_btn and trailer_btn.get("data-trailer-url"):
trailer_url = trailer_btn.get("data-trailer-url")
# Extracting Year & Cast from the Metadata List
year = ""
cast = ""
for item in soup.find_all("div", class_="metadata-item"):
label = item.find("span", class_="metadata-label")
value = item.find("span", class_="metadata-value")
if label and value:
lbl = label.text.strip().lower()
val = value.text.strip()
if "air" in lbl or "release" in lbl or "year" in lbl:
match = re.search(r"\d{4}", val)
if match:
year = match.group(0)
if "stars" in lbl or "cast" in lbl:
cast = val
description = ""
for p in soup.find_all("p"):
text_clean = p.text.strip()
if (
len(text_clean) > 50
and "download" not in text_clean.lower()
and (
"band together" in text_clean.lower()
or "young" in text_clean.lower()
or len(text_clean) > 80
)
):
description = text_clean
break
# ---------------------------------------------------------
# PARSE PACKS & EPISODES
# ---------------------------------------------------------
download_links = {"packs": [], "episodes": []}
seen_pack_links = set()
seen_episode_links = set()
# PACKS
for pack_item in soup.find_all("div", class_="download-item"):
header = pack_item.find("div", class_="download-header")
if not header:
continue
pack_name = re.sub(r"\s+", " ", header.text.replace("\n", " ").strip())
links_div = pack_item.find("div", class_="grid grid-cols-2 gap-2")
if links_div:
pack_links_list = []
for btn in links_div.find_all("a", class_="btn"):
href, server_name = (
btn.get("href"),
btn.text.strip().replace("\xa0", "").strip(),
)
# --- EXCLUDE HUBDRIVE LINKS ---
if "hubdrive" in server_name.lower():
continue
uniq_key = (pack_name, server_name)
if href and uniq_key not in seen_pack_links:
seen_pack_links.add(uniq_key)
pack_links_list.append({"server": server_name, "url": href})
if pack_links_list:
download_links["packs"].append(
{"title": pack_name, "links": pack_links_list}
)
# EPISODES
grouped_episodes = {}
current_season_context = "Unknown Quality"
elements = soup.find_all(
lambda tag: (
tag.name == "div"
and tag.get("class")
and (
"episode-header" in tag.get("class")
or "episode-download-item" in tag.get("class")
)
)
)
for element in elements:
classes = element.get("class", [])
if "episode-header" in classes:
current_season_context = re.sub(r"\s+", " ", element.text.strip())
if current_season_context not in grouped_episodes:
grouped_episodes[current_season_context] = {}
elif "episode-download-item" in classes:
ep_info_tag = element.find("span", class_="badge-psa")
ep_num_str = re.sub(
r"\s+",
" ",
ep_info_tag.text.strip() if ep_info_tag else "Unknown Episode",
)
if current_season_context not in grouped_episodes:
grouped_episodes[current_season_context] = {}
if ep_num_str not in grouped_episodes[current_season_context]:
grouped_episodes[current_season_context][ep_num_str] = []
links_div = element.find("div", class_="episode-links")
if links_div:
for btn in links_div.find_all("a", class_="btn"):
href, server_name = (
btn.get("href"),
btn.text.strip().replace("\xa0", "").strip(),
)
# --- EXCLUDE HUBDRIVE LINKS ---
if "hubdrive" in server_name.lower():
continue
uniq_key = (current_season_context, ep_num_str, server_name)
if href and uniq_key not in seen_episode_links:
seen_episode_links.add(uniq_key)
grouped_episodes[current_season_context][
ep_num_str
].append({"server": server_name, "url": href})
# Format array
formatted_episodes = []
for season_quality, episodes_dict in grouped_episodes.items():
if not episodes_dict:
continue
ep_list = []
for ep_name, links in episodes_dict.items():
# Only add the episode to the final array if there is at least one link (removes empty HubDrive-only episodes if any exist)
if links:
ep_list.append({"episode": ep_name, "links": links})
if ep_list:
formatted_episodes.append(
{"season_quality": season_quality, "episodes": ep_list}
)
download_links["episodes"] = formatted_episodes
return {
"title": title,
"year": year,
"score": score,
"genres": genres,
"cast": cast,
"trailer": trailer_url,
"poster": poster,
"description": description,
"download_links": download_links,
"page_url": url,
}
except Exception as e:
return {"error": str(e), "url": url}
# ====================== FastAPI ======================
engine = HDHubEngine()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
await engine.client.aclose()
app = FastAPI(lifespan=lifespan, title="HDHub Scraper & Resolver v4.3")
@app.get("/")
async def root_directory():
return JSONResponse(
{
"name": "HDHub API Scraper & Resolver",
"version": "4.3",
"features": "Added Search Endpoint. Native extraction for Score/Year/Genres/Cast. Auto-Pixeldrain resolver. HubDrive filtered.",
"endpoints": {
"/home": {
"description": "Fetch homepage or category-specific movie/series lists.",
"method": "GET",
"parameters": {
"type": "(Optional) movie, anime, ott, latest_movie."
},
"example": "/home?type=anime",
},
"/search": {
"description": "Search for movies or series by title.",
"method": "GET",
"parameters": {"query": "(Required) The search keyword."},
"example": "/search?query=batman",
},
"/info": {
"description": "Scrape full info (Metadata, Packs, Episodes). HubDrive links are automatically hidden.",
"method": "GET",
"parameters": {
"url": "(Required) The full HDHub post URL obtained from /home or /search."
},
"example": "/info?url=https://4khdhub.dad/xo-kitty-series-1017/",
},
"/resolve": {
"description": "Bypass intermediate ad/wrapper links to get final download URL. Automatically converts Pixeldrain links.",
"method": "GET",
"parameters": {"url": "(Required) Wrapper URL from /info."},
"example": "/resolve?url=https://gadgetsweb.xyz/?id=...",
},
},
}
)
@app.get("/home")
async def api_home(
type: str = Query(
"latest_movie",
description="Type of content available: movie, anime, ott, latest_movie",
),
):
return await engine.get_home(type)
@app.get("/search")
async def api_search(query: str = Query(..., description="The search keyword")):
return await engine.search(query)
@app.get("/info")
async def api_info(
url: str = Query(..., description="Post URL (e.g. from /home or /search)"),
):
if not url.startswith(("http://", "https://")):
raise HTTPException(400, "Invalid URL")
return await engine.get_info(url)
@app.get("/resolve")
async def api_resolve(url: str = Query(..., description="Wrapper URL to resolve")):
if not url.startswith(("http://", "https://")):
raise HTTPException(400, "Invalid URL")
return await engine.resolve(url)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
fastapi
uvicorn[standard]
httpx
beautifulsoup4