import asyncio import base64 import codecs import json import re import logging import datetime from typing import Dict, List, Optional from contextlib import asynccontextmanager import httpx from fastapi import FastAPI, Query, HTTPException from bs4 import BeautifulSoup from fastapi.responses import JSONResponse logging.basicConfig(level=logging.INFO) logger = logging.getLogger("HDHub-Resolver") # ========================================== # CONFIGURATION # ========================================== HDHUB_BASE_URL = "https://4khdhub.dad" def safe_b64decode(data: str) -> str: if not data: return "" try: data = data.strip() missing = len(data) % 4 if missing: data += "=" * (4 - missing) return base64.b64decode(data, validate=False).decode("utf-8", errors="ignore") except Exception: return "" class HDHubEngine: def __init__(self): self.client = httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=15.0), follow_redirects=True, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", }, ) # ========================================== # UTILS # ========================================== def _convert_pixeldrain(self, url: str) -> str: """Converts normal Pixeldrain viewer links to direct download API links.""" match = re.search( r"(https?://pixeldrain\.[a-z]+)/u/([a-zA-Z0-9_-]+)", url, re.IGNORECASE ) if match: base_url = match.group(1) file_id = match.group(2) return f"{base_url}/api/file/{file_id}?download" return url # ========================================== # RESOLVER MODULE # ========================================== async def resolve(self, url: str, depth: int = 0) -> Dict: if depth > 12: return {"error": "Max recursion depth reached", "url": url} logger.info(f"Resolving (depth {depth}): {url}") if "pixeldrain" in url.lower(): return { "found_links": [ { "server": "Pixeldrain Direct", "link": self._convert_pixeldrain(url), } ], "count": 1, "type": "direct", } try: resp = await self.client.get(url) text = resp.text final_url = str(resp.url) if any( ext in final_url.lower() for ext in [".mkv", ".mp4", ".m3u8", ".zip", "pixeldrain"] ): return { "found_links": [ { "server": "Direct Link", "link": self._convert_pixeldrain(final_url), } ], "count": 1, "type": "direct", } if any( k in final_url.lower() for k in ["gadgetsweb", "cryptoinsights", "techly360", "?id="] ): return await self._handle_wrapper(final_url, text, depth) if any( k in final_url.lower() for k in ["hubcloud", "gamerxyt", "shikshakdaak", "drive/"] ): return await self._handle_hubcloud_style_page(final_url, text, depth) return await self._fallback_search(text, depth) except httpx.HTTPStatusError as e: return {"error": f"HTTP {e.response.status_code}", "url": url} except Exception as e: logger.exception("Resolution error") return {"error": str(e), "url": url} async def _handle_hubcloud_style_page( self, page_url: str, text: str, depth: int ) -> Dict: soup = BeautifulSoup(text, "html.parser") links_data = [] seen = set() current_minute = str(datetime.datetime.now().minute).zfill(2) base_to_ignore = set() s3 = soup.find("a", id="s3") if s3 and s3.get("href"): base_s3 = s3["href"].strip() base_to_ignore.add(base_s3) dynamic_s3 = base_s3 + "_1" + current_minute seen.add(dynamic_s3) links_data.append( { "server": s3.text.strip().replace("Download", "").strip() or "FSLv2 Server", "link": dynamic_s3, } ) fsl = soup.find("a", id="fsl") if fsl and fsl.get("href"): base_fsl = fsl["href"].strip() base_to_ignore.add(base_fsl) dynamic_fsl = base_fsl + "1" + current_minute seen.add(dynamic_fsl) links_data.append( { "server": fsl.text.strip().replace("Download", "").strip() or "FSL Server", "link": dynamic_fsl, } ) junk_keywords = [ "hubcloud.fans", "drive/admin", "t.me", "tinyurl.com", "one.one.one.one", "google.com/search", "ampproject.org", "bloggingvector.shop", ] for a in soup.find_all("a", href=True): href = a["href"].strip() if not href or href.startswith(("#", "javascript:")): continue if href.startswith("/"): href = f"https://{httpx.URL(page_url).host}{href}" if href in base_to_ignore or any(j in href.lower() for j in junk_keywords): continue if "gamerxyt.com/hubcloud.php" in href.lower() and "host=" in href.lower(): return await self.resolve(href, depth + 1) if href not in seen: seen.add(href) label = a.text.strip().replace("Download", "").strip() links_data.append({"server": label or "Unknown Server", "link": href}) download_links = [] for item in links_data: lower_link = item["link"].lower() if any( x in lower_link for x in [ ".mkv", ".mp4", ".zip", "pixeldrain", "fsl-buckets", "toxix.buzz", "hubcdn.fans", "cloudserver", ] ): item["link"] = self._convert_pixeldrain(item["link"]) download_links.append(item) if download_links: return { "found_links": download_links, "count": len(download_links), "source": "hubcloud_gamerxyt", "page_url": page_url, } return {"detail": "No usable download links found", "page_url": page_url} async def _handle_wrapper(self, url: str, text: str, depth: int) -> Dict: direct_match = re.search( r'(https?://(?:www\.)?gamerxyt\.com/hubcloud\.php\?[^"\'>\s]+)', text ) if direct_match: return await self.resolve(direct_match.group(1), depth + 1) for b64 in re.findall(r"[A-Za-z0-9+/=]{40,}", text): dec = safe_b64decode(b64) if "gamerxyt.com" in dec or "hubcloud.php" in dec: m = re.search(r'(https?://[^"\'>\s]+)', dec) if m: return await self.resolve(m.group(1), depth + 1) patterns = [ r"s\('o','([A-Za-z0-9+/=]+)'", r"ck\('_wp_http[^']*','([^']+)'", r"['\"]([A-Za-z0-9+/=]{100,})['\"]", ] combined = "".join(["".join(re.findall(pat, text)) for pat in patterns]) if combined: step1 = safe_b64decode(combined) step2 = safe_b64decode(step1) rotated = codecs.encode(step2, "rot_13") if step2 else step2 final_str = safe_b64decode(rotated) try: if final_str.strip().startswith("{"): data = json.loads(final_str) next_url = data.get("o") or data.get("url") or data.get("link") if next_url: decoded = safe_b64decode(next_url) final = decoded if decoded.startswith("http") else next_url return await self.resolve(final, depth + 1) except Exception: pass return {"error": "Wrapper failed to extract link", "url": url} async def _fallback_search(self, text: str, depth: int) -> Dict: for b64 in re.findall(r"[A-Za-z0-9+/=]{100,}", text): dec = safe_b64decode(b64) if dec.startswith(("http://", "https://")): return await self.resolve(dec, depth + 1) return {"error": "Unsupported page", "detail": "No recognizable pattern"} # ========================================== # SCRAPING MODULE (Home, Info, Search) # ========================================== async def get_home(self, cat_type: str = "latest_movie") -> Dict: cat_type = cat_type.lower().strip() if cat_type in ["movies", "movie"]: cat_type = "movie" elif cat_type in ["lates_movie", "latest_movies", "latest"]: cat_type = "latest_movie" paths = { "movie": "/category/movies/", "anime": "/category/anime/", "ott": "/category/web-series/", "latest_movie": "/", } target_path = paths.get(cat_type, "/") url = f"{HDHUB_BASE_URL.rstrip('/')}{target_path}" try: resp = await self.client.get(url) if resp.status_code == 404 and cat_type != "latest_movie": fallback_paths = { "movie": "/movies/", "anime": "/anime/", "ott": "/web-series/", } url = f"{HDHUB_BASE_URL.rstrip('/')}{fallback_paths.get(cat_type, target_path)}" resp = await self.client.get(url) return self._parse_movie_cards(resp.text, url, category=cat_type) except Exception as e: return {"error": str(e)} async def search(self, query: str) -> Dict: url = f"{HDHUB_BASE_URL.rstrip('/')}/" try: resp = await self.client.get(url, params={"s": query}) return self._parse_movie_cards(resp.text, str(resp.url), query=query) except Exception as e: return {"error": str(e), "query": query} def _parse_movie_cards( self, html: str, source_url: str, category: str = None, query: str = None ) -> Dict: """Helper to parse movie cards for both home and search endpoints.""" soup = BeautifulSoup(html, "html.parser") results = [] for a_tag in soup.find_all("a", class_="movie-card"): href = a_tag.get("href") if not href: continue if href.startswith("/"): href = HDHUB_BASE_URL.rstrip("/") + href title_tag = a_tag.find("h3", class_="movie-card-title") title = title_tag.text.strip() if title_tag else "Unknown Title" img_tag = a_tag.find("img") img_url = img_tag.get("src") if img_tag else "" if title: results.append({"title": title, "url": href, "image": img_url}) unique_results = [] seen = set() for r in results: if r["url"] not in seen: seen.add(r["url"]) unique_results.append(r) response = {"source": source_url} if category: response["category"] = category if query: response["query"] = query response["results"] = unique_results return response async def get_info(self, url: str) -> Dict: try: resp = await self.client.get(url) soup = BeautifulSoup(resp.text, "html.parser") # Basic details title_tag = soup.find("h1", class_="page-title") or soup.find("h1") title = title_tag.text.strip() if title_tag else "Unknown" poster_tag = soup.find("img", src=re.compile(r"tmdb\.org")) poster = poster_tag.get("src") if poster_tag else "" # --------------------------------------------------------- # HTML SCRAPING: Score, Genres, Year, Cast, Trailer # --------------------------------------------------------- score = "" score_tag = soup.find("span", class_="imdb-score") if score_tag: score = score_tag.text.strip() genres = [] junk_tags = [ "1080p", "2160p", "720p", "dv hdr", "sdr", "movies", "series", "hindi", "english", ] for a_tag in soup.select(".badge.badge-outline a[href^='/category/']"): tag_text = a_tag.text.strip() if tag_text.lower() not in junk_tags: genres.append(tag_text) # --- DETERMINE IF MOVIE OR SERIES --- content_type = "movie" url_lower = url.lower() if "-series-" in url_lower: content_type = "series" elif any(g.lower() in ["series", "web series", "tv show"] for g in genres): content_type = "series" elif soup.find("div", class_="episode-download-item") or soup.find( id="episodes" ): content_type = "series" trailer_url = "" trailer_btn = soup.find(id="trailer-btn") if trailer_btn and trailer_btn.get("data-trailer-url"): trailer_url = trailer_btn.get("data-trailer-url") # Extracting Year & Cast from the Metadata List year = "" cast = "" for item in soup.find_all("div", class_="metadata-item"): label = item.find("span", class_="metadata-label") value = item.find("span", class_="metadata-value") if label and value: lbl = label.text.strip().lower() val = value.text.strip() if "air" in lbl or "release" in lbl or "year" in lbl: match = re.search(r"\d{4}", val) if match: year = match.group(0) if "stars" in lbl or "cast" in lbl: cast = val description = "" for p in soup.find_all("p"): text_clean = p.text.strip() if ( len(text_clean) > 50 and "download" not in text_clean.lower() and ( "band together" in text_clean.lower() or "young" in text_clean.lower() or len(text_clean) > 80 ) ): description = text_clean break # --------------------------------------------------------- # PARSE LINKS BASED ON TYPE (Movie vs Series) # --------------------------------------------------------- download_links = {} if content_type == "series": # EPISODES PARSING (Ignores "Packs" completely) grouped_episodes = {} current_season_context = "Unknown Quality" seen_episode_links = set() elements = soup.find_all( lambda tag: ( tag.name == "div" and tag.get("class") and ( "episode-header" in tag.get("class") or "episode-download-item" in tag.get("class") ) ) ) for element in elements: classes = element.get("class", []) if "episode-header" in classes: current_season_context = re.sub( r"\s+", " ", element.text.strip() ) if current_season_context not in grouped_episodes: grouped_episodes[current_season_context] = {} elif "episode-download-item" in classes: ep_info_tag = element.find("span", class_="badge-psa") ep_num_str = re.sub( r"\s+", " ", ep_info_tag.text.strip() if ep_info_tag else "Unknown Episode", ) if current_season_context not in grouped_episodes: grouped_episodes[current_season_context] = {} if ep_num_str not in grouped_episodes[current_season_context]: grouped_episodes[current_season_context][ep_num_str] = [] links_div = element.find("div", class_="episode-links") if links_div: for btn in links_div.find_all("a", class_="btn"): href, server_name = ( btn.get("href"), btn.text.strip().replace("\xa0", "").strip(), ) # EXCLUDE HUBDRIVE LINKS if "hubdrive" in server_name.lower(): continue uniq_key = ( current_season_context, ep_num_str, server_name, ) if href and uniq_key not in seen_episode_links: seen_episode_links.add(uniq_key) grouped_episodes[current_season_context][ ep_num_str ].append({"server": server_name, "url": href}) # Format array formatted_episodes = [] for season_quality, episodes_dict in grouped_episodes.items(): if not episodes_dict: continue ep_list = [] for ep_name, links in episodes_dict.items(): if links: # Remove empty episodes if HubDrive was the only link ep_list.append({"episode": ep_name, "links": links}) if ep_list: formatted_episodes.append( {"season_quality": season_quality, "episodes": ep_list} ) download_links["episodes"] = formatted_episodes else: # MOVIE PARSING (Grabs direct files/qualities) movie_links = [] seen_movie_links = set() for pack_item in soup.find_all("div", class_="download-item"): header = pack_item.find("div", class_="download-header") if not header: continue quality_name = re.sub( r"\s+", " ", header.text.replace("\n", " ").strip() ) links_div = pack_item.find("div", class_="grid grid-cols-2 gap-2") if links_div: btn_links = [] for btn in links_div.find_all("a", class_="btn"): href, server_name = ( btn.get("href"), btn.text.strip().replace("\xa0", "").strip(), ) # EXCLUDE HUBDRIVE LINKS if "hubdrive" in server_name.lower(): continue uniq_key = (quality_name, server_name) if href and uniq_key not in seen_movie_links: seen_movie_links.add(uniq_key) btn_links.append({"server": server_name, "url": href}) if btn_links: movie_links.append( {"quality": quality_name, "links": btn_links} ) download_links["movie_links"] = movie_links return { "title": title, "type": content_type, "year": year, "score": score, "genres": genres, "cast": cast, "trailer": trailer_url, "poster": poster, "description": description, "download_links": download_links, "page_url": url, } except Exception as e: return {"error": str(e), "url": url} # ====================== FastAPI ====================== engine = HDHubEngine() @asynccontextmanager async def lifespan(app: FastAPI): yield await engine.client.aclose() app = FastAPI(lifespan=lifespan, title="HDHub Scraper & Resolver v4.4") @app.get("/") async def root_directory(): return JSONResponse( { "name": "HDHub API Scraper & Resolver", "version": "4.4", "features": "Type Detection (Movie/Series). Removed Series Packs. Added Search. HubDrive Filtered. Pixeldrain Auto-Converter.", "endpoints": { "/home": { "description": "Fetch homepage or category-specific movie/series lists.", "method": "GET", "parameters": { "type": "(Optional) movie, anime, ott, latest_movie." }, "example": "/home?type=anime", }, "/search": { "description": "Search for movies or series by title.", "method": "GET", "parameters": {"query": "(Required) The search keyword."}, "example": "/search?query=batman", }, "/info": { "description": "Scrape full info. Automatically detects Movie or Series and hides series 'Packs'. HubDrive links are hidden.", "method": "GET", "parameters": { "url": "(Required) The full HDHub post URL obtained from /home or /search." }, "example": "/info?url=https://4khdhub.dad/xo-kitty-series-1017/", }, "/resolve": { "description": "Bypass intermediate ad/wrapper links to get final download URL. Automatically converts Pixeldrain links.", "method": "GET", "parameters": {"url": "(Required) Wrapper URL from /info."}, "example": "/resolve?url=https://gadgetsweb.xyz/?id=...", }, }, } ) @app.get("/home") async def api_home( type: str = Query( "latest_movie", description="Type of content available: movie, anime, ott, latest_movie", ), ): return await engine.get_home(type) @app.get("/search") async def api_search(query: str = Query(..., description="The search keyword")): return await engine.search(query) @app.get("/info") async def api_info( url: str = Query(..., description="Post URL (e.g. from /home or /search)"), ): if not url.startswith(("http://", "https://")): raise HTTPException(400, "Invalid URL") return await engine.get_info(url) @app.get("/resolve") async def api_resolve(url: str = Query(..., description="Wrapper URL to resolve")): if not url.startswith(("http://", "https://")): raise HTTPException(400, "Invalid URL") return await engine.resolve(url) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)